1# orm/relationships.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import Literal
36from typing import NamedTuple
37from typing import NoReturn
38from typing import Optional
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeVar
45from typing import Union
46import weakref
47
48from . import attributes
49from . import strategy_options
50from ._typing import insp_is_aliased_class
51from ._typing import is_has_collection_adapter
52from .base import _DeclarativeMapped
53from .base import _is_mapped_class
54from .base import class_mapper
55from .base import DynamicMapped
56from .base import LoaderCallableStatus
57from .base import PassiveFlag
58from .base import state_str
59from .base import WriteOnlyMapped
60from .interfaces import _AttributeOptions
61from .interfaces import _DataclassDefaultsDontSet
62from .interfaces import _IntrospectsAnnotations
63from .interfaces import MANYTOMANY
64from .interfaces import MANYTOONE
65from .interfaces import ONETOMANY
66from .interfaces import PropComparator
67from .interfaces import RelationshipDirection
68from .interfaces import StrategizedProperty
69from .util import CascadeOptions
70from .. import exc as sa_exc
71from .. import Exists
72from .. import log
73from .. import schema
74from .. import sql
75from .. import util
76from ..inspection import inspect
77from ..sql import coercions
78from ..sql import expression
79from ..sql import operators
80from ..sql import roles
81from ..sql import visitors
82from ..sql._typing import _ColumnExpressionArgument
83from ..sql._typing import _HasClauseElement
84from ..sql.annotation import _safe_annotate
85from ..sql.base import _NoArg
86from ..sql.elements import ColumnClause
87from ..sql.elements import ColumnElement
88from ..sql.util import _deep_annotate
89from ..sql.util import _deep_deannotate
90from ..sql.util import _shallow_annotate
91from ..sql.util import adapt_criterion_to_null
92from ..sql.util import ClauseAdapter
93from ..sql.util import join_condition
94from ..sql.util import selectables_overlap
95from ..sql.util import visit_binary_product
96from ..util.typing import de_optionalize_union_types
97from ..util.typing import resolve_name_to_real_class_name
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _ExternalEntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InstanceDict
104 from ._typing import _InternalEntityType
105 from ._typing import _O
106 from ._typing import _RegistryType
107 from .base import Mapped
108 from .clsregistry import _class_resolver
109 from .clsregistry import _ModNS
110 from .decl_base import _ClassScanMapperConfig
111 from .dependency import _DependencyProcessor
112 from .mapper import Mapper
113 from .query import Query
114 from .session import Session
115 from .state import InstanceState
116 from .strategies import _LazyLoader
117 from .util import AliasedClass
118 from .util import AliasedInsp
119 from ..sql._typing import _CoreAdapterProto
120 from ..sql._typing import _EquivalentColumnMap
121 from ..sql._typing import _InfoType
122 from ..sql.annotation import _AnnotationDict
123 from ..sql.annotation import SupportsAnnotations
124 from ..sql.elements import BinaryExpression
125 from ..sql.elements import BindParameter
126 from ..sql.elements import ClauseElement
127 from ..sql.schema import Table
128 from ..sql.selectable import FromClause
129 from ..util.typing import _AnnotationScanType
130 from ..util.typing import RODescriptorReference
131
132_T = TypeVar("_T", bound=Any)
133_T1 = TypeVar("_T1", bound=Any)
134_T2 = TypeVar("_T2", bound=Any)
135
136_PT = TypeVar("_PT", bound=Any)
137
138_PT2 = TypeVar("_PT2", bound=Any)
139
140
141_RelationshipArgumentType = Union[
142 str,
143 Type[_T],
144 Callable[[], Type[_T]],
145 "Mapper[_T]",
146 "AliasedClass[_T]",
147 Callable[[], "Mapper[_T]"],
148 Callable[[], "AliasedClass[_T]"],
149]
150
151_LazyLoadArgumentType = Literal[
152 "select",
153 "joined",
154 "selectin",
155 "subquery",
156 "raise",
157 "raise_on_sql",
158 "noload",
159 "immediate",
160 "write_only",
161 "dynamic",
162 True,
163 False,
164 None,
165]
166
167
168_RelationshipJoinConditionArgument = Union[
169 str, _ColumnExpressionArgument[bool]
170]
171_RelationshipSecondaryArgument = Union[
172 "FromClause", str, Callable[[], "FromClause"]
173]
174_ORMOrderByArgument = Union[
175 Literal[False],
176 str,
177 _ColumnExpressionArgument[Any],
178 Callable[[], _ColumnExpressionArgument[Any]],
179 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
180 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
181]
182_RelationshipBackPopulatesArgument = Union[
183 str,
184 PropComparator[Any],
185 Callable[[], Union[str, PropComparator[Any]]],
186]
187
188
189ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
190
191_ORMColCollectionElement = Union[
192 ColumnClause[Any],
193 _HasClauseElement[Any],
194 roles.DMLColumnRole,
195 "Mapped[Any]",
196]
197_ORMColCollectionArgument = Union[
198 str,
199 Sequence[_ORMColCollectionElement],
200 Callable[[], Sequence[_ORMColCollectionElement]],
201 Callable[[], _ORMColCollectionElement],
202 _ORMColCollectionElement,
203]
204
205
206_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
207
208_CE = TypeVar("_CE", bound="ColumnElement[Any]")
209
210
211_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
212
213_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
214
215_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
216
217
218def remote(expr: _CEA) -> _CEA:
219 """Annotate a portion of a primaryjoin expression
220 with a 'remote' annotation.
221
222 See the section :ref:`relationship_custom_foreign` for a
223 description of use.
224
225 .. seealso::
226
227 :ref:`relationship_custom_foreign`
228
229 :func:`.foreign`
230
231 """
232 return _annotate_columns( # type: ignore
233 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
234 )
235
236
237def foreign(expr: _CEA) -> _CEA:
238 """Annotate a portion of a primaryjoin expression
239 with a 'foreign' annotation.
240
241 See the section :ref:`relationship_custom_foreign` for a
242 description of use.
243
244 .. seealso::
245
246 :ref:`relationship_custom_foreign`
247
248 :func:`.remote`
249
250 """
251
252 return _annotate_columns( # type: ignore
253 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
254 )
255
256
257@dataclasses.dataclass
258class _RelationshipArg(Generic[_T1, _T2]):
259 """stores a user-defined parameter value that must be resolved and
260 parsed later at mapper configuration time.
261
262 """
263
264 __slots__ = "name", "argument", "resolved"
265 name: str
266 argument: _T1
267 resolved: Optional[_T2]
268
269 def _is_populated(self) -> bool:
270 return self.argument is not None
271
272 def _resolve_against_registry(
273 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
274 ) -> None:
275 attr_value = self.argument
276
277 if isinstance(attr_value, str):
278 self.resolved = clsregistry_resolver(
279 attr_value, self.name == "secondary"
280 )()
281 elif callable(attr_value) and not _is_mapped_class(attr_value):
282 self.resolved = attr_value()
283 else:
284 self.resolved = attr_value
285
286 def effective_value(self) -> Any:
287 if self.resolved is not None:
288 return self.resolved
289 else:
290 return self.argument
291
292
293_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
294
295
296@dataclasses.dataclass
297class _StringRelationshipArg(_RelationshipArg[_T1, _T2]):
298 def _resolve_against_registry(
299 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
300 ) -> None:
301 attr_value = self.argument
302
303 if callable(attr_value):
304 attr_value = attr_value()
305
306 if isinstance(attr_value, attributes.QueryableAttribute):
307 attr_value = attr_value.key # type: ignore
308
309 self.resolved = attr_value
310
311
312class _RelationshipArgs(NamedTuple):
313 """stores user-passed parameters that are resolved at mapper configuration
314 time.
315
316 """
317
318 secondary: _RelationshipArg[
319 Optional[_RelationshipSecondaryArgument],
320 Optional[FromClause],
321 ]
322 primaryjoin: _RelationshipArg[
323 Optional[_RelationshipJoinConditionArgument],
324 Optional[ColumnElement[Any]],
325 ]
326 secondaryjoin: _RelationshipArg[
327 Optional[_RelationshipJoinConditionArgument],
328 Optional[ColumnElement[Any]],
329 ]
330 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
331 foreign_keys: _RelationshipArg[
332 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
333 ]
334 remote_side: _RelationshipArg[
335 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
336 ]
337 back_populates: _StringRelationshipArg[
338 Optional[_RelationshipBackPopulatesArgument], str
339 ]
340
341
342@log.class_logger
343class RelationshipProperty(
344 _DataclassDefaultsDontSet,
345 _IntrospectsAnnotations,
346 StrategizedProperty[_T],
347 log.Identified,
348):
349 """Describes an object property that holds a single item or list
350 of items that correspond to a related database table.
351
352 Public constructor is the :func:`_orm.relationship` function.
353
354 .. seealso::
355
356 :ref:`relationship_config_toplevel`
357
358 """
359
360 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
361 inherit_cache = True
362 """:meta private:"""
363
364 _links_to_entity = True
365 _is_relationship = True
366
367 _overlaps: Sequence[str]
368
369 _lazy_strategy: _LazyLoader
370
371 _persistence_only = dict(
372 passive_deletes=False,
373 passive_updates=True,
374 enable_typechecks=True,
375 active_history=False,
376 cascade_backrefs=False,
377 )
378
379 _dependency_processor: Optional[_DependencyProcessor] = None
380
381 primaryjoin: ColumnElement[bool]
382 secondaryjoin: Optional[ColumnElement[bool]]
383 secondary: Optional[FromClause]
384 _join_condition: _JoinCondition
385 order_by: _RelationshipOrderByArg
386
387 _user_defined_foreign_keys: Set[ColumnElement[Any]]
388 _calculated_foreign_keys: Set[ColumnElement[Any]]
389
390 remote_side: Set[ColumnElement[Any]]
391 local_columns: Set[ColumnElement[Any]]
392
393 synchronize_pairs: _ColumnPairs
394 secondary_synchronize_pairs: Optional[_ColumnPairs]
395
396 local_remote_pairs: _ColumnPairs
397
398 direction: RelationshipDirection
399
400 _init_args: _RelationshipArgs
401 _disable_dataclass_default_factory = True
402
403 def __init__(
404 self,
405 argument: Optional[_RelationshipArgumentType[_T]] = None,
406 secondary: Optional[_RelationshipSecondaryArgument] = None,
407 *,
408 uselist: Optional[bool] = None,
409 collection_class: Optional[
410 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
411 ] = None,
412 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
413 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
414 back_populates: Optional[_RelationshipBackPopulatesArgument] = None,
415 order_by: _ORMOrderByArgument = False,
416 backref: Optional[ORMBackrefArgument] = None,
417 overlaps: Optional[str] = None,
418 post_update: bool = False,
419 cascade: str = "save-update, merge",
420 viewonly: bool = False,
421 attribute_options: Optional[_AttributeOptions] = None,
422 lazy: _LazyLoadArgumentType = "select",
423 passive_deletes: Union[Literal["all"], bool] = False,
424 passive_updates: bool = True,
425 active_history: bool = False,
426 enable_typechecks: bool = True,
427 foreign_keys: Optional[_ORMColCollectionArgument] = None,
428 remote_side: Optional[_ORMColCollectionArgument] = None,
429 join_depth: Optional[int] = None,
430 comparator_factory: Optional[
431 Type[RelationshipProperty.Comparator[Any]]
432 ] = None,
433 single_parent: bool = False,
434 innerjoin: bool = False,
435 distinct_target_key: Optional[bool] = None,
436 load_on_pending: bool = False,
437 query_class: Optional[Type[Query[Any]]] = None,
438 info: Optional[_InfoType] = None,
439 omit_join: Literal[None, False] = None,
440 sync_backref: Optional[bool] = None,
441 doc: Optional[str] = None,
442 bake_queries: Literal[True] = True,
443 cascade_backrefs: Literal[False] = False,
444 _local_remote_pairs: Optional[_ColumnPairs] = None,
445 _legacy_inactive_history_style: bool = False,
446 ):
447 super().__init__(attribute_options=attribute_options)
448
449 self.uselist = uselist
450 self.argument = argument
451
452 self._init_args = _RelationshipArgs(
453 _RelationshipArg("secondary", secondary, None),
454 _RelationshipArg("primaryjoin", primaryjoin, None),
455 _RelationshipArg("secondaryjoin", secondaryjoin, None),
456 _RelationshipArg("order_by", order_by, None),
457 _RelationshipArg("foreign_keys", foreign_keys, None),
458 _RelationshipArg("remote_side", remote_side, None),
459 _StringRelationshipArg("back_populates", back_populates, None),
460 )
461
462 if self._attribute_options.dataclasses_default not in (
463 _NoArg.NO_ARG,
464 None,
465 ):
466 raise sa_exc.ArgumentError(
467 "Only 'None' is accepted as dataclass "
468 "default for a relationship()"
469 )
470
471 self.post_update = post_update
472 self.viewonly = viewonly
473 if viewonly:
474 self._warn_for_persistence_only_flags(
475 passive_deletes=passive_deletes,
476 passive_updates=passive_updates,
477 enable_typechecks=enable_typechecks,
478 active_history=active_history,
479 cascade_backrefs=cascade_backrefs,
480 )
481 if viewonly and sync_backref:
482 raise sa_exc.ArgumentError(
483 "sync_backref and viewonly cannot both be True"
484 )
485 self.sync_backref = sync_backref
486 self.lazy = lazy
487 self.single_parent = single_parent
488 self.collection_class = collection_class
489 self.passive_deletes = passive_deletes
490
491 if cascade_backrefs:
492 raise sa_exc.ArgumentError(
493 "The 'cascade_backrefs' parameter passed to "
494 "relationship() may only be set to False."
495 )
496
497 self.passive_updates = passive_updates
498 self.enable_typechecks = enable_typechecks
499 self.query_class = query_class
500 self.innerjoin = innerjoin
501 self.distinct_target_key = distinct_target_key
502 self.doc = doc
503 self.active_history = active_history
504 self._legacy_inactive_history_style = _legacy_inactive_history_style
505
506 self.join_depth = join_depth
507 if omit_join:
508 util.warn(
509 "setting omit_join to True is not supported; selectin "
510 "loading of this relationship may not work correctly if this "
511 "flag is set explicitly. omit_join optimization is "
512 "automatically detected for conditions under which it is "
513 "supported."
514 )
515
516 self.omit_join = omit_join
517 self.local_remote_pairs = _local_remote_pairs or ()
518 self.load_on_pending = load_on_pending
519 self.comparator_factory = (
520 comparator_factory or RelationshipProperty.Comparator
521 )
522 util.set_creation_order(self)
523
524 if info is not None:
525 self.info.update(info)
526
527 self.strategy_key = (("lazy", self.lazy),)
528
529 self._reverse_property: Set[RelationshipProperty[Any]] = set()
530
531 if overlaps:
532 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
533 else:
534 self._overlaps = ()
535
536 self.cascade = cascade
537
538 if back_populates:
539 if backref:
540 raise sa_exc.ArgumentError(
541 "backref and back_populates keyword arguments "
542 "are mutually exclusive"
543 )
544 self.backref = None
545 else:
546 self.backref = backref
547
548 @property
549 def back_populates(self) -> str:
550 return self._init_args.back_populates.effective_value() # type: ignore
551
552 @back_populates.setter
553 def back_populates(self, value: str) -> None:
554 self._init_args.back_populates.argument = value
555
556 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
557 for k, v in kw.items():
558 if v != self._persistence_only[k]:
559 # we are warning here rather than warn deprecated as this is a
560 # configuration mistake, and Python shows regular warnings more
561 # aggressively than deprecation warnings by default. Unlike the
562 # case of setting viewonly with cascade, the settings being
563 # warned about here are not actively doing the wrong thing
564 # against viewonly=True, so it is not as urgent to have these
565 # raise an error.
566 util.warn(
567 "Setting %s on relationship() while also "
568 "setting viewonly=True does not make sense, as a "
569 "viewonly=True relationship does not perform persistence "
570 "operations. This configuration may raise an error "
571 "in a future release." % (k,)
572 )
573
574 def instrument_class(self, mapper: Mapper[Any]) -> None:
575 attributes._register_descriptor(
576 mapper.class_,
577 self.key,
578 comparator=self.comparator_factory(self, mapper),
579 parententity=mapper,
580 doc=self.doc,
581 )
582
583 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
584 """Produce boolean, comparison, and other operators for
585 :class:`.RelationshipProperty` attributes.
586
587 See the documentation for :class:`.PropComparator` for a brief
588 overview of ORM level operator definition.
589
590 .. seealso::
591
592 :class:`.PropComparator`
593
594 :class:`.ColumnProperty.Comparator`
595
596 :class:`.ColumnOperators`
597
598 :ref:`types_operators`
599
600 :attr:`.TypeEngine.comparator_factory`
601
602 """
603
604 __slots__ = (
605 "entity",
606 "mapper",
607 "property",
608 "_of_type",
609 "_extra_criteria",
610 )
611
612 prop: RODescriptorReference[RelationshipProperty[_PT]]
613 _of_type: Optional[_EntityType[_PT]]
614
615 def __init__(
616 self,
617 prop: RelationshipProperty[_PT],
618 parentmapper: _InternalEntityType[Any],
619 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
620 of_type: Optional[_EntityType[_PT]] = None,
621 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
622 ):
623 """Construction of :class:`.RelationshipProperty.Comparator`
624 is internal to the ORM's attribute mechanics.
625
626 """
627 self.prop = prop
628 self._parententity = parentmapper
629 self._adapt_to_entity = adapt_to_entity
630 if of_type:
631 self._of_type = of_type
632 else:
633 self._of_type = None
634 self._extra_criteria = extra_criteria
635
636 def adapt_to_entity(
637 self, adapt_to_entity: AliasedInsp[Any]
638 ) -> RelationshipProperty.Comparator[Any]:
639 return self.__class__(
640 self.prop,
641 self._parententity,
642 adapt_to_entity=adapt_to_entity,
643 of_type=self._of_type,
644 )
645
646 entity: _InternalEntityType[_PT]
647 """The target entity referred to by this
648 :class:`.RelationshipProperty.Comparator`.
649
650 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
651 object.
652
653 This is the "target" or "remote" side of the
654 :func:`_orm.relationship`.
655
656 """
657
658 mapper: Mapper[_PT]
659 """The target :class:`_orm.Mapper` referred to by this
660 :class:`.RelationshipProperty.Comparator`.
661
662 This is the "target" or "remote" side of the
663 :func:`_orm.relationship`.
664
665 """
666
667 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
668 if self._of_type:
669 return inspect(self._of_type) # type: ignore
670 else:
671 return self.prop.entity
672
673 def _memoized_attr_mapper(self) -> Mapper[_PT]:
674 return self.entity.mapper
675
676 def _source_selectable(self) -> FromClause:
677 if self._adapt_to_entity:
678 return self._adapt_to_entity.selectable
679 else:
680 return self.property.parent._with_polymorphic_selectable
681
682 def __clause_element__(self) -> ColumnElement[bool]:
683 adapt_from = self._source_selectable()
684 if self._of_type:
685 of_type_entity = inspect(self._of_type)
686 else:
687 of_type_entity = None
688
689 (
690 pj,
691 sj,
692 source,
693 dest,
694 secondary,
695 target_adapter,
696 ) = self.prop._create_joins(
697 source_selectable=adapt_from,
698 source_polymorphic=True,
699 of_type_entity=of_type_entity,
700 alias_secondary=True,
701 extra_criteria=self._extra_criteria,
702 )
703 if sj is not None:
704 return pj & sj
705 else:
706 return pj
707
708 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
709 r"""Redefine this object in terms of a polymorphic subclass.
710
711 See :meth:`.PropComparator.of_type` for an example.
712
713
714 """
715 return RelationshipProperty.Comparator(
716 self.prop,
717 self._parententity,
718 adapt_to_entity=self._adapt_to_entity,
719 of_type=class_,
720 extra_criteria=self._extra_criteria,
721 )
722
723 def and_(
724 self, *criteria: _ColumnExpressionArgument[bool]
725 ) -> PropComparator[Any]:
726 """Add AND criteria.
727
728 See :meth:`.PropComparator.and_` for an example.
729
730 .. versionadded:: 1.4
731
732 """
733 exprs = tuple(
734 coercions.expect(roles.WhereHavingRole, clause)
735 for clause in util.coerce_generator_arg(criteria)
736 )
737
738 return RelationshipProperty.Comparator(
739 self.prop,
740 self._parententity,
741 adapt_to_entity=self._adapt_to_entity,
742 of_type=self._of_type,
743 extra_criteria=self._extra_criteria + exprs,
744 )
745
746 def in_(self, other: Any) -> NoReturn:
747 """Produce an IN clause - this is not implemented
748 for :func:`_orm.relationship`-based attributes at this time.
749
750 """
751 raise NotImplementedError(
752 "in_() not yet supported for "
753 "relationships. For a simple "
754 "many-to-one, use in_() against "
755 "the set of foreign key values."
756 )
757
758 # https://github.com/python/mypy/issues/4266
759 __hash__ = None # type: ignore
760
761 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
762 """Implement the ``==`` operator.
763
764 In a many-to-one context, such as:
765
766 .. sourcecode:: text
767
768 MyClass.some_prop == <some object>
769
770 this will typically produce a
771 clause such as:
772
773 .. sourcecode:: text
774
775 mytable.related_id == <some id>
776
777 Where ``<some id>`` is the primary key of the given
778 object.
779
780 The ``==`` operator provides partial functionality for non-
781 many-to-one comparisons:
782
783 * Comparisons against collections are not supported.
784 Use :meth:`~.Relationship.Comparator.contains`.
785 * Compared to a scalar one-to-many, will produce a
786 clause that compares the target columns in the parent to
787 the given target.
788 * Compared to a scalar many-to-many, an alias
789 of the association table will be rendered as
790 well, forming a natural join that is part of the
791 main body of the query. This will not work for
792 queries that go beyond simple AND conjunctions of
793 comparisons, such as those which use OR. Use
794 explicit joins, outerjoins, or
795 :meth:`~.Relationship.Comparator.has` for
796 more comprehensive non-many-to-one scalar
797 membership tests.
798 * Comparisons against ``None`` given in a one-to-many
799 or many-to-many context produce a NOT EXISTS clause.
800
801 """
802 if other is None or isinstance(other, expression.Null):
803 if self.property.direction in [ONETOMANY, MANYTOMANY]:
804 return ~self._criterion_exists()
805 else:
806 return self.property._optimized_compare(
807 None, adapt_source=self.adapter
808 )
809 elif self.property.uselist:
810 raise sa_exc.InvalidRequestError(
811 "Can't compare a collection to an object or collection; "
812 "use contains() to test for membership."
813 )
814 else:
815 return self.property._optimized_compare(
816 other, adapt_source=self.adapter
817 )
818
819 def _criterion_exists(
820 self,
821 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
822 **kwargs: Any,
823 ) -> Exists:
824 where_criteria = (
825 coercions.expect(roles.WhereHavingRole, criterion)
826 if criterion is not None
827 else None
828 )
829
830 if getattr(self, "_of_type", None):
831 info: Optional[_InternalEntityType[Any]] = inspect(
832 self._of_type
833 )
834 assert info is not None
835 target_mapper, to_selectable, is_aliased_class = (
836 info.mapper,
837 info.selectable,
838 info.is_aliased_class,
839 )
840 if self.property._is_self_referential and not is_aliased_class:
841 to_selectable = to_selectable._anonymous_fromclause()
842
843 single_crit = target_mapper._single_table_criterion
844 if single_crit is not None:
845 if where_criteria is not None:
846 where_criteria = single_crit & where_criteria
847 else:
848 where_criteria = single_crit
849 else:
850 is_aliased_class = False
851 to_selectable = None
852
853 if self.adapter:
854 source_selectable = self._source_selectable()
855 else:
856 source_selectable = None
857
858 (
859 pj,
860 sj,
861 source,
862 dest,
863 secondary,
864 target_adapter,
865 ) = self.property._create_joins(
866 dest_selectable=to_selectable,
867 source_selectable=source_selectable,
868 )
869
870 for k in kwargs:
871 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
872 if where_criteria is None:
873 where_criteria = crit
874 else:
875 where_criteria = where_criteria & crit
876
877 # annotate the *local* side of the join condition, in the case
878 # of pj + sj this is the full primaryjoin, in the case of just
879 # pj its the local side of the primaryjoin.
880 j: ColumnElement[bool]
881 if sj is not None:
882 j = pj & sj
883 else:
884 j = pj
885
886 if (
887 where_criteria is not None
888 and target_adapter
889 and not is_aliased_class
890 ):
891 # limit this adapter to annotated only?
892 where_criteria = target_adapter.traverse(where_criteria)
893
894 # only have the "joined left side" of what we
895 # return be subject to Query adaption. The right
896 # side of it is used for an exists() subquery and
897 # should not correlate or otherwise reach out
898 # to anything in the enclosing query.
899 if where_criteria is not None:
900 where_criteria = where_criteria._annotate(
901 {"no_replacement_traverse": True}
902 )
903
904 crit = j & sql.True_._ifnone(where_criteria)
905
906 if secondary is not None:
907 ex = (
908 sql.exists(1)
909 .where(crit)
910 .select_from(dest, secondary)
911 .correlate_except(dest, secondary)
912 )
913 else:
914 ex = (
915 sql.exists(1)
916 .where(crit)
917 .select_from(dest)
918 .correlate_except(dest)
919 )
920 return ex
921
922 def any(
923 self,
924 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
925 **kwargs: Any,
926 ) -> ColumnElement[bool]:
927 """Produce an expression that tests a collection against
928 particular criterion, using EXISTS.
929
930 An expression like::
931
932 session.query(MyClass).filter(
933 MyClass.somereference.any(SomeRelated.x == 2)
934 )
935
936 Will produce a query like:
937
938 .. sourcecode:: sql
939
940 SELECT * FROM my_table WHERE
941 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
942 AND related.x=2)
943
944 Because :meth:`~.Relationship.Comparator.any` uses
945 a correlated subquery, its performance is not nearly as
946 good when compared against large target tables as that of
947 using a join.
948
949 :meth:`~.Relationship.Comparator.any` is particularly
950 useful for testing for empty collections::
951
952 session.query(MyClass).filter(~MyClass.somereference.any())
953
954 will produce:
955
956 .. sourcecode:: sql
957
958 SELECT * FROM my_table WHERE
959 NOT (EXISTS (SELECT 1 FROM related WHERE
960 related.my_id=my_table.id))
961
962 :meth:`~.Relationship.Comparator.any` is only
963 valid for collections, i.e. a :func:`_orm.relationship`
964 that has ``uselist=True``. For scalar references,
965 use :meth:`~.Relationship.Comparator.has`.
966
967 """
968 if not self.property.uselist:
969 raise sa_exc.InvalidRequestError(
970 "'any()' not implemented for scalar "
971 "attributes. Use has()."
972 )
973
974 return self._criterion_exists(criterion, **kwargs)
975
976 def has(
977 self,
978 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
979 **kwargs: Any,
980 ) -> ColumnElement[bool]:
981 """Produce an expression that tests a scalar reference against
982 particular criterion, using EXISTS.
983
984 An expression like::
985
986 session.query(MyClass).filter(
987 MyClass.somereference.has(SomeRelated.x == 2)
988 )
989
990 Will produce a query like:
991
992 .. sourcecode:: sql
993
994 SELECT * FROM my_table WHERE
995 EXISTS (SELECT 1 FROM related WHERE
996 related.id==my_table.related_id AND related.x=2)
997
998 Because :meth:`~.Relationship.Comparator.has` uses
999 a correlated subquery, its performance is not nearly as
1000 good when compared against large target tables as that of
1001 using a join.
1002
1003 :meth:`~.Relationship.Comparator.has` is only
1004 valid for scalar references, i.e. a :func:`_orm.relationship`
1005 that has ``uselist=False``. For collection references,
1006 use :meth:`~.Relationship.Comparator.any`.
1007
1008 """
1009 if self.property.uselist:
1010 raise sa_exc.InvalidRequestError(
1011 "'has()' not implemented for collections. Use any()."
1012 )
1013 return self._criterion_exists(criterion, **kwargs)
1014
1015 def contains(
1016 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
1017 ) -> ColumnElement[bool]:
1018 """Return a simple expression that tests a collection for
1019 containment of a particular item.
1020
1021 :meth:`~.Relationship.Comparator.contains` is
1022 only valid for a collection, i.e. a
1023 :func:`_orm.relationship` that implements
1024 one-to-many or many-to-many with ``uselist=True``.
1025
1026 When used in a simple one-to-many context, an
1027 expression like::
1028
1029 MyClass.contains(other)
1030
1031 Produces a clause like:
1032
1033 .. sourcecode:: sql
1034
1035 mytable.id == <some id>
1036
1037 Where ``<some id>`` is the value of the foreign key
1038 attribute on ``other`` which refers to the primary
1039 key of its parent object. From this it follows that
1040 :meth:`~.Relationship.Comparator.contains` is
1041 very useful when used with simple one-to-many
1042 operations.
1043
1044 For many-to-many operations, the behavior of
1045 :meth:`~.Relationship.Comparator.contains`
1046 has more caveats. The association table will be
1047 rendered in the statement, producing an "implicit"
1048 join, that is, includes multiple tables in the FROM
1049 clause which are equated in the WHERE clause::
1050
1051 query(MyClass).filter(MyClass.contains(other))
1052
1053 Produces a query like:
1054
1055 .. sourcecode:: sql
1056
1057 SELECT * FROM my_table, my_association_table AS
1058 my_association_table_1 WHERE
1059 my_table.id = my_association_table_1.parent_id
1060 AND my_association_table_1.child_id = <some id>
1061
1062 Where ``<some id>`` would be the primary key of
1063 ``other``. From the above, it is clear that
1064 :meth:`~.Relationship.Comparator.contains`
1065 will **not** work with many-to-many collections when
1066 used in queries that move beyond simple AND
1067 conjunctions, such as multiple
1068 :meth:`~.Relationship.Comparator.contains`
1069 expressions joined by OR. In such cases subqueries or
1070 explicit "outer joins" will need to be used instead.
1071 See :meth:`~.Relationship.Comparator.any` for
1072 a less-performant alternative using EXISTS, or refer
1073 to :meth:`_query.Query.outerjoin`
1074 as well as :ref:`orm_queryguide_joins`
1075 for more details on constructing outer joins.
1076
1077 kwargs may be ignored by this operator but are required for API
1078 conformance.
1079 """
1080 if not self.prop.uselist:
1081 raise sa_exc.InvalidRequestError(
1082 "'contains' not implemented for scalar "
1083 "attributes. Use =="
1084 )
1085
1086 clause = self.prop._optimized_compare(
1087 other, adapt_source=self.adapter
1088 )
1089
1090 if self.prop.secondaryjoin is not None:
1091 clause.negation_clause = self.__negated_contains_or_equals(
1092 other
1093 )
1094
1095 return clause
1096
1097 def __negated_contains_or_equals(
1098 self, other: Any
1099 ) -> ColumnElement[bool]:
1100 if self.prop.direction == MANYTOONE:
1101 state = attributes.instance_state(other)
1102
1103 def state_bindparam(
1104 local_col: ColumnElement[Any],
1105 state: InstanceState[Any],
1106 remote_col: ColumnElement[Any],
1107 ) -> BindParameter[Any]:
1108 dict_ = state.dict
1109 return sql.bindparam(
1110 local_col.key,
1111 type_=local_col.type,
1112 unique=True,
1113 callable_=self.prop._get_attr_w_warn_on_none(
1114 self.prop.mapper, state, dict_, remote_col
1115 ),
1116 )
1117
1118 def adapt(col: _CE) -> _CE:
1119 if self.adapter:
1120 return self.adapter(col)
1121 else:
1122 return col
1123
1124 if self.property._use_get:
1125 return sql.and_(
1126 *[
1127 sql.or_(
1128 adapt(x)
1129 != state_bindparam(adapt(x), state, y),
1130 adapt(x) == None,
1131 )
1132 for (x, y) in self.property.local_remote_pairs
1133 ]
1134 )
1135
1136 criterion = sql.and_(
1137 *[
1138 x == y
1139 for (x, y) in zip(
1140 self.property.mapper.primary_key,
1141 self.property.mapper.primary_key_from_instance(other),
1142 )
1143 ]
1144 )
1145
1146 return ~self._criterion_exists(criterion)
1147
1148 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1149 """Implement the ``!=`` operator.
1150
1151 In a many-to-one context, such as:
1152
1153 .. sourcecode:: text
1154
1155 MyClass.some_prop != <some object>
1156
1157 This will typically produce a clause such as:
1158
1159 .. sourcecode:: sql
1160
1161 mytable.related_id != <some id>
1162
1163 Where ``<some id>`` is the primary key of the
1164 given object.
1165
1166 The ``!=`` operator provides partial functionality for non-
1167 many-to-one comparisons:
1168
1169 * Comparisons against collections are not supported.
1170 Use
1171 :meth:`~.Relationship.Comparator.contains`
1172 in conjunction with :func:`_expression.not_`.
1173 * Compared to a scalar one-to-many, will produce a
1174 clause that compares the target columns in the parent to
1175 the given target.
1176 * Compared to a scalar many-to-many, an alias
1177 of the association table will be rendered as
1178 well, forming a natural join that is part of the
1179 main body of the query. This will not work for
1180 queries that go beyond simple AND conjunctions of
1181 comparisons, such as those which use OR. Use
1182 explicit joins, outerjoins, or
1183 :meth:`~.Relationship.Comparator.has` in
1184 conjunction with :func:`_expression.not_` for
1185 more comprehensive non-many-to-one scalar
1186 membership tests.
1187 * Comparisons against ``None`` given in a one-to-many
1188 or many-to-many context produce an EXISTS clause.
1189
1190 """
1191 if other is None or isinstance(other, expression.Null):
1192 if self.property.direction == MANYTOONE:
1193 return ~self.property._optimized_compare(
1194 None, adapt_source=self.adapter
1195 )
1196
1197 else:
1198 return self._criterion_exists()
1199 elif self.property.uselist:
1200 raise sa_exc.InvalidRequestError(
1201 "Can't compare a collection"
1202 " to an object or collection; use "
1203 "contains() to test for membership."
1204 )
1205 else:
1206 return self.__negated_contains_or_equals(other)
1207
1208 if TYPE_CHECKING:
1209 property: RelationshipProperty[_PT] # noqa: A001
1210
1211 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1212 self.prop.parent._check_configure()
1213 return self.prop
1214
1215 def _with_parent(
1216 self,
1217 instance: object,
1218 alias_secondary: bool = True,
1219 from_entity: Optional[_EntityType[Any]] = None,
1220 ) -> ColumnElement[bool]:
1221 assert instance is not None
1222 adapt_source: Optional[_CoreAdapterProto] = None
1223 if from_entity is not None:
1224 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1225 assert insp is not None
1226 if insp_is_aliased_class(insp):
1227 adapt_source = insp._adapter.adapt_clause
1228 return self._optimized_compare(
1229 instance,
1230 value_is_parent=True,
1231 adapt_source=adapt_source,
1232 alias_secondary=alias_secondary,
1233 )
1234
1235 def _optimized_compare(
1236 self,
1237 state: Any,
1238 value_is_parent: bool = False,
1239 adapt_source: Optional[_CoreAdapterProto] = None,
1240 alias_secondary: bool = True,
1241 ) -> ColumnElement[bool]:
1242 if state is not None:
1243 try:
1244 state = inspect(state)
1245 except sa_exc.NoInspectionAvailable:
1246 state = None
1247
1248 if state is None or not getattr(state, "is_instance", False):
1249 raise sa_exc.ArgumentError(
1250 "Mapped instance expected for relationship "
1251 "comparison to object. Classes, queries and other "
1252 "SQL elements are not accepted in this context; for "
1253 "comparison with a subquery, "
1254 "use %s.has(**criteria)." % self
1255 )
1256 reverse_direction = not value_is_parent
1257
1258 if state is None:
1259 return self._lazy_none_clause(
1260 reverse_direction, adapt_source=adapt_source
1261 )
1262
1263 if not reverse_direction:
1264 criterion, bind_to_col = (
1265 self._lazy_strategy._lazywhere,
1266 self._lazy_strategy._bind_to_col,
1267 )
1268 else:
1269 criterion, bind_to_col = (
1270 self._lazy_strategy._rev_lazywhere,
1271 self._lazy_strategy._rev_bind_to_col,
1272 )
1273
1274 if reverse_direction:
1275 mapper = self.mapper
1276 else:
1277 mapper = self.parent
1278
1279 dict_ = attributes.instance_dict(state.obj())
1280
1281 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1282 if bindparam._identifying_key in bind_to_col:
1283 bindparam.callable = self._get_attr_w_warn_on_none(
1284 mapper,
1285 state,
1286 dict_,
1287 bind_to_col[bindparam._identifying_key],
1288 )
1289
1290 if self.secondary is not None and alias_secondary:
1291 criterion = ClauseAdapter(
1292 self.secondary._anonymous_fromclause()
1293 ).traverse(criterion)
1294
1295 criterion = visitors.cloned_traverse(
1296 criterion, {}, {"bindparam": visit_bindparam}
1297 )
1298
1299 if adapt_source:
1300 criterion = adapt_source(criterion)
1301 return criterion
1302
1303 def _get_attr_w_warn_on_none(
1304 self,
1305 mapper: Mapper[Any],
1306 state: InstanceState[Any],
1307 dict_: _InstanceDict,
1308 column: ColumnElement[Any],
1309 ) -> Callable[[], Any]:
1310 """Create the callable that is used in a many-to-one expression.
1311
1312 E.g.::
1313
1314 u1 = s.query(User).get(5)
1315
1316 expr = Address.user == u1
1317
1318 Above, the SQL should be "address.user_id = 5". The callable
1319 returned by this method produces the value "5" based on the identity
1320 of ``u1``.
1321
1322 """
1323
1324 # in this callable, we're trying to thread the needle through
1325 # a wide variety of scenarios, including:
1326 #
1327 # * the object hasn't been flushed yet and there's no value for
1328 # the attribute as of yet
1329 #
1330 # * the object hasn't been flushed yet but it has a user-defined
1331 # value
1332 #
1333 # * the object has a value but it's expired and not locally present
1334 #
1335 # * the object has a value but it's expired and not locally present,
1336 # and the object is also detached
1337 #
1338 # * The object hadn't been flushed yet, there was no value, but
1339 # later, the object has been expired and detached, and *now*
1340 # they're trying to evaluate it
1341 #
1342 # * the object had a value, but it was changed to a new value, and
1343 # then expired
1344 #
1345 # * the object had a value, but it was changed to a new value, and
1346 # then expired, then the object was detached
1347 #
1348 # * the object has a user-set value, but it's None and we don't do
1349 # the comparison correctly for that so warn
1350 #
1351
1352 prop = mapper.get_property_by_column(column)
1353
1354 # by invoking this method, InstanceState will track the last known
1355 # value for this key each time the attribute is to be expired.
1356 # this feature was added explicitly for use in this method.
1357 state._track_last_known_value(prop.key)
1358
1359 lkv_fixed = state._last_known_values
1360
1361 def _go() -> Any:
1362 assert lkv_fixed is not None
1363 last_known = to_return = lkv_fixed[prop.key]
1364 existing_is_available = (
1365 last_known is not LoaderCallableStatus.NO_VALUE
1366 )
1367
1368 # we support that the value may have changed. so here we
1369 # try to get the most recent value including re-fetching.
1370 # only if we can't get a value now due to detachment do we return
1371 # the last known value
1372 current_value = mapper._get_state_attr_by_column(
1373 state,
1374 dict_,
1375 column,
1376 passive=(
1377 PassiveFlag.PASSIVE_OFF
1378 if state.persistent
1379 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1380 ),
1381 )
1382
1383 if current_value is LoaderCallableStatus.NEVER_SET:
1384 if not existing_is_available:
1385 raise sa_exc.InvalidRequestError(
1386 "Can't resolve value for column %s on object "
1387 "%s; no value has been set for this column"
1388 % (column, state_str(state))
1389 )
1390 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1391 if not existing_is_available:
1392 raise sa_exc.InvalidRequestError(
1393 "Can't resolve value for column %s on object "
1394 "%s; the object is detached and the value was "
1395 "expired" % (column, state_str(state))
1396 )
1397 else:
1398 to_return = current_value
1399 if to_return is None:
1400 util.warn(
1401 "Got None for value of column %s; this is unsupported "
1402 "for a relationship comparison and will not "
1403 "currently produce an IS comparison "
1404 "(but may in a future release)" % column
1405 )
1406 return to_return
1407
1408 return _go
1409
1410 def _lazy_none_clause(
1411 self,
1412 reverse_direction: bool = False,
1413 adapt_source: Optional[_CoreAdapterProto] = None,
1414 ) -> ColumnElement[bool]:
1415 if not reverse_direction:
1416 criterion, bind_to_col = (
1417 self._lazy_strategy._lazywhere,
1418 self._lazy_strategy._bind_to_col,
1419 )
1420 else:
1421 criterion, bind_to_col = (
1422 self._lazy_strategy._rev_lazywhere,
1423 self._lazy_strategy._rev_bind_to_col,
1424 )
1425
1426 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1427
1428 if adapt_source:
1429 criterion = adapt_source(criterion)
1430 return criterion
1431
1432 def _format_as_string(self, class_: type, key: str) -> str:
1433 return f"{class_.__name__}.{key}"
1434
1435 def __str__(self) -> str:
1436 return self._format_as_string(self.parent.class_, self.key)
1437
1438 def merge(
1439 self,
1440 session: Session,
1441 source_state: InstanceState[Any],
1442 source_dict: _InstanceDict,
1443 dest_state: InstanceState[Any],
1444 dest_dict: _InstanceDict,
1445 load: bool,
1446 _recursive: Dict[Any, object],
1447 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1448 ) -> None:
1449 if load:
1450 for r in self._reverse_property:
1451 if (source_state, r) in _recursive:
1452 return
1453
1454 if "merge" not in self._cascade:
1455 return
1456
1457 if self.key not in source_dict:
1458 return
1459
1460 if self.uselist:
1461 impl = source_state.get_impl(self.key)
1462
1463 assert is_has_collection_adapter(impl)
1464 instances_iterable = impl.get_collection(source_state, source_dict)
1465
1466 # if this is a CollectionAttributeImpl, then empty should
1467 # be False, otherwise "self.key in source_dict" should not be
1468 # True
1469 assert not instances_iterable.empty if impl.collection else True
1470
1471 if load:
1472 # for a full merge, pre-load the destination collection,
1473 # so that individual _merge of each item pulls from identity
1474 # map for those already present.
1475 # also assumes CollectionAttributeImpl behavior of loading
1476 # "old" list in any case
1477 dest_state.get_impl(self.key).get(
1478 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1479 )
1480
1481 dest_list = []
1482 for current in instances_iterable:
1483 current_state = attributes.instance_state(current)
1484 current_dict = attributes.instance_dict(current)
1485 _recursive[(current_state, self)] = True
1486 obj = session._merge(
1487 current_state,
1488 current_dict,
1489 load=load,
1490 _recursive=_recursive,
1491 _resolve_conflict_map=_resolve_conflict_map,
1492 )
1493 if obj is not None:
1494 dest_list.append(obj)
1495
1496 if not load:
1497 coll = attributes.init_state_collection(
1498 dest_state, dest_dict, self.key
1499 )
1500 for c in dest_list:
1501 coll.append_without_event(c)
1502 else:
1503 dest_impl = dest_state.get_impl(self.key)
1504 assert is_has_collection_adapter(dest_impl)
1505 dest_impl.set(
1506 dest_state,
1507 dest_dict,
1508 dest_list,
1509 _adapt=False,
1510 passive=PassiveFlag.PASSIVE_MERGE,
1511 )
1512 else:
1513 current = source_dict[self.key]
1514 if current is not None:
1515 current_state = attributes.instance_state(current)
1516 current_dict = attributes.instance_dict(current)
1517 _recursive[(current_state, self)] = True
1518 obj = session._merge(
1519 current_state,
1520 current_dict,
1521 load=load,
1522 _recursive=_recursive,
1523 _resolve_conflict_map=_resolve_conflict_map,
1524 )
1525 else:
1526 obj = None
1527
1528 if not load:
1529 dest_dict[self.key] = obj
1530 else:
1531 dest_state.get_impl(self.key).set(
1532 dest_state, dest_dict, obj, None
1533 )
1534
1535 def _value_as_iterable(
1536 self,
1537 state: InstanceState[_O],
1538 dict_: _InstanceDict,
1539 key: str,
1540 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1541 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1542 """Return a list of tuples (state, obj) for the given
1543 key.
1544
1545 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1546 """
1547
1548 impl = state.manager[key].impl
1549 x = impl.get(state, dict_, passive=passive)
1550 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1551 return []
1552 elif is_has_collection_adapter(impl):
1553 return [
1554 (attributes.instance_state(o), o)
1555 for o in impl.get_collection(state, dict_, x, passive=passive)
1556 ]
1557 else:
1558 return [(attributes.instance_state(x), x)]
1559
1560 def cascade_iterator(
1561 self,
1562 type_: str,
1563 state: InstanceState[Any],
1564 dict_: _InstanceDict,
1565 visited_states: Set[InstanceState[Any]],
1566 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1567 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1568 # assert type_ in self._cascade
1569
1570 # only actively lazy load on the 'delete' cascade
1571 if type_ != "delete" or self.passive_deletes:
1572 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1573 else:
1574 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1575
1576 if type_ == "save-update":
1577 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1578 else:
1579 tuples = self._value_as_iterable(
1580 state, dict_, self.key, passive=passive
1581 )
1582
1583 skip_pending = (
1584 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1585 )
1586
1587 for instance_state, c in tuples:
1588 if instance_state in visited_states:
1589 continue
1590
1591 if c is None:
1592 # would like to emit a warning here, but
1593 # would not be consistent with collection.append(None)
1594 # current behavior of silently skipping.
1595 # see [ticket:2229]
1596 continue
1597
1598 assert instance_state is not None
1599 instance_dict = attributes.instance_dict(c)
1600
1601 if halt_on and halt_on(instance_state):
1602 continue
1603
1604 if skip_pending and not instance_state.key:
1605 continue
1606
1607 instance_mapper = instance_state.manager.mapper
1608
1609 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1610 raise AssertionError(
1611 "Attribute '%s' on class '%s' "
1612 "doesn't handle objects "
1613 "of type '%s'"
1614 % (self.key, self.parent.class_, c.__class__)
1615 )
1616
1617 visited_states.add(instance_state)
1618
1619 yield c, instance_mapper, instance_state, instance_dict
1620
1621 @property
1622 def _effective_sync_backref(self) -> bool:
1623 if self.viewonly:
1624 return False
1625 else:
1626 return self.sync_backref is not False
1627
1628 @staticmethod
1629 def _check_sync_backref(
1630 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1631 ) -> None:
1632 if rel_a.viewonly and rel_b.sync_backref:
1633 raise sa_exc.InvalidRequestError(
1634 "Relationship %s cannot specify sync_backref=True since %s "
1635 "includes viewonly=True." % (rel_b, rel_a)
1636 )
1637 if (
1638 rel_a.viewonly
1639 and not rel_b.viewonly
1640 and rel_b.sync_backref is not False
1641 ):
1642 rel_b.sync_backref = False
1643
1644 def _add_reverse_property(self, key: str) -> None:
1645 other = self.mapper.get_property(key, _configure_mappers=False)
1646 if not isinstance(other, RelationshipProperty):
1647 raise sa_exc.InvalidRequestError(
1648 "back_populates on relationship '%s' refers to attribute '%s' "
1649 "that is not a relationship. The back_populates parameter "
1650 "should refer to the name of a relationship on the target "
1651 "class." % (self, other)
1652 )
1653 # viewonly and sync_backref cases
1654 # 1. self.viewonly==True and other.sync_backref==True -> error
1655 # 2. self.viewonly==True and other.viewonly==False and
1656 # other.sync_backref==None -> warn sync_backref=False, set to False
1657 self._check_sync_backref(self, other)
1658 # 3. other.viewonly==True and self.sync_backref==True -> error
1659 # 4. other.viewonly==True and self.viewonly==False and
1660 # self.sync_backref==None -> warn sync_backref=False, set to False
1661 self._check_sync_backref(other, self)
1662
1663 self._reverse_property.add(other)
1664 other._reverse_property.add(self)
1665
1666 other._setup_entity()
1667
1668 if not other.mapper.common_parent(self.parent):
1669 raise sa_exc.ArgumentError(
1670 "reverse_property %r on "
1671 "relationship %s references relationship %s, which "
1672 "does not reference mapper %s"
1673 % (key, self, other, self.parent)
1674 )
1675
1676 if (
1677 other._configure_started
1678 and self.direction in (ONETOMANY, MANYTOONE)
1679 and self.direction == other.direction
1680 ):
1681 raise sa_exc.ArgumentError(
1682 "%s and back-reference %s are "
1683 "both of the same direction %r. Did you mean to "
1684 "set remote_side on the many-to-one side ?"
1685 % (other, self, self.direction)
1686 )
1687
1688 @util.memoized_property
1689 def entity(self) -> _InternalEntityType[_T]:
1690 """Return the target mapped entity, which is an inspect() of the
1691 class or aliased class that is referenced by this
1692 :class:`.RelationshipProperty`.
1693
1694 """
1695 self.parent._check_configure()
1696 return self.entity
1697
1698 @util.memoized_property
1699 def mapper(self) -> Mapper[_T]:
1700 """Return the targeted :class:`_orm.Mapper` for this
1701 :class:`.RelationshipProperty`.
1702
1703 """
1704 return self.entity.mapper
1705
1706 def do_init(self) -> None:
1707 self._process_dependent_arguments()
1708 self._setup_entity()
1709 self._setup_registry_dependencies()
1710 self._setup_join_conditions()
1711 self._check_cascade_settings(self._cascade)
1712 self._post_init()
1713 self._generate_backref()
1714 self._join_condition._warn_for_conflicting_sync_targets()
1715 super().do_init()
1716 self._lazy_strategy = cast(
1717 "_LazyLoader", self._get_strategy((("lazy", "select"),))
1718 )
1719
1720 def _setup_registry_dependencies(self) -> None:
1721 self.parent.mapper.registry._set_depends_on(
1722 self.entity.mapper.registry
1723 )
1724
1725 def _process_dependent_arguments(self) -> None:
1726 """Convert incoming configuration arguments to their
1727 proper form.
1728
1729 Callables are resolved, ORM annotations removed.
1730
1731 """
1732
1733 # accept callables for other attributes which may require
1734 # deferred initialization. This technique is used
1735 # by declarative "string configs" and some recipes.
1736 init_args = self._init_args
1737
1738 for attr in (
1739 "order_by",
1740 "primaryjoin",
1741 "secondaryjoin",
1742 "secondary",
1743 "foreign_keys",
1744 "remote_side",
1745 "back_populates",
1746 ):
1747 rel_arg = getattr(init_args, attr)
1748
1749 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1750
1751 # remove "annotations" which are present if mapped class
1752 # descriptors are used to create the join expression.
1753 for attr in "primaryjoin", "secondaryjoin":
1754 rel_arg = getattr(init_args, attr)
1755 val = rel_arg.resolved
1756 if val is not None:
1757 rel_arg.resolved = coercions.expect(
1758 roles.ColumnArgumentRole, val, argname=attr
1759 )
1760
1761 secondary = init_args.secondary.resolved
1762 if secondary is not None and _is_mapped_class(secondary):
1763 raise sa_exc.ArgumentError(
1764 "secondary argument %s passed to to relationship() %s must "
1765 "be a Table object or other FROM clause; can't send a mapped "
1766 "class directly as rows in 'secondary' are persisted "
1767 "independently of a class that is mapped "
1768 "to that same table." % (secondary, self)
1769 )
1770
1771 # ensure expressions in self.order_by, foreign_keys,
1772 # remote_side are all columns, not strings.
1773 if (
1774 init_args.order_by.resolved is not False
1775 and init_args.order_by.resolved is not None
1776 ):
1777 self.order_by = tuple(
1778 coercions.expect(
1779 roles.ColumnArgumentRole, x, argname="order_by"
1780 )
1781 for x in util.to_list(init_args.order_by.resolved)
1782 )
1783 else:
1784 self.order_by = False
1785
1786 self._user_defined_foreign_keys = util.column_set(
1787 coercions.expect(
1788 roles.ColumnArgumentRole, x, argname="foreign_keys"
1789 )
1790 for x in util.to_column_set(init_args.foreign_keys.resolved)
1791 )
1792
1793 self.remote_side = util.column_set(
1794 coercions.expect(
1795 roles.ColumnArgumentRole, x, argname="remote_side"
1796 )
1797 for x in util.to_column_set(init_args.remote_side.resolved)
1798 )
1799
1800 def declarative_scan(
1801 self,
1802 decl_scan: _ClassScanMapperConfig,
1803 registry: _RegistryType,
1804 cls: Type[Any],
1805 originating_module: Optional[str],
1806 key: str,
1807 mapped_container: Optional[Type[Mapped[Any]]],
1808 annotation: Optional[_AnnotationScanType],
1809 extracted_mapped_annotation: Optional[_AnnotationScanType],
1810 is_dataclass_field: bool,
1811 ) -> None:
1812 if extracted_mapped_annotation is None:
1813 if self.argument is None:
1814 self._raise_for_required(key, cls)
1815 else:
1816 return
1817
1818 argument = extracted_mapped_annotation
1819 assert originating_module is not None
1820
1821 if mapped_container is not None:
1822 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1823 is_dynamic = issubclass(mapped_container, DynamicMapped)
1824 if is_write_only:
1825 self.lazy = "write_only"
1826 self.strategy_key = (("lazy", self.lazy),)
1827 elif is_dynamic:
1828 self.lazy = "dynamic"
1829 self.strategy_key = (("lazy", self.lazy),)
1830 else:
1831 is_write_only = is_dynamic = False
1832
1833 argument = de_optionalize_union_types(argument)
1834
1835 if hasattr(argument, "__origin__"):
1836 arg_origin = argument.__origin__
1837 if isinstance(arg_origin, type) and issubclass(
1838 arg_origin, abc.Collection
1839 ):
1840 if self.collection_class is None:
1841 if _py_inspect.isabstract(arg_origin):
1842 raise sa_exc.ArgumentError(
1843 f"Collection annotation type {arg_origin} cannot "
1844 "be instantiated; please provide an explicit "
1845 "'collection_class' parameter "
1846 "(e.g. list, set, etc.) to the "
1847 "relationship() function to accompany this "
1848 "annotation"
1849 )
1850
1851 self.collection_class = arg_origin
1852
1853 elif not is_write_only and not is_dynamic:
1854 self.uselist = False
1855
1856 if argument.__args__: # type: ignore
1857 if isinstance(arg_origin, type) and issubclass(
1858 arg_origin, typing.Mapping
1859 ):
1860 type_arg = argument.__args__[-1] # type: ignore
1861 else:
1862 type_arg = argument.__args__[0] # type: ignore
1863 if hasattr(type_arg, "__forward_arg__"):
1864 str_argument = type_arg.__forward_arg__
1865
1866 argument = resolve_name_to_real_class_name(
1867 str_argument, originating_module
1868 )
1869 else:
1870 argument = type_arg
1871 else:
1872 raise sa_exc.ArgumentError(
1873 f"Generic alias {argument} requires an argument"
1874 )
1875 elif hasattr(argument, "__forward_arg__"):
1876 argument = argument.__forward_arg__
1877
1878 argument = resolve_name_to_real_class_name(
1879 argument, originating_module
1880 )
1881
1882 if (
1883 self.collection_class is None
1884 and not is_write_only
1885 and not is_dynamic
1886 ):
1887 self.uselist = False
1888
1889 # ticket #8759
1890 # if a lead argument was given to relationship(), like
1891 # `relationship("B")`, use that, don't replace it with class we
1892 # found in the annotation. The declarative_scan() method call here is
1893 # still useful, as we continue to derive collection type and do
1894 # checking of the annotation in any case.
1895 if self.argument is None:
1896 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1897
1898 if (
1899 self._attribute_options.dataclasses_default_factory
1900 is not _NoArg.NO_ARG
1901 and self._attribute_options.dataclasses_default_factory
1902 is not self.collection_class
1903 ):
1904 raise sa_exc.ArgumentError(
1905 f"For relationship {self._format_as_string(cls, key)} using "
1906 "dataclass options, default_factory must be exactly "
1907 f"{self.collection_class}"
1908 )
1909
1910 @util.preload_module("sqlalchemy.orm.mapper")
1911 def _setup_entity(self, __argument: Any = None, /) -> None:
1912 if "entity" in self.__dict__:
1913 return
1914
1915 mapperlib = util.preloaded.orm_mapper
1916
1917 if __argument:
1918 argument = __argument
1919 else:
1920 argument = self.argument
1921
1922 resolved_argument: _ExternalEntityType[Any]
1923
1924 if isinstance(argument, str):
1925 # we might want to cleanup clsregistry API to make this
1926 # more straightforward
1927 resolved_argument = cast(
1928 "_ExternalEntityType[Any]",
1929 self._clsregistry_resolve_name(argument)(),
1930 )
1931 elif callable(argument) and not isinstance(
1932 argument, (type, mapperlib.Mapper)
1933 ):
1934 resolved_argument = argument()
1935 else:
1936 resolved_argument = argument
1937
1938 entity: _InternalEntityType[Any]
1939
1940 if isinstance(resolved_argument, type):
1941 entity = class_mapper(resolved_argument, configure=False)
1942 else:
1943 try:
1944 entity = inspect(resolved_argument)
1945 except sa_exc.NoInspectionAvailable:
1946 entity = None # type: ignore
1947
1948 if not hasattr(entity, "mapper"):
1949 raise sa_exc.ArgumentError(
1950 "relationship '%s' expects "
1951 "a class or a mapper argument (received: %s)"
1952 % (self.key, type(resolved_argument))
1953 )
1954
1955 self.entity = entity
1956 self.target = self.entity.persist_selectable
1957
1958 def _setup_join_conditions(self) -> None:
1959 self._join_condition = jc = _JoinCondition(
1960 parent_persist_selectable=self.parent.persist_selectable,
1961 child_persist_selectable=self.entity.persist_selectable,
1962 parent_local_selectable=self.parent.local_table,
1963 child_local_selectable=self.entity.local_table,
1964 primaryjoin=self._init_args.primaryjoin.resolved,
1965 secondary=self._init_args.secondary.resolved,
1966 secondaryjoin=self._init_args.secondaryjoin.resolved,
1967 parent_equivalents=self.parent._equivalent_columns,
1968 child_equivalents=self.mapper._equivalent_columns,
1969 consider_as_foreign_keys=self._user_defined_foreign_keys,
1970 local_remote_pairs=self.local_remote_pairs,
1971 remote_side=self.remote_side,
1972 self_referential=self._is_self_referential,
1973 prop=self,
1974 support_sync=not self.viewonly,
1975 can_be_synced_fn=self._columns_are_mapped,
1976 )
1977 self.primaryjoin = jc.primaryjoin
1978 self.secondaryjoin = jc.secondaryjoin
1979 self.secondary = jc.secondary
1980 self.direction = jc.direction
1981 self.local_remote_pairs = jc.local_remote_pairs
1982 self.remote_side = jc.remote_columns
1983 self.local_columns = jc.local_columns
1984 self.synchronize_pairs = jc.synchronize_pairs
1985 self._calculated_foreign_keys = jc.foreign_key_columns
1986 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1987
1988 @property
1989 def _clsregistry_resolve_arg(
1990 self,
1991 ) -> Callable[[str, bool], _class_resolver]:
1992 return self._clsregistry_resolvers[1]
1993
1994 @property
1995 def _clsregistry_resolve_name(
1996 self,
1997 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
1998 return self._clsregistry_resolvers[0]
1999
2000 @util.memoized_property
2001 @util.preload_module("sqlalchemy.orm.clsregistry")
2002 def _clsregistry_resolvers(
2003 self,
2004 ) -> Tuple[
2005 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
2006 Callable[[str, bool], _class_resolver],
2007 ]:
2008 _resolver = util.preloaded.orm_clsregistry._resolver
2009
2010 return _resolver(self.parent.class_, self)
2011
2012 @property
2013 def cascade(self) -> CascadeOptions:
2014 """Return the current cascade setting for this
2015 :class:`.RelationshipProperty`.
2016 """
2017 return self._cascade
2018
2019 @cascade.setter
2020 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
2021 self._set_cascade(cascade)
2022
2023 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
2024 cascade = CascadeOptions(cascade_arg)
2025
2026 if self.viewonly:
2027 cascade = CascadeOptions(
2028 cascade.intersection(CascadeOptions._viewonly_cascades)
2029 )
2030
2031 if "mapper" in self.__dict__:
2032 self._check_cascade_settings(cascade)
2033 self._cascade = cascade
2034
2035 if self._dependency_processor:
2036 self._dependency_processor.cascade = cascade
2037
2038 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
2039 if (
2040 cascade.delete_orphan
2041 and not self.single_parent
2042 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
2043 ):
2044 raise sa_exc.ArgumentError(
2045 "For %(direction)s relationship %(rel)s, delete-orphan "
2046 "cascade is normally "
2047 'configured only on the "one" side of a one-to-many '
2048 "relationship, "
2049 'and not on the "many" side of a many-to-one or many-to-many '
2050 "relationship. "
2051 "To force this relationship to allow a particular "
2052 '"%(relatedcls)s" object to be referenced by only '
2053 'a single "%(clsname)s" object at a time via the '
2054 "%(rel)s relationship, which "
2055 "would allow "
2056 "delete-orphan cascade to take place in this direction, set "
2057 "the single_parent=True flag."
2058 % {
2059 "rel": self,
2060 "direction": (
2061 "many-to-one"
2062 if self.direction is MANYTOONE
2063 else "many-to-many"
2064 ),
2065 "clsname": self.parent.class_.__name__,
2066 "relatedcls": self.mapper.class_.__name__,
2067 },
2068 code="bbf0",
2069 )
2070
2071 if self.passive_deletes == "all" and (
2072 "delete" in cascade or "delete-orphan" in cascade
2073 ):
2074 raise sa_exc.ArgumentError(
2075 "On %s, can't set passive_deletes='all' in conjunction "
2076 "with 'delete' or 'delete-orphan' cascade" % self
2077 )
2078
2079 if cascade.delete_orphan:
2080 self.mapper.primary_mapper()._delete_orphans.append(
2081 (self.key, self.parent.class_)
2082 )
2083
2084 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2085 """Return True if this property will persist values on behalf
2086 of the given mapper.
2087
2088 """
2089
2090 return (
2091 self.key in mapper.relationships
2092 and mapper.relationships[self.key] is self
2093 )
2094
2095 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2096 """Return True if all columns in the given collection are
2097 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2098
2099 """
2100
2101 secondary = self._init_args.secondary.resolved
2102 for c in cols:
2103 if secondary is not None and secondary.c.contains_column(c):
2104 continue
2105 if not self.parent.persist_selectable.c.contains_column(
2106 c
2107 ) and not self.target.c.contains_column(c):
2108 return False
2109 return True
2110
2111 def _generate_backref(self) -> None:
2112 """Interpret the 'backref' instruction to create a
2113 :func:`_orm.relationship` complementary to this one."""
2114
2115 resolve_back_populates = self._init_args.back_populates.resolved
2116
2117 if self.backref is not None and not resolve_back_populates:
2118 kwargs: Dict[str, Any]
2119 if isinstance(self.backref, str):
2120 backref_key, kwargs = self.backref, {}
2121 else:
2122 backref_key, kwargs = self.backref
2123 mapper = self.mapper.primary_mapper()
2124
2125 if not mapper.concrete:
2126 check = set(mapper.iterate_to_root()).union(
2127 mapper.self_and_descendants
2128 )
2129 for m in check:
2130 if m.has_property(backref_key) and not m.concrete:
2131 raise sa_exc.ArgumentError(
2132 "Error creating backref "
2133 "'%s' on relationship '%s': property of that "
2134 "name exists on mapper '%s'"
2135 % (backref_key, self, m)
2136 )
2137
2138 # determine primaryjoin/secondaryjoin for the
2139 # backref. Use the one we had, so that
2140 # a custom join doesn't have to be specified in
2141 # both directions.
2142 if self.secondary is not None:
2143 # for many to many, just switch primaryjoin/
2144 # secondaryjoin. use the annotated
2145 # pj/sj on the _join_condition.
2146 pj = kwargs.pop(
2147 "primaryjoin",
2148 self._join_condition.secondaryjoin_minus_local,
2149 )
2150 sj = kwargs.pop(
2151 "secondaryjoin",
2152 self._join_condition.primaryjoin_minus_local,
2153 )
2154 else:
2155 pj = kwargs.pop(
2156 "primaryjoin",
2157 self._join_condition.primaryjoin_reverse_remote,
2158 )
2159 sj = kwargs.pop("secondaryjoin", None)
2160 if sj:
2161 raise sa_exc.InvalidRequestError(
2162 "Can't assign 'secondaryjoin' on a backref "
2163 "against a non-secondary relationship."
2164 )
2165
2166 foreign_keys = kwargs.pop(
2167 "foreign_keys", self._user_defined_foreign_keys
2168 )
2169 parent = self.parent.primary_mapper()
2170 kwargs.setdefault("viewonly", self.viewonly)
2171 kwargs.setdefault("post_update", self.post_update)
2172 kwargs.setdefault("passive_updates", self.passive_updates)
2173 kwargs.setdefault("sync_backref", self.sync_backref)
2174 self.back_populates = backref_key
2175 relationship = RelationshipProperty(
2176 parent,
2177 self.secondary,
2178 primaryjoin=pj,
2179 secondaryjoin=sj,
2180 foreign_keys=foreign_keys,
2181 back_populates=self.key,
2182 **kwargs,
2183 )
2184 mapper._configure_property(
2185 backref_key, relationship, warn_for_existing=True
2186 )
2187
2188 if resolve_back_populates:
2189 if isinstance(resolve_back_populates, PropComparator):
2190 back_populates = resolve_back_populates.prop.key
2191 elif isinstance(resolve_back_populates, str):
2192 back_populates = resolve_back_populates
2193 else:
2194 # need test coverage for this case as well
2195 raise sa_exc.ArgumentError(
2196 f"Invalid back_populates value: {resolve_back_populates!r}"
2197 )
2198
2199 self._add_reverse_property(back_populates)
2200
2201 @util.preload_module("sqlalchemy.orm.dependency")
2202 def _post_init(self) -> None:
2203 dependency = util.preloaded.orm_dependency
2204
2205 if self.uselist is None:
2206 self.uselist = self.direction is not MANYTOONE
2207 if not self.viewonly:
2208 self._dependency_processor = ( # type: ignore
2209 dependency._DependencyProcessor.from_relationship
2210 )(self)
2211
2212 if (
2213 self.uselist
2214 and self._attribute_options.dataclasses_default
2215 is not _NoArg.NO_ARG
2216 ):
2217 raise sa_exc.ArgumentError(
2218 f"On relationship {self}, the dataclass default for "
2219 "relationship may only be set for "
2220 "a relationship that references a scalar value, i.e. "
2221 "many-to-one or explicitly uselist=False"
2222 )
2223
2224 @util.memoized_property
2225 def _use_get(self) -> bool:
2226 """memoize the 'use_get' attribute of this RelationshipLoader's
2227 lazyloader."""
2228
2229 strategy = self._lazy_strategy
2230 return strategy.use_get
2231
2232 @util.memoized_property
2233 def _is_self_referential(self) -> bool:
2234 return self.mapper.common_parent(self.parent)
2235
2236 def _create_joins(
2237 self,
2238 source_polymorphic: bool = False,
2239 source_selectable: Optional[FromClause] = None,
2240 dest_selectable: Optional[FromClause] = None,
2241 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2242 alias_secondary: bool = False,
2243 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2244 ) -> Tuple[
2245 ColumnElement[bool],
2246 Optional[ColumnElement[bool]],
2247 FromClause,
2248 FromClause,
2249 Optional[FromClause],
2250 Optional[ClauseAdapter],
2251 ]:
2252 aliased = False
2253
2254 if alias_secondary and self.secondary is not None:
2255 aliased = True
2256
2257 if source_selectable is None:
2258 if source_polymorphic and self.parent.with_polymorphic:
2259 source_selectable = self.parent._with_polymorphic_selectable
2260
2261 if of_type_entity:
2262 dest_mapper = of_type_entity.mapper
2263 if dest_selectable is None:
2264 dest_selectable = of_type_entity.selectable
2265 aliased = True
2266 else:
2267 dest_mapper = self.mapper
2268
2269 if dest_selectable is None:
2270 dest_selectable = self.entity.selectable
2271 if self.mapper.with_polymorphic:
2272 aliased = True
2273
2274 if self._is_self_referential and source_selectable is None:
2275 dest_selectable = dest_selectable._anonymous_fromclause()
2276 aliased = True
2277 elif (
2278 dest_selectable is not self.mapper._with_polymorphic_selectable
2279 or self.mapper.with_polymorphic
2280 ):
2281 aliased = True
2282
2283 single_crit = dest_mapper._single_table_criterion
2284 aliased = aliased or (
2285 source_selectable is not None
2286 and (
2287 source_selectable
2288 is not self.parent._with_polymorphic_selectable
2289 or source_selectable._is_subquery
2290 )
2291 )
2292
2293 (
2294 primaryjoin,
2295 secondaryjoin,
2296 secondary,
2297 target_adapter,
2298 dest_selectable,
2299 ) = self._join_condition.join_targets(
2300 source_selectable,
2301 dest_selectable,
2302 aliased,
2303 single_crit,
2304 extra_criteria,
2305 )
2306 if source_selectable is None:
2307 source_selectable = self.parent.local_table
2308 if dest_selectable is None:
2309 dest_selectable = self.entity.local_table
2310 return (
2311 primaryjoin,
2312 secondaryjoin,
2313 source_selectable,
2314 dest_selectable,
2315 secondary,
2316 target_adapter,
2317 )
2318
2319
2320def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2321 def clone(elem: _CE) -> _CE:
2322 if isinstance(elem, expression.ColumnClause):
2323 elem = elem._annotate(annotations.copy()) # type: ignore
2324 elem._copy_internals(clone=clone)
2325 return elem
2326
2327 if element is not None:
2328 element = clone(element)
2329 clone = None # type: ignore # remove gc cycles
2330 return element
2331
2332
2333class _JoinCondition:
2334 primaryjoin_initial: Optional[ColumnElement[bool]]
2335 primaryjoin: ColumnElement[bool]
2336 secondaryjoin: Optional[ColumnElement[bool]]
2337 secondary: Optional[FromClause]
2338 prop: RelationshipProperty[Any]
2339
2340 synchronize_pairs: _ColumnPairs
2341 secondary_synchronize_pairs: _ColumnPairs
2342 direction: RelationshipDirection
2343
2344 parent_persist_selectable: FromClause
2345 child_persist_selectable: FromClause
2346 parent_local_selectable: FromClause
2347 child_local_selectable: FromClause
2348
2349 _local_remote_pairs: Optional[_ColumnPairs]
2350
2351 def __init__(
2352 self,
2353 parent_persist_selectable: FromClause,
2354 child_persist_selectable: FromClause,
2355 parent_local_selectable: FromClause,
2356 child_local_selectable: FromClause,
2357 *,
2358 primaryjoin: Optional[ColumnElement[bool]] = None,
2359 secondary: Optional[FromClause] = None,
2360 secondaryjoin: Optional[ColumnElement[bool]] = None,
2361 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2362 child_equivalents: Optional[_EquivalentColumnMap] = None,
2363 consider_as_foreign_keys: Any = None,
2364 local_remote_pairs: Optional[_ColumnPairs] = None,
2365 remote_side: Any = None,
2366 self_referential: Any = False,
2367 prop: RelationshipProperty[Any],
2368 support_sync: bool = True,
2369 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2370 ):
2371 self.parent_persist_selectable = parent_persist_selectable
2372 self.parent_local_selectable = parent_local_selectable
2373 self.child_persist_selectable = child_persist_selectable
2374 self.child_local_selectable = child_local_selectable
2375 self.parent_equivalents = parent_equivalents
2376 self.child_equivalents = child_equivalents
2377 self.primaryjoin_initial = primaryjoin
2378 self.secondaryjoin = secondaryjoin
2379 self.secondary = secondary
2380 self.consider_as_foreign_keys = consider_as_foreign_keys
2381 self._local_remote_pairs = local_remote_pairs
2382 self._remote_side = remote_side
2383 self.prop = prop
2384 self.self_referential = self_referential
2385 self.support_sync = support_sync
2386 self.can_be_synced_fn = can_be_synced_fn
2387
2388 self._determine_joins()
2389 assert self.primaryjoin is not None
2390
2391 self._annotate_fks()
2392 self._annotate_remote()
2393 self._annotate_local()
2394 self._annotate_parentmapper()
2395 self._setup_pairs()
2396 self._check_foreign_cols(self.primaryjoin, True)
2397 if self.secondaryjoin is not None:
2398 self._check_foreign_cols(self.secondaryjoin, False)
2399 self._determine_direction()
2400 self._check_remote_side()
2401 self._log_joins()
2402
2403 def _log_joins(self) -> None:
2404 log = self.prop.logger
2405 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2406 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2407 log.info(
2408 "%s synchronize pairs [%s]",
2409 self.prop,
2410 ",".join(
2411 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2412 ),
2413 )
2414 log.info(
2415 "%s secondary synchronize pairs [%s]",
2416 self.prop,
2417 ",".join(
2418 "(%s => %s)" % (l, r)
2419 for (l, r) in self.secondary_synchronize_pairs or []
2420 ),
2421 )
2422 log.info(
2423 "%s local/remote pairs [%s]",
2424 self.prop,
2425 ",".join(
2426 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2427 ),
2428 )
2429 log.info(
2430 "%s remote columns [%s]",
2431 self.prop,
2432 ",".join("%s" % col for col in self.remote_columns),
2433 )
2434 log.info(
2435 "%s local columns [%s]",
2436 self.prop,
2437 ",".join("%s" % col for col in self.local_columns),
2438 )
2439 log.info("%s relationship direction %s", self.prop, self.direction)
2440
2441 def _determine_joins(self) -> None:
2442 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2443 if not passed to the constructor already.
2444
2445 This is based on analysis of the foreign key relationships
2446 between the parent and target mapped selectables.
2447
2448 """
2449 if self.secondaryjoin is not None and self.secondary is None:
2450 raise sa_exc.ArgumentError(
2451 "Property %s specified with secondary "
2452 "join condition but "
2453 "no secondary argument" % self.prop
2454 )
2455
2456 # find a join between the given mapper's mapped table and
2457 # the given table. will try the mapper's local table first
2458 # for more specificity, then if not found will try the more
2459 # general mapped table, which in the case of inheritance is
2460 # a join.
2461 try:
2462 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2463 if self.secondary is not None:
2464 if self.secondaryjoin is None:
2465 self.secondaryjoin = join_condition(
2466 self.child_persist_selectable,
2467 self.secondary,
2468 a_subset=self.child_local_selectable,
2469 consider_as_foreign_keys=consider_as_foreign_keys,
2470 )
2471 if self.primaryjoin_initial is None:
2472 self.primaryjoin = join_condition(
2473 self.parent_persist_selectable,
2474 self.secondary,
2475 a_subset=self.parent_local_selectable,
2476 consider_as_foreign_keys=consider_as_foreign_keys,
2477 )
2478 else:
2479 self.primaryjoin = self.primaryjoin_initial
2480 else:
2481 if self.primaryjoin_initial is None:
2482 self.primaryjoin = join_condition(
2483 self.parent_persist_selectable,
2484 self.child_persist_selectable,
2485 a_subset=self.parent_local_selectable,
2486 consider_as_foreign_keys=consider_as_foreign_keys,
2487 )
2488 else:
2489 self.primaryjoin = self.primaryjoin_initial
2490 except sa_exc.NoForeignKeysError as nfe:
2491 if self.secondary is not None:
2492 raise sa_exc.NoForeignKeysError(
2493 "Could not determine join "
2494 "condition between parent/child tables on "
2495 "relationship %s - there are no foreign keys "
2496 "linking these tables via secondary table '%s'. "
2497 "Ensure that referencing columns are associated "
2498 "with a ForeignKey or ForeignKeyConstraint, or "
2499 "specify 'primaryjoin' and 'secondaryjoin' "
2500 "expressions." % (self.prop, self.secondary)
2501 ) from nfe
2502 else:
2503 raise sa_exc.NoForeignKeysError(
2504 "Could not determine join "
2505 "condition between parent/child tables on "
2506 "relationship %s - there are no foreign keys "
2507 "linking these tables. "
2508 "Ensure that referencing columns are associated "
2509 "with a ForeignKey or ForeignKeyConstraint, or "
2510 "specify a 'primaryjoin' expression." % self.prop
2511 ) from nfe
2512 except sa_exc.AmbiguousForeignKeysError as afe:
2513 if self.secondary is not None:
2514 raise sa_exc.AmbiguousForeignKeysError(
2515 "Could not determine join "
2516 "condition between parent/child tables on "
2517 "relationship %s - there are multiple foreign key "
2518 "paths linking the tables via secondary table '%s'. "
2519 "Specify the 'foreign_keys' "
2520 "argument, providing a list of those columns which "
2521 "should be counted as containing a foreign key "
2522 "reference from the secondary table to each of the "
2523 "parent and child tables." % (self.prop, self.secondary)
2524 ) from afe
2525 else:
2526 raise sa_exc.AmbiguousForeignKeysError(
2527 "Could not determine join "
2528 "condition between parent/child tables on "
2529 "relationship %s - there are multiple foreign key "
2530 "paths linking the tables. Specify the "
2531 "'foreign_keys' argument, providing a list of those "
2532 "columns which should be counted as containing a "
2533 "foreign key reference to the parent table." % self.prop
2534 ) from afe
2535
2536 @property
2537 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2538 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2539
2540 @property
2541 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2542 assert self.secondaryjoin is not None
2543 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2544
2545 @util.memoized_property
2546 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2547 """Return the primaryjoin condition suitable for the
2548 "reverse" direction.
2549
2550 If the primaryjoin was delivered here with pre-existing
2551 "remote" annotations, the local/remote annotations
2552 are reversed. Otherwise, the local/remote annotations
2553 are removed.
2554
2555 """
2556 if self._has_remote_annotations:
2557
2558 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2559 if "remote" in element._annotations:
2560 v = dict(element._annotations)
2561 del v["remote"]
2562 v["local"] = True
2563 return element._with_annotations(v)
2564 elif "local" in element._annotations:
2565 v = dict(element._annotations)
2566 del v["local"]
2567 v["remote"] = True
2568 return element._with_annotations(v)
2569
2570 return None
2571
2572 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2573 else:
2574 if self._has_foreign_annotations:
2575 # TODO: coverage
2576 return _deep_deannotate(
2577 self.primaryjoin, values=("local", "remote")
2578 )
2579 else:
2580 return _deep_deannotate(self.primaryjoin)
2581
2582 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2583 for col in visitors.iterate(clause, {}):
2584 if annotation in col._annotations:
2585 return True
2586 else:
2587 return False
2588
2589 @util.memoized_property
2590 def _has_foreign_annotations(self) -> bool:
2591 return self._has_annotation(self.primaryjoin, "foreign")
2592
2593 @util.memoized_property
2594 def _has_remote_annotations(self) -> bool:
2595 return self._has_annotation(self.primaryjoin, "remote")
2596
2597 def _annotate_fks(self) -> None:
2598 """Annotate the primaryjoin and secondaryjoin
2599 structures with 'foreign' annotations marking columns
2600 considered as foreign.
2601
2602 """
2603 if self._has_foreign_annotations:
2604 return
2605
2606 if self.consider_as_foreign_keys:
2607 self._annotate_from_fk_list()
2608 else:
2609 self._annotate_present_fks()
2610
2611 def _annotate_from_fk_list(self) -> None:
2612 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2613 if element in self.consider_as_foreign_keys:
2614 return element._annotate({"foreign": True})
2615 return None
2616
2617 self.primaryjoin = visitors.replacement_traverse(
2618 self.primaryjoin, {}, check_fk
2619 )
2620 if self.secondaryjoin is not None:
2621 self.secondaryjoin = visitors.replacement_traverse(
2622 self.secondaryjoin, {}, check_fk
2623 )
2624
2625 def _annotate_present_fks(self) -> None:
2626 if self.secondary is not None:
2627 secondarycols = util.column_set(self.secondary.c)
2628 else:
2629 secondarycols = set()
2630
2631 def is_foreign(
2632 a: ColumnElement[Any], b: ColumnElement[Any]
2633 ) -> Optional[ColumnElement[Any]]:
2634 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2635 if a.references(b):
2636 return a
2637 elif b.references(a):
2638 return b
2639
2640 if secondarycols:
2641 if a in secondarycols and b not in secondarycols:
2642 return a
2643 elif b in secondarycols and a not in secondarycols:
2644 return b
2645
2646 return None
2647
2648 def visit_binary(binary: BinaryExpression[Any]) -> None:
2649 if not isinstance(
2650 binary.left, sql.ColumnElement
2651 ) or not isinstance(binary.right, sql.ColumnElement):
2652 return
2653
2654 if (
2655 "foreign" not in binary.left._annotations
2656 and "foreign" not in binary.right._annotations
2657 ):
2658 col = is_foreign(binary.left, binary.right)
2659 if col is not None:
2660 if col.compare(binary.left):
2661 binary.left = binary.left._annotate({"foreign": True})
2662 elif col.compare(binary.right):
2663 binary.right = binary.right._annotate(
2664 {"foreign": True}
2665 )
2666
2667 self.primaryjoin = visitors.cloned_traverse(
2668 self.primaryjoin, {}, {"binary": visit_binary}
2669 )
2670 if self.secondaryjoin is not None:
2671 self.secondaryjoin = visitors.cloned_traverse(
2672 self.secondaryjoin, {}, {"binary": visit_binary}
2673 )
2674
2675 def _refers_to_parent_table(self) -> bool:
2676 """Return True if the join condition contains column
2677 comparisons where both columns are in both tables.
2678
2679 """
2680 pt = self.parent_persist_selectable
2681 mt = self.child_persist_selectable
2682 result = False
2683
2684 def visit_binary(binary: BinaryExpression[Any]) -> None:
2685 nonlocal result
2686 c, f = binary.left, binary.right
2687 if (
2688 isinstance(c, expression.ColumnClause)
2689 and isinstance(f, expression.ColumnClause)
2690 and pt.is_derived_from(c.table)
2691 and pt.is_derived_from(f.table)
2692 and mt.is_derived_from(c.table)
2693 and mt.is_derived_from(f.table)
2694 ):
2695 result = True
2696
2697 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2698 return result
2699
2700 def _tables_overlap(self) -> bool:
2701 """Return True if parent/child tables have some overlap."""
2702
2703 return selectables_overlap(
2704 self.parent_persist_selectable, self.child_persist_selectable
2705 )
2706
2707 def _annotate_remote(self) -> None:
2708 """Annotate the primaryjoin and secondaryjoin
2709 structures with 'remote' annotations marking columns
2710 considered as part of the 'remote' side.
2711
2712 """
2713 if self._has_remote_annotations:
2714 return
2715
2716 if self.secondary is not None:
2717 self._annotate_remote_secondary()
2718 elif self._local_remote_pairs or self._remote_side:
2719 self._annotate_remote_from_args()
2720 elif self._refers_to_parent_table():
2721 self._annotate_selfref(
2722 lambda col: "foreign" in col._annotations, False
2723 )
2724 elif self._tables_overlap():
2725 self._annotate_remote_with_overlap()
2726 else:
2727 self._annotate_remote_distinct_selectables()
2728
2729 def _annotate_remote_secondary(self) -> None:
2730 """annotate 'remote' in primaryjoin, secondaryjoin
2731 when 'secondary' is present.
2732
2733 """
2734
2735 assert self.secondary is not None
2736 fixed_secondary = self.secondary
2737
2738 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2739 if fixed_secondary.c.contains_column(element):
2740 return element._annotate({"remote": True})
2741 return None
2742
2743 self.primaryjoin = visitors.replacement_traverse(
2744 self.primaryjoin, {}, repl
2745 )
2746
2747 assert self.secondaryjoin is not None
2748 self.secondaryjoin = visitors.replacement_traverse(
2749 self.secondaryjoin, {}, repl
2750 )
2751
2752 def _annotate_selfref(
2753 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2754 ) -> None:
2755 """annotate 'remote' in primaryjoin, secondaryjoin
2756 when the relationship is detected as self-referential.
2757
2758 """
2759
2760 def visit_binary(binary: BinaryExpression[Any]) -> None:
2761 equated = binary.left.compare(binary.right)
2762 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2763 binary.right, expression.ColumnClause
2764 ):
2765 # assume one to many - FKs are "remote"
2766 if fn(binary.left):
2767 binary.left = binary.left._annotate({"remote": True})
2768 if fn(binary.right) and not equated:
2769 binary.right = binary.right._annotate({"remote": True})
2770 elif not remote_side_given:
2771 self._warn_non_column_elements()
2772
2773 self.primaryjoin = visitors.cloned_traverse(
2774 self.primaryjoin, {}, {"binary": visit_binary}
2775 )
2776
2777 def _annotate_remote_from_args(self) -> None:
2778 """annotate 'remote' in primaryjoin, secondaryjoin
2779 when the 'remote_side' or '_local_remote_pairs'
2780 arguments are used.
2781
2782 """
2783 if self._local_remote_pairs:
2784 if self._remote_side:
2785 raise sa_exc.ArgumentError(
2786 "remote_side argument is redundant "
2787 "against more detailed _local_remote_side "
2788 "argument."
2789 )
2790
2791 remote_side = [r for (l, r) in self._local_remote_pairs]
2792 else:
2793 remote_side = self._remote_side
2794
2795 if self._refers_to_parent_table():
2796 self._annotate_selfref(lambda col: col in remote_side, True)
2797 else:
2798
2799 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2800 # use set() to avoid generating ``__eq__()`` expressions
2801 # against each element
2802 if element in set(remote_side):
2803 return element._annotate({"remote": True})
2804 return None
2805
2806 self.primaryjoin = visitors.replacement_traverse(
2807 self.primaryjoin, {}, repl
2808 )
2809
2810 def _annotate_remote_with_overlap(self) -> None:
2811 """annotate 'remote' in primaryjoin, secondaryjoin
2812 when the parent/child tables have some set of
2813 tables in common, though is not a fully self-referential
2814 relationship.
2815
2816 """
2817
2818 def visit_binary(binary: BinaryExpression[Any]) -> None:
2819 binary.left, binary.right = proc_left_right(
2820 binary.left, binary.right
2821 )
2822 binary.right, binary.left = proc_left_right(
2823 binary.right, binary.left
2824 )
2825
2826 check_entities = (
2827 self.prop is not None and self.prop.mapper is not self.prop.parent
2828 )
2829
2830 def proc_left_right(
2831 left: ColumnElement[Any], right: ColumnElement[Any]
2832 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2833 if isinstance(left, expression.ColumnClause) and isinstance(
2834 right, expression.ColumnClause
2835 ):
2836 if self.child_persist_selectable.c.contains_column(
2837 right
2838 ) and self.parent_persist_selectable.c.contains_column(left):
2839 right = right._annotate({"remote": True})
2840 elif (
2841 check_entities
2842 and right._annotations.get("parentmapper") is self.prop.mapper
2843 ):
2844 right = right._annotate({"remote": True})
2845 elif (
2846 check_entities
2847 and left._annotations.get("parentmapper") is self.prop.mapper
2848 ):
2849 left = left._annotate({"remote": True})
2850 else:
2851 self._warn_non_column_elements()
2852
2853 return left, right
2854
2855 self.primaryjoin = visitors.cloned_traverse(
2856 self.primaryjoin, {}, {"binary": visit_binary}
2857 )
2858
2859 def _annotate_remote_distinct_selectables(self) -> None:
2860 """annotate 'remote' in primaryjoin, secondaryjoin
2861 when the parent/child tables are entirely
2862 separate.
2863
2864 """
2865
2866 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2867 if self.child_persist_selectable.c.contains_column(element) and (
2868 not self.parent_local_selectable.c.contains_column(element)
2869 or self.child_local_selectable.c.contains_column(element)
2870 ):
2871 return element._annotate({"remote": True})
2872 return None
2873
2874 self.primaryjoin = visitors.replacement_traverse(
2875 self.primaryjoin, {}, repl
2876 )
2877
2878 def _warn_non_column_elements(self) -> None:
2879 util.warn(
2880 "Non-simple column elements in primary "
2881 "join condition for property %s - consider using "
2882 "remote() annotations to mark the remote side." % self.prop
2883 )
2884
2885 def _annotate_local(self) -> None:
2886 """Annotate the primaryjoin and secondaryjoin
2887 structures with 'local' annotations.
2888
2889 This annotates all column elements found
2890 simultaneously in the parent table
2891 and the join condition that don't have a
2892 'remote' annotation set up from
2893 _annotate_remote() or user-defined.
2894
2895 """
2896 if self._has_annotation(self.primaryjoin, "local"):
2897 return
2898
2899 if self._local_remote_pairs:
2900 local_side = util.column_set(
2901 [l for (l, r) in self._local_remote_pairs]
2902 )
2903 else:
2904 local_side = util.column_set(self.parent_persist_selectable.c)
2905
2906 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2907 if "remote" not in element._annotations and element in local_side:
2908 return element._annotate({"local": True})
2909 return None
2910
2911 self.primaryjoin = visitors.replacement_traverse(
2912 self.primaryjoin, {}, locals_
2913 )
2914
2915 def _annotate_parentmapper(self) -> None:
2916 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2917 if "remote" in element._annotations:
2918 return element._annotate({"parentmapper": self.prop.mapper})
2919 elif "local" in element._annotations:
2920 return element._annotate({"parentmapper": self.prop.parent})
2921 return None
2922
2923 self.primaryjoin = visitors.replacement_traverse(
2924 self.primaryjoin, {}, parentmappers_
2925 )
2926
2927 def _check_remote_side(self) -> None:
2928 if not self.local_remote_pairs:
2929 raise sa_exc.ArgumentError(
2930 "Relationship %s could "
2931 "not determine any unambiguous local/remote column "
2932 "pairs based on join condition and remote_side "
2933 "arguments. "
2934 "Consider using the remote() annotation to "
2935 "accurately mark those elements of the join "
2936 "condition that are on the remote side of "
2937 "the relationship." % (self.prop,)
2938 )
2939 else:
2940 not_target = util.column_set(
2941 self.parent_persist_selectable.c
2942 ).difference(self.child_persist_selectable.c)
2943
2944 for _, rmt in self.local_remote_pairs:
2945 if rmt in not_target:
2946 util.warn(
2947 "Expression %s is marked as 'remote', but these "
2948 "column(s) are local to the local side. The "
2949 "remote() annotation is needed only for a "
2950 "self-referential relationship where both sides "
2951 "of the relationship refer to the same tables."
2952 % (rmt,)
2953 )
2954
2955 def _check_foreign_cols(
2956 self, join_condition: ColumnElement[bool], primary: bool
2957 ) -> None:
2958 """Check the foreign key columns collected and emit error
2959 messages."""
2960 foreign_cols = self._gather_columns_with_annotation(
2961 join_condition, "foreign"
2962 )
2963
2964 has_foreign = bool(foreign_cols)
2965
2966 if primary:
2967 can_sync = bool(self.synchronize_pairs)
2968 else:
2969 can_sync = bool(self.secondary_synchronize_pairs)
2970
2971 if (
2972 self.support_sync
2973 and can_sync
2974 or (not self.support_sync and has_foreign)
2975 ):
2976 return
2977
2978 # from here below is just determining the best error message
2979 # to report. Check for a join condition using any operator
2980 # (not just ==), perhaps they need to turn on "viewonly=True".
2981 if self.support_sync and has_foreign and not can_sync:
2982 err = (
2983 "Could not locate any simple equality expressions "
2984 "involving locally mapped foreign key columns for "
2985 "%s join condition "
2986 "'%s' on relationship %s."
2987 % (
2988 primary and "primary" or "secondary",
2989 join_condition,
2990 self.prop,
2991 )
2992 )
2993 err += (
2994 " Ensure that referencing columns are associated "
2995 "with a ForeignKey or ForeignKeyConstraint, or are "
2996 "annotated in the join condition with the foreign() "
2997 "annotation. To allow comparison operators other than "
2998 "'==', the relationship can be marked as viewonly=True."
2999 )
3000
3001 raise sa_exc.ArgumentError(err)
3002 else:
3003 err = (
3004 "Could not locate any relevant foreign key columns "
3005 "for %s join condition '%s' on relationship %s."
3006 % (
3007 primary and "primary" or "secondary",
3008 join_condition,
3009 self.prop,
3010 )
3011 )
3012 err += (
3013 " Ensure that referencing columns are associated "
3014 "with a ForeignKey or ForeignKeyConstraint, or are "
3015 "annotated in the join condition with the foreign() "
3016 "annotation."
3017 )
3018 raise sa_exc.ArgumentError(err)
3019
3020 def _determine_direction(self) -> None:
3021 """Determine if this relationship is one to many, many to one,
3022 many to many.
3023
3024 """
3025 if self.secondaryjoin is not None:
3026 self.direction = MANYTOMANY
3027 else:
3028 parentcols = util.column_set(self.parent_persist_selectable.c)
3029 targetcols = util.column_set(self.child_persist_selectable.c)
3030
3031 # fk collection which suggests ONETOMANY.
3032 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
3033
3034 # fk collection which suggests MANYTOONE.
3035
3036 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
3037
3038 if onetomany_fk and manytoone_fk:
3039 # fks on both sides. test for overlap of local/remote
3040 # with foreign key.
3041 # we will gather columns directly from their annotations
3042 # without deannotating, so that we can distinguish on a column
3043 # that refers to itself.
3044
3045 # 1. columns that are both remote and FK suggest
3046 # onetomany.
3047 onetomany_local = self._gather_columns_with_annotation(
3048 self.primaryjoin, "remote", "foreign"
3049 )
3050
3051 # 2. columns that are FK but are not remote (e.g. local)
3052 # suggest manytoone.
3053 manytoone_local = {
3054 c
3055 for c in self._gather_columns_with_annotation(
3056 self.primaryjoin, "foreign"
3057 )
3058 if "remote" not in c._annotations
3059 }
3060
3061 # 3. if both collections are present, remove columns that
3062 # refer to themselves. This is for the case of
3063 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3064 if onetomany_local and manytoone_local:
3065 self_equated = self.remote_columns.intersection(
3066 self.local_columns
3067 )
3068 onetomany_local = onetomany_local.difference(self_equated)
3069 manytoone_local = manytoone_local.difference(self_equated)
3070
3071 # at this point, if only one or the other collection is
3072 # present, we know the direction, otherwise it's still
3073 # ambiguous.
3074
3075 if onetomany_local and not manytoone_local:
3076 self.direction = ONETOMANY
3077 elif manytoone_local and not onetomany_local:
3078 self.direction = MANYTOONE
3079 else:
3080 raise sa_exc.ArgumentError(
3081 "Can't determine relationship"
3082 " direction for relationship '%s' - foreign "
3083 "key columns within the join condition are present "
3084 "in both the parent and the child's mapped tables. "
3085 "Ensure that only those columns referring "
3086 "to a parent column are marked as foreign, "
3087 "either via the foreign() annotation or "
3088 "via the foreign_keys argument." % self.prop
3089 )
3090 elif onetomany_fk:
3091 self.direction = ONETOMANY
3092 elif manytoone_fk:
3093 self.direction = MANYTOONE
3094 else:
3095 raise sa_exc.ArgumentError(
3096 "Can't determine relationship "
3097 "direction for relationship '%s' - foreign "
3098 "key columns are present in neither the parent "
3099 "nor the child's mapped tables" % self.prop
3100 )
3101
3102 def _deannotate_pairs(
3103 self, collection: _ColumnPairIterable
3104 ) -> _MutableColumnPairs:
3105 """provide deannotation for the various lists of
3106 pairs, so that using them in hashes doesn't incur
3107 high-overhead __eq__() comparisons against
3108 original columns mapped.
3109
3110 """
3111 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3112
3113 def _setup_pairs(self) -> None:
3114 sync_pairs: _MutableColumnPairs = []
3115 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3116 util.OrderedSet([])
3117 )
3118 secondary_sync_pairs: _MutableColumnPairs = []
3119
3120 def go(
3121 joincond: ColumnElement[bool],
3122 collection: _MutableColumnPairs,
3123 ) -> None:
3124 def visit_binary(
3125 binary: BinaryExpression[Any],
3126 left: ColumnElement[Any],
3127 right: ColumnElement[Any],
3128 ) -> None:
3129 if (
3130 "remote" in right._annotations
3131 and "remote" not in left._annotations
3132 and self.can_be_synced_fn(left)
3133 ):
3134 lrp.add((left, right))
3135 elif (
3136 "remote" in left._annotations
3137 and "remote" not in right._annotations
3138 and self.can_be_synced_fn(right)
3139 ):
3140 lrp.add((right, left))
3141 if binary.operator is operators.eq and self.can_be_synced_fn(
3142 left, right
3143 ):
3144 if "foreign" in right._annotations:
3145 collection.append((left, right))
3146 elif "foreign" in left._annotations:
3147 collection.append((right, left))
3148
3149 visit_binary_product(visit_binary, joincond)
3150
3151 for joincond, collection in [
3152 (self.primaryjoin, sync_pairs),
3153 (self.secondaryjoin, secondary_sync_pairs),
3154 ]:
3155 if joincond is None:
3156 continue
3157 go(joincond, collection)
3158
3159 self.local_remote_pairs = self._deannotate_pairs(lrp)
3160 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3161 self.secondary_synchronize_pairs = self._deannotate_pairs(
3162 secondary_sync_pairs
3163 )
3164
3165 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3166 ColumnElement[Any],
3167 weakref.WeakKeyDictionary[
3168 RelationshipProperty[Any], ColumnElement[Any]
3169 ],
3170 ] = weakref.WeakKeyDictionary()
3171
3172 def _warn_for_conflicting_sync_targets(self) -> None:
3173 if not self.support_sync:
3174 return
3175
3176 # we would like to detect if we are synchronizing any column
3177 # pairs in conflict with another relationship that wishes to sync
3178 # an entirely different column to the same target. This is a
3179 # very rare edge case so we will try to minimize the memory/overhead
3180 # impact of this check
3181 for from_, to_ in [
3182 (from_, to_) for (from_, to_) in self.synchronize_pairs
3183 ] + [
3184 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3185 ]:
3186 # save ourselves a ton of memory and overhead by only
3187 # considering columns that are subject to a overlapping
3188 # FK constraints at the core level. This condition can arise
3189 # if multiple relationships overlap foreign() directly, but
3190 # we're going to assume it's typically a ForeignKeyConstraint-
3191 # level configuration that benefits from this warning.
3192
3193 if to_ not in self._track_overlapping_sync_targets:
3194 self._track_overlapping_sync_targets[to_] = (
3195 weakref.WeakKeyDictionary({self.prop: from_})
3196 )
3197 else:
3198 other_props = []
3199 prop_to_from = self._track_overlapping_sync_targets[to_]
3200
3201 for pr, fr_ in prop_to_from.items():
3202 if (
3203 not pr.mapper._dispose_called
3204 and pr not in self.prop._reverse_property
3205 and pr.key not in self.prop._overlaps
3206 and self.prop.key not in pr._overlaps
3207 # note: the "__*" symbol is used internally by
3208 # SQLAlchemy as a general means of suppressing the
3209 # overlaps warning for some extension cases, however
3210 # this is not currently
3211 # a publicly supported symbol and may change at
3212 # any time.
3213 and "__*" not in self.prop._overlaps
3214 and "__*" not in pr._overlaps
3215 and not self.prop.parent.is_sibling(pr.parent)
3216 and not self.prop.mapper.is_sibling(pr.mapper)
3217 and not self.prop.parent.is_sibling(pr.mapper)
3218 and not self.prop.mapper.is_sibling(pr.parent)
3219 and (
3220 self.prop.key != pr.key
3221 or not self.prop.parent.common_parent(pr.parent)
3222 )
3223 ):
3224 other_props.append((pr, fr_))
3225
3226 if other_props:
3227 util.warn(
3228 "relationship '%s' will copy column %s to column %s, "
3229 "which conflicts with relationship(s): %s. "
3230 "If this is not the intention, consider if these "
3231 "relationships should be linked with "
3232 "back_populates, or if viewonly=True should be "
3233 "applied to one or more if they are read-only. "
3234 "For the less common case that foreign key "
3235 "constraints are partially overlapping, the "
3236 "orm.foreign() "
3237 "annotation can be used to isolate the columns that "
3238 "should be written towards. To silence this "
3239 "warning, add the parameter 'overlaps=\"%s\"' to the "
3240 "'%s' relationship."
3241 % (
3242 self.prop,
3243 from_,
3244 to_,
3245 ", ".join(
3246 sorted(
3247 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3248 for (pr, fr_) in other_props
3249 )
3250 ),
3251 ",".join(sorted(pr.key for pr, fr in other_props)),
3252 self.prop,
3253 ),
3254 code="qzyx",
3255 )
3256 self._track_overlapping_sync_targets[to_][self.prop] = from_
3257
3258 @util.memoized_property
3259 def remote_columns(self) -> Set[ColumnElement[Any]]:
3260 return self._gather_join_annotations("remote")
3261
3262 @util.memoized_property
3263 def local_columns(self) -> Set[ColumnElement[Any]]:
3264 return self._gather_join_annotations("local")
3265
3266 @util.memoized_property
3267 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3268 return self._gather_join_annotations("foreign")
3269
3270 def _gather_join_annotations(
3271 self, annotation: str
3272 ) -> Set[ColumnElement[Any]]:
3273 s = set(
3274 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3275 )
3276 if self.secondaryjoin is not None:
3277 s.update(
3278 self._gather_columns_with_annotation(
3279 self.secondaryjoin, annotation
3280 )
3281 )
3282 return {x._deannotate() for x in s}
3283
3284 def _gather_columns_with_annotation(
3285 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3286 ) -> Set[ColumnElement[Any]]:
3287 annotation_set = set(annotation)
3288 return {
3289 cast(ColumnElement[Any], col)
3290 for col in visitors.iterate(clause, {})
3291 if annotation_set.issubset(col._annotations)
3292 }
3293
3294 @util.memoized_property
3295 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3296 if self.secondary is not None:
3297 return frozenset(
3298 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3299 )
3300 else:
3301 return util.EMPTY_SET
3302
3303 def join_targets(
3304 self,
3305 source_selectable: Optional[FromClause],
3306 dest_selectable: FromClause,
3307 aliased: bool,
3308 single_crit: Optional[ColumnElement[bool]] = None,
3309 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3310 ) -> Tuple[
3311 ColumnElement[bool],
3312 Optional[ColumnElement[bool]],
3313 Optional[FromClause],
3314 Optional[ClauseAdapter],
3315 FromClause,
3316 ]:
3317 """Given a source and destination selectable, create a
3318 join between them.
3319
3320 This takes into account aliasing the join clause
3321 to reference the appropriate corresponding columns
3322 in the target objects, as well as the extra child
3323 criterion, equivalent column sets, etc.
3324
3325 """
3326 # place a barrier on the destination such that
3327 # replacement traversals won't ever dig into it.
3328 # its internal structure remains fixed
3329 # regardless of context.
3330 dest_selectable = _shallow_annotate(
3331 dest_selectable, {"no_replacement_traverse": True}
3332 )
3333
3334 primaryjoin, secondaryjoin, secondary = (
3335 self.primaryjoin,
3336 self.secondaryjoin,
3337 self.secondary,
3338 )
3339
3340 # adjust the join condition for single table inheritance,
3341 # in the case that the join is to a subclass
3342 # this is analogous to the
3343 # "_adjust_for_single_table_inheritance()" method in Query.
3344
3345 if single_crit is not None:
3346 if secondaryjoin is not None:
3347 secondaryjoin = secondaryjoin & single_crit
3348 else:
3349 primaryjoin = primaryjoin & single_crit
3350
3351 if extra_criteria:
3352
3353 def mark_exclude_cols(
3354 elem: SupportsAnnotations, annotations: _AnnotationDict
3355 ) -> SupportsAnnotations:
3356 """note unrelated columns in the "extra criteria" as either
3357 should be adapted or not adapted, even though they are not
3358 part of our "local" or "remote" side.
3359
3360 see #9779 for this case, as well as #11010 for a follow up
3361
3362 """
3363
3364 parentmapper_for_element = elem._annotations.get(
3365 "parentmapper", None
3366 )
3367
3368 if (
3369 parentmapper_for_element is not self.prop.parent
3370 and parentmapper_for_element is not self.prop.mapper
3371 and elem not in self._secondary_lineage_set
3372 ):
3373 return _safe_annotate(elem, annotations)
3374 else:
3375 return elem
3376
3377 extra_criteria = tuple(
3378 _deep_annotate(
3379 elem,
3380 {"should_not_adapt": True},
3381 annotate_callable=mark_exclude_cols,
3382 )
3383 for elem in extra_criteria
3384 )
3385
3386 if secondaryjoin is not None:
3387 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3388 else:
3389 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3390
3391 if aliased:
3392 if secondary is not None:
3393 secondary = secondary._anonymous_fromclause(flat=True)
3394 primary_aliasizer = ClauseAdapter(
3395 secondary,
3396 exclude_fn=_local_col_exclude,
3397 )
3398 secondary_aliasizer = ClauseAdapter(
3399 dest_selectable, equivalents=self.child_equivalents
3400 ).chain(primary_aliasizer)
3401 if source_selectable is not None:
3402 primary_aliasizer = ClauseAdapter(
3403 secondary,
3404 exclude_fn=_local_col_exclude,
3405 ).chain(
3406 ClauseAdapter(
3407 source_selectable,
3408 equivalents=self.parent_equivalents,
3409 )
3410 )
3411
3412 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3413 else:
3414 primary_aliasizer = ClauseAdapter(
3415 dest_selectable,
3416 exclude_fn=_local_col_exclude,
3417 equivalents=self.child_equivalents,
3418 )
3419 if source_selectable is not None:
3420 primary_aliasizer.chain(
3421 ClauseAdapter(
3422 source_selectable,
3423 exclude_fn=_remote_col_exclude,
3424 equivalents=self.parent_equivalents,
3425 )
3426 )
3427 secondary_aliasizer = None
3428
3429 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3430 target_adapter = secondary_aliasizer or primary_aliasizer
3431 target_adapter.exclude_fn = None
3432 else:
3433 target_adapter = None
3434 return (
3435 primaryjoin,
3436 secondaryjoin,
3437 secondary,
3438 target_adapter,
3439 dest_selectable,
3440 )
3441
3442 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3443 ColumnElement[bool],
3444 Dict[str, ColumnElement[Any]],
3445 Dict[ColumnElement[Any], ColumnElement[Any]],
3446 ]:
3447 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3448 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3449
3450 has_secondary = self.secondaryjoin is not None
3451
3452 if has_secondary:
3453 lookup = collections.defaultdict(list)
3454 for l, r in self.local_remote_pairs:
3455 lookup[l].append((l, r))
3456 equated_columns[r] = l
3457 elif not reverse_direction:
3458 for l, r in self.local_remote_pairs:
3459 equated_columns[r] = l
3460 else:
3461 for l, r in self.local_remote_pairs:
3462 equated_columns[l] = r
3463
3464 def col_to_bind(
3465 element: ColumnElement[Any], **kw: Any
3466 ) -> Optional[BindParameter[Any]]:
3467 if (
3468 (not reverse_direction and "local" in element._annotations)
3469 or reverse_direction
3470 and (
3471 (has_secondary and element in lookup)
3472 or (not has_secondary and "remote" in element._annotations)
3473 )
3474 ):
3475 if element not in binds:
3476 binds[element] = sql.bindparam(
3477 None, None, type_=element.type, unique=True
3478 )
3479 return binds[element]
3480 return None
3481
3482 lazywhere = self.primaryjoin
3483 if self.secondaryjoin is None or not reverse_direction:
3484 lazywhere = visitors.replacement_traverse(
3485 lazywhere, {}, col_to_bind
3486 )
3487
3488 if self.secondaryjoin is not None:
3489 secondaryjoin = self.secondaryjoin
3490 if reverse_direction:
3491 secondaryjoin = visitors.replacement_traverse(
3492 secondaryjoin, {}, col_to_bind
3493 )
3494 lazywhere = sql.and_(lazywhere, secondaryjoin)
3495
3496 bind_to_col = {binds[col].key: col for col in binds}
3497
3498 return lazywhere, bind_to_col, equated_columns
3499
3500
3501class _ColInAnnotations:
3502 """Serializable object that tests for names in c._annotations.
3503
3504 TODO: does this need to be serializable anymore? can we find what the
3505 use case was for that?
3506
3507 """
3508
3509 __slots__ = ("names",)
3510
3511 def __init__(self, *names: str):
3512 self.names = frozenset(names)
3513
3514 def __call__(self, c: ClauseElement) -> bool:
3515 return bool(self.names.intersection(c._annotations))
3516
3517
3518_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3519_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3520
3521
3522class Relationship(
3523 RelationshipProperty[_T],
3524 _DeclarativeMapped[_T],
3525):
3526 """Describes an object property that holds a single item or list
3527 of items that correspond to a related database table.
3528
3529 Public constructor is the :func:`_orm.relationship` function.
3530
3531 .. seealso::
3532
3533 :ref:`relationship_config_toplevel`
3534
3535 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3536 compatible subclass for :class:`_orm.RelationshipProperty`.
3537
3538 """
3539
3540 inherit_cache = True
3541 """:meta private:"""
3542
3543
3544class _RelationshipDeclared( # type: ignore[misc]
3545 Relationship[_T],
3546 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3547 DynamicMapped[_T], # not compatible with Mapped[_T]
3548):
3549 """Relationship subclass used implicitly for declarative mapping."""
3550
3551 inherit_cache = True
3552 """:meta private:"""
3553
3554 @classmethod
3555 def _mapper_property_name(cls) -> str:
3556 return "Relationship"