1# orm/strategies.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""sqlalchemy.orm.interfaces.LoaderStrategy
11 implementations, and related MapperOptions."""
12
13from __future__ import annotations
14
15import collections
16import itertools
17from typing import Any
18from typing import Dict
19from typing import Optional
20from typing import Tuple
21from typing import TYPE_CHECKING
22from typing import Union
23
24from . import attributes
25from . import exc as orm_exc
26from . import interfaces
27from . import loading
28from . import path_registry
29from . import properties
30from . import query
31from . import relationships
32from . import unitofwork
33from . import util as orm_util
34from .base import _DEFER_FOR_STATE
35from .base import _RAISE_FOR_STATE
36from .base import _SET_DEFERRED_EXPIRED
37from .base import ATTR_WAS_SET
38from .base import LoaderCallableStatus
39from .base import PASSIVE_OFF
40from .base import PassiveFlag
41from .context import _column_descriptions
42from .context import ORMCompileState
43from .context import ORMSelectCompileState
44from .context import QueryContext
45from .interfaces import LoaderStrategy
46from .interfaces import StrategizedProperty
47from .session import _state_session
48from .state import InstanceState
49from .strategy_options import Load
50from .util import _none_only_set
51from .util import AliasedClass
52from .. import event
53from .. import exc as sa_exc
54from .. import inspect
55from .. import log
56from .. import sql
57from .. import util
58from ..sql import util as sql_util
59from ..sql import visitors
60from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
61from ..sql.selectable import Select
62from ..util.typing import Literal
63
64if TYPE_CHECKING:
65 from .mapper import Mapper
66 from .relationships import RelationshipProperty
67 from ..sql.elements import ColumnElement
68
69
70def _register_attribute(
71 prop,
72 mapper,
73 useobject,
74 compare_function=None,
75 typecallable=None,
76 callable_=None,
77 proxy_property=None,
78 active_history=False,
79 impl_class=None,
80 **kw,
81):
82 listen_hooks = []
83
84 uselist = useobject and prop.uselist
85
86 if useobject and prop.single_parent:
87 listen_hooks.append(single_parent_validator)
88
89 if prop.key in prop.parent.validators:
90 fn, opts = prop.parent.validators[prop.key]
91 listen_hooks.append(
92 lambda desc, prop: orm_util._validator_events(
93 desc, prop.key, fn, **opts
94 )
95 )
96
97 if useobject:
98 listen_hooks.append(unitofwork.track_cascade_events)
99
100 # need to assemble backref listeners
101 # after the singleparentvalidator, mapper validator
102 if useobject:
103 backref = prop.back_populates
104 if backref and prop._effective_sync_backref:
105 listen_hooks.append(
106 lambda desc, prop: attributes.backref_listeners(
107 desc, backref, uselist
108 )
109 )
110
111 # a single MapperProperty is shared down a class inheritance
112 # hierarchy, so we set up attribute instrumentation and backref event
113 # for each mapper down the hierarchy.
114
115 # typically, "mapper" is the same as prop.parent, due to the way
116 # the configure_mappers() process runs, however this is not strongly
117 # enforced, and in the case of a second configure_mappers() run the
118 # mapper here might not be prop.parent; also, a subclass mapper may
119 # be called here before a superclass mapper. That is, can't depend
120 # on mappers not already being set up so we have to check each one.
121
122 for m in mapper.self_and_descendants:
123 if prop is m._props.get(
124 prop.key
125 ) and not m.class_manager._attr_has_impl(prop.key):
126 desc = attributes.register_attribute_impl(
127 m.class_,
128 prop.key,
129 parent_token=prop,
130 uselist=uselist,
131 compare_function=compare_function,
132 useobject=useobject,
133 trackparent=useobject
134 and (
135 prop.single_parent
136 or prop.direction is interfaces.ONETOMANY
137 ),
138 typecallable=typecallable,
139 callable_=callable_,
140 active_history=active_history,
141 impl_class=impl_class,
142 send_modified_events=not useobject or not prop.viewonly,
143 doc=prop.doc,
144 **kw,
145 )
146
147 for hook in listen_hooks:
148 hook(desc, prop)
149
150
151@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
152class UninstrumentedColumnLoader(LoaderStrategy):
153 """Represent a non-instrumented MapperProperty.
154
155 The polymorphic_on argument of mapper() often results in this,
156 if the argument is against the with_polymorphic selectable.
157
158 """
159
160 __slots__ = ("columns",)
161
162 def __init__(self, parent, strategy_key):
163 super().__init__(parent, strategy_key)
164 self.columns = self.parent_property.columns
165
166 def setup_query(
167 self,
168 compile_state,
169 query_entity,
170 path,
171 loadopt,
172 adapter,
173 column_collection=None,
174 **kwargs,
175 ):
176 for c in self.columns:
177 if adapter:
178 c = adapter.columns[c]
179 compile_state._append_dedupe_col_collection(c, column_collection)
180
181 def create_row_processor(
182 self,
183 context,
184 query_entity,
185 path,
186 loadopt,
187 mapper,
188 result,
189 adapter,
190 populators,
191 ):
192 pass
193
194
195@log.class_logger
196@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
197class ColumnLoader(LoaderStrategy):
198 """Provide loading behavior for a :class:`.ColumnProperty`."""
199
200 __slots__ = "columns", "is_composite"
201
202 def __init__(self, parent, strategy_key):
203 super().__init__(parent, strategy_key)
204 self.columns = self.parent_property.columns
205 self.is_composite = hasattr(self.parent_property, "composite_class")
206
207 def setup_query(
208 self,
209 compile_state,
210 query_entity,
211 path,
212 loadopt,
213 adapter,
214 column_collection,
215 memoized_populators,
216 check_for_adapt=False,
217 **kwargs,
218 ):
219 for c in self.columns:
220 if adapter:
221 if check_for_adapt:
222 c = adapter.adapt_check_present(c)
223 if c is None:
224 return
225 else:
226 c = adapter.columns[c]
227
228 compile_state._append_dedupe_col_collection(c, column_collection)
229
230 fetch = self.columns[0]
231 if adapter:
232 fetch = adapter.columns[fetch]
233 if fetch is None:
234 # None happens here only for dml bulk_persistence cases
235 # when context.DMLReturningColFilter is used
236 return
237
238 memoized_populators[self.parent_property] = fetch
239
240 def init_class_attribute(self, mapper):
241 self.is_class_level = True
242 coltype = self.columns[0].type
243 # TODO: check all columns ? check for foreign key as well?
244 active_history = (
245 self.parent_property.active_history
246 or self.columns[0].primary_key
247 or (
248 mapper.version_id_col is not None
249 and mapper._columntoproperty.get(mapper.version_id_col, None)
250 is self.parent_property
251 )
252 )
253
254 _register_attribute(
255 self.parent_property,
256 mapper,
257 useobject=False,
258 compare_function=coltype.compare_values,
259 active_history=active_history,
260 )
261
262 def create_row_processor(
263 self,
264 context,
265 query_entity,
266 path,
267 loadopt,
268 mapper,
269 result,
270 adapter,
271 populators,
272 ):
273 # look through list of columns represented here
274 # to see which, if any, is present in the row.
275
276 for col in self.columns:
277 if adapter:
278 col = adapter.columns[col]
279 getter = result._getter(col, False)
280 if getter:
281 populators["quick"].append((self.key, getter))
282 break
283 else:
284 populators["expire"].append((self.key, True))
285
286
287@log.class_logger
288@properties.ColumnProperty.strategy_for(query_expression=True)
289class ExpressionColumnLoader(ColumnLoader):
290 def __init__(self, parent, strategy_key):
291 super().__init__(parent, strategy_key)
292
293 # compare to the "default" expression that is mapped in
294 # the column. If it's sql.null, we don't need to render
295 # unless an expr is passed in the options.
296 null = sql.null().label(None)
297 self._have_default_expression = any(
298 not c.compare(null) for c in self.parent_property.columns
299 )
300
301 def setup_query(
302 self,
303 compile_state,
304 query_entity,
305 path,
306 loadopt,
307 adapter,
308 column_collection,
309 memoized_populators,
310 **kwargs,
311 ):
312 columns = None
313 if loadopt and loadopt._extra_criteria:
314 columns = loadopt._extra_criteria
315
316 elif self._have_default_expression:
317 columns = self.parent_property.columns
318
319 if columns is None:
320 return
321
322 for c in columns:
323 if adapter:
324 c = adapter.columns[c]
325 compile_state._append_dedupe_col_collection(c, column_collection)
326
327 fetch = columns[0]
328 if adapter:
329 fetch = adapter.columns[fetch]
330 if fetch is None:
331 # None is not expected to be the result of any
332 # adapter implementation here, however there may be theoretical
333 # usages of returning() with context.DMLReturningColFilter
334 return
335
336 memoized_populators[self.parent_property] = fetch
337
338 def create_row_processor(
339 self,
340 context,
341 query_entity,
342 path,
343 loadopt,
344 mapper,
345 result,
346 adapter,
347 populators,
348 ):
349 # look through list of columns represented here
350 # to see which, if any, is present in the row.
351 if loadopt and loadopt._extra_criteria:
352 columns = loadopt._extra_criteria
353
354 for col in columns:
355 if adapter:
356 col = adapter.columns[col]
357 getter = result._getter(col, False)
358 if getter:
359 populators["quick"].append((self.key, getter))
360 break
361 else:
362 populators["expire"].append((self.key, True))
363
364 def init_class_attribute(self, mapper):
365 self.is_class_level = True
366
367 _register_attribute(
368 self.parent_property,
369 mapper,
370 useobject=False,
371 compare_function=self.columns[0].type.compare_values,
372 accepts_scalar_loader=False,
373 )
374
375
376@log.class_logger
377@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
378@properties.ColumnProperty.strategy_for(
379 deferred=True, instrument=True, raiseload=True
380)
381@properties.ColumnProperty.strategy_for(do_nothing=True)
382class DeferredColumnLoader(LoaderStrategy):
383 """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
384
385 __slots__ = "columns", "group", "raiseload"
386
387 def __init__(self, parent, strategy_key):
388 super().__init__(parent, strategy_key)
389 if hasattr(self.parent_property, "composite_class"):
390 raise NotImplementedError(
391 "Deferred loading for composite types not implemented yet"
392 )
393 self.raiseload = self.strategy_opts.get("raiseload", False)
394 self.columns = self.parent_property.columns
395 self.group = self.parent_property.group
396
397 def create_row_processor(
398 self,
399 context,
400 query_entity,
401 path,
402 loadopt,
403 mapper,
404 result,
405 adapter,
406 populators,
407 ):
408 # for a DeferredColumnLoader, this method is only used during a
409 # "row processor only" query; see test_deferred.py ->
410 # tests with "rowproc_only" in their name. As of the 1.0 series,
411 # loading._instance_processor doesn't use a "row processing" function
412 # to populate columns, instead it uses data in the "populators"
413 # dictionary. Normally, the DeferredColumnLoader.setup_query()
414 # sets up that data in the "memoized_populators" dictionary
415 # and "create_row_processor()" here is never invoked.
416
417 if (
418 context.refresh_state
419 and context.query._compile_options._only_load_props
420 and self.key in context.query._compile_options._only_load_props
421 ):
422 self.parent_property._get_strategy(
423 (("deferred", False), ("instrument", True))
424 ).create_row_processor(
425 context,
426 query_entity,
427 path,
428 loadopt,
429 mapper,
430 result,
431 adapter,
432 populators,
433 )
434
435 elif not self.is_class_level:
436 if self.raiseload:
437 set_deferred_for_local_state = (
438 self.parent_property._raise_column_loader
439 )
440 else:
441 set_deferred_for_local_state = (
442 self.parent_property._deferred_column_loader
443 )
444 populators["new"].append((self.key, set_deferred_for_local_state))
445 else:
446 populators["expire"].append((self.key, False))
447
448 def init_class_attribute(self, mapper):
449 self.is_class_level = True
450
451 _register_attribute(
452 self.parent_property,
453 mapper,
454 useobject=False,
455 compare_function=self.columns[0].type.compare_values,
456 callable_=self._load_for_state,
457 load_on_unexpire=False,
458 )
459
460 def setup_query(
461 self,
462 compile_state,
463 query_entity,
464 path,
465 loadopt,
466 adapter,
467 column_collection,
468 memoized_populators,
469 only_load_props=None,
470 **kw,
471 ):
472 if (
473 (
474 compile_state.compile_options._render_for_subquery
475 and self.parent_property._renders_in_subqueries
476 )
477 or (
478 loadopt
479 and set(self.columns).intersection(
480 self.parent._should_undefer_in_wildcard
481 )
482 )
483 or (
484 loadopt
485 and self.group
486 and loadopt.local_opts.get(
487 "undefer_group_%s" % self.group, False
488 )
489 )
490 or (only_load_props and self.key in only_load_props)
491 ):
492 self.parent_property._get_strategy(
493 (("deferred", False), ("instrument", True))
494 ).setup_query(
495 compile_state,
496 query_entity,
497 path,
498 loadopt,
499 adapter,
500 column_collection,
501 memoized_populators,
502 **kw,
503 )
504 elif self.is_class_level:
505 memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
506 elif not self.raiseload:
507 memoized_populators[self.parent_property] = _DEFER_FOR_STATE
508 else:
509 memoized_populators[self.parent_property] = _RAISE_FOR_STATE
510
511 def _load_for_state(self, state, passive):
512 if not state.key:
513 return LoaderCallableStatus.ATTR_EMPTY
514
515 if not passive & PassiveFlag.SQL_OK:
516 return LoaderCallableStatus.PASSIVE_NO_RESULT
517
518 localparent = state.manager.mapper
519
520 if self.group:
521 toload = [
522 p.key
523 for p in localparent.iterate_properties
524 if isinstance(p, StrategizedProperty)
525 and isinstance(p.strategy, DeferredColumnLoader)
526 and p.group == self.group
527 ]
528 else:
529 toload = [self.key]
530
531 # narrow the keys down to just those which have no history
532 group = [k for k in toload if k in state.unmodified]
533
534 session = _state_session(state)
535 if session is None:
536 raise orm_exc.DetachedInstanceError(
537 "Parent instance %s is not bound to a Session; "
538 "deferred load operation of attribute '%s' cannot proceed"
539 % (orm_util.state_str(state), self.key)
540 )
541
542 if self.raiseload:
543 self._invoke_raise_load(state, passive, "raise")
544
545 loading.load_scalar_attributes(
546 state.mapper, state, set(group), PASSIVE_OFF
547 )
548
549 return LoaderCallableStatus.ATTR_WAS_SET
550
551 def _invoke_raise_load(self, state, passive, lazy):
552 raise sa_exc.InvalidRequestError(
553 "'%s' is not available due to raiseload=True" % (self,)
554 )
555
556
557class LoadDeferredColumns:
558 """serializable loader object used by DeferredColumnLoader"""
559
560 def __init__(self, key: str, raiseload: bool = False):
561 self.key = key
562 self.raiseload = raiseload
563
564 def __call__(self, state, passive=attributes.PASSIVE_OFF):
565 key = self.key
566
567 localparent = state.manager.mapper
568 prop = localparent._props[key]
569 if self.raiseload:
570 strategy_key = (
571 ("deferred", True),
572 ("instrument", True),
573 ("raiseload", True),
574 )
575 else:
576 strategy_key = (("deferred", True), ("instrument", True))
577 strategy = prop._get_strategy(strategy_key)
578 return strategy._load_for_state(state, passive)
579
580
581class AbstractRelationshipLoader(LoaderStrategy):
582 """LoaderStratgies which deal with related objects."""
583
584 __slots__ = "mapper", "target", "uselist", "entity"
585
586 def __init__(self, parent, strategy_key):
587 super().__init__(parent, strategy_key)
588 self.mapper = self.parent_property.mapper
589 self.entity = self.parent_property.entity
590 self.target = self.parent_property.target
591 self.uselist = self.parent_property.uselist
592
593 def _immediateload_create_row_processor(
594 self,
595 context,
596 query_entity,
597 path,
598 loadopt,
599 mapper,
600 result,
601 adapter,
602 populators,
603 ):
604 return self.parent_property._get_strategy(
605 (("lazy", "immediate"),)
606 ).create_row_processor(
607 context,
608 query_entity,
609 path,
610 loadopt,
611 mapper,
612 result,
613 adapter,
614 populators,
615 )
616
617
618@log.class_logger
619@relationships.RelationshipProperty.strategy_for(do_nothing=True)
620class DoNothingLoader(LoaderStrategy):
621 """Relationship loader that makes no change to the object's state.
622
623 Compared to NoLoader, this loader does not initialize the
624 collection/attribute to empty/none; the usual default LazyLoader will
625 take effect.
626
627 """
628
629
630@log.class_logger
631@relationships.RelationshipProperty.strategy_for(lazy="noload")
632@relationships.RelationshipProperty.strategy_for(lazy=None)
633class NoLoader(AbstractRelationshipLoader):
634 """Provide loading behavior for a :class:`.Relationship`
635 with "lazy=None".
636
637 """
638
639 __slots__ = ()
640
641 def init_class_attribute(self, mapper):
642 self.is_class_level = True
643
644 _register_attribute(
645 self.parent_property,
646 mapper,
647 useobject=True,
648 typecallable=self.parent_property.collection_class,
649 )
650
651 def create_row_processor(
652 self,
653 context,
654 query_entity,
655 path,
656 loadopt,
657 mapper,
658 result,
659 adapter,
660 populators,
661 ):
662 def invoke_no_load(state, dict_, row):
663 if self.uselist:
664 attributes.init_state_collection(state, dict_, self.key)
665 else:
666 dict_[self.key] = None
667
668 populators["new"].append((self.key, invoke_no_load))
669
670
671@log.class_logger
672@relationships.RelationshipProperty.strategy_for(lazy=True)
673@relationships.RelationshipProperty.strategy_for(lazy="select")
674@relationships.RelationshipProperty.strategy_for(lazy="raise")
675@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
676@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
677class LazyLoader(
678 AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
679):
680 """Provide loading behavior for a :class:`.Relationship`
681 with "lazy=True", that is loads when first accessed.
682
683 """
684
685 __slots__ = (
686 "_lazywhere",
687 "_rev_lazywhere",
688 "_lazyload_reverse_option",
689 "_order_by",
690 "use_get",
691 "is_aliased_class",
692 "_bind_to_col",
693 "_equated_columns",
694 "_rev_bind_to_col",
695 "_rev_equated_columns",
696 "_simple_lazy_clause",
697 "_raise_always",
698 "_raise_on_sql",
699 )
700
701 _lazywhere: ColumnElement[bool]
702 _bind_to_col: Dict[str, ColumnElement[Any]]
703 _rev_lazywhere: ColumnElement[bool]
704 _rev_bind_to_col: Dict[str, ColumnElement[Any]]
705
706 parent_property: RelationshipProperty[Any]
707
708 def __init__(
709 self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
710 ):
711 super().__init__(parent, strategy_key)
712 self._raise_always = self.strategy_opts["lazy"] == "raise"
713 self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
714
715 self.is_aliased_class = inspect(self.entity).is_aliased_class
716
717 join_condition = self.parent_property._join_condition
718 (
719 self._lazywhere,
720 self._bind_to_col,
721 self._equated_columns,
722 ) = join_condition.create_lazy_clause()
723
724 (
725 self._rev_lazywhere,
726 self._rev_bind_to_col,
727 self._rev_equated_columns,
728 ) = join_condition.create_lazy_clause(reverse_direction=True)
729
730 if self.parent_property.order_by:
731 self._order_by = [
732 sql_util._deep_annotate(elem, {"_orm_adapt": True})
733 for elem in util.to_list(self.parent_property.order_by)
734 ]
735 else:
736 self._order_by = None
737
738 self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
739
740 # determine if our "lazywhere" clause is the same as the mapper's
741 # get() clause. then we can just use mapper.get()
742 #
743 # TODO: the "not self.uselist" can be taken out entirely; a m2o
744 # load that populates for a list (very unusual, but is possible with
745 # the API) can still set for "None" and the attribute system will
746 # populate as an empty list.
747 self.use_get = (
748 not self.is_aliased_class
749 and not self.uselist
750 and self.entity._get_clause[0].compare(
751 self._lazywhere,
752 use_proxies=True,
753 compare_keys=False,
754 equivalents=self.mapper._equivalent_columns,
755 )
756 )
757
758 if self.use_get:
759 for col in list(self._equated_columns):
760 if col in self.mapper._equivalent_columns:
761 for c in self.mapper._equivalent_columns[col]:
762 self._equated_columns[c] = self._equated_columns[col]
763
764 self.logger.info(
765 "%s will use Session.get() to optimize instance loads", self
766 )
767
768 def init_class_attribute(self, mapper):
769 self.is_class_level = True
770
771 _legacy_inactive_history_style = (
772 self.parent_property._legacy_inactive_history_style
773 )
774
775 if self.parent_property.active_history:
776 active_history = True
777 _deferred_history = False
778
779 elif (
780 self.parent_property.direction is not interfaces.MANYTOONE
781 or not self.use_get
782 ):
783 if _legacy_inactive_history_style:
784 active_history = True
785 _deferred_history = False
786 else:
787 active_history = False
788 _deferred_history = True
789 else:
790 active_history = _deferred_history = False
791
792 _register_attribute(
793 self.parent_property,
794 mapper,
795 useobject=True,
796 callable_=self._load_for_state,
797 typecallable=self.parent_property.collection_class,
798 active_history=active_history,
799 _deferred_history=_deferred_history,
800 )
801
802 def _memoized_attr__simple_lazy_clause(self):
803 lazywhere = sql_util._deep_annotate(
804 self._lazywhere, {"_orm_adapt": True}
805 )
806
807 criterion, bind_to_col = (lazywhere, self._bind_to_col)
808
809 params = []
810
811 def visit_bindparam(bindparam):
812 bindparam.unique = False
813
814 visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
815
816 def visit_bindparam(bindparam):
817 if bindparam._identifying_key in bind_to_col:
818 params.append(
819 (
820 bindparam.key,
821 bind_to_col[bindparam._identifying_key],
822 None,
823 )
824 )
825 elif bindparam.callable is None:
826 params.append((bindparam.key, None, bindparam.value))
827
828 criterion = visitors.cloned_traverse(
829 criterion, {}, {"bindparam": visit_bindparam}
830 )
831
832 return criterion, params
833
834 def _generate_lazy_clause(self, state, passive):
835 criterion, param_keys = self._simple_lazy_clause
836
837 if state is None:
838 return sql_util.adapt_criterion_to_null(
839 criterion, [key for key, ident, value in param_keys]
840 )
841
842 mapper = self.parent_property.parent
843
844 o = state.obj() # strong ref
845 dict_ = attributes.instance_dict(o)
846
847 if passive & PassiveFlag.INIT_OK:
848 passive ^= PassiveFlag.INIT_OK
849
850 params = {}
851 for key, ident, value in param_keys:
852 if ident is not None:
853 if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
854 value = mapper._get_committed_state_attr_by_column(
855 state, dict_, ident, passive
856 )
857 else:
858 value = mapper._get_state_attr_by_column(
859 state, dict_, ident, passive
860 )
861
862 params[key] = value
863
864 return criterion, params
865
866 def _invoke_raise_load(self, state, passive, lazy):
867 raise sa_exc.InvalidRequestError(
868 "'%s' is not available due to lazy='%s'" % (self, lazy)
869 )
870
871 def _load_for_state(
872 self,
873 state,
874 passive,
875 loadopt=None,
876 extra_criteria=(),
877 extra_options=(),
878 alternate_effective_path=None,
879 execution_options=util.EMPTY_DICT,
880 ):
881 if not state.key and (
882 (
883 not self.parent_property.load_on_pending
884 and not state._load_pending
885 )
886 or not state.session_id
887 ):
888 return LoaderCallableStatus.ATTR_EMPTY
889
890 pending = not state.key
891 primary_key_identity = None
892
893 use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
894
895 if (not passive & PassiveFlag.SQL_OK and not use_get) or (
896 not passive & attributes.NON_PERSISTENT_OK and pending
897 ):
898 return LoaderCallableStatus.PASSIVE_NO_RESULT
899
900 if (
901 # we were given lazy="raise"
902 self._raise_always
903 # the no_raise history-related flag was not passed
904 and not passive & PassiveFlag.NO_RAISE
905 and (
906 # if we are use_get and related_object_ok is disabled,
907 # which means we are at most looking in the identity map
908 # for history purposes or otherwise returning
909 # PASSIVE_NO_RESULT, don't raise. This is also a
910 # history-related flag
911 not use_get
912 or passive & PassiveFlag.RELATED_OBJECT_OK
913 )
914 ):
915 self._invoke_raise_load(state, passive, "raise")
916
917 session = _state_session(state)
918 if not session:
919 if passive & PassiveFlag.NO_RAISE:
920 return LoaderCallableStatus.PASSIVE_NO_RESULT
921
922 raise orm_exc.DetachedInstanceError(
923 "Parent instance %s is not bound to a Session; "
924 "lazy load operation of attribute '%s' cannot proceed"
925 % (orm_util.state_str(state), self.key)
926 )
927
928 # if we have a simple primary key load, check the
929 # identity map without generating a Query at all
930 if use_get:
931 primary_key_identity = self._get_ident_for_use_get(
932 session, state, passive
933 )
934 if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
935 return LoaderCallableStatus.PASSIVE_NO_RESULT
936 elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
937 return LoaderCallableStatus.NEVER_SET
938
939 # test for None alone in primary_key_identity based on
940 # allow_partial_pks preference. PASSIVE_NO_RESULT and NEVER_SET
941 # have already been tested above
942 if not self.mapper.allow_partial_pks:
943 if _none_only_set.intersection(primary_key_identity):
944 return None
945 else:
946 if _none_only_set.issuperset(primary_key_identity):
947 return None
948
949 if (
950 self.key in state.dict
951 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
952 ):
953 return LoaderCallableStatus.ATTR_WAS_SET
954
955 # look for this identity in the identity map. Delegate to the
956 # Query class in use, as it may have special rules for how it
957 # does this, including how it decides what the correct
958 # identity_token would be for this identity.
959
960 instance = session._identity_lookup(
961 self.entity,
962 primary_key_identity,
963 passive=passive,
964 lazy_loaded_from=state,
965 )
966
967 if instance is not None:
968 if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
969 return None
970 else:
971 return instance
972 elif (
973 not passive & PassiveFlag.SQL_OK
974 or not passive & PassiveFlag.RELATED_OBJECT_OK
975 ):
976 return LoaderCallableStatus.PASSIVE_NO_RESULT
977
978 return self._emit_lazyload(
979 session,
980 state,
981 primary_key_identity,
982 passive,
983 loadopt,
984 extra_criteria,
985 extra_options,
986 alternate_effective_path,
987 execution_options,
988 )
989
990 def _get_ident_for_use_get(self, session, state, passive):
991 instance_mapper = state.manager.mapper
992
993 if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
994 get_attr = instance_mapper._get_committed_state_attr_by_column
995 else:
996 get_attr = instance_mapper._get_state_attr_by_column
997
998 dict_ = state.dict
999
1000 return [
1001 get_attr(state, dict_, self._equated_columns[pk], passive=passive)
1002 for pk in self.mapper.primary_key
1003 ]
1004
1005 @util.preload_module("sqlalchemy.orm.strategy_options")
1006 def _emit_lazyload(
1007 self,
1008 session,
1009 state,
1010 primary_key_identity,
1011 passive,
1012 loadopt,
1013 extra_criteria,
1014 extra_options,
1015 alternate_effective_path,
1016 execution_options,
1017 ):
1018 strategy_options = util.preloaded.orm_strategy_options
1019
1020 clauseelement = self.entity.__clause_element__()
1021 stmt = Select._create_raw_select(
1022 _raw_columns=[clauseelement],
1023 _propagate_attrs=clauseelement._propagate_attrs,
1024 _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
1025 _compile_options=ORMCompileState.default_compile_options,
1026 )
1027 load_options = QueryContext.default_load_options
1028
1029 load_options += {
1030 "_invoke_all_eagers": False,
1031 "_lazy_loaded_from": state,
1032 }
1033
1034 if self.parent_property.secondary is not None:
1035 stmt = stmt.select_from(
1036 self.mapper, self.parent_property.secondary
1037 )
1038
1039 pending = not state.key
1040
1041 # don't autoflush on pending
1042 if pending or passive & attributes.NO_AUTOFLUSH:
1043 stmt._execution_options = util.immutabledict({"autoflush": False})
1044
1045 use_get = self.use_get
1046
1047 if state.load_options or (loadopt and loadopt._extra_criteria):
1048 if alternate_effective_path is None:
1049 effective_path = state.load_path[self.parent_property]
1050 else:
1051 effective_path = alternate_effective_path[self.parent_property]
1052
1053 opts = state.load_options
1054
1055 if loadopt and loadopt._extra_criteria:
1056 use_get = False
1057 opts += (
1058 orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
1059 )
1060
1061 stmt._with_options = opts
1062 elif alternate_effective_path is None:
1063 # this path is used if there are not already any options
1064 # in the query, but an event may want to add them
1065 effective_path = state.mapper._path_registry[self.parent_property]
1066 else:
1067 # added by immediateloader
1068 effective_path = alternate_effective_path[self.parent_property]
1069
1070 if extra_options:
1071 stmt._with_options += extra_options
1072
1073 stmt._compile_options += {"_current_path": effective_path}
1074
1075 if use_get:
1076 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1077 self._invoke_raise_load(state, passive, "raise_on_sql")
1078
1079 return loading.load_on_pk_identity(
1080 session,
1081 stmt,
1082 primary_key_identity,
1083 load_options=load_options,
1084 execution_options=execution_options,
1085 )
1086
1087 if self._order_by:
1088 stmt._order_by_clauses = self._order_by
1089
1090 def _lazyload_reverse(compile_context):
1091 for rev in self.parent_property._reverse_property:
1092 # reverse props that are MANYTOONE are loading *this*
1093 # object from get(), so don't need to eager out to those.
1094 if (
1095 rev.direction is interfaces.MANYTOONE
1096 and rev._use_get
1097 and not isinstance(rev.strategy, LazyLoader)
1098 ):
1099 strategy_options.Load._construct_for_existing_path(
1100 compile_context.compile_options._current_path[
1101 rev.parent
1102 ]
1103 ).lazyload(rev).process_compile_state(compile_context)
1104
1105 stmt._with_context_options += (
1106 (_lazyload_reverse, self.parent_property),
1107 )
1108
1109 lazy_clause, params = self._generate_lazy_clause(state, passive)
1110
1111 if execution_options:
1112 execution_options = util.EMPTY_DICT.merge_with(
1113 execution_options,
1114 {
1115 "_sa_orm_load_options": load_options,
1116 },
1117 )
1118 else:
1119 execution_options = {
1120 "_sa_orm_load_options": load_options,
1121 }
1122
1123 if (
1124 self.key in state.dict
1125 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
1126 ):
1127 return LoaderCallableStatus.ATTR_WAS_SET
1128
1129 if pending:
1130 if util.has_intersection(orm_util._none_set, params.values()):
1131 return None
1132
1133 elif util.has_intersection(orm_util._never_set, params.values()):
1134 return None
1135
1136 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1137 self._invoke_raise_load(state, passive, "raise_on_sql")
1138
1139 stmt._where_criteria = (lazy_clause,)
1140
1141 result = session.execute(
1142 stmt, params, execution_options=execution_options
1143 )
1144
1145 result = result.unique().scalars().all()
1146
1147 if self.uselist:
1148 return result
1149 else:
1150 l = len(result)
1151 if l:
1152 if l > 1:
1153 util.warn(
1154 "Multiple rows returned with "
1155 "uselist=False for lazily-loaded attribute '%s' "
1156 % self.parent_property
1157 )
1158
1159 return result[0]
1160 else:
1161 return None
1162
1163 def create_row_processor(
1164 self,
1165 context,
1166 query_entity,
1167 path,
1168 loadopt,
1169 mapper,
1170 result,
1171 adapter,
1172 populators,
1173 ):
1174 key = self.key
1175
1176 if (
1177 context.load_options._is_user_refresh
1178 and context.query._compile_options._only_load_props
1179 and self.key in context.query._compile_options._only_load_props
1180 ):
1181 return self._immediateload_create_row_processor(
1182 context,
1183 query_entity,
1184 path,
1185 loadopt,
1186 mapper,
1187 result,
1188 adapter,
1189 populators,
1190 )
1191
1192 if not self.is_class_level or (loadopt and loadopt._extra_criteria):
1193 # we are not the primary manager for this attribute
1194 # on this class - set up a
1195 # per-instance lazyloader, which will override the
1196 # class-level behavior.
1197 # this currently only happens when using a
1198 # "lazyload" option on a "no load"
1199 # attribute - "eager" attributes always have a
1200 # class-level lazyloader installed.
1201 set_lazy_callable = (
1202 InstanceState._instance_level_callable_processor
1203 )(
1204 mapper.class_manager,
1205 LoadLazyAttribute(
1206 key,
1207 self,
1208 loadopt,
1209 (
1210 loadopt._generate_extra_criteria(context)
1211 if loadopt._extra_criteria
1212 else None
1213 ),
1214 ),
1215 key,
1216 )
1217
1218 populators["new"].append((self.key, set_lazy_callable))
1219 elif context.populate_existing or mapper.always_refresh:
1220
1221 def reset_for_lazy_callable(state, dict_, row):
1222 # we are the primary manager for this attribute on
1223 # this class - reset its
1224 # per-instance attribute state, so that the class-level
1225 # lazy loader is
1226 # executed when next referenced on this instance.
1227 # this is needed in
1228 # populate_existing() types of scenarios to reset
1229 # any existing state.
1230 state._reset(dict_, key)
1231
1232 populators["new"].append((self.key, reset_for_lazy_callable))
1233
1234
1235class LoadLazyAttribute:
1236 """semi-serializable loader object used by LazyLoader
1237
1238 Historically, this object would be carried along with instances that
1239 needed to run lazyloaders, so it had to be serializable to support
1240 cached instances.
1241
1242 this is no longer a general requirement, and the case where this object
1243 is used is exactly the case where we can't really serialize easily,
1244 which is when extra criteria in the loader option is present.
1245
1246 We can't reliably serialize that as it refers to mapped entities and
1247 AliasedClass objects that are local to the current process, which would
1248 need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
1249 approach.
1250
1251 """
1252
1253 def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
1254 self.key = key
1255 self.strategy_key = initiating_strategy.strategy_key
1256 self.loadopt = loadopt
1257 self.extra_criteria = extra_criteria
1258
1259 def __getstate__(self):
1260 if self.extra_criteria is not None:
1261 util.warn(
1262 "Can't reliably serialize a lazyload() option that "
1263 "contains additional criteria; please use eager loading "
1264 "for this case"
1265 )
1266 return {
1267 "key": self.key,
1268 "strategy_key": self.strategy_key,
1269 "loadopt": self.loadopt,
1270 "extra_criteria": (),
1271 }
1272
1273 def __call__(self, state, passive=attributes.PASSIVE_OFF):
1274 key = self.key
1275 instance_mapper = state.manager.mapper
1276 prop = instance_mapper._props[key]
1277 strategy = prop._strategies[self.strategy_key]
1278
1279 return strategy._load_for_state(
1280 state,
1281 passive,
1282 loadopt=self.loadopt,
1283 extra_criteria=self.extra_criteria,
1284 )
1285
1286
1287class PostLoader(AbstractRelationshipLoader):
1288 """A relationship loader that emits a second SELECT statement."""
1289
1290 __slots__ = ()
1291
1292 def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
1293 effective_path = (
1294 context.compile_state.current_path or orm_util.PathRegistry.root
1295 ) + path
1296
1297 top_level_context = context._get_top_level_context()
1298 execution_options = util.immutabledict(
1299 {"sa_top_level_orm_context": top_level_context}
1300 )
1301
1302 if loadopt:
1303 recursion_depth = loadopt.local_opts.get("recursion_depth", None)
1304 unlimited_recursion = recursion_depth == -1
1305 else:
1306 recursion_depth = None
1307 unlimited_recursion = False
1308
1309 if recursion_depth is not None:
1310 if not self.parent_property._is_self_referential:
1311 raise sa_exc.InvalidRequestError(
1312 f"recursion_depth option on relationship "
1313 f"{self.parent_property} not valid for "
1314 "non-self-referential relationship"
1315 )
1316 recursion_depth = context.execution_options.get(
1317 f"_recursion_depth_{id(self)}", recursion_depth
1318 )
1319
1320 if not unlimited_recursion and recursion_depth < 0:
1321 return (
1322 effective_path,
1323 False,
1324 execution_options,
1325 recursion_depth,
1326 )
1327
1328 if not unlimited_recursion:
1329 execution_options = execution_options.union(
1330 {
1331 f"_recursion_depth_{id(self)}": recursion_depth - 1,
1332 }
1333 )
1334
1335 if loading.PostLoad.path_exists(
1336 context, effective_path, self.parent_property
1337 ):
1338 return effective_path, False, execution_options, recursion_depth
1339
1340 path_w_prop = path[self.parent_property]
1341 effective_path_w_prop = effective_path[self.parent_property]
1342
1343 if not path_w_prop.contains(context.attributes, "loader"):
1344 if join_depth:
1345 if effective_path_w_prop.length / 2 > join_depth:
1346 return (
1347 effective_path,
1348 False,
1349 execution_options,
1350 recursion_depth,
1351 )
1352 elif effective_path_w_prop.contains_mapper(self.mapper):
1353 return (
1354 effective_path,
1355 False,
1356 execution_options,
1357 recursion_depth,
1358 )
1359
1360 return effective_path, True, execution_options, recursion_depth
1361
1362
1363@relationships.RelationshipProperty.strategy_for(lazy="immediate")
1364class ImmediateLoader(PostLoader):
1365 __slots__ = ("join_depth",)
1366
1367 def __init__(self, parent, strategy_key):
1368 super().__init__(parent, strategy_key)
1369 self.join_depth = self.parent_property.join_depth
1370
1371 def init_class_attribute(self, mapper):
1372 self.parent_property._get_strategy(
1373 (("lazy", "select"),)
1374 ).init_class_attribute(mapper)
1375
1376 def create_row_processor(
1377 self,
1378 context,
1379 query_entity,
1380 path,
1381 loadopt,
1382 mapper,
1383 result,
1384 adapter,
1385 populators,
1386 ):
1387 if not context.compile_state.compile_options._enable_eagerloads:
1388 return
1389
1390 (
1391 effective_path,
1392 run_loader,
1393 execution_options,
1394 recursion_depth,
1395 ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
1396
1397 if not run_loader:
1398 # this will not emit SQL and will only emit for a many-to-one
1399 # "use get" load. the "_RELATED" part means it may return
1400 # instance even if its expired, since this is a mutually-recursive
1401 # load operation.
1402 flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
1403 else:
1404 flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
1405
1406 loading.PostLoad.callable_for_path(
1407 context,
1408 effective_path,
1409 self.parent,
1410 self.parent_property,
1411 self._load_for_path,
1412 loadopt,
1413 flags,
1414 recursion_depth,
1415 execution_options,
1416 )
1417
1418 def _load_for_path(
1419 self,
1420 context,
1421 path,
1422 states,
1423 load_only,
1424 loadopt,
1425 flags,
1426 recursion_depth,
1427 execution_options,
1428 ):
1429 if recursion_depth:
1430 new_opt = Load(loadopt.path.entity)
1431 new_opt.context = (
1432 loadopt,
1433 loadopt._recurse(),
1434 )
1435 alternate_effective_path = path._truncate_recursive()
1436 extra_options = (new_opt,)
1437 else:
1438 alternate_effective_path = path
1439 extra_options = ()
1440
1441 key = self.key
1442 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
1443 for state, overwrite in states:
1444 dict_ = state.dict
1445
1446 if overwrite or key not in dict_:
1447 value = lazyloader._load_for_state(
1448 state,
1449 flags,
1450 extra_options=extra_options,
1451 alternate_effective_path=alternate_effective_path,
1452 execution_options=execution_options,
1453 )
1454 if value not in (
1455 ATTR_WAS_SET,
1456 LoaderCallableStatus.PASSIVE_NO_RESULT,
1457 ):
1458 state.get_impl(key).set_committed_value(
1459 state, dict_, value
1460 )
1461
1462
1463@log.class_logger
1464@relationships.RelationshipProperty.strategy_for(lazy="subquery")
1465class SubqueryLoader(PostLoader):
1466 __slots__ = ("join_depth",)
1467
1468 def __init__(self, parent, strategy_key):
1469 super().__init__(parent, strategy_key)
1470 self.join_depth = self.parent_property.join_depth
1471
1472 def init_class_attribute(self, mapper):
1473 self.parent_property._get_strategy(
1474 (("lazy", "select"),)
1475 ).init_class_attribute(mapper)
1476
1477 def _get_leftmost(
1478 self,
1479 orig_query_entity_index,
1480 subq_path,
1481 current_compile_state,
1482 is_root,
1483 ):
1484 given_subq_path = subq_path
1485 subq_path = subq_path.path
1486 subq_mapper = orm_util._class_to_mapper(subq_path[0])
1487
1488 # determine attributes of the leftmost mapper
1489 if (
1490 self.parent.isa(subq_mapper)
1491 and self.parent_property is subq_path[1]
1492 ):
1493 leftmost_mapper, leftmost_prop = self.parent, self.parent_property
1494 else:
1495 leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
1496
1497 if is_root:
1498 # the subq_path is also coming from cached state, so when we start
1499 # building up this path, it has to also be converted to be in terms
1500 # of the current state. this is for the specific case of the entity
1501 # is an AliasedClass against a subquery that's not otherwise going
1502 # to adapt
1503 new_subq_path = current_compile_state._entities[
1504 orig_query_entity_index
1505 ].entity_zero._path_registry[leftmost_prop]
1506 additional = len(subq_path) - len(new_subq_path)
1507 if additional:
1508 new_subq_path += path_registry.PathRegistry.coerce(
1509 subq_path[-additional:]
1510 )
1511 else:
1512 new_subq_path = given_subq_path
1513
1514 leftmost_cols = leftmost_prop.local_columns
1515
1516 leftmost_attr = [
1517 getattr(
1518 new_subq_path.path[0].entity,
1519 leftmost_mapper._columntoproperty[c].key,
1520 )
1521 for c in leftmost_cols
1522 ]
1523
1524 return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
1525
1526 def _generate_from_original_query(
1527 self,
1528 orig_compile_state,
1529 orig_query,
1530 leftmost_mapper,
1531 leftmost_attr,
1532 leftmost_relationship,
1533 orig_entity,
1534 ):
1535 # reformat the original query
1536 # to look only for significant columns
1537 q = orig_query._clone().correlate(None)
1538
1539 # LEGACY: make a Query back from the select() !!
1540 # This suits at least two legacy cases:
1541 # 1. applications which expect before_compile() to be called
1542 # below when we run .subquery() on this query (Keystone)
1543 # 2. applications which are doing subqueryload with complex
1544 # from_self() queries, as query.subquery() / .statement
1545 # has to do the full compile context for multiply-nested
1546 # from_self() (Neutron) - see test_subqload_from_self
1547 # for demo.
1548 q2 = query.Query.__new__(query.Query)
1549 q2.__dict__.update(q.__dict__)
1550 q = q2
1551
1552 # set the query's "FROM" list explicitly to what the
1553 # FROM list would be in any case, as we will be limiting
1554 # the columns in the SELECT list which may no longer include
1555 # all entities mentioned in things like WHERE, JOIN, etc.
1556 if not q._from_obj:
1557 q._enable_assertions = False
1558 q.select_from.non_generative(
1559 q,
1560 *{
1561 ent["entity"]
1562 for ent in _column_descriptions(
1563 orig_query, compile_state=orig_compile_state
1564 )
1565 if ent["entity"] is not None
1566 },
1567 )
1568
1569 # select from the identity columns of the outer (specifically, these
1570 # are the 'local_cols' of the property). This will remove other
1571 # columns from the query that might suggest the right entity which is
1572 # why we do set select_from above. The attributes we have are
1573 # coerced and adapted using the original query's adapter, which is
1574 # needed only for the case of adapting a subclass column to
1575 # that of a polymorphic selectable, e.g. we have
1576 # Engineer.primary_language and the entity is Person. All other
1577 # adaptations, e.g. from_self, select_entity_from(), will occur
1578 # within the new query when it compiles, as the compile_state we are
1579 # using here is only a partial one. If the subqueryload is from a
1580 # with_polymorphic() or other aliased() object, left_attr will already
1581 # be the correct attributes so no adaptation is needed.
1582 target_cols = orig_compile_state._adapt_col_list(
1583 [
1584 sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
1585 for o in leftmost_attr
1586 ],
1587 orig_compile_state._get_current_adapter(),
1588 )
1589 q._raw_columns = target_cols
1590
1591 distinct_target_key = leftmost_relationship.distinct_target_key
1592
1593 if distinct_target_key is True:
1594 q._distinct = True
1595 elif distinct_target_key is None:
1596 # if target_cols refer to a non-primary key or only
1597 # part of a composite primary key, set the q as distinct
1598 for t in {c.table for c in target_cols}:
1599 if not set(target_cols).issuperset(t.primary_key):
1600 q._distinct = True
1601 break
1602
1603 # don't need ORDER BY if no limit/offset
1604 if not q._has_row_limiting_clause:
1605 q._order_by_clauses = ()
1606
1607 if q._distinct is True and q._order_by_clauses:
1608 # the logic to automatically add the order by columns to the query
1609 # when distinct is True is deprecated in the query
1610 to_add = sql_util.expand_column_list_from_order_by(
1611 target_cols, q._order_by_clauses
1612 )
1613 if to_add:
1614 q._set_entities(target_cols + to_add)
1615
1616 # the original query now becomes a subquery
1617 # which we'll join onto.
1618 # LEGACY: as "q" is a Query, the before_compile() event is invoked
1619 # here.
1620 embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
1621 left_alias = orm_util.AliasedClass(
1622 leftmost_mapper, embed_q, use_mapper_path=True
1623 )
1624 return left_alias
1625
1626 def _prep_for_joins(self, left_alias, subq_path):
1627 # figure out what's being joined. a.k.a. the fun part
1628 to_join = []
1629 pairs = list(subq_path.pairs())
1630
1631 for i, (mapper, prop) in enumerate(pairs):
1632 if i > 0:
1633 # look at the previous mapper in the chain -
1634 # if it is as or more specific than this prop's
1635 # mapper, use that instead.
1636 # note we have an assumption here that
1637 # the non-first element is always going to be a mapper,
1638 # not an AliasedClass
1639
1640 prev_mapper = pairs[i - 1][1].mapper
1641 to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
1642 else:
1643 to_append = mapper
1644
1645 to_join.append((to_append, prop.key))
1646
1647 # determine the immediate parent class we are joining from,
1648 # which needs to be aliased.
1649
1650 if len(to_join) < 2:
1651 # in the case of a one level eager load, this is the
1652 # leftmost "left_alias".
1653 parent_alias = left_alias
1654 else:
1655 info = inspect(to_join[-1][0])
1656 if info.is_aliased_class:
1657 parent_alias = info.entity
1658 else:
1659 # alias a plain mapper as we may be
1660 # joining multiple times
1661 parent_alias = orm_util.AliasedClass(
1662 info.entity, use_mapper_path=True
1663 )
1664
1665 local_cols = self.parent_property.local_columns
1666
1667 local_attr = [
1668 getattr(parent_alias, self.parent._columntoproperty[c].key)
1669 for c in local_cols
1670 ]
1671 return to_join, local_attr, parent_alias
1672
1673 def _apply_joins(
1674 self, q, to_join, left_alias, parent_alias, effective_entity
1675 ):
1676 ltj = len(to_join)
1677 if ltj == 1:
1678 to_join = [
1679 getattr(left_alias, to_join[0][1]).of_type(effective_entity)
1680 ]
1681 elif ltj == 2:
1682 to_join = [
1683 getattr(left_alias, to_join[0][1]).of_type(parent_alias),
1684 getattr(parent_alias, to_join[-1][1]).of_type(
1685 effective_entity
1686 ),
1687 ]
1688 elif ltj > 2:
1689 middle = [
1690 (
1691 (
1692 orm_util.AliasedClass(item[0])
1693 if not inspect(item[0]).is_aliased_class
1694 else item[0].entity
1695 ),
1696 item[1],
1697 )
1698 for item in to_join[1:-1]
1699 ]
1700 inner = []
1701
1702 while middle:
1703 item = middle.pop(0)
1704 attr = getattr(item[0], item[1])
1705 if middle:
1706 attr = attr.of_type(middle[0][0])
1707 else:
1708 attr = attr.of_type(parent_alias)
1709
1710 inner.append(attr)
1711
1712 to_join = (
1713 [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
1714 + inner
1715 + [
1716 getattr(parent_alias, to_join[-1][1]).of_type(
1717 effective_entity
1718 )
1719 ]
1720 )
1721
1722 for attr in to_join:
1723 q = q.join(attr)
1724
1725 return q
1726
1727 def _setup_options(
1728 self,
1729 context,
1730 q,
1731 subq_path,
1732 rewritten_path,
1733 orig_query,
1734 effective_entity,
1735 loadopt,
1736 ):
1737 # note that because the subqueryload object
1738 # does not re-use the cached query, instead always making
1739 # use of the current invoked query, while we have two queries
1740 # here (orig and context.query), they are both non-cached
1741 # queries and we can transfer the options as is without
1742 # adjusting for new criteria. Some work on #6881 / #6889
1743 # brought this into question.
1744 new_options = orig_query._with_options
1745
1746 if loadopt and loadopt._extra_criteria:
1747 new_options += (
1748 orm_util.LoaderCriteriaOption(
1749 self.entity,
1750 loadopt._generate_extra_criteria(context),
1751 ),
1752 )
1753
1754 # propagate loader options etc. to the new query.
1755 # these will fire relative to subq_path.
1756 q = q._with_current_path(rewritten_path)
1757 q = q.options(*new_options)
1758
1759 return q
1760
1761 def _setup_outermost_orderby(self, q):
1762 if self.parent_property.order_by:
1763
1764 def _setup_outermost_orderby(compile_context):
1765 compile_context.eager_order_by += tuple(
1766 util.to_list(self.parent_property.order_by)
1767 )
1768
1769 q = q._add_context_option(
1770 _setup_outermost_orderby, self.parent_property
1771 )
1772
1773 return q
1774
1775 class _SubqCollections:
1776 """Given a :class:`_query.Query` used to emit the "subquery load",
1777 provide a load interface that executes the query at the
1778 first moment a value is needed.
1779
1780 """
1781
1782 __slots__ = (
1783 "session",
1784 "execution_options",
1785 "load_options",
1786 "params",
1787 "subq",
1788 "_data",
1789 )
1790
1791 def __init__(self, context, subq):
1792 # avoid creating a cycle by storing context
1793 # even though that's preferable
1794 self.session = context.session
1795 self.execution_options = context.execution_options
1796 self.load_options = context.load_options
1797 self.params = context.params or {}
1798 self.subq = subq
1799 self._data = None
1800
1801 def get(self, key, default):
1802 if self._data is None:
1803 self._load()
1804 return self._data.get(key, default)
1805
1806 def _load(self):
1807 self._data = collections.defaultdict(list)
1808
1809 q = self.subq
1810 assert q.session is None
1811
1812 q = q.with_session(self.session)
1813
1814 if self.load_options._populate_existing:
1815 q = q.populate_existing()
1816 # to work with baked query, the parameters may have been
1817 # updated since this query was created, so take these into account
1818
1819 rows = list(q.params(self.params))
1820 for k, v in itertools.groupby(rows, lambda x: x[1:]):
1821 self._data[k].extend(vv[0] for vv in v)
1822
1823 def loader(self, state, dict_, row):
1824 if self._data is None:
1825 self._load()
1826
1827 def _setup_query_from_rowproc(
1828 self,
1829 context,
1830 query_entity,
1831 path,
1832 entity,
1833 loadopt,
1834 adapter,
1835 ):
1836 compile_state = context.compile_state
1837 if (
1838 not compile_state.compile_options._enable_eagerloads
1839 or compile_state.compile_options._for_refresh_state
1840 ):
1841 return
1842
1843 orig_query_entity_index = compile_state._entities.index(query_entity)
1844 context.loaders_require_buffering = True
1845
1846 path = path[self.parent_property]
1847
1848 # build up a path indicating the path from the leftmost
1849 # entity to the thing we're subquery loading.
1850 with_poly_entity = path.get(
1851 compile_state.attributes, "path_with_polymorphic", None
1852 )
1853 if with_poly_entity is not None:
1854 effective_entity = with_poly_entity
1855 else:
1856 effective_entity = self.entity
1857
1858 subq_path, rewritten_path = context.query._execution_options.get(
1859 ("subquery_paths", None),
1860 (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
1861 )
1862 is_root = subq_path is orm_util.PathRegistry.root
1863 subq_path = subq_path + path
1864 rewritten_path = rewritten_path + path
1865
1866 # use the current query being invoked, not the compile state
1867 # one. this is so that we get the current parameters. however,
1868 # it means we can't use the existing compile state, we have to make
1869 # a new one. other approaches include possibly using the
1870 # compiled query but swapping the params, seems only marginally
1871 # less time spent but more complicated
1872 orig_query = context.query._execution_options.get(
1873 ("orig_query", SubqueryLoader), context.query
1874 )
1875
1876 # make a new compile_state for the query that's probably cached, but
1877 # we're sort of undoing a bit of that caching :(
1878 compile_state_cls = ORMCompileState._get_plugin_class_for_plugin(
1879 orig_query, "orm"
1880 )
1881
1882 if orig_query._is_lambda_element:
1883 if context.load_options._lazy_loaded_from is None:
1884 util.warn(
1885 'subqueryloader for "%s" must invoke lambda callable '
1886 "at %r in "
1887 "order to produce a new query, decreasing the efficiency "
1888 "of caching for this statement. Consider using "
1889 "selectinload() for more effective full-lambda caching"
1890 % (self, orig_query)
1891 )
1892 orig_query = orig_query._resolved
1893
1894 # this is the more "quick" version, however it's not clear how
1895 # much of this we need. in particular I can't get a test to
1896 # fail if the "set_base_alias" is missing and not sure why that is.
1897 orig_compile_state = compile_state_cls._create_entities_collection(
1898 orig_query, legacy=False
1899 )
1900
1901 (
1902 leftmost_mapper,
1903 leftmost_attr,
1904 leftmost_relationship,
1905 rewritten_path,
1906 ) = self._get_leftmost(
1907 orig_query_entity_index,
1908 rewritten_path,
1909 orig_compile_state,
1910 is_root,
1911 )
1912
1913 # generate a new Query from the original, then
1914 # produce a subquery from it.
1915 left_alias = self._generate_from_original_query(
1916 orig_compile_state,
1917 orig_query,
1918 leftmost_mapper,
1919 leftmost_attr,
1920 leftmost_relationship,
1921 entity,
1922 )
1923
1924 # generate another Query that will join the
1925 # left alias to the target relationships.
1926 # basically doing a longhand
1927 # "from_self()". (from_self() itself not quite industrial
1928 # strength enough for all contingencies...but very close)
1929
1930 q = query.Query(effective_entity)
1931
1932 q._execution_options = context.query._execution_options.merge_with(
1933 context.execution_options,
1934 {
1935 ("orig_query", SubqueryLoader): orig_query,
1936 ("subquery_paths", None): (subq_path, rewritten_path),
1937 },
1938 )
1939
1940 q = q._set_enable_single_crit(False)
1941 to_join, local_attr, parent_alias = self._prep_for_joins(
1942 left_alias, subq_path
1943 )
1944
1945 q = q.add_columns(*local_attr)
1946 q = self._apply_joins(
1947 q, to_join, left_alias, parent_alias, effective_entity
1948 )
1949
1950 q = self._setup_options(
1951 context,
1952 q,
1953 subq_path,
1954 rewritten_path,
1955 orig_query,
1956 effective_entity,
1957 loadopt,
1958 )
1959 q = self._setup_outermost_orderby(q)
1960
1961 return q
1962
1963 def create_row_processor(
1964 self,
1965 context,
1966 query_entity,
1967 path,
1968 loadopt,
1969 mapper,
1970 result,
1971 adapter,
1972 populators,
1973 ):
1974 if (
1975 loadopt
1976 and context.compile_state.statement is not None
1977 and context.compile_state.statement.is_dml
1978 ):
1979 util.warn_deprecated(
1980 "The subqueryload loader option is not compatible with DML "
1981 "statements such as INSERT, UPDATE. Only SELECT may be used."
1982 "This warning will become an exception in a future release.",
1983 "2.0",
1984 )
1985
1986 if context.refresh_state:
1987 return self._immediateload_create_row_processor(
1988 context,
1989 query_entity,
1990 path,
1991 loadopt,
1992 mapper,
1993 result,
1994 adapter,
1995 populators,
1996 )
1997
1998 _, run_loader, _, _ = self._setup_for_recursion(
1999 context, path, loadopt, self.join_depth
2000 )
2001 if not run_loader:
2002 return
2003
2004 if not isinstance(context.compile_state, ORMSelectCompileState):
2005 # issue 7505 - subqueryload() in 1.3 and previous would silently
2006 # degrade for from_statement() without warning. this behavior
2007 # is restored here
2008 return
2009
2010 if not self.parent.class_manager[self.key].impl.supports_population:
2011 raise sa_exc.InvalidRequestError(
2012 "'%s' does not support object "
2013 "population - eager loading cannot be applied." % self
2014 )
2015
2016 # a little dance here as the "path" is still something that only
2017 # semi-tracks the exact series of things we are loading, still not
2018 # telling us about with_polymorphic() and stuff like that when it's at
2019 # the root.. the initial MapperEntity is more accurate for this case.
2020 if len(path) == 1:
2021 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
2022 return
2023 elif not orm_util._entity_isa(path[-1], self.parent):
2024 return
2025
2026 subq = self._setup_query_from_rowproc(
2027 context,
2028 query_entity,
2029 path,
2030 path[-1],
2031 loadopt,
2032 adapter,
2033 )
2034
2035 if subq is None:
2036 return
2037
2038 assert subq.session is None
2039
2040 path = path[self.parent_property]
2041
2042 local_cols = self.parent_property.local_columns
2043
2044 # cache the loaded collections in the context
2045 # so that inheriting mappers don't re-load when they
2046 # call upon create_row_processor again
2047 collections = path.get(context.attributes, "collections")
2048 if collections is None:
2049 collections = self._SubqCollections(context, subq)
2050 path.set(context.attributes, "collections", collections)
2051
2052 if adapter:
2053 local_cols = [adapter.columns[c] for c in local_cols]
2054
2055 if self.uselist:
2056 self._create_collection_loader(
2057 context, result, collections, local_cols, populators
2058 )
2059 else:
2060 self._create_scalar_loader(
2061 context, result, collections, local_cols, populators
2062 )
2063
2064 def _create_collection_loader(
2065 self, context, result, collections, local_cols, populators
2066 ):
2067 tuple_getter = result._tuple_getter(local_cols)
2068
2069 def load_collection_from_subq(state, dict_, row):
2070 collection = collections.get(tuple_getter(row), ())
2071 state.get_impl(self.key).set_committed_value(
2072 state, dict_, collection
2073 )
2074
2075 def load_collection_from_subq_existing_row(state, dict_, row):
2076 if self.key not in dict_:
2077 load_collection_from_subq(state, dict_, row)
2078
2079 populators["new"].append((self.key, load_collection_from_subq))
2080 populators["existing"].append(
2081 (self.key, load_collection_from_subq_existing_row)
2082 )
2083
2084 if context.invoke_all_eagers:
2085 populators["eager"].append((self.key, collections.loader))
2086
2087 def _create_scalar_loader(
2088 self, context, result, collections, local_cols, populators
2089 ):
2090 tuple_getter = result._tuple_getter(local_cols)
2091
2092 def load_scalar_from_subq(state, dict_, row):
2093 collection = collections.get(tuple_getter(row), (None,))
2094 if len(collection) > 1:
2095 util.warn(
2096 "Multiple rows returned with "
2097 "uselist=False for eagerly-loaded attribute '%s' " % self
2098 )
2099
2100 scalar = collection[0]
2101 state.get_impl(self.key).set_committed_value(state, dict_, scalar)
2102
2103 def load_scalar_from_subq_existing_row(state, dict_, row):
2104 if self.key not in dict_:
2105 load_scalar_from_subq(state, dict_, row)
2106
2107 populators["new"].append((self.key, load_scalar_from_subq))
2108 populators["existing"].append(
2109 (self.key, load_scalar_from_subq_existing_row)
2110 )
2111 if context.invoke_all_eagers:
2112 populators["eager"].append((self.key, collections.loader))
2113
2114
2115@log.class_logger
2116@relationships.RelationshipProperty.strategy_for(lazy="joined")
2117@relationships.RelationshipProperty.strategy_for(lazy=False)
2118class JoinedLoader(AbstractRelationshipLoader):
2119 """Provide loading behavior for a :class:`.Relationship`
2120 using joined eager loading.
2121
2122 """
2123
2124 __slots__ = "join_depth"
2125
2126 def __init__(self, parent, strategy_key):
2127 super().__init__(parent, strategy_key)
2128 self.join_depth = self.parent_property.join_depth
2129
2130 def init_class_attribute(self, mapper):
2131 self.parent_property._get_strategy(
2132 (("lazy", "select"),)
2133 ).init_class_attribute(mapper)
2134
2135 def setup_query(
2136 self,
2137 compile_state,
2138 query_entity,
2139 path,
2140 loadopt,
2141 adapter,
2142 column_collection=None,
2143 parentmapper=None,
2144 chained_from_outerjoin=False,
2145 **kwargs,
2146 ):
2147 """Add a left outer join to the statement that's being constructed."""
2148
2149 if not compile_state.compile_options._enable_eagerloads:
2150 return
2151 elif (
2152 loadopt
2153 and compile_state.statement is not None
2154 and compile_state.statement.is_dml
2155 ):
2156 util.warn_deprecated(
2157 "The joinedload loader option is not compatible with DML "
2158 "statements such as INSERT, UPDATE. Only SELECT may be used."
2159 "This warning will become an exception in a future release.",
2160 "2.0",
2161 )
2162 elif self.uselist:
2163 compile_state.multi_row_eager_loaders = True
2164
2165 path = path[self.parent_property]
2166
2167 user_defined_adapter = (
2168 self._init_user_defined_eager_proc(
2169 loadopt, compile_state, compile_state.attributes
2170 )
2171 if loadopt
2172 else False
2173 )
2174
2175 if user_defined_adapter is not False:
2176 # setup an adapter but dont create any JOIN, assume it's already
2177 # in the query
2178 (
2179 clauses,
2180 adapter,
2181 add_to_collection,
2182 ) = self._setup_query_on_user_defined_adapter(
2183 compile_state,
2184 query_entity,
2185 path,
2186 adapter,
2187 user_defined_adapter,
2188 )
2189
2190 # don't do "wrap" for multi-row, we want to wrap
2191 # limited/distinct SELECT,
2192 # because we want to put the JOIN on the outside.
2193
2194 else:
2195 # if not via query option, check for
2196 # a cycle
2197 if not path.contains(compile_state.attributes, "loader"):
2198 if self.join_depth:
2199 if path.length / 2 > self.join_depth:
2200 return
2201 elif path.contains_mapper(self.mapper):
2202 return
2203
2204 # add the JOIN and create an adapter
2205 (
2206 clauses,
2207 adapter,
2208 add_to_collection,
2209 chained_from_outerjoin,
2210 ) = self._generate_row_adapter(
2211 compile_state,
2212 query_entity,
2213 path,
2214 loadopt,
2215 adapter,
2216 column_collection,
2217 parentmapper,
2218 chained_from_outerjoin,
2219 )
2220
2221 # for multi-row, we want to wrap limited/distinct SELECT,
2222 # because we want to put the JOIN on the outside.
2223 compile_state.eager_adding_joins = True
2224
2225 with_poly_entity = path.get(
2226 compile_state.attributes, "path_with_polymorphic", None
2227 )
2228 if with_poly_entity is not None:
2229 with_polymorphic = inspect(
2230 with_poly_entity
2231 ).with_polymorphic_mappers
2232 else:
2233 with_polymorphic = None
2234
2235 path = path[self.entity]
2236
2237 loading._setup_entity_query(
2238 compile_state,
2239 self.mapper,
2240 query_entity,
2241 path,
2242 clauses,
2243 add_to_collection,
2244 with_polymorphic=with_polymorphic,
2245 parentmapper=self.mapper,
2246 chained_from_outerjoin=chained_from_outerjoin,
2247 )
2248
2249 has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
2250
2251 if has_nones:
2252 if with_poly_entity is not None:
2253 raise sa_exc.InvalidRequestError(
2254 "Detected unaliased columns when generating joined "
2255 "load. Make sure to use aliased=True or flat=True "
2256 "when using joined loading with with_polymorphic()."
2257 )
2258 else:
2259 compile_state.secondary_columns = [
2260 c for c in compile_state.secondary_columns if c is not None
2261 ]
2262
2263 def _init_user_defined_eager_proc(
2264 self, loadopt, compile_state, target_attributes
2265 ):
2266 # check if the opt applies at all
2267 if "eager_from_alias" not in loadopt.local_opts:
2268 # nope
2269 return False
2270
2271 path = loadopt.path.parent
2272
2273 # the option applies. check if the "user_defined_eager_row_processor"
2274 # has been built up.
2275 adapter = path.get(
2276 compile_state.attributes, "user_defined_eager_row_processor", False
2277 )
2278 if adapter is not False:
2279 # just return it
2280 return adapter
2281
2282 # otherwise figure it out.
2283 alias = loadopt.local_opts["eager_from_alias"]
2284 root_mapper, prop = path[-2:]
2285
2286 if alias is not None:
2287 if isinstance(alias, str):
2288 alias = prop.target.alias(alias)
2289 adapter = orm_util.ORMAdapter(
2290 orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
2291 prop.mapper,
2292 selectable=alias,
2293 equivalents=prop.mapper._equivalent_columns,
2294 limit_on_entity=False,
2295 )
2296 else:
2297 if path.contains(
2298 compile_state.attributes, "path_with_polymorphic"
2299 ):
2300 with_poly_entity = path.get(
2301 compile_state.attributes, "path_with_polymorphic"
2302 )
2303 adapter = orm_util.ORMAdapter(
2304 orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
2305 with_poly_entity,
2306 equivalents=prop.mapper._equivalent_columns,
2307 )
2308 else:
2309 adapter = compile_state._polymorphic_adapters.get(
2310 prop.mapper, None
2311 )
2312 path.set(
2313 target_attributes,
2314 "user_defined_eager_row_processor",
2315 adapter,
2316 )
2317
2318 return adapter
2319
2320 def _setup_query_on_user_defined_adapter(
2321 self, context, entity, path, adapter, user_defined_adapter
2322 ):
2323 # apply some more wrapping to the "user defined adapter"
2324 # if we are setting up the query for SQL render.
2325 adapter = entity._get_entity_clauses(context)
2326
2327 if adapter and user_defined_adapter:
2328 user_defined_adapter = user_defined_adapter.wrap(adapter)
2329 path.set(
2330 context.attributes,
2331 "user_defined_eager_row_processor",
2332 user_defined_adapter,
2333 )
2334 elif adapter:
2335 user_defined_adapter = adapter
2336 path.set(
2337 context.attributes,
2338 "user_defined_eager_row_processor",
2339 user_defined_adapter,
2340 )
2341
2342 add_to_collection = context.primary_columns
2343 return user_defined_adapter, adapter, add_to_collection
2344
2345 def _generate_row_adapter(
2346 self,
2347 compile_state,
2348 entity,
2349 path,
2350 loadopt,
2351 adapter,
2352 column_collection,
2353 parentmapper,
2354 chained_from_outerjoin,
2355 ):
2356 with_poly_entity = path.get(
2357 compile_state.attributes, "path_with_polymorphic", None
2358 )
2359 if with_poly_entity:
2360 to_adapt = with_poly_entity
2361 else:
2362 insp = inspect(self.entity)
2363 if insp.is_aliased_class:
2364 alt_selectable = insp.selectable
2365 else:
2366 alt_selectable = None
2367
2368 to_adapt = orm_util.AliasedClass(
2369 self.mapper,
2370 alias=(
2371 alt_selectable._anonymous_fromclause(flat=True)
2372 if alt_selectable is not None
2373 else None
2374 ),
2375 flat=True,
2376 use_mapper_path=True,
2377 )
2378
2379 to_adapt_insp = inspect(to_adapt)
2380
2381 clauses = to_adapt_insp._memo(
2382 ("joinedloader_ormadapter", self),
2383 orm_util.ORMAdapter,
2384 orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
2385 to_adapt_insp,
2386 equivalents=self.mapper._equivalent_columns,
2387 adapt_required=True,
2388 allow_label_resolve=False,
2389 anonymize_labels=True,
2390 )
2391
2392 assert clauses.is_aliased_class
2393
2394 innerjoin = (
2395 loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
2396 if loadopt is not None
2397 else self.parent_property.innerjoin
2398 )
2399
2400 if not innerjoin:
2401 # if this is an outer join, all non-nested eager joins from
2402 # this path must also be outer joins
2403 chained_from_outerjoin = True
2404
2405 compile_state.create_eager_joins.append(
2406 (
2407 self._create_eager_join,
2408 entity,
2409 path,
2410 adapter,
2411 parentmapper,
2412 clauses,
2413 innerjoin,
2414 chained_from_outerjoin,
2415 loadopt._extra_criteria if loadopt else (),
2416 )
2417 )
2418
2419 add_to_collection = compile_state.secondary_columns
2420 path.set(compile_state.attributes, "eager_row_processor", clauses)
2421
2422 return clauses, adapter, add_to_collection, chained_from_outerjoin
2423
2424 def _create_eager_join(
2425 self,
2426 compile_state,
2427 query_entity,
2428 path,
2429 adapter,
2430 parentmapper,
2431 clauses,
2432 innerjoin,
2433 chained_from_outerjoin,
2434 extra_criteria,
2435 ):
2436 if parentmapper is None:
2437 localparent = query_entity.mapper
2438 else:
2439 localparent = parentmapper
2440
2441 # whether or not the Query will wrap the selectable in a subquery,
2442 # and then attach eager load joins to that (i.e., in the case of
2443 # LIMIT/OFFSET etc.)
2444 should_nest_selectable = (
2445 compile_state.multi_row_eager_loaders
2446 and compile_state._should_nest_selectable
2447 )
2448
2449 query_entity_key = None
2450
2451 if (
2452 query_entity not in compile_state.eager_joins
2453 and not should_nest_selectable
2454 and compile_state.from_clauses
2455 ):
2456 indexes = sql_util.find_left_clause_that_matches_given(
2457 compile_state.from_clauses, query_entity.selectable
2458 )
2459
2460 if len(indexes) > 1:
2461 # for the eager load case, I can't reproduce this right
2462 # now. For query.join() I can.
2463 raise sa_exc.InvalidRequestError(
2464 "Can't identify which query entity in which to joined "
2465 "eager load from. Please use an exact match when "
2466 "specifying the join path."
2467 )
2468
2469 if indexes:
2470 clause = compile_state.from_clauses[indexes[0]]
2471 # join to an existing FROM clause on the query.
2472 # key it to its list index in the eager_joins dict.
2473 # Query._compile_context will adapt as needed and
2474 # append to the FROM clause of the select().
2475 query_entity_key, default_towrap = indexes[0], clause
2476
2477 if query_entity_key is None:
2478 query_entity_key, default_towrap = (
2479 query_entity,
2480 query_entity.selectable,
2481 )
2482
2483 towrap = compile_state.eager_joins.setdefault(
2484 query_entity_key, default_towrap
2485 )
2486
2487 if adapter:
2488 if getattr(adapter, "is_aliased_class", False):
2489 # joining from an adapted entity. The adapted entity
2490 # might be a "with_polymorphic", so resolve that to our
2491 # specific mapper's entity before looking for our attribute
2492 # name on it.
2493 efm = adapter.aliased_insp._entity_for_mapper(
2494 localparent
2495 if localparent.isa(self.parent)
2496 else self.parent
2497 )
2498
2499 # look for our attribute on the adapted entity, else fall back
2500 # to our straight property
2501 onclause = getattr(efm.entity, self.key, self.parent_property)
2502 else:
2503 onclause = getattr(
2504 orm_util.AliasedClass(
2505 self.parent, adapter.selectable, use_mapper_path=True
2506 ),
2507 self.key,
2508 self.parent_property,
2509 )
2510
2511 else:
2512 onclause = self.parent_property
2513
2514 assert clauses.is_aliased_class
2515
2516 attach_on_outside = (
2517 not chained_from_outerjoin
2518 or not innerjoin
2519 or innerjoin == "unnested"
2520 or query_entity.entity_zero.represents_outer_join
2521 )
2522
2523 extra_join_criteria = extra_criteria
2524 additional_entity_criteria = compile_state.global_attributes.get(
2525 ("additional_entity_criteria", self.mapper), ()
2526 )
2527 if additional_entity_criteria:
2528 extra_join_criteria += tuple(
2529 ae._resolve_where_criteria(self.mapper)
2530 for ae in additional_entity_criteria
2531 if ae.propagate_to_loaders
2532 )
2533
2534 if attach_on_outside:
2535 # this is the "classic" eager join case.
2536 eagerjoin = orm_util._ORMJoin(
2537 towrap,
2538 clauses.aliased_insp,
2539 onclause,
2540 isouter=not innerjoin
2541 or query_entity.entity_zero.represents_outer_join
2542 or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
2543 _left_memo=self.parent,
2544 _right_memo=path[self.mapper],
2545 _extra_criteria=extra_join_criteria,
2546 )
2547 else:
2548 # all other cases are innerjoin=='nested' approach
2549 eagerjoin = self._splice_nested_inner_join(
2550 path, path[-2], towrap, clauses, onclause, extra_join_criteria
2551 )
2552
2553 compile_state.eager_joins[query_entity_key] = eagerjoin
2554
2555 # send a hint to the Query as to where it may "splice" this join
2556 eagerjoin.stop_on = query_entity.selectable
2557
2558 if not parentmapper:
2559 # for parentclause that is the non-eager end of the join,
2560 # ensure all the parent cols in the primaryjoin are actually
2561 # in the
2562 # columns clause (i.e. are not deferred), so that aliasing applied
2563 # by the Query propagates those columns outward.
2564 # This has the effect
2565 # of "undefering" those columns.
2566 for col in sql_util._find_columns(
2567 self.parent_property.primaryjoin
2568 ):
2569 if localparent.persist_selectable.c.contains_column(col):
2570 if adapter:
2571 col = adapter.columns[col]
2572 compile_state._append_dedupe_col_collection(
2573 col, compile_state.primary_columns
2574 )
2575
2576 if self.parent_property.order_by:
2577 compile_state.eager_order_by += tuple(
2578 (eagerjoin._target_adapter.copy_and_process)(
2579 util.to_list(self.parent_property.order_by)
2580 )
2581 )
2582
2583 def _splice_nested_inner_join(
2584 self,
2585 path,
2586 entity_we_want_to_splice_onto,
2587 join_obj,
2588 clauses,
2589 onclause,
2590 extra_criteria,
2591 entity_inside_join_structure: Union[
2592 Mapper, None, Literal[False]
2593 ] = False,
2594 detected_existing_path: Optional[path_registry.PathRegistry] = None,
2595 ):
2596 # recursive fn to splice a nested join into an existing one.
2597 # entity_inside_join_structure=False means this is the outermost call,
2598 # and it should return a value. entity_inside_join_structure=<mapper>
2599 # indicates we've descended into a join and are looking at a FROM
2600 # clause representing this mapper; if this is not
2601 # entity_we_want_to_splice_onto then return None to end the recursive
2602 # branch
2603
2604 assert entity_we_want_to_splice_onto is path[-2]
2605
2606 if entity_inside_join_structure is False:
2607 assert isinstance(join_obj, orm_util._ORMJoin)
2608
2609 if isinstance(join_obj, sql.selectable.FromGrouping):
2610 # FromGrouping - continue descending into the structure
2611 return self._splice_nested_inner_join(
2612 path,
2613 entity_we_want_to_splice_onto,
2614 join_obj.element,
2615 clauses,
2616 onclause,
2617 extra_criteria,
2618 entity_inside_join_structure,
2619 )
2620 elif isinstance(join_obj, orm_util._ORMJoin):
2621 # _ORMJoin - continue descending into the structure
2622
2623 join_right_path = join_obj._right_memo
2624
2625 # see if right side of join is viable
2626 target_join = self._splice_nested_inner_join(
2627 path,
2628 entity_we_want_to_splice_onto,
2629 join_obj.right,
2630 clauses,
2631 onclause,
2632 extra_criteria,
2633 entity_inside_join_structure=(
2634 join_right_path[-1].mapper
2635 if join_right_path is not None
2636 else None
2637 ),
2638 )
2639
2640 if target_join is not None:
2641 # for a right splice, attempt to flatten out
2642 # a JOIN b JOIN c JOIN .. to avoid needless
2643 # parenthesis nesting
2644 if not join_obj.isouter and not target_join.isouter:
2645 eagerjoin = join_obj._splice_into_center(target_join)
2646 else:
2647 eagerjoin = orm_util._ORMJoin(
2648 join_obj.left,
2649 target_join,
2650 join_obj.onclause,
2651 isouter=join_obj.isouter,
2652 _left_memo=join_obj._left_memo,
2653 )
2654
2655 eagerjoin._target_adapter = target_join._target_adapter
2656 return eagerjoin
2657
2658 else:
2659 # see if left side of join is viable
2660 target_join = self._splice_nested_inner_join(
2661 path,
2662 entity_we_want_to_splice_onto,
2663 join_obj.left,
2664 clauses,
2665 onclause,
2666 extra_criteria,
2667 entity_inside_join_structure=join_obj._left_memo,
2668 detected_existing_path=join_right_path,
2669 )
2670
2671 if target_join is not None:
2672 eagerjoin = orm_util._ORMJoin(
2673 target_join,
2674 join_obj.right,
2675 join_obj.onclause,
2676 isouter=join_obj.isouter,
2677 _right_memo=join_obj._right_memo,
2678 )
2679 eagerjoin._target_adapter = target_join._target_adapter
2680 return eagerjoin
2681
2682 # neither side viable, return None, or fail if this was the top
2683 # most call
2684 if entity_inside_join_structure is False:
2685 assert (
2686 False
2687 ), "assertion failed attempting to produce joined eager loads"
2688 return None
2689
2690 # reached an endpoint (e.g. a table that's mapped, or an alias of that
2691 # table). determine if we can use this endpoint to splice onto
2692
2693 # is this the entity we want to splice onto in the first place?
2694 if not entity_we_want_to_splice_onto.isa(entity_inside_join_structure):
2695 return None
2696
2697 # path check. if we know the path how this join endpoint got here,
2698 # lets look at our path we are satisfying and see if we're in the
2699 # wrong place. This is specifically for when our entity may
2700 # appear more than once in the path, issue #11449
2701 # updated in issue #11965.
2702 if detected_existing_path and len(detected_existing_path) > 2:
2703 # this assertion is currently based on how this call is made,
2704 # where given a join_obj, the call will have these parameters as
2705 # entity_inside_join_structure=join_obj._left_memo
2706 # and entity_inside_join_structure=join_obj._right_memo.mapper
2707 assert detected_existing_path[-3] is entity_inside_join_structure
2708
2709 # from that, see if the path we are targeting matches the
2710 # "existing" path of this join all the way up to the midpoint
2711 # of this join object (e.g. the relationship).
2712 # if not, then this is not our target
2713 #
2714 # a test condition where this test is false looks like:
2715 #
2716 # desired splice: Node->kind->Kind
2717 # path of desired splice: NodeGroup->nodes->Node->kind
2718 # path we've located: NodeGroup->nodes->Node->common_node->Node
2719 #
2720 # above, because we want to splice kind->Kind onto
2721 # NodeGroup->nodes->Node, this is not our path because it actually
2722 # goes more steps than we want into self-referential
2723 # ->common_node->Node
2724 #
2725 # a test condition where this test is true looks like:
2726 #
2727 # desired splice: B->c2s->C2
2728 # path of desired splice: A->bs->B->c2s
2729 # path we've located: A->bs->B->c1s->C1
2730 #
2731 # above, we want to splice c2s->C2 onto B, and the located path
2732 # shows that the join ends with B->c1s->C1. so we will
2733 # add another join onto that, which would create a "branch" that
2734 # we might represent in a pseudopath as:
2735 #
2736 # B->c1s->C1
2737 # ->c2s->C2
2738 #
2739 # i.e. A JOIN B ON <bs> JOIN C1 ON <c1s>
2740 # JOIN C2 ON <c2s>
2741 #
2742
2743 if detected_existing_path[0:-2] != path.path[0:-1]:
2744 return None
2745
2746 return orm_util._ORMJoin(
2747 join_obj,
2748 clauses.aliased_insp,
2749 onclause,
2750 isouter=False,
2751 _left_memo=entity_inside_join_structure,
2752 _right_memo=path[path[-1].mapper],
2753 _extra_criteria=extra_criteria,
2754 )
2755
2756 def _create_eager_adapter(self, context, result, adapter, path, loadopt):
2757 compile_state = context.compile_state
2758
2759 user_defined_adapter = (
2760 self._init_user_defined_eager_proc(
2761 loadopt, compile_state, context.attributes
2762 )
2763 if loadopt
2764 else False
2765 )
2766
2767 if user_defined_adapter is not False:
2768 decorator = user_defined_adapter
2769 # user defined eagerloads are part of the "primary"
2770 # portion of the load.
2771 # the adapters applied to the Query should be honored.
2772 if compile_state.compound_eager_adapter and decorator:
2773 decorator = decorator.wrap(
2774 compile_state.compound_eager_adapter
2775 )
2776 elif compile_state.compound_eager_adapter:
2777 decorator = compile_state.compound_eager_adapter
2778 else:
2779 decorator = path.get(
2780 compile_state.attributes, "eager_row_processor"
2781 )
2782 if decorator is None:
2783 return False
2784
2785 if self.mapper._result_has_identity_key(result, decorator):
2786 return decorator
2787 else:
2788 # no identity key - don't return a row
2789 # processor, will cause a degrade to lazy
2790 return False
2791
2792 def create_row_processor(
2793 self,
2794 context,
2795 query_entity,
2796 path,
2797 loadopt,
2798 mapper,
2799 result,
2800 adapter,
2801 populators,
2802 ):
2803
2804 if not context.compile_state.compile_options._enable_eagerloads:
2805 return
2806
2807 if not self.parent.class_manager[self.key].impl.supports_population:
2808 raise sa_exc.InvalidRequestError(
2809 "'%s' does not support object "
2810 "population - eager loading cannot be applied." % self
2811 )
2812
2813 if self.uselist:
2814 context.loaders_require_uniquing = True
2815
2816 our_path = path[self.parent_property]
2817
2818 eager_adapter = self._create_eager_adapter(
2819 context, result, adapter, our_path, loadopt
2820 )
2821
2822 if eager_adapter is not False:
2823 key = self.key
2824
2825 _instance = loading._instance_processor(
2826 query_entity,
2827 self.mapper,
2828 context,
2829 result,
2830 our_path[self.entity],
2831 eager_adapter,
2832 )
2833
2834 if not self.uselist:
2835 self._create_scalar_loader(context, key, _instance, populators)
2836 else:
2837 self._create_collection_loader(
2838 context, key, _instance, populators
2839 )
2840 else:
2841 self.parent_property._get_strategy(
2842 (("lazy", "select"),)
2843 ).create_row_processor(
2844 context,
2845 query_entity,
2846 path,
2847 loadopt,
2848 mapper,
2849 result,
2850 adapter,
2851 populators,
2852 )
2853
2854 def _create_collection_loader(self, context, key, _instance, populators):
2855 def load_collection_from_joined_new_row(state, dict_, row):
2856 # note this must unconditionally clear out any existing collection.
2857 # an existing collection would be present only in the case of
2858 # populate_existing().
2859 collection = attributes.init_state_collection(state, dict_, key)
2860 result_list = util.UniqueAppender(
2861 collection, "append_without_event"
2862 )
2863 context.attributes[(state, key)] = result_list
2864 inst = _instance(row)
2865 if inst is not None:
2866 result_list.append(inst)
2867
2868 def load_collection_from_joined_existing_row(state, dict_, row):
2869 if (state, key) in context.attributes:
2870 result_list = context.attributes[(state, key)]
2871 else:
2872 # appender_key can be absent from context.attributes
2873 # with isnew=False when self-referential eager loading
2874 # is used; the same instance may be present in two
2875 # distinct sets of result columns
2876 collection = attributes.init_state_collection(
2877 state, dict_, key
2878 )
2879 result_list = util.UniqueAppender(
2880 collection, "append_without_event"
2881 )
2882 context.attributes[(state, key)] = result_list
2883 inst = _instance(row)
2884 if inst is not None:
2885 result_list.append(inst)
2886
2887 def load_collection_from_joined_exec(state, dict_, row):
2888 _instance(row)
2889
2890 populators["new"].append(
2891 (self.key, load_collection_from_joined_new_row)
2892 )
2893 populators["existing"].append(
2894 (self.key, load_collection_from_joined_existing_row)
2895 )
2896 if context.invoke_all_eagers:
2897 populators["eager"].append(
2898 (self.key, load_collection_from_joined_exec)
2899 )
2900
2901 def _create_scalar_loader(self, context, key, _instance, populators):
2902 def load_scalar_from_joined_new_row(state, dict_, row):
2903 # set a scalar object instance directly on the parent
2904 # object, bypassing InstrumentedAttribute event handlers.
2905 dict_[key] = _instance(row)
2906
2907 def load_scalar_from_joined_existing_row(state, dict_, row):
2908 # call _instance on the row, even though the object has
2909 # been created, so that we further descend into properties
2910 existing = _instance(row)
2911
2912 # conflicting value already loaded, this shouldn't happen
2913 if key in dict_:
2914 if existing is not dict_[key]:
2915 util.warn(
2916 "Multiple rows returned with "
2917 "uselist=False for eagerly-loaded attribute '%s' "
2918 % self
2919 )
2920 else:
2921 # this case is when one row has multiple loads of the
2922 # same entity (e.g. via aliasing), one has an attribute
2923 # that the other doesn't.
2924 dict_[key] = existing
2925
2926 def load_scalar_from_joined_exec(state, dict_, row):
2927 _instance(row)
2928
2929 populators["new"].append((self.key, load_scalar_from_joined_new_row))
2930 populators["existing"].append(
2931 (self.key, load_scalar_from_joined_existing_row)
2932 )
2933 if context.invoke_all_eagers:
2934 populators["eager"].append(
2935 (self.key, load_scalar_from_joined_exec)
2936 )
2937
2938
2939@log.class_logger
2940@relationships.RelationshipProperty.strategy_for(lazy="selectin")
2941class SelectInLoader(PostLoader, util.MemoizedSlots):
2942 __slots__ = (
2943 "join_depth",
2944 "omit_join",
2945 "_parent_alias",
2946 "_query_info",
2947 "_fallback_query_info",
2948 )
2949
2950 query_info = collections.namedtuple(
2951 "queryinfo",
2952 [
2953 "load_only_child",
2954 "load_with_join",
2955 "in_expr",
2956 "pk_cols",
2957 "zero_idx",
2958 "child_lookup_cols",
2959 ],
2960 )
2961
2962 _chunksize = 500
2963
2964 def __init__(self, parent, strategy_key):
2965 super().__init__(parent, strategy_key)
2966 self.join_depth = self.parent_property.join_depth
2967 is_m2o = self.parent_property.direction is interfaces.MANYTOONE
2968
2969 if self.parent_property.omit_join is not None:
2970 self.omit_join = self.parent_property.omit_join
2971 else:
2972 lazyloader = self.parent_property._get_strategy(
2973 (("lazy", "select"),)
2974 )
2975 if is_m2o:
2976 self.omit_join = lazyloader.use_get
2977 else:
2978 self.omit_join = self.parent._get_clause[0].compare(
2979 lazyloader._rev_lazywhere,
2980 use_proxies=True,
2981 compare_keys=False,
2982 equivalents=self.parent._equivalent_columns,
2983 )
2984
2985 if self.omit_join:
2986 if is_m2o:
2987 self._query_info = self._init_for_omit_join_m2o()
2988 self._fallback_query_info = self._init_for_join()
2989 else:
2990 self._query_info = self._init_for_omit_join()
2991 else:
2992 self._query_info = self._init_for_join()
2993
2994 def _init_for_omit_join(self):
2995 pk_to_fk = dict(
2996 self.parent_property._join_condition.local_remote_pairs
2997 )
2998 pk_to_fk.update(
2999 (equiv, pk_to_fk[k])
3000 for k in list(pk_to_fk)
3001 for equiv in self.parent._equivalent_columns.get(k, ())
3002 )
3003
3004 pk_cols = fk_cols = [
3005 pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
3006 ]
3007 if len(fk_cols) > 1:
3008 in_expr = sql.tuple_(*fk_cols)
3009 zero_idx = False
3010 else:
3011 in_expr = fk_cols[0]
3012 zero_idx = True
3013
3014 return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
3015
3016 def _init_for_omit_join_m2o(self):
3017 pk_cols = self.mapper.primary_key
3018 if len(pk_cols) > 1:
3019 in_expr = sql.tuple_(*pk_cols)
3020 zero_idx = False
3021 else:
3022 in_expr = pk_cols[0]
3023 zero_idx = True
3024
3025 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
3026 lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
3027
3028 return self.query_info(
3029 True, False, in_expr, pk_cols, zero_idx, lookup_cols
3030 )
3031
3032 def _init_for_join(self):
3033 self._parent_alias = AliasedClass(self.parent.class_)
3034 pa_insp = inspect(self._parent_alias)
3035 pk_cols = [
3036 pa_insp._adapt_element(col) for col in self.parent.primary_key
3037 ]
3038 if len(pk_cols) > 1:
3039 in_expr = sql.tuple_(*pk_cols)
3040 zero_idx = False
3041 else:
3042 in_expr = pk_cols[0]
3043 zero_idx = True
3044 return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
3045
3046 def init_class_attribute(self, mapper):
3047 self.parent_property._get_strategy(
3048 (("lazy", "select"),)
3049 ).init_class_attribute(mapper)
3050
3051 def create_row_processor(
3052 self,
3053 context,
3054 query_entity,
3055 path,
3056 loadopt,
3057 mapper,
3058 result,
3059 adapter,
3060 populators,
3061 ):
3062 if context.refresh_state:
3063 return self._immediateload_create_row_processor(
3064 context,
3065 query_entity,
3066 path,
3067 loadopt,
3068 mapper,
3069 result,
3070 adapter,
3071 populators,
3072 )
3073
3074 (
3075 effective_path,
3076 run_loader,
3077 execution_options,
3078 recursion_depth,
3079 ) = self._setup_for_recursion(
3080 context, path, loadopt, join_depth=self.join_depth
3081 )
3082
3083 if not run_loader:
3084 return
3085
3086 if not context.compile_state.compile_options._enable_eagerloads:
3087 return
3088
3089 if not self.parent.class_manager[self.key].impl.supports_population:
3090 raise sa_exc.InvalidRequestError(
3091 "'%s' does not support object "
3092 "population - eager loading cannot be applied." % self
3093 )
3094
3095 # a little dance here as the "path" is still something that only
3096 # semi-tracks the exact series of things we are loading, still not
3097 # telling us about with_polymorphic() and stuff like that when it's at
3098 # the root.. the initial MapperEntity is more accurate for this case.
3099 if len(path) == 1:
3100 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
3101 return
3102 elif not orm_util._entity_isa(path[-1], self.parent):
3103 return
3104
3105 selectin_path = effective_path
3106
3107 path_w_prop = path[self.parent_property]
3108
3109 # build up a path indicating the path from the leftmost
3110 # entity to the thing we're subquery loading.
3111 with_poly_entity = path_w_prop.get(
3112 context.attributes, "path_with_polymorphic", None
3113 )
3114 if with_poly_entity is not None:
3115 effective_entity = inspect(with_poly_entity)
3116 else:
3117 effective_entity = self.entity
3118
3119 loading.PostLoad.callable_for_path(
3120 context,
3121 selectin_path,
3122 self.parent,
3123 self.parent_property,
3124 self._load_for_path,
3125 effective_entity,
3126 loadopt,
3127 recursion_depth,
3128 execution_options,
3129 )
3130
3131 def _load_for_path(
3132 self,
3133 context,
3134 path,
3135 states,
3136 load_only,
3137 effective_entity,
3138 loadopt,
3139 recursion_depth,
3140 execution_options,
3141 ):
3142 if load_only and self.key not in load_only:
3143 return
3144
3145 query_info = self._query_info
3146
3147 if query_info.load_only_child:
3148 our_states = collections.defaultdict(list)
3149 none_states = []
3150
3151 mapper = self.parent
3152
3153 for state, overwrite in states:
3154 state_dict = state.dict
3155 related_ident = tuple(
3156 mapper._get_state_attr_by_column(
3157 state,
3158 state_dict,
3159 lk,
3160 passive=attributes.PASSIVE_NO_FETCH,
3161 )
3162 for lk in query_info.child_lookup_cols
3163 )
3164 # if the loaded parent objects do not have the foreign key
3165 # to the related item loaded, then degrade into the joined
3166 # version of selectinload
3167 if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
3168 query_info = self._fallback_query_info
3169 break
3170
3171 # organize states into lists keyed to particular foreign
3172 # key values.
3173 if None not in related_ident:
3174 our_states[related_ident].append(
3175 (state, state_dict, overwrite)
3176 )
3177 else:
3178 # For FK values that have None, add them to a
3179 # separate collection that will be populated separately
3180 none_states.append((state, state_dict, overwrite))
3181
3182 # note the above conditional may have changed query_info
3183 if not query_info.load_only_child:
3184 our_states = [
3185 (state.key[1], state, state.dict, overwrite)
3186 for state, overwrite in states
3187 ]
3188
3189 pk_cols = query_info.pk_cols
3190 in_expr = query_info.in_expr
3191
3192 if not query_info.load_with_join:
3193 # in "omit join" mode, the primary key column and the
3194 # "in" expression are in terms of the related entity. So
3195 # if the related entity is polymorphic or otherwise aliased,
3196 # we need to adapt our "pk_cols" and "in_expr" to that
3197 # entity. in non-"omit join" mode, these are against the
3198 # parent entity and do not need adaption.
3199 if effective_entity.is_aliased_class:
3200 pk_cols = [
3201 effective_entity._adapt_element(col) for col in pk_cols
3202 ]
3203 in_expr = effective_entity._adapt_element(in_expr)
3204
3205 bundle_ent = orm_util.Bundle("pk", *pk_cols)
3206 bundle_sql = bundle_ent.__clause_element__()
3207
3208 entity_sql = effective_entity.__clause_element__()
3209 q = Select._create_raw_select(
3210 _raw_columns=[bundle_sql, entity_sql],
3211 _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
3212 _compile_options=ORMCompileState.default_compile_options,
3213 _propagate_attrs={
3214 "compile_state_plugin": "orm",
3215 "plugin_subject": effective_entity,
3216 },
3217 )
3218
3219 if not query_info.load_with_join:
3220 # the Bundle we have in the "omit_join" case is against raw, non
3221 # annotated columns, so to ensure the Query knows its primary
3222 # entity, we add it explicitly. If we made the Bundle against
3223 # annotated columns, we hit a performance issue in this specific
3224 # case, which is detailed in issue #4347.
3225 q = q.select_from(effective_entity)
3226 else:
3227 # in the non-omit_join case, the Bundle is against the annotated/
3228 # mapped column of the parent entity, but the #4347 issue does not
3229 # occur in this case.
3230 q = q.select_from(self._parent_alias).join(
3231 getattr(self._parent_alias, self.parent_property.key).of_type(
3232 effective_entity
3233 )
3234 )
3235
3236 q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
3237
3238 # a test which exercises what these comments talk about is
3239 # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
3240 #
3241 # effective_entity above is given to us in terms of the cached
3242 # statement, namely this one:
3243 orig_query = context.compile_state.select_statement
3244
3245 # the actual statement that was requested is this one:
3246 # context_query = context.user_passed_query
3247 #
3248 # that's not the cached one, however. So while it is of the identical
3249 # structure, if it has entities like AliasedInsp, which we get from
3250 # aliased() or with_polymorphic(), the AliasedInsp will likely be a
3251 # different object identity each time, and will not match up
3252 # hashing-wise to the corresponding AliasedInsp that's in the
3253 # cached query, meaning it won't match on paths and loader lookups
3254 # and loaders like this one will be skipped if it is used in options.
3255 #
3256 # as it turns out, standard loader options like selectinload(),
3257 # lazyload() that have a path need
3258 # to come from the cached query so that the AliasedInsp etc. objects
3259 # that are in the query line up with the object that's in the path
3260 # of the strategy object. however other options like
3261 # with_loader_criteria() that doesn't have a path (has a fixed entity)
3262 # and needs to have access to the latest closure state in order to
3263 # be correct, we need to use the uncached one.
3264 #
3265 # as of #8399 we let the loader option itself figure out what it
3266 # wants to do given cached and uncached version of itself.
3267
3268 effective_path = path[self.parent_property]
3269
3270 if orig_query is context.user_passed_query:
3271 new_options = orig_query._with_options
3272 else:
3273 cached_options = orig_query._with_options
3274 uncached_options = context.user_passed_query._with_options
3275
3276 # propagate compile state options from the original query,
3277 # updating their "extra_criteria" as necessary.
3278 # note this will create a different cache key than
3279 # "orig" options if extra_criteria is present, because the copy
3280 # of extra_criteria will have different boundparam than that of
3281 # the QueryableAttribute in the path
3282 new_options = [
3283 orig_opt._adapt_cached_option_to_uncached_option(
3284 context, uncached_opt
3285 )
3286 for orig_opt, uncached_opt in zip(
3287 cached_options, uncached_options
3288 )
3289 ]
3290
3291 if loadopt and loadopt._extra_criteria:
3292 new_options += (
3293 orm_util.LoaderCriteriaOption(
3294 effective_entity,
3295 loadopt._generate_extra_criteria(context),
3296 ),
3297 )
3298
3299 if recursion_depth is not None:
3300 effective_path = effective_path._truncate_recursive()
3301
3302 q = q.options(*new_options)
3303
3304 q = q._update_compile_options({"_current_path": effective_path})
3305 if context.populate_existing:
3306 q = q.execution_options(populate_existing=True)
3307
3308 if self.parent_property.order_by:
3309 if not query_info.load_with_join:
3310 eager_order_by = self.parent_property.order_by
3311 if effective_entity.is_aliased_class:
3312 eager_order_by = [
3313 effective_entity._adapt_element(elem)
3314 for elem in eager_order_by
3315 ]
3316 q = q.order_by(*eager_order_by)
3317 else:
3318
3319 def _setup_outermost_orderby(compile_context):
3320 compile_context.eager_order_by += tuple(
3321 util.to_list(self.parent_property.order_by)
3322 )
3323
3324 q = q._add_context_option(
3325 _setup_outermost_orderby, self.parent_property
3326 )
3327
3328 if query_info.load_only_child:
3329 self._load_via_child(
3330 our_states,
3331 none_states,
3332 query_info,
3333 q,
3334 context,
3335 execution_options,
3336 )
3337 else:
3338 self._load_via_parent(
3339 our_states, query_info, q, context, execution_options
3340 )
3341
3342 def _load_via_child(
3343 self,
3344 our_states,
3345 none_states,
3346 query_info,
3347 q,
3348 context,
3349 execution_options,
3350 ):
3351 uselist = self.uselist
3352
3353 # this sort is really for the benefit of the unit tests
3354 our_keys = sorted(our_states)
3355 while our_keys:
3356 chunk = our_keys[0 : self._chunksize]
3357 our_keys = our_keys[self._chunksize :]
3358 data = {
3359 k: v
3360 for k, v in context.session.execute(
3361 q,
3362 params={
3363 "primary_keys": [
3364 key[0] if query_info.zero_idx else key
3365 for key in chunk
3366 ]
3367 },
3368 execution_options=execution_options,
3369 ).unique()
3370 }
3371
3372 for key in chunk:
3373 # for a real foreign key and no concurrent changes to the
3374 # DB while running this method, "key" is always present in
3375 # data. However, for primaryjoins without real foreign keys
3376 # a non-None primaryjoin condition may still refer to no
3377 # related object.
3378 related_obj = data.get(key, None)
3379 for state, dict_, overwrite in our_states[key]:
3380 if not overwrite and self.key in dict_:
3381 continue
3382
3383 state.get_impl(self.key).set_committed_value(
3384 state,
3385 dict_,
3386 related_obj if not uselist else [related_obj],
3387 )
3388 # populate none states with empty value / collection
3389 for state, dict_, overwrite in none_states:
3390 if not overwrite and self.key in dict_:
3391 continue
3392
3393 # note it's OK if this is a uselist=True attribute, the empty
3394 # collection will be populated
3395 state.get_impl(self.key).set_committed_value(state, dict_, None)
3396
3397 def _load_via_parent(
3398 self, our_states, query_info, q, context, execution_options
3399 ):
3400 uselist = self.uselist
3401 _empty_result = () if uselist else None
3402
3403 while our_states:
3404 chunk = our_states[0 : self._chunksize]
3405 our_states = our_states[self._chunksize :]
3406
3407 primary_keys = [
3408 key[0] if query_info.zero_idx else key
3409 for key, state, state_dict, overwrite in chunk
3410 ]
3411
3412 data = collections.defaultdict(list)
3413 for k, v in itertools.groupby(
3414 context.session.execute(
3415 q,
3416 params={"primary_keys": primary_keys},
3417 execution_options=execution_options,
3418 ).unique(),
3419 lambda x: x[0],
3420 ):
3421 data[k].extend(vv[1] for vv in v)
3422
3423 for key, state, state_dict, overwrite in chunk:
3424 if not overwrite and self.key in state_dict:
3425 continue
3426
3427 collection = data.get(key, _empty_result)
3428
3429 if not uselist and collection:
3430 if len(collection) > 1:
3431 util.warn(
3432 "Multiple rows returned with "
3433 "uselist=False for eagerly-loaded "
3434 "attribute '%s' " % self
3435 )
3436 state.get_impl(self.key).set_committed_value(
3437 state, state_dict, collection[0]
3438 )
3439 else:
3440 # note that empty tuple set on uselist=False sets the
3441 # value to None
3442 state.get_impl(self.key).set_committed_value(
3443 state, state_dict, collection
3444 )
3445
3446
3447def single_parent_validator(desc, prop):
3448 def _do_check(state, value, oldvalue, initiator):
3449 if value is not None and initiator.key == prop.key:
3450 hasparent = initiator.hasparent(attributes.instance_state(value))
3451 if hasparent and oldvalue is not value:
3452 raise sa_exc.InvalidRequestError(
3453 "Instance %s is already associated with an instance "
3454 "of %s via its %s attribute, and is only allowed a "
3455 "single parent."
3456 % (orm_util.instance_str(value), state.class_, prop),
3457 code="bbf1",
3458 )
3459 return value
3460
3461 def append(state, value, initiator):
3462 return _do_check(state, value, None, initiator)
3463
3464 def set_(state, value, oldvalue, initiator):
3465 return _do_check(state, value, oldvalue, initiator)
3466
3467 event.listen(
3468 desc, "append", append, raw=True, retval=True, active_history=True
3469 )
3470 event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)