1# orm/relationships.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import NamedTuple
36from typing import NoReturn
37from typing import Optional
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TypeVar
43from typing import Union
44import weakref
45
46from . import attributes
47from . import strategy_options
48from ._typing import insp_is_aliased_class
49from ._typing import is_has_collection_adapter
50from .base import _DeclarativeMapped
51from .base import _is_mapped_class
52from .base import class_mapper
53from .base import DynamicMapped
54from .base import LoaderCallableStatus
55from .base import PassiveFlag
56from .base import state_str
57from .base import WriteOnlyMapped
58from .interfaces import _AttributeOptions
59from .interfaces import _IntrospectsAnnotations
60from .interfaces import MANYTOMANY
61from .interfaces import MANYTOONE
62from .interfaces import ONETOMANY
63from .interfaces import PropComparator
64from .interfaces import RelationshipDirection
65from .interfaces import StrategizedProperty
66from .util import _orm_annotate
67from .util import _orm_deannotate
68from .util import CascadeOptions
69from .. import exc as sa_exc
70from .. import Exists
71from .. import log
72from .. import schema
73from .. import sql
74from .. import util
75from ..inspection import inspect
76from ..sql import coercions
77from ..sql import expression
78from ..sql import operators
79from ..sql import roles
80from ..sql import visitors
81from ..sql._typing import _ColumnExpressionArgument
82from ..sql._typing import _HasClauseElement
83from ..sql.annotation import _safe_annotate
84from ..sql.elements import ColumnClause
85from ..sql.elements import ColumnElement
86from ..sql.util import _deep_annotate
87from ..sql.util import _deep_deannotate
88from ..sql.util import _shallow_annotate
89from ..sql.util import adapt_criterion_to_null
90from ..sql.util import ClauseAdapter
91from ..sql.util import join_condition
92from ..sql.util import selectables_overlap
93from ..sql.util import visit_binary_product
94from ..util.typing import de_optionalize_union_types
95from ..util.typing import Literal
96from ..util.typing import resolve_name_to_real_class_name
97
98if typing.TYPE_CHECKING:
99 from ._typing import _EntityType
100 from ._typing import _ExternalEntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InstanceDict
103 from ._typing import _InternalEntityType
104 from ._typing import _O
105 from ._typing import _RegistryType
106 from .base import Mapped
107 from .clsregistry import _class_resolver
108 from .clsregistry import _ModNS
109 from .decl_base import _ClassScanMapperConfig
110 from .dependency import DependencyProcessor
111 from .mapper import Mapper
112 from .query import Query
113 from .session import Session
114 from .state import InstanceState
115 from .strategies import LazyLoader
116 from .util import AliasedClass
117 from .util import AliasedInsp
118 from ..sql._typing import _CoreAdapterProto
119 from ..sql._typing import _EquivalentColumnMap
120 from ..sql._typing import _InfoType
121 from ..sql.annotation import _AnnotationDict
122 from ..sql.annotation import SupportsAnnotations
123 from ..sql.elements import BinaryExpression
124 from ..sql.elements import BindParameter
125 from ..sql.elements import ClauseElement
126 from ..sql.schema import Table
127 from ..sql.selectable import FromClause
128 from ..util.typing import _AnnotationScanType
129 from ..util.typing import RODescriptorReference
130
131_T = TypeVar("_T", bound=Any)
132_T1 = TypeVar("_T1", bound=Any)
133_T2 = TypeVar("_T2", bound=Any)
134
135_PT = TypeVar("_PT", bound=Any)
136
137_PT2 = TypeVar("_PT2", bound=Any)
138
139
140_RelationshipArgumentType = Union[
141 str,
142 Type[_T],
143 Callable[[], Type[_T]],
144 "Mapper[_T]",
145 "AliasedClass[_T]",
146 Callable[[], "Mapper[_T]"],
147 Callable[[], "AliasedClass[_T]"],
148]
149
150_LazyLoadArgumentType = Literal[
151 "select",
152 "joined",
153 "selectin",
154 "subquery",
155 "raise",
156 "raise_on_sql",
157 "noload",
158 "immediate",
159 "write_only",
160 "dynamic",
161 True,
162 False,
163 None,
164]
165
166
167_RelationshipJoinConditionArgument = Union[
168 str, _ColumnExpressionArgument[bool]
169]
170_RelationshipSecondaryArgument = Union[
171 "FromClause", str, Callable[[], "FromClause"]
172]
173_ORMOrderByArgument = Union[
174 Literal[False],
175 str,
176 _ColumnExpressionArgument[Any],
177 Callable[[], _ColumnExpressionArgument[Any]],
178 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
179 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
180]
181ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
182
183_ORMColCollectionElement = Union[
184 ColumnClause[Any],
185 _HasClauseElement[Any],
186 roles.DMLColumnRole,
187 "Mapped[Any]",
188]
189_ORMColCollectionArgument = Union[
190 str,
191 Sequence[_ORMColCollectionElement],
192 Callable[[], Sequence[_ORMColCollectionElement]],
193 Callable[[], _ORMColCollectionElement],
194 _ORMColCollectionElement,
195]
196
197
198_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
199
200_CE = TypeVar("_CE", bound="ColumnElement[Any]")
201
202
203_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
204
205_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
206
207_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
208
209
210def remote(expr: _CEA) -> _CEA:
211 """Annotate a portion of a primaryjoin expression
212 with a 'remote' annotation.
213
214 See the section :ref:`relationship_custom_foreign` for a
215 description of use.
216
217 .. seealso::
218
219 :ref:`relationship_custom_foreign`
220
221 :func:`.foreign`
222
223 """
224 return _annotate_columns( # type: ignore
225 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
226 )
227
228
229def foreign(expr: _CEA) -> _CEA:
230 """Annotate a portion of a primaryjoin expression
231 with a 'foreign' annotation.
232
233 See the section :ref:`relationship_custom_foreign` for a
234 description of use.
235
236 .. seealso::
237
238 :ref:`relationship_custom_foreign`
239
240 :func:`.remote`
241
242 """
243
244 return _annotate_columns( # type: ignore
245 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
246 )
247
248
249@dataclasses.dataclass
250class _RelationshipArg(Generic[_T1, _T2]):
251 """stores a user-defined parameter value that must be resolved and
252 parsed later at mapper configuration time.
253
254 """
255
256 __slots__ = "name", "argument", "resolved"
257 name: str
258 argument: _T1
259 resolved: Optional[_T2]
260
261 def _is_populated(self) -> bool:
262 return self.argument is not None
263
264 def _resolve_against_registry(
265 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
266 ) -> None:
267 attr_value = self.argument
268
269 if isinstance(attr_value, str):
270 self.resolved = clsregistry_resolver(
271 attr_value, self.name == "secondary"
272 )()
273 elif callable(attr_value) and not _is_mapped_class(attr_value):
274 self.resolved = attr_value()
275 else:
276 self.resolved = attr_value
277
278
279_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
280
281
282class _RelationshipArgs(NamedTuple):
283 """stores user-passed parameters that are resolved at mapper configuration
284 time.
285
286 """
287
288 secondary: _RelationshipArg[
289 Optional[_RelationshipSecondaryArgument],
290 Optional[FromClause],
291 ]
292 primaryjoin: _RelationshipArg[
293 Optional[_RelationshipJoinConditionArgument],
294 Optional[ColumnElement[Any]],
295 ]
296 secondaryjoin: _RelationshipArg[
297 Optional[_RelationshipJoinConditionArgument],
298 Optional[ColumnElement[Any]],
299 ]
300 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
301 foreign_keys: _RelationshipArg[
302 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
303 ]
304 remote_side: _RelationshipArg[
305 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
306 ]
307
308
309@log.class_logger
310class RelationshipProperty(
311 _IntrospectsAnnotations, StrategizedProperty[_T], log.Identified
312):
313 """Describes an object property that holds a single item or list
314 of items that correspond to a related database table.
315
316 Public constructor is the :func:`_orm.relationship` function.
317
318 .. seealso::
319
320 :ref:`relationship_config_toplevel`
321
322 """
323
324 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
325 inherit_cache = True
326 """:meta private:"""
327
328 _links_to_entity = True
329 _is_relationship = True
330
331 _overlaps: Sequence[str]
332
333 _lazy_strategy: LazyLoader
334
335 _persistence_only = dict(
336 passive_deletes=False,
337 passive_updates=True,
338 enable_typechecks=True,
339 active_history=False,
340 cascade_backrefs=False,
341 )
342
343 _dependency_processor: Optional[DependencyProcessor] = None
344
345 primaryjoin: ColumnElement[bool]
346 secondaryjoin: Optional[ColumnElement[bool]]
347 secondary: Optional[FromClause]
348 _join_condition: JoinCondition
349 order_by: _RelationshipOrderByArg
350
351 _user_defined_foreign_keys: Set[ColumnElement[Any]]
352 _calculated_foreign_keys: Set[ColumnElement[Any]]
353
354 remote_side: Set[ColumnElement[Any]]
355 local_columns: Set[ColumnElement[Any]]
356
357 synchronize_pairs: _ColumnPairs
358 secondary_synchronize_pairs: Optional[_ColumnPairs]
359
360 local_remote_pairs: Optional[_ColumnPairs]
361
362 direction: RelationshipDirection
363
364 _init_args: _RelationshipArgs
365
366 def __init__(
367 self,
368 argument: Optional[_RelationshipArgumentType[_T]] = None,
369 secondary: Optional[_RelationshipSecondaryArgument] = None,
370 *,
371 uselist: Optional[bool] = None,
372 collection_class: Optional[
373 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
374 ] = None,
375 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
376 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
377 back_populates: Optional[str] = None,
378 order_by: _ORMOrderByArgument = False,
379 backref: Optional[ORMBackrefArgument] = None,
380 overlaps: Optional[str] = None,
381 post_update: bool = False,
382 cascade: str = "save-update, merge",
383 viewonly: bool = False,
384 attribute_options: Optional[_AttributeOptions] = None,
385 lazy: _LazyLoadArgumentType = "select",
386 passive_deletes: Union[Literal["all"], bool] = False,
387 passive_updates: bool = True,
388 active_history: bool = False,
389 enable_typechecks: bool = True,
390 foreign_keys: Optional[_ORMColCollectionArgument] = None,
391 remote_side: Optional[_ORMColCollectionArgument] = None,
392 join_depth: Optional[int] = None,
393 comparator_factory: Optional[
394 Type[RelationshipProperty.Comparator[Any]]
395 ] = None,
396 single_parent: bool = False,
397 innerjoin: bool = False,
398 distinct_target_key: Optional[bool] = None,
399 load_on_pending: bool = False,
400 query_class: Optional[Type[Query[Any]]] = None,
401 info: Optional[_InfoType] = None,
402 omit_join: Literal[None, False] = None,
403 sync_backref: Optional[bool] = None,
404 doc: Optional[str] = None,
405 bake_queries: Literal[True] = True,
406 cascade_backrefs: Literal[False] = False,
407 _local_remote_pairs: Optional[_ColumnPairs] = None,
408 _legacy_inactive_history_style: bool = False,
409 ):
410 super().__init__(attribute_options=attribute_options)
411
412 self.uselist = uselist
413 self.argument = argument
414
415 self._init_args = _RelationshipArgs(
416 _RelationshipArg("secondary", secondary, None),
417 _RelationshipArg("primaryjoin", primaryjoin, None),
418 _RelationshipArg("secondaryjoin", secondaryjoin, None),
419 _RelationshipArg("order_by", order_by, None),
420 _RelationshipArg("foreign_keys", foreign_keys, None),
421 _RelationshipArg("remote_side", remote_side, None),
422 )
423
424 self.post_update = post_update
425 self.viewonly = viewonly
426 if viewonly:
427 self._warn_for_persistence_only_flags(
428 passive_deletes=passive_deletes,
429 passive_updates=passive_updates,
430 enable_typechecks=enable_typechecks,
431 active_history=active_history,
432 cascade_backrefs=cascade_backrefs,
433 )
434 if viewonly and sync_backref:
435 raise sa_exc.ArgumentError(
436 "sync_backref and viewonly cannot both be True"
437 )
438 self.sync_backref = sync_backref
439 self.lazy = lazy
440 self.single_parent = single_parent
441 self.collection_class = collection_class
442 self.passive_deletes = passive_deletes
443
444 if cascade_backrefs:
445 raise sa_exc.ArgumentError(
446 "The 'cascade_backrefs' parameter passed to "
447 "relationship() may only be set to False."
448 )
449
450 self.passive_updates = passive_updates
451 self.enable_typechecks = enable_typechecks
452 self.query_class = query_class
453 self.innerjoin = innerjoin
454 self.distinct_target_key = distinct_target_key
455 self.doc = doc
456 self.active_history = active_history
457 self._legacy_inactive_history_style = _legacy_inactive_history_style
458
459 self.join_depth = join_depth
460 if omit_join:
461 util.warn(
462 "setting omit_join to True is not supported; selectin "
463 "loading of this relationship may not work correctly if this "
464 "flag is set explicitly. omit_join optimization is "
465 "automatically detected for conditions under which it is "
466 "supported."
467 )
468
469 self.omit_join = omit_join
470 self.local_remote_pairs = _local_remote_pairs
471 self.load_on_pending = load_on_pending
472 self.comparator_factory = (
473 comparator_factory or RelationshipProperty.Comparator
474 )
475 util.set_creation_order(self)
476
477 if info is not None:
478 self.info.update(info)
479
480 self.strategy_key = (("lazy", self.lazy),)
481
482 self._reverse_property: Set[RelationshipProperty[Any]] = set()
483
484 if overlaps:
485 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
486 else:
487 self._overlaps = ()
488
489 # mypy ignoring the @property setter
490 self.cascade = cascade # type: ignore
491
492 self.back_populates = back_populates
493
494 if self.back_populates:
495 if backref:
496 raise sa_exc.ArgumentError(
497 "backref and back_populates keyword arguments "
498 "are mutually exclusive"
499 )
500 self.backref = None
501 else:
502 self.backref = backref
503
504 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
505 for k, v in kw.items():
506 if v != self._persistence_only[k]:
507 # we are warning here rather than warn deprecated as this is a
508 # configuration mistake, and Python shows regular warnings more
509 # aggressively than deprecation warnings by default. Unlike the
510 # case of setting viewonly with cascade, the settings being
511 # warned about here are not actively doing the wrong thing
512 # against viewonly=True, so it is not as urgent to have these
513 # raise an error.
514 util.warn(
515 "Setting %s on relationship() while also "
516 "setting viewonly=True does not make sense, as a "
517 "viewonly=True relationship does not perform persistence "
518 "operations. This configuration may raise an error "
519 "in a future release." % (k,)
520 )
521
522 def instrument_class(self, mapper: Mapper[Any]) -> None:
523 attributes.register_descriptor(
524 mapper.class_,
525 self.key,
526 comparator=self.comparator_factory(self, mapper),
527 parententity=mapper,
528 doc=self.doc,
529 )
530
531 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
532 """Produce boolean, comparison, and other operators for
533 :class:`.RelationshipProperty` attributes.
534
535 See the documentation for :class:`.PropComparator` for a brief
536 overview of ORM level operator definition.
537
538 .. seealso::
539
540 :class:`.PropComparator`
541
542 :class:`.ColumnProperty.Comparator`
543
544 :class:`.ColumnOperators`
545
546 :ref:`types_operators`
547
548 :attr:`.TypeEngine.comparator_factory`
549
550 """
551
552 __slots__ = (
553 "entity",
554 "mapper",
555 "property",
556 "_of_type",
557 "_extra_criteria",
558 )
559
560 prop: RODescriptorReference[RelationshipProperty[_PT]]
561 _of_type: Optional[_EntityType[_PT]]
562
563 def __init__(
564 self,
565 prop: RelationshipProperty[_PT],
566 parentmapper: _InternalEntityType[Any],
567 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
568 of_type: Optional[_EntityType[_PT]] = None,
569 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
570 ):
571 """Construction of :class:`.RelationshipProperty.Comparator`
572 is internal to the ORM's attribute mechanics.
573
574 """
575 self.prop = prop
576 self._parententity = parentmapper
577 self._adapt_to_entity = adapt_to_entity
578 if of_type:
579 self._of_type = of_type
580 else:
581 self._of_type = None
582 self._extra_criteria = extra_criteria
583
584 def adapt_to_entity(
585 self, adapt_to_entity: AliasedInsp[Any]
586 ) -> RelationshipProperty.Comparator[Any]:
587 return self.__class__(
588 self.prop,
589 self._parententity,
590 adapt_to_entity=adapt_to_entity,
591 of_type=self._of_type,
592 )
593
594 entity: _InternalEntityType[_PT]
595 """The target entity referred to by this
596 :class:`.RelationshipProperty.Comparator`.
597
598 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
599 object.
600
601 This is the "target" or "remote" side of the
602 :func:`_orm.relationship`.
603
604 """
605
606 mapper: Mapper[_PT]
607 """The target :class:`_orm.Mapper` referred to by this
608 :class:`.RelationshipProperty.Comparator`.
609
610 This is the "target" or "remote" side of the
611 :func:`_orm.relationship`.
612
613 """
614
615 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
616 if self._of_type:
617 return inspect(self._of_type) # type: ignore
618 else:
619 return self.prop.entity
620
621 def _memoized_attr_mapper(self) -> Mapper[_PT]:
622 return self.entity.mapper
623
624 def _source_selectable(self) -> FromClause:
625 if self._adapt_to_entity:
626 return self._adapt_to_entity.selectable
627 else:
628 return self.property.parent._with_polymorphic_selectable
629
630 def __clause_element__(self) -> ColumnElement[bool]:
631 adapt_from = self._source_selectable()
632 if self._of_type:
633 of_type_entity = inspect(self._of_type)
634 else:
635 of_type_entity = None
636
637 (
638 pj,
639 sj,
640 source,
641 dest,
642 secondary,
643 target_adapter,
644 ) = self.prop._create_joins(
645 source_selectable=adapt_from,
646 source_polymorphic=True,
647 of_type_entity=of_type_entity,
648 alias_secondary=True,
649 extra_criteria=self._extra_criteria,
650 )
651 if sj is not None:
652 return pj & sj
653 else:
654 return pj
655
656 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
657 r"""Redefine this object in terms of a polymorphic subclass.
658
659 See :meth:`.PropComparator.of_type` for an example.
660
661
662 """
663 return RelationshipProperty.Comparator(
664 self.prop,
665 self._parententity,
666 adapt_to_entity=self._adapt_to_entity,
667 of_type=class_,
668 extra_criteria=self._extra_criteria,
669 )
670
671 def and_(
672 self, *criteria: _ColumnExpressionArgument[bool]
673 ) -> PropComparator[Any]:
674 """Add AND criteria.
675
676 See :meth:`.PropComparator.and_` for an example.
677
678 .. versionadded:: 1.4
679
680 """
681 exprs = tuple(
682 coercions.expect(roles.WhereHavingRole, clause)
683 for clause in util.coerce_generator_arg(criteria)
684 )
685
686 return RelationshipProperty.Comparator(
687 self.prop,
688 self._parententity,
689 adapt_to_entity=self._adapt_to_entity,
690 of_type=self._of_type,
691 extra_criteria=self._extra_criteria + exprs,
692 )
693
694 def in_(self, other: Any) -> NoReturn:
695 """Produce an IN clause - this is not implemented
696 for :func:`_orm.relationship`-based attributes at this time.
697
698 """
699 raise NotImplementedError(
700 "in_() not yet supported for "
701 "relationships. For a simple "
702 "many-to-one, use in_() against "
703 "the set of foreign key values."
704 )
705
706 # https://github.com/python/mypy/issues/4266
707 __hash__ = None # type: ignore
708
709 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
710 """Implement the ``==`` operator.
711
712 In a many-to-one context, such as:
713
714 .. sourcecode:: text
715
716 MyClass.some_prop == <some object>
717
718 this will typically produce a
719 clause such as:
720
721 .. sourcecode:: text
722
723 mytable.related_id == <some id>
724
725 Where ``<some id>`` is the primary key of the given
726 object.
727
728 The ``==`` operator provides partial functionality for non-
729 many-to-one comparisons:
730
731 * Comparisons against collections are not supported.
732 Use :meth:`~.Relationship.Comparator.contains`.
733 * Compared to a scalar one-to-many, will produce a
734 clause that compares the target columns in the parent to
735 the given target.
736 * Compared to a scalar many-to-many, an alias
737 of the association table will be rendered as
738 well, forming a natural join that is part of the
739 main body of the query. This will not work for
740 queries that go beyond simple AND conjunctions of
741 comparisons, such as those which use OR. Use
742 explicit joins, outerjoins, or
743 :meth:`~.Relationship.Comparator.has` for
744 more comprehensive non-many-to-one scalar
745 membership tests.
746 * Comparisons against ``None`` given in a one-to-many
747 or many-to-many context produce a NOT EXISTS clause.
748
749 """
750 if other is None or isinstance(other, expression.Null):
751 if self.property.direction in [ONETOMANY, MANYTOMANY]:
752 return ~self._criterion_exists()
753 else:
754 return _orm_annotate(
755 self.property._optimized_compare(
756 None, adapt_source=self.adapter
757 )
758 )
759 elif self.property.uselist:
760 raise sa_exc.InvalidRequestError(
761 "Can't compare a collection to an object or collection; "
762 "use contains() to test for membership."
763 )
764 else:
765 return _orm_annotate(
766 self.property._optimized_compare(
767 other, adapt_source=self.adapter
768 )
769 )
770
771 def _criterion_exists(
772 self,
773 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
774 **kwargs: Any,
775 ) -> Exists:
776 where_criteria = (
777 coercions.expect(roles.WhereHavingRole, criterion)
778 if criterion is not None
779 else None
780 )
781
782 if getattr(self, "_of_type", None):
783 info: Optional[_InternalEntityType[Any]] = inspect(
784 self._of_type
785 )
786 assert info is not None
787 target_mapper, to_selectable, is_aliased_class = (
788 info.mapper,
789 info.selectable,
790 info.is_aliased_class,
791 )
792 if self.property._is_self_referential and not is_aliased_class:
793 to_selectable = to_selectable._anonymous_fromclause()
794
795 single_crit = target_mapper._single_table_criterion
796 if single_crit is not None:
797 if where_criteria is not None:
798 where_criteria = single_crit & where_criteria
799 else:
800 where_criteria = single_crit
801 else:
802 is_aliased_class = False
803 to_selectable = None
804
805 if self.adapter:
806 source_selectable = self._source_selectable()
807 else:
808 source_selectable = None
809
810 (
811 pj,
812 sj,
813 source,
814 dest,
815 secondary,
816 target_adapter,
817 ) = self.property._create_joins(
818 dest_selectable=to_selectable,
819 source_selectable=source_selectable,
820 )
821
822 for k in kwargs:
823 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
824 if where_criteria is None:
825 where_criteria = crit
826 else:
827 where_criteria = where_criteria & crit
828
829 # annotate the *local* side of the join condition, in the case
830 # of pj + sj this is the full primaryjoin, in the case of just
831 # pj its the local side of the primaryjoin.
832 if sj is not None:
833 j = _orm_annotate(pj) & sj
834 else:
835 j = _orm_annotate(pj, exclude=self.property.remote_side)
836
837 if (
838 where_criteria is not None
839 and target_adapter
840 and not is_aliased_class
841 ):
842 # limit this adapter to annotated only?
843 where_criteria = target_adapter.traverse(where_criteria)
844
845 # only have the "joined left side" of what we
846 # return be subject to Query adaption. The right
847 # side of it is used for an exists() subquery and
848 # should not correlate or otherwise reach out
849 # to anything in the enclosing query.
850 if where_criteria is not None:
851 where_criteria = where_criteria._annotate(
852 {"no_replacement_traverse": True}
853 )
854
855 crit = j & sql.True_._ifnone(where_criteria)
856
857 if secondary is not None:
858 ex = (
859 sql.exists(1)
860 .where(crit)
861 .select_from(dest, secondary)
862 .correlate_except(dest, secondary)
863 )
864 else:
865 ex = (
866 sql.exists(1)
867 .where(crit)
868 .select_from(dest)
869 .correlate_except(dest)
870 )
871 return ex
872
873 def any(
874 self,
875 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
876 **kwargs: Any,
877 ) -> ColumnElement[bool]:
878 """Produce an expression that tests a collection against
879 particular criterion, using EXISTS.
880
881 An expression like::
882
883 session.query(MyClass).filter(
884 MyClass.somereference.any(SomeRelated.x == 2)
885 )
886
887 Will produce a query like:
888
889 .. sourcecode:: sql
890
891 SELECT * FROM my_table WHERE
892 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
893 AND related.x=2)
894
895 Because :meth:`~.Relationship.Comparator.any` uses
896 a correlated subquery, its performance is not nearly as
897 good when compared against large target tables as that of
898 using a join.
899
900 :meth:`~.Relationship.Comparator.any` is particularly
901 useful for testing for empty collections::
902
903 session.query(MyClass).filter(~MyClass.somereference.any())
904
905 will produce:
906
907 .. sourcecode:: sql
908
909 SELECT * FROM my_table WHERE
910 NOT (EXISTS (SELECT 1 FROM related WHERE
911 related.my_id=my_table.id))
912
913 :meth:`~.Relationship.Comparator.any` is only
914 valid for collections, i.e. a :func:`_orm.relationship`
915 that has ``uselist=True``. For scalar references,
916 use :meth:`~.Relationship.Comparator.has`.
917
918 """
919 if not self.property.uselist:
920 raise sa_exc.InvalidRequestError(
921 "'any()' not implemented for scalar "
922 "attributes. Use has()."
923 )
924
925 return self._criterion_exists(criterion, **kwargs)
926
927 def has(
928 self,
929 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
930 **kwargs: Any,
931 ) -> ColumnElement[bool]:
932 """Produce an expression that tests a scalar reference against
933 particular criterion, using EXISTS.
934
935 An expression like::
936
937 session.query(MyClass).filter(
938 MyClass.somereference.has(SomeRelated.x == 2)
939 )
940
941 Will produce a query like:
942
943 .. sourcecode:: sql
944
945 SELECT * FROM my_table WHERE
946 EXISTS (SELECT 1 FROM related WHERE
947 related.id==my_table.related_id AND related.x=2)
948
949 Because :meth:`~.Relationship.Comparator.has` uses
950 a correlated subquery, its performance is not nearly as
951 good when compared against large target tables as that of
952 using a join.
953
954 :meth:`~.Relationship.Comparator.has` is only
955 valid for scalar references, i.e. a :func:`_orm.relationship`
956 that has ``uselist=False``. For collection references,
957 use :meth:`~.Relationship.Comparator.any`.
958
959 """
960 if self.property.uselist:
961 raise sa_exc.InvalidRequestError(
962 "'has()' not implemented for collections. Use any()."
963 )
964 return self._criterion_exists(criterion, **kwargs)
965
966 def contains(
967 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
968 ) -> ColumnElement[bool]:
969 """Return a simple expression that tests a collection for
970 containment of a particular item.
971
972 :meth:`~.Relationship.Comparator.contains` is
973 only valid for a collection, i.e. a
974 :func:`_orm.relationship` that implements
975 one-to-many or many-to-many with ``uselist=True``.
976
977 When used in a simple one-to-many context, an
978 expression like::
979
980 MyClass.contains(other)
981
982 Produces a clause like:
983
984 .. sourcecode:: sql
985
986 mytable.id == <some id>
987
988 Where ``<some id>`` is the value of the foreign key
989 attribute on ``other`` which refers to the primary
990 key of its parent object. From this it follows that
991 :meth:`~.Relationship.Comparator.contains` is
992 very useful when used with simple one-to-many
993 operations.
994
995 For many-to-many operations, the behavior of
996 :meth:`~.Relationship.Comparator.contains`
997 has more caveats. The association table will be
998 rendered in the statement, producing an "implicit"
999 join, that is, includes multiple tables in the FROM
1000 clause which are equated in the WHERE clause::
1001
1002 query(MyClass).filter(MyClass.contains(other))
1003
1004 Produces a query like:
1005
1006 .. sourcecode:: sql
1007
1008 SELECT * FROM my_table, my_association_table AS
1009 my_association_table_1 WHERE
1010 my_table.id = my_association_table_1.parent_id
1011 AND my_association_table_1.child_id = <some id>
1012
1013 Where ``<some id>`` would be the primary key of
1014 ``other``. From the above, it is clear that
1015 :meth:`~.Relationship.Comparator.contains`
1016 will **not** work with many-to-many collections when
1017 used in queries that move beyond simple AND
1018 conjunctions, such as multiple
1019 :meth:`~.Relationship.Comparator.contains`
1020 expressions joined by OR. In such cases subqueries or
1021 explicit "outer joins" will need to be used instead.
1022 See :meth:`~.Relationship.Comparator.any` for
1023 a less-performant alternative using EXISTS, or refer
1024 to :meth:`_query.Query.outerjoin`
1025 as well as :ref:`orm_queryguide_joins`
1026 for more details on constructing outer joins.
1027
1028 kwargs may be ignored by this operator but are required for API
1029 conformance.
1030 """
1031 if not self.prop.uselist:
1032 raise sa_exc.InvalidRequestError(
1033 "'contains' not implemented for scalar "
1034 "attributes. Use =="
1035 )
1036
1037 clause = self.prop._optimized_compare(
1038 other, adapt_source=self.adapter
1039 )
1040
1041 if self.prop.secondaryjoin is not None:
1042 clause.negation_clause = self.__negated_contains_or_equals(
1043 other
1044 )
1045
1046 return clause
1047
1048 def __negated_contains_or_equals(
1049 self, other: Any
1050 ) -> ColumnElement[bool]:
1051 if self.prop.direction == MANYTOONE:
1052 state = attributes.instance_state(other)
1053
1054 def state_bindparam(
1055 local_col: ColumnElement[Any],
1056 state: InstanceState[Any],
1057 remote_col: ColumnElement[Any],
1058 ) -> BindParameter[Any]:
1059 dict_ = state.dict
1060 return sql.bindparam(
1061 local_col.key,
1062 type_=local_col.type,
1063 unique=True,
1064 callable_=self.prop._get_attr_w_warn_on_none(
1065 self.prop.mapper, state, dict_, remote_col
1066 ),
1067 )
1068
1069 def adapt(col: _CE) -> _CE:
1070 if self.adapter:
1071 return self.adapter(col)
1072 else:
1073 return col
1074
1075 if self.property._use_get:
1076 return sql.and_(
1077 *[
1078 sql.or_(
1079 adapt(x)
1080 != state_bindparam(adapt(x), state, y),
1081 adapt(x) == None,
1082 )
1083 for (x, y) in self.property.local_remote_pairs
1084 ]
1085 )
1086
1087 criterion = sql.and_(
1088 *[
1089 x == y
1090 for (x, y) in zip(
1091 self.property.mapper.primary_key,
1092 self.property.mapper.primary_key_from_instance(other),
1093 )
1094 ]
1095 )
1096
1097 return ~self._criterion_exists(criterion)
1098
1099 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1100 """Implement the ``!=`` operator.
1101
1102 In a many-to-one context, such as:
1103
1104 .. sourcecode:: text
1105
1106 MyClass.some_prop != <some object>
1107
1108 This will typically produce a clause such as:
1109
1110 .. sourcecode:: sql
1111
1112 mytable.related_id != <some id>
1113
1114 Where ``<some id>`` is the primary key of the
1115 given object.
1116
1117 The ``!=`` operator provides partial functionality for non-
1118 many-to-one comparisons:
1119
1120 * Comparisons against collections are not supported.
1121 Use
1122 :meth:`~.Relationship.Comparator.contains`
1123 in conjunction with :func:`_expression.not_`.
1124 * Compared to a scalar one-to-many, will produce a
1125 clause that compares the target columns in the parent to
1126 the given target.
1127 * Compared to a scalar many-to-many, an alias
1128 of the association table will be rendered as
1129 well, forming a natural join that is part of the
1130 main body of the query. This will not work for
1131 queries that go beyond simple AND conjunctions of
1132 comparisons, such as those which use OR. Use
1133 explicit joins, outerjoins, or
1134 :meth:`~.Relationship.Comparator.has` in
1135 conjunction with :func:`_expression.not_` for
1136 more comprehensive non-many-to-one scalar
1137 membership tests.
1138 * Comparisons against ``None`` given in a one-to-many
1139 or many-to-many context produce an EXISTS clause.
1140
1141 """
1142 if other is None or isinstance(other, expression.Null):
1143 if self.property.direction == MANYTOONE:
1144 return _orm_annotate(
1145 ~self.property._optimized_compare(
1146 None, adapt_source=self.adapter
1147 )
1148 )
1149
1150 else:
1151 return self._criterion_exists()
1152 elif self.property.uselist:
1153 raise sa_exc.InvalidRequestError(
1154 "Can't compare a collection"
1155 " to an object or collection; use "
1156 "contains() to test for membership."
1157 )
1158 else:
1159 return _orm_annotate(self.__negated_contains_or_equals(other))
1160
1161 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1162 self.prop.parent._check_configure()
1163 return self.prop
1164
1165 def _with_parent(
1166 self,
1167 instance: object,
1168 alias_secondary: bool = True,
1169 from_entity: Optional[_EntityType[Any]] = None,
1170 ) -> ColumnElement[bool]:
1171 assert instance is not None
1172 adapt_source: Optional[_CoreAdapterProto] = None
1173 if from_entity is not None:
1174 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1175 assert insp is not None
1176 if insp_is_aliased_class(insp):
1177 adapt_source = insp._adapter.adapt_clause
1178 return self._optimized_compare(
1179 instance,
1180 value_is_parent=True,
1181 adapt_source=adapt_source,
1182 alias_secondary=alias_secondary,
1183 )
1184
1185 def _optimized_compare(
1186 self,
1187 state: Any,
1188 value_is_parent: bool = False,
1189 adapt_source: Optional[_CoreAdapterProto] = None,
1190 alias_secondary: bool = True,
1191 ) -> ColumnElement[bool]:
1192 if state is not None:
1193 try:
1194 state = inspect(state)
1195 except sa_exc.NoInspectionAvailable:
1196 state = None
1197
1198 if state is None or not getattr(state, "is_instance", False):
1199 raise sa_exc.ArgumentError(
1200 "Mapped instance expected for relationship "
1201 "comparison to object. Classes, queries and other "
1202 "SQL elements are not accepted in this context; for "
1203 "comparison with a subquery, "
1204 "use %s.has(**criteria)." % self
1205 )
1206 reverse_direction = not value_is_parent
1207
1208 if state is None:
1209 return self._lazy_none_clause(
1210 reverse_direction, adapt_source=adapt_source
1211 )
1212
1213 if not reverse_direction:
1214 criterion, bind_to_col = (
1215 self._lazy_strategy._lazywhere,
1216 self._lazy_strategy._bind_to_col,
1217 )
1218 else:
1219 criterion, bind_to_col = (
1220 self._lazy_strategy._rev_lazywhere,
1221 self._lazy_strategy._rev_bind_to_col,
1222 )
1223
1224 if reverse_direction:
1225 mapper = self.mapper
1226 else:
1227 mapper = self.parent
1228
1229 dict_ = attributes.instance_dict(state.obj())
1230
1231 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1232 if bindparam._identifying_key in bind_to_col:
1233 bindparam.callable = self._get_attr_w_warn_on_none(
1234 mapper,
1235 state,
1236 dict_,
1237 bind_to_col[bindparam._identifying_key],
1238 )
1239
1240 if self.secondary is not None and alias_secondary:
1241 criterion = ClauseAdapter(
1242 self.secondary._anonymous_fromclause()
1243 ).traverse(criterion)
1244
1245 criterion = visitors.cloned_traverse(
1246 criterion, {}, {"bindparam": visit_bindparam}
1247 )
1248
1249 if adapt_source:
1250 criterion = adapt_source(criterion)
1251 return criterion
1252
1253 def _get_attr_w_warn_on_none(
1254 self,
1255 mapper: Mapper[Any],
1256 state: InstanceState[Any],
1257 dict_: _InstanceDict,
1258 column: ColumnElement[Any],
1259 ) -> Callable[[], Any]:
1260 """Create the callable that is used in a many-to-one expression.
1261
1262 E.g.::
1263
1264 u1 = s.query(User).get(5)
1265
1266 expr = Address.user == u1
1267
1268 Above, the SQL should be "address.user_id = 5". The callable
1269 returned by this method produces the value "5" based on the identity
1270 of ``u1``.
1271
1272 """
1273
1274 # in this callable, we're trying to thread the needle through
1275 # a wide variety of scenarios, including:
1276 #
1277 # * the object hasn't been flushed yet and there's no value for
1278 # the attribute as of yet
1279 #
1280 # * the object hasn't been flushed yet but it has a user-defined
1281 # value
1282 #
1283 # * the object has a value but it's expired and not locally present
1284 #
1285 # * the object has a value but it's expired and not locally present,
1286 # and the object is also detached
1287 #
1288 # * The object hadn't been flushed yet, there was no value, but
1289 # later, the object has been expired and detached, and *now*
1290 # they're trying to evaluate it
1291 #
1292 # * the object had a value, but it was changed to a new value, and
1293 # then expired
1294 #
1295 # * the object had a value, but it was changed to a new value, and
1296 # then expired, then the object was detached
1297 #
1298 # * the object has a user-set value, but it's None and we don't do
1299 # the comparison correctly for that so warn
1300 #
1301
1302 prop = mapper.get_property_by_column(column)
1303
1304 # by invoking this method, InstanceState will track the last known
1305 # value for this key each time the attribute is to be expired.
1306 # this feature was added explicitly for use in this method.
1307 state._track_last_known_value(prop.key)
1308
1309 lkv_fixed = state._last_known_values
1310
1311 def _go() -> Any:
1312 assert lkv_fixed is not None
1313 last_known = to_return = lkv_fixed[prop.key]
1314 existing_is_available = (
1315 last_known is not LoaderCallableStatus.NO_VALUE
1316 )
1317
1318 # we support that the value may have changed. so here we
1319 # try to get the most recent value including re-fetching.
1320 # only if we can't get a value now due to detachment do we return
1321 # the last known value
1322 current_value = mapper._get_state_attr_by_column(
1323 state,
1324 dict_,
1325 column,
1326 passive=(
1327 PassiveFlag.PASSIVE_OFF
1328 if state.persistent
1329 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1330 ),
1331 )
1332
1333 if current_value is LoaderCallableStatus.NEVER_SET:
1334 if not existing_is_available:
1335 raise sa_exc.InvalidRequestError(
1336 "Can't resolve value for column %s on object "
1337 "%s; no value has been set for this column"
1338 % (column, state_str(state))
1339 )
1340 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1341 if not existing_is_available:
1342 raise sa_exc.InvalidRequestError(
1343 "Can't resolve value for column %s on object "
1344 "%s; the object is detached and the value was "
1345 "expired" % (column, state_str(state))
1346 )
1347 else:
1348 to_return = current_value
1349 if to_return is None:
1350 util.warn(
1351 "Got None for value of column %s; this is unsupported "
1352 "for a relationship comparison and will not "
1353 "currently produce an IS comparison "
1354 "(but may in a future release)" % column
1355 )
1356 return to_return
1357
1358 return _go
1359
1360 def _lazy_none_clause(
1361 self,
1362 reverse_direction: bool = False,
1363 adapt_source: Optional[_CoreAdapterProto] = None,
1364 ) -> ColumnElement[bool]:
1365 if not reverse_direction:
1366 criterion, bind_to_col = (
1367 self._lazy_strategy._lazywhere,
1368 self._lazy_strategy._bind_to_col,
1369 )
1370 else:
1371 criterion, bind_to_col = (
1372 self._lazy_strategy._rev_lazywhere,
1373 self._lazy_strategy._rev_bind_to_col,
1374 )
1375
1376 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1377
1378 if adapt_source:
1379 criterion = adapt_source(criterion)
1380 return criterion
1381
1382 def __str__(self) -> str:
1383 return str(self.parent.class_.__name__) + "." + self.key
1384
1385 def merge(
1386 self,
1387 session: Session,
1388 source_state: InstanceState[Any],
1389 source_dict: _InstanceDict,
1390 dest_state: InstanceState[Any],
1391 dest_dict: _InstanceDict,
1392 load: bool,
1393 _recursive: Dict[Any, object],
1394 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1395 ) -> None:
1396 if load:
1397 for r in self._reverse_property:
1398 if (source_state, r) in _recursive:
1399 return
1400
1401 if "merge" not in self._cascade:
1402 return
1403
1404 if self.key not in source_dict:
1405 return
1406
1407 if self.uselist:
1408 impl = source_state.get_impl(self.key)
1409
1410 assert is_has_collection_adapter(impl)
1411 instances_iterable = impl.get_collection(source_state, source_dict)
1412
1413 # if this is a CollectionAttributeImpl, then empty should
1414 # be False, otherwise "self.key in source_dict" should not be
1415 # True
1416 assert not instances_iterable.empty if impl.collection else True
1417
1418 if load:
1419 # for a full merge, pre-load the destination collection,
1420 # so that individual _merge of each item pulls from identity
1421 # map for those already present.
1422 # also assumes CollectionAttributeImpl behavior of loading
1423 # "old" list in any case
1424 dest_state.get_impl(self.key).get(
1425 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1426 )
1427
1428 dest_list = []
1429 for current in instances_iterable:
1430 current_state = attributes.instance_state(current)
1431 current_dict = attributes.instance_dict(current)
1432 _recursive[(current_state, self)] = True
1433 obj = session._merge(
1434 current_state,
1435 current_dict,
1436 load=load,
1437 _recursive=_recursive,
1438 _resolve_conflict_map=_resolve_conflict_map,
1439 )
1440 if obj is not None:
1441 dest_list.append(obj)
1442
1443 if not load:
1444 coll = attributes.init_state_collection(
1445 dest_state, dest_dict, self.key
1446 )
1447 for c in dest_list:
1448 coll.append_without_event(c)
1449 else:
1450 dest_impl = dest_state.get_impl(self.key)
1451 assert is_has_collection_adapter(dest_impl)
1452 dest_impl.set(
1453 dest_state,
1454 dest_dict,
1455 dest_list,
1456 _adapt=False,
1457 passive=PassiveFlag.PASSIVE_MERGE,
1458 )
1459 else:
1460 current = source_dict[self.key]
1461 if current is not None:
1462 current_state = attributes.instance_state(current)
1463 current_dict = attributes.instance_dict(current)
1464 _recursive[(current_state, self)] = True
1465 obj = session._merge(
1466 current_state,
1467 current_dict,
1468 load=load,
1469 _recursive=_recursive,
1470 _resolve_conflict_map=_resolve_conflict_map,
1471 )
1472 else:
1473 obj = None
1474
1475 if not load:
1476 dest_dict[self.key] = obj
1477 else:
1478 dest_state.get_impl(self.key).set(
1479 dest_state, dest_dict, obj, None
1480 )
1481
1482 def _value_as_iterable(
1483 self,
1484 state: InstanceState[_O],
1485 dict_: _InstanceDict,
1486 key: str,
1487 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1488 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1489 """Return a list of tuples (state, obj) for the given
1490 key.
1491
1492 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1493 """
1494
1495 impl = state.manager[key].impl
1496 x = impl.get(state, dict_, passive=passive)
1497 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1498 return []
1499 elif is_has_collection_adapter(impl):
1500 return [
1501 (attributes.instance_state(o), o)
1502 for o in impl.get_collection(state, dict_, x, passive=passive)
1503 ]
1504 else:
1505 return [(attributes.instance_state(x), x)]
1506
1507 def cascade_iterator(
1508 self,
1509 type_: str,
1510 state: InstanceState[Any],
1511 dict_: _InstanceDict,
1512 visited_states: Set[InstanceState[Any]],
1513 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1514 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1515 # assert type_ in self._cascade
1516
1517 # only actively lazy load on the 'delete' cascade
1518 if type_ != "delete" or self.passive_deletes:
1519 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1520 else:
1521 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1522
1523 if type_ == "save-update":
1524 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1525 else:
1526 tuples = self._value_as_iterable(
1527 state, dict_, self.key, passive=passive
1528 )
1529
1530 skip_pending = (
1531 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1532 )
1533
1534 for instance_state, c in tuples:
1535 if instance_state in visited_states:
1536 continue
1537
1538 if c is None:
1539 # would like to emit a warning here, but
1540 # would not be consistent with collection.append(None)
1541 # current behavior of silently skipping.
1542 # see [ticket:2229]
1543 continue
1544
1545 assert instance_state is not None
1546 instance_dict = attributes.instance_dict(c)
1547
1548 if halt_on and halt_on(instance_state):
1549 continue
1550
1551 if skip_pending and not instance_state.key:
1552 continue
1553
1554 instance_mapper = instance_state.manager.mapper
1555
1556 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1557 raise AssertionError(
1558 "Attribute '%s' on class '%s' "
1559 "doesn't handle objects "
1560 "of type '%s'"
1561 % (self.key, self.parent.class_, c.__class__)
1562 )
1563
1564 visited_states.add(instance_state)
1565
1566 yield c, instance_mapper, instance_state, instance_dict
1567
1568 @property
1569 def _effective_sync_backref(self) -> bool:
1570 if self.viewonly:
1571 return False
1572 else:
1573 return self.sync_backref is not False
1574
1575 @staticmethod
1576 def _check_sync_backref(
1577 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1578 ) -> None:
1579 if rel_a.viewonly and rel_b.sync_backref:
1580 raise sa_exc.InvalidRequestError(
1581 "Relationship %s cannot specify sync_backref=True since %s "
1582 "includes viewonly=True." % (rel_b, rel_a)
1583 )
1584 if (
1585 rel_a.viewonly
1586 and not rel_b.viewonly
1587 and rel_b.sync_backref is not False
1588 ):
1589 rel_b.sync_backref = False
1590
1591 def _add_reverse_property(self, key: str) -> None:
1592 other = self.mapper.get_property(key, _configure_mappers=False)
1593 if not isinstance(other, RelationshipProperty):
1594 raise sa_exc.InvalidRequestError(
1595 "back_populates on relationship '%s' refers to attribute '%s' "
1596 "that is not a relationship. The back_populates parameter "
1597 "should refer to the name of a relationship on the target "
1598 "class." % (self, other)
1599 )
1600 # viewonly and sync_backref cases
1601 # 1. self.viewonly==True and other.sync_backref==True -> error
1602 # 2. self.viewonly==True and other.viewonly==False and
1603 # other.sync_backref==None -> warn sync_backref=False, set to False
1604 self._check_sync_backref(self, other)
1605 # 3. other.viewonly==True and self.sync_backref==True -> error
1606 # 4. other.viewonly==True and self.viewonly==False and
1607 # self.sync_backref==None -> warn sync_backref=False, set to False
1608 self._check_sync_backref(other, self)
1609
1610 self._reverse_property.add(other)
1611 other._reverse_property.add(self)
1612
1613 other._setup_entity()
1614
1615 if not other.mapper.common_parent(self.parent):
1616 raise sa_exc.ArgumentError(
1617 "reverse_property %r on "
1618 "relationship %s references relationship %s, which "
1619 "does not reference mapper %s"
1620 % (key, self, other, self.parent)
1621 )
1622
1623 if (
1624 other._configure_started
1625 and self.direction in (ONETOMANY, MANYTOONE)
1626 and self.direction == other.direction
1627 ):
1628 raise sa_exc.ArgumentError(
1629 "%s and back-reference %s are "
1630 "both of the same direction %r. Did you mean to "
1631 "set remote_side on the many-to-one side ?"
1632 % (other, self, self.direction)
1633 )
1634
1635 @util.memoized_property
1636 def entity(self) -> _InternalEntityType[_T]:
1637 """Return the target mapped entity, which is an inspect() of the
1638 class or aliased class that is referenced by this
1639 :class:`.RelationshipProperty`.
1640
1641 """
1642 self.parent._check_configure()
1643 return self.entity
1644
1645 @util.memoized_property
1646 def mapper(self) -> Mapper[_T]:
1647 """Return the targeted :class:`_orm.Mapper` for this
1648 :class:`.RelationshipProperty`.
1649
1650 """
1651 return self.entity.mapper
1652
1653 def do_init(self) -> None:
1654 self._check_conflicts()
1655 self._process_dependent_arguments()
1656 self._setup_entity()
1657 self._setup_registry_dependencies()
1658 self._setup_join_conditions()
1659 self._check_cascade_settings(self._cascade)
1660 self._post_init()
1661 self._generate_backref()
1662 self._join_condition._warn_for_conflicting_sync_targets()
1663 super().do_init()
1664 self._lazy_strategy = cast(
1665 "LazyLoader", self._get_strategy((("lazy", "select"),))
1666 )
1667
1668 def _setup_registry_dependencies(self) -> None:
1669 self.parent.mapper.registry._set_depends_on(
1670 self.entity.mapper.registry
1671 )
1672
1673 def _process_dependent_arguments(self) -> None:
1674 """Convert incoming configuration arguments to their
1675 proper form.
1676
1677 Callables are resolved, ORM annotations removed.
1678
1679 """
1680
1681 # accept callables for other attributes which may require
1682 # deferred initialization. This technique is used
1683 # by declarative "string configs" and some recipes.
1684 init_args = self._init_args
1685
1686 for attr in (
1687 "order_by",
1688 "primaryjoin",
1689 "secondaryjoin",
1690 "secondary",
1691 "foreign_keys",
1692 "remote_side",
1693 ):
1694 rel_arg = getattr(init_args, attr)
1695
1696 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1697
1698 # remove "annotations" which are present if mapped class
1699 # descriptors are used to create the join expression.
1700 for attr in "primaryjoin", "secondaryjoin":
1701 rel_arg = getattr(init_args, attr)
1702 val = rel_arg.resolved
1703 if val is not None:
1704 rel_arg.resolved = _orm_deannotate(
1705 coercions.expect(
1706 roles.ColumnArgumentRole, val, argname=attr
1707 )
1708 )
1709
1710 secondary = init_args.secondary.resolved
1711 if secondary is not None and _is_mapped_class(secondary):
1712 raise sa_exc.ArgumentError(
1713 "secondary argument %s passed to to relationship() %s must "
1714 "be a Table object or other FROM clause; can't send a mapped "
1715 "class directly as rows in 'secondary' are persisted "
1716 "independently of a class that is mapped "
1717 "to that same table." % (secondary, self)
1718 )
1719
1720 # ensure expressions in self.order_by, foreign_keys,
1721 # remote_side are all columns, not strings.
1722 if (
1723 init_args.order_by.resolved is not False
1724 and init_args.order_by.resolved is not None
1725 ):
1726 self.order_by = tuple(
1727 coercions.expect(
1728 roles.ColumnArgumentRole, x, argname="order_by"
1729 )
1730 for x in util.to_list(init_args.order_by.resolved)
1731 )
1732 else:
1733 self.order_by = False
1734
1735 self._user_defined_foreign_keys = util.column_set(
1736 coercions.expect(
1737 roles.ColumnArgumentRole, x, argname="foreign_keys"
1738 )
1739 for x in util.to_column_set(init_args.foreign_keys.resolved)
1740 )
1741
1742 self.remote_side = util.column_set(
1743 coercions.expect(
1744 roles.ColumnArgumentRole, x, argname="remote_side"
1745 )
1746 for x in util.to_column_set(init_args.remote_side.resolved)
1747 )
1748
1749 def declarative_scan(
1750 self,
1751 decl_scan: _ClassScanMapperConfig,
1752 registry: _RegistryType,
1753 cls: Type[Any],
1754 originating_module: Optional[str],
1755 key: str,
1756 mapped_container: Optional[Type[Mapped[Any]]],
1757 annotation: Optional[_AnnotationScanType],
1758 extracted_mapped_annotation: Optional[_AnnotationScanType],
1759 is_dataclass_field: bool,
1760 ) -> None:
1761 if extracted_mapped_annotation is None:
1762 if self.argument is None:
1763 self._raise_for_required(key, cls)
1764 else:
1765 return
1766
1767 argument = extracted_mapped_annotation
1768 assert originating_module is not None
1769
1770 if mapped_container is not None:
1771 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1772 is_dynamic = issubclass(mapped_container, DynamicMapped)
1773 if is_write_only:
1774 self.lazy = "write_only"
1775 self.strategy_key = (("lazy", self.lazy),)
1776 elif is_dynamic:
1777 self.lazy = "dynamic"
1778 self.strategy_key = (("lazy", self.lazy),)
1779 else:
1780 is_write_only = is_dynamic = False
1781
1782 argument = de_optionalize_union_types(argument)
1783
1784 if hasattr(argument, "__origin__"):
1785 arg_origin = argument.__origin__
1786 if isinstance(arg_origin, type) and issubclass(
1787 arg_origin, abc.Collection
1788 ):
1789 if self.collection_class is None:
1790 if _py_inspect.isabstract(arg_origin):
1791 raise sa_exc.ArgumentError(
1792 f"Collection annotation type {arg_origin} cannot "
1793 "be instantiated; please provide an explicit "
1794 "'collection_class' parameter "
1795 "(e.g. list, set, etc.) to the "
1796 "relationship() function to accompany this "
1797 "annotation"
1798 )
1799
1800 self.collection_class = arg_origin
1801
1802 elif not is_write_only and not is_dynamic:
1803 self.uselist = False
1804
1805 if argument.__args__: # type: ignore
1806 if isinstance(arg_origin, type) and issubclass(
1807 arg_origin, typing.Mapping
1808 ):
1809 type_arg = argument.__args__[-1] # type: ignore
1810 else:
1811 type_arg = argument.__args__[0] # type: ignore
1812 if hasattr(type_arg, "__forward_arg__"):
1813 str_argument = type_arg.__forward_arg__
1814
1815 argument = resolve_name_to_real_class_name(
1816 str_argument, originating_module
1817 )
1818 else:
1819 argument = type_arg
1820 else:
1821 raise sa_exc.ArgumentError(
1822 f"Generic alias {argument} requires an argument"
1823 )
1824 elif hasattr(argument, "__forward_arg__"):
1825 argument = argument.__forward_arg__
1826
1827 argument = resolve_name_to_real_class_name(
1828 argument, originating_module
1829 )
1830
1831 if (
1832 self.collection_class is None
1833 and not is_write_only
1834 and not is_dynamic
1835 ):
1836 self.uselist = False
1837
1838 # ticket #8759
1839 # if a lead argument was given to relationship(), like
1840 # `relationship("B")`, use that, don't replace it with class we
1841 # found in the annotation. The declarative_scan() method call here is
1842 # still useful, as we continue to derive collection type and do
1843 # checking of the annotation in any case.
1844 if self.argument is None:
1845 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1846
1847 @util.preload_module("sqlalchemy.orm.mapper")
1848 def _setup_entity(self, __argument: Any = None) -> None:
1849 if "entity" in self.__dict__:
1850 return
1851
1852 mapperlib = util.preloaded.orm_mapper
1853
1854 if __argument:
1855 argument = __argument
1856 else:
1857 argument = self.argument
1858
1859 resolved_argument: _ExternalEntityType[Any]
1860
1861 if isinstance(argument, str):
1862 # we might want to cleanup clsregistry API to make this
1863 # more straightforward
1864 resolved_argument = cast(
1865 "_ExternalEntityType[Any]",
1866 self._clsregistry_resolve_name(argument)(),
1867 )
1868 elif callable(argument) and not isinstance(
1869 argument, (type, mapperlib.Mapper)
1870 ):
1871 resolved_argument = argument()
1872 else:
1873 resolved_argument = argument
1874
1875 entity: _InternalEntityType[Any]
1876
1877 if isinstance(resolved_argument, type):
1878 entity = class_mapper(resolved_argument, configure=False)
1879 else:
1880 try:
1881 entity = inspect(resolved_argument)
1882 except sa_exc.NoInspectionAvailable:
1883 entity = None # type: ignore
1884
1885 if not hasattr(entity, "mapper"):
1886 raise sa_exc.ArgumentError(
1887 "relationship '%s' expects "
1888 "a class or a mapper argument (received: %s)"
1889 % (self.key, type(resolved_argument))
1890 )
1891
1892 self.entity = entity
1893 self.target = self.entity.persist_selectable
1894
1895 def _setup_join_conditions(self) -> None:
1896 self._join_condition = jc = JoinCondition(
1897 parent_persist_selectable=self.parent.persist_selectable,
1898 child_persist_selectable=self.entity.persist_selectable,
1899 parent_local_selectable=self.parent.local_table,
1900 child_local_selectable=self.entity.local_table,
1901 primaryjoin=self._init_args.primaryjoin.resolved,
1902 secondary=self._init_args.secondary.resolved,
1903 secondaryjoin=self._init_args.secondaryjoin.resolved,
1904 parent_equivalents=self.parent._equivalent_columns,
1905 child_equivalents=self.mapper._equivalent_columns,
1906 consider_as_foreign_keys=self._user_defined_foreign_keys,
1907 local_remote_pairs=self.local_remote_pairs,
1908 remote_side=self.remote_side,
1909 self_referential=self._is_self_referential,
1910 prop=self,
1911 support_sync=not self.viewonly,
1912 can_be_synced_fn=self._columns_are_mapped,
1913 )
1914 self.primaryjoin = jc.primaryjoin
1915 self.secondaryjoin = jc.secondaryjoin
1916 self.secondary = jc.secondary
1917 self.direction = jc.direction
1918 self.local_remote_pairs = jc.local_remote_pairs
1919 self.remote_side = jc.remote_columns
1920 self.local_columns = jc.local_columns
1921 self.synchronize_pairs = jc.synchronize_pairs
1922 self._calculated_foreign_keys = jc.foreign_key_columns
1923 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1924
1925 @property
1926 def _clsregistry_resolve_arg(
1927 self,
1928 ) -> Callable[[str, bool], _class_resolver]:
1929 return self._clsregistry_resolvers[1]
1930
1931 @property
1932 def _clsregistry_resolve_name(
1933 self,
1934 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
1935 return self._clsregistry_resolvers[0]
1936
1937 @util.memoized_property
1938 @util.preload_module("sqlalchemy.orm.clsregistry")
1939 def _clsregistry_resolvers(
1940 self,
1941 ) -> Tuple[
1942 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
1943 Callable[[str, bool], _class_resolver],
1944 ]:
1945 _resolver = util.preloaded.orm_clsregistry._resolver
1946
1947 return _resolver(self.parent.class_, self)
1948
1949 def _check_conflicts(self) -> None:
1950 """Test that this relationship is legal, warn about
1951 inheritance conflicts."""
1952 if self.parent.non_primary and not class_mapper(
1953 self.parent.class_, configure=False
1954 ).has_property(self.key):
1955 raise sa_exc.ArgumentError(
1956 "Attempting to assign a new "
1957 "relationship '%s' to a non-primary mapper on "
1958 "class '%s'. New relationships can only be added "
1959 "to the primary mapper, i.e. the very first mapper "
1960 "created for class '%s' "
1961 % (
1962 self.key,
1963 self.parent.class_.__name__,
1964 self.parent.class_.__name__,
1965 )
1966 )
1967
1968 @property
1969 def cascade(self) -> CascadeOptions:
1970 """Return the current cascade setting for this
1971 :class:`.RelationshipProperty`.
1972 """
1973 return self._cascade
1974
1975 @cascade.setter
1976 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
1977 self._set_cascade(cascade)
1978
1979 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
1980 cascade = CascadeOptions(cascade_arg)
1981
1982 if self.viewonly:
1983 cascade = CascadeOptions(
1984 cascade.intersection(CascadeOptions._viewonly_cascades)
1985 )
1986
1987 if "mapper" in self.__dict__:
1988 self._check_cascade_settings(cascade)
1989 self._cascade = cascade
1990
1991 if self._dependency_processor:
1992 self._dependency_processor.cascade = cascade
1993
1994 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
1995 if (
1996 cascade.delete_orphan
1997 and not self.single_parent
1998 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
1999 ):
2000 raise sa_exc.ArgumentError(
2001 "For %(direction)s relationship %(rel)s, delete-orphan "
2002 "cascade is normally "
2003 'configured only on the "one" side of a one-to-many '
2004 "relationship, "
2005 'and not on the "many" side of a many-to-one or many-to-many '
2006 "relationship. "
2007 "To force this relationship to allow a particular "
2008 '"%(relatedcls)s" object to be referenced by only '
2009 'a single "%(clsname)s" object at a time via the '
2010 "%(rel)s relationship, which "
2011 "would allow "
2012 "delete-orphan cascade to take place in this direction, set "
2013 "the single_parent=True flag."
2014 % {
2015 "rel": self,
2016 "direction": (
2017 "many-to-one"
2018 if self.direction is MANYTOONE
2019 else "many-to-many"
2020 ),
2021 "clsname": self.parent.class_.__name__,
2022 "relatedcls": self.mapper.class_.__name__,
2023 },
2024 code="bbf0",
2025 )
2026
2027 if self.passive_deletes == "all" and (
2028 "delete" in cascade or "delete-orphan" in cascade
2029 ):
2030 raise sa_exc.ArgumentError(
2031 "On %s, can't set passive_deletes='all' in conjunction "
2032 "with 'delete' or 'delete-orphan' cascade" % self
2033 )
2034
2035 if cascade.delete_orphan:
2036 self.mapper.primary_mapper()._delete_orphans.append(
2037 (self.key, self.parent.class_)
2038 )
2039
2040 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2041 """Return True if this property will persist values on behalf
2042 of the given mapper.
2043
2044 """
2045
2046 return (
2047 self.key in mapper.relationships
2048 and mapper.relationships[self.key] is self
2049 )
2050
2051 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2052 """Return True if all columns in the given collection are
2053 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2054
2055 """
2056
2057 secondary = self._init_args.secondary.resolved
2058 for c in cols:
2059 if secondary is not None and secondary.c.contains_column(c):
2060 continue
2061 if not self.parent.persist_selectable.c.contains_column(
2062 c
2063 ) and not self.target.c.contains_column(c):
2064 return False
2065 return True
2066
2067 def _generate_backref(self) -> None:
2068 """Interpret the 'backref' instruction to create a
2069 :func:`_orm.relationship` complementary to this one."""
2070
2071 if self.parent.non_primary:
2072 return
2073 if self.backref is not None and not self.back_populates:
2074 kwargs: Dict[str, Any]
2075 if isinstance(self.backref, str):
2076 backref_key, kwargs = self.backref, {}
2077 else:
2078 backref_key, kwargs = self.backref
2079 mapper = self.mapper.primary_mapper()
2080
2081 if not mapper.concrete:
2082 check = set(mapper.iterate_to_root()).union(
2083 mapper.self_and_descendants
2084 )
2085 for m in check:
2086 if m.has_property(backref_key) and not m.concrete:
2087 raise sa_exc.ArgumentError(
2088 "Error creating backref "
2089 "'%s' on relationship '%s': property of that "
2090 "name exists on mapper '%s'"
2091 % (backref_key, self, m)
2092 )
2093
2094 # determine primaryjoin/secondaryjoin for the
2095 # backref. Use the one we had, so that
2096 # a custom join doesn't have to be specified in
2097 # both directions.
2098 if self.secondary is not None:
2099 # for many to many, just switch primaryjoin/
2100 # secondaryjoin. use the annotated
2101 # pj/sj on the _join_condition.
2102 pj = kwargs.pop(
2103 "primaryjoin",
2104 self._join_condition.secondaryjoin_minus_local,
2105 )
2106 sj = kwargs.pop(
2107 "secondaryjoin",
2108 self._join_condition.primaryjoin_minus_local,
2109 )
2110 else:
2111 pj = kwargs.pop(
2112 "primaryjoin",
2113 self._join_condition.primaryjoin_reverse_remote,
2114 )
2115 sj = kwargs.pop("secondaryjoin", None)
2116 if sj:
2117 raise sa_exc.InvalidRequestError(
2118 "Can't assign 'secondaryjoin' on a backref "
2119 "against a non-secondary relationship."
2120 )
2121
2122 foreign_keys = kwargs.pop(
2123 "foreign_keys", self._user_defined_foreign_keys
2124 )
2125 parent = self.parent.primary_mapper()
2126 kwargs.setdefault("viewonly", self.viewonly)
2127 kwargs.setdefault("post_update", self.post_update)
2128 kwargs.setdefault("passive_updates", self.passive_updates)
2129 kwargs.setdefault("sync_backref", self.sync_backref)
2130 self.back_populates = backref_key
2131 relationship = RelationshipProperty(
2132 parent,
2133 self.secondary,
2134 primaryjoin=pj,
2135 secondaryjoin=sj,
2136 foreign_keys=foreign_keys,
2137 back_populates=self.key,
2138 **kwargs,
2139 )
2140 mapper._configure_property(
2141 backref_key, relationship, warn_for_existing=True
2142 )
2143
2144 if self.back_populates:
2145 self._add_reverse_property(self.back_populates)
2146
2147 @util.preload_module("sqlalchemy.orm.dependency")
2148 def _post_init(self) -> None:
2149 dependency = util.preloaded.orm_dependency
2150
2151 if self.uselist is None:
2152 self.uselist = self.direction is not MANYTOONE
2153 if not self.viewonly:
2154 self._dependency_processor = ( # type: ignore
2155 dependency.DependencyProcessor.from_relationship
2156 )(self)
2157
2158 @util.memoized_property
2159 def _use_get(self) -> bool:
2160 """memoize the 'use_get' attribute of this RelationshipLoader's
2161 lazyloader."""
2162
2163 strategy = self._lazy_strategy
2164 return strategy.use_get
2165
2166 @util.memoized_property
2167 def _is_self_referential(self) -> bool:
2168 return self.mapper.common_parent(self.parent)
2169
2170 def _create_joins(
2171 self,
2172 source_polymorphic: bool = False,
2173 source_selectable: Optional[FromClause] = None,
2174 dest_selectable: Optional[FromClause] = None,
2175 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2176 alias_secondary: bool = False,
2177 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2178 ) -> Tuple[
2179 ColumnElement[bool],
2180 Optional[ColumnElement[bool]],
2181 FromClause,
2182 FromClause,
2183 Optional[FromClause],
2184 Optional[ClauseAdapter],
2185 ]:
2186 aliased = False
2187
2188 if alias_secondary and self.secondary is not None:
2189 aliased = True
2190
2191 if source_selectable is None:
2192 if source_polymorphic and self.parent.with_polymorphic:
2193 source_selectable = self.parent._with_polymorphic_selectable
2194
2195 if of_type_entity:
2196 dest_mapper = of_type_entity.mapper
2197 if dest_selectable is None:
2198 dest_selectable = of_type_entity.selectable
2199 aliased = True
2200 else:
2201 dest_mapper = self.mapper
2202
2203 if dest_selectable is None:
2204 dest_selectable = self.entity.selectable
2205 if self.mapper.with_polymorphic:
2206 aliased = True
2207
2208 if self._is_self_referential and source_selectable is None:
2209 dest_selectable = dest_selectable._anonymous_fromclause()
2210 aliased = True
2211 elif (
2212 dest_selectable is not self.mapper._with_polymorphic_selectable
2213 or self.mapper.with_polymorphic
2214 ):
2215 aliased = True
2216
2217 single_crit = dest_mapper._single_table_criterion
2218 aliased = aliased or (
2219 source_selectable is not None
2220 and (
2221 source_selectable
2222 is not self.parent._with_polymorphic_selectable
2223 or source_selectable._is_subquery
2224 )
2225 )
2226
2227 (
2228 primaryjoin,
2229 secondaryjoin,
2230 secondary,
2231 target_adapter,
2232 dest_selectable,
2233 ) = self._join_condition.join_targets(
2234 source_selectable,
2235 dest_selectable,
2236 aliased,
2237 single_crit,
2238 extra_criteria,
2239 )
2240 if source_selectable is None:
2241 source_selectable = self.parent.local_table
2242 if dest_selectable is None:
2243 dest_selectable = self.entity.local_table
2244 return (
2245 primaryjoin,
2246 secondaryjoin,
2247 source_selectable,
2248 dest_selectable,
2249 secondary,
2250 target_adapter,
2251 )
2252
2253
2254def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2255 def clone(elem: _CE) -> _CE:
2256 if isinstance(elem, expression.ColumnClause):
2257 elem = elem._annotate(annotations.copy()) # type: ignore
2258 elem._copy_internals(clone=clone)
2259 return elem
2260
2261 if element is not None:
2262 element = clone(element)
2263 clone = None # type: ignore # remove gc cycles
2264 return element
2265
2266
2267class JoinCondition:
2268 primaryjoin_initial: Optional[ColumnElement[bool]]
2269 primaryjoin: ColumnElement[bool]
2270 secondaryjoin: Optional[ColumnElement[bool]]
2271 secondary: Optional[FromClause]
2272 prop: RelationshipProperty[Any]
2273
2274 synchronize_pairs: _ColumnPairs
2275 secondary_synchronize_pairs: _ColumnPairs
2276 direction: RelationshipDirection
2277
2278 parent_persist_selectable: FromClause
2279 child_persist_selectable: FromClause
2280 parent_local_selectable: FromClause
2281 child_local_selectable: FromClause
2282
2283 _local_remote_pairs: Optional[_ColumnPairs]
2284
2285 def __init__(
2286 self,
2287 parent_persist_selectable: FromClause,
2288 child_persist_selectable: FromClause,
2289 parent_local_selectable: FromClause,
2290 child_local_selectable: FromClause,
2291 *,
2292 primaryjoin: Optional[ColumnElement[bool]] = None,
2293 secondary: Optional[FromClause] = None,
2294 secondaryjoin: Optional[ColumnElement[bool]] = None,
2295 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2296 child_equivalents: Optional[_EquivalentColumnMap] = None,
2297 consider_as_foreign_keys: Any = None,
2298 local_remote_pairs: Optional[_ColumnPairs] = None,
2299 remote_side: Any = None,
2300 self_referential: Any = False,
2301 prop: RelationshipProperty[Any],
2302 support_sync: bool = True,
2303 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2304 ):
2305 self.parent_persist_selectable = parent_persist_selectable
2306 self.parent_local_selectable = parent_local_selectable
2307 self.child_persist_selectable = child_persist_selectable
2308 self.child_local_selectable = child_local_selectable
2309 self.parent_equivalents = parent_equivalents
2310 self.child_equivalents = child_equivalents
2311 self.primaryjoin_initial = primaryjoin
2312 self.secondaryjoin = secondaryjoin
2313 self.secondary = secondary
2314 self.consider_as_foreign_keys = consider_as_foreign_keys
2315 self._local_remote_pairs = local_remote_pairs
2316 self._remote_side = remote_side
2317 self.prop = prop
2318 self.self_referential = self_referential
2319 self.support_sync = support_sync
2320 self.can_be_synced_fn = can_be_synced_fn
2321
2322 self._determine_joins()
2323 assert self.primaryjoin is not None
2324
2325 self._sanitize_joins()
2326 self._annotate_fks()
2327 self._annotate_remote()
2328 self._annotate_local()
2329 self._annotate_parentmapper()
2330 self._setup_pairs()
2331 self._check_foreign_cols(self.primaryjoin, True)
2332 if self.secondaryjoin is not None:
2333 self._check_foreign_cols(self.secondaryjoin, False)
2334 self._determine_direction()
2335 self._check_remote_side()
2336 self._log_joins()
2337
2338 def _log_joins(self) -> None:
2339 log = self.prop.logger
2340 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2341 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2342 log.info(
2343 "%s synchronize pairs [%s]",
2344 self.prop,
2345 ",".join(
2346 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2347 ),
2348 )
2349 log.info(
2350 "%s secondary synchronize pairs [%s]",
2351 self.prop,
2352 ",".join(
2353 "(%s => %s)" % (l, r)
2354 for (l, r) in self.secondary_synchronize_pairs or []
2355 ),
2356 )
2357 log.info(
2358 "%s local/remote pairs [%s]",
2359 self.prop,
2360 ",".join(
2361 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2362 ),
2363 )
2364 log.info(
2365 "%s remote columns [%s]",
2366 self.prop,
2367 ",".join("%s" % col for col in self.remote_columns),
2368 )
2369 log.info(
2370 "%s local columns [%s]",
2371 self.prop,
2372 ",".join("%s" % col for col in self.local_columns),
2373 )
2374 log.info("%s relationship direction %s", self.prop, self.direction)
2375
2376 def _sanitize_joins(self) -> None:
2377 """remove the parententity annotation from our join conditions which
2378 can leak in here based on some declarative patterns and maybe others.
2379
2380 "parentmapper" is relied upon both by the ORM evaluator as well as
2381 the use case in _join_fixture_inh_selfref_w_entity
2382 that relies upon it being present, see :ticket:`3364`.
2383
2384 """
2385
2386 self.primaryjoin = _deep_deannotate(
2387 self.primaryjoin, values=("parententity", "proxy_key")
2388 )
2389 if self.secondaryjoin is not None:
2390 self.secondaryjoin = _deep_deannotate(
2391 self.secondaryjoin, values=("parententity", "proxy_key")
2392 )
2393
2394 def _determine_joins(self) -> None:
2395 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2396 if not passed to the constructor already.
2397
2398 This is based on analysis of the foreign key relationships
2399 between the parent and target mapped selectables.
2400
2401 """
2402 if self.secondaryjoin is not None and self.secondary is None:
2403 raise sa_exc.ArgumentError(
2404 "Property %s specified with secondary "
2405 "join condition but "
2406 "no secondary argument" % self.prop
2407 )
2408
2409 # find a join between the given mapper's mapped table and
2410 # the given table. will try the mapper's local table first
2411 # for more specificity, then if not found will try the more
2412 # general mapped table, which in the case of inheritance is
2413 # a join.
2414 try:
2415 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2416 if self.secondary is not None:
2417 if self.secondaryjoin is None:
2418 self.secondaryjoin = join_condition(
2419 self.child_persist_selectable,
2420 self.secondary,
2421 a_subset=self.child_local_selectable,
2422 consider_as_foreign_keys=consider_as_foreign_keys,
2423 )
2424 if self.primaryjoin_initial is None:
2425 self.primaryjoin = join_condition(
2426 self.parent_persist_selectable,
2427 self.secondary,
2428 a_subset=self.parent_local_selectable,
2429 consider_as_foreign_keys=consider_as_foreign_keys,
2430 )
2431 else:
2432 self.primaryjoin = self.primaryjoin_initial
2433 else:
2434 if self.primaryjoin_initial is None:
2435 self.primaryjoin = join_condition(
2436 self.parent_persist_selectable,
2437 self.child_persist_selectable,
2438 a_subset=self.parent_local_selectable,
2439 consider_as_foreign_keys=consider_as_foreign_keys,
2440 )
2441 else:
2442 self.primaryjoin = self.primaryjoin_initial
2443 except sa_exc.NoForeignKeysError as nfe:
2444 if self.secondary is not None:
2445 raise sa_exc.NoForeignKeysError(
2446 "Could not determine join "
2447 "condition between parent/child tables on "
2448 "relationship %s - there are no foreign keys "
2449 "linking these tables via secondary table '%s'. "
2450 "Ensure that referencing columns are associated "
2451 "with a ForeignKey or ForeignKeyConstraint, or "
2452 "specify 'primaryjoin' and 'secondaryjoin' "
2453 "expressions." % (self.prop, self.secondary)
2454 ) from nfe
2455 else:
2456 raise sa_exc.NoForeignKeysError(
2457 "Could not determine join "
2458 "condition between parent/child tables on "
2459 "relationship %s - there are no foreign keys "
2460 "linking these tables. "
2461 "Ensure that referencing columns are associated "
2462 "with a ForeignKey or ForeignKeyConstraint, or "
2463 "specify a 'primaryjoin' expression." % self.prop
2464 ) from nfe
2465 except sa_exc.AmbiguousForeignKeysError as afe:
2466 if self.secondary is not None:
2467 raise sa_exc.AmbiguousForeignKeysError(
2468 "Could not determine join "
2469 "condition between parent/child tables on "
2470 "relationship %s - there are multiple foreign key "
2471 "paths linking the tables via secondary table '%s'. "
2472 "Specify the 'foreign_keys' "
2473 "argument, providing a list of those columns which "
2474 "should be counted as containing a foreign key "
2475 "reference from the secondary table to each of the "
2476 "parent and child tables." % (self.prop, self.secondary)
2477 ) from afe
2478 else:
2479 raise sa_exc.AmbiguousForeignKeysError(
2480 "Could not determine join "
2481 "condition between parent/child tables on "
2482 "relationship %s - there are multiple foreign key "
2483 "paths linking the tables. Specify the "
2484 "'foreign_keys' argument, providing a list of those "
2485 "columns which should be counted as containing a "
2486 "foreign key reference to the parent table." % self.prop
2487 ) from afe
2488
2489 @property
2490 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2491 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2492
2493 @property
2494 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2495 assert self.secondaryjoin is not None
2496 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2497
2498 @util.memoized_property
2499 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2500 """Return the primaryjoin condition suitable for the
2501 "reverse" direction.
2502
2503 If the primaryjoin was delivered here with pre-existing
2504 "remote" annotations, the local/remote annotations
2505 are reversed. Otherwise, the local/remote annotations
2506 are removed.
2507
2508 """
2509 if self._has_remote_annotations:
2510
2511 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2512 if "remote" in element._annotations:
2513 v = dict(element._annotations)
2514 del v["remote"]
2515 v["local"] = True
2516 return element._with_annotations(v)
2517 elif "local" in element._annotations:
2518 v = dict(element._annotations)
2519 del v["local"]
2520 v["remote"] = True
2521 return element._with_annotations(v)
2522
2523 return None
2524
2525 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2526 else:
2527 if self._has_foreign_annotations:
2528 # TODO: coverage
2529 return _deep_deannotate(
2530 self.primaryjoin, values=("local", "remote")
2531 )
2532 else:
2533 return _deep_deannotate(self.primaryjoin)
2534
2535 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2536 for col in visitors.iterate(clause, {}):
2537 if annotation in col._annotations:
2538 return True
2539 else:
2540 return False
2541
2542 @util.memoized_property
2543 def _has_foreign_annotations(self) -> bool:
2544 return self._has_annotation(self.primaryjoin, "foreign")
2545
2546 @util.memoized_property
2547 def _has_remote_annotations(self) -> bool:
2548 return self._has_annotation(self.primaryjoin, "remote")
2549
2550 def _annotate_fks(self) -> None:
2551 """Annotate the primaryjoin and secondaryjoin
2552 structures with 'foreign' annotations marking columns
2553 considered as foreign.
2554
2555 """
2556 if self._has_foreign_annotations:
2557 return
2558
2559 if self.consider_as_foreign_keys:
2560 self._annotate_from_fk_list()
2561 else:
2562 self._annotate_present_fks()
2563
2564 def _annotate_from_fk_list(self) -> None:
2565 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2566 if element in self.consider_as_foreign_keys:
2567 return element._annotate({"foreign": True})
2568 return None
2569
2570 self.primaryjoin = visitors.replacement_traverse(
2571 self.primaryjoin, {}, check_fk
2572 )
2573 if self.secondaryjoin is not None:
2574 self.secondaryjoin = visitors.replacement_traverse(
2575 self.secondaryjoin, {}, check_fk
2576 )
2577
2578 def _annotate_present_fks(self) -> None:
2579 if self.secondary is not None:
2580 secondarycols = util.column_set(self.secondary.c)
2581 else:
2582 secondarycols = set()
2583
2584 def is_foreign(
2585 a: ColumnElement[Any], b: ColumnElement[Any]
2586 ) -> Optional[ColumnElement[Any]]:
2587 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2588 if a.references(b):
2589 return a
2590 elif b.references(a):
2591 return b
2592
2593 if secondarycols:
2594 if a in secondarycols and b not in secondarycols:
2595 return a
2596 elif b in secondarycols and a not in secondarycols:
2597 return b
2598
2599 return None
2600
2601 def visit_binary(binary: BinaryExpression[Any]) -> None:
2602 if not isinstance(
2603 binary.left, sql.ColumnElement
2604 ) or not isinstance(binary.right, sql.ColumnElement):
2605 return
2606
2607 if (
2608 "foreign" not in binary.left._annotations
2609 and "foreign" not in binary.right._annotations
2610 ):
2611 col = is_foreign(binary.left, binary.right)
2612 if col is not None:
2613 if col.compare(binary.left):
2614 binary.left = binary.left._annotate({"foreign": True})
2615 elif col.compare(binary.right):
2616 binary.right = binary.right._annotate(
2617 {"foreign": True}
2618 )
2619
2620 self.primaryjoin = visitors.cloned_traverse(
2621 self.primaryjoin, {}, {"binary": visit_binary}
2622 )
2623 if self.secondaryjoin is not None:
2624 self.secondaryjoin = visitors.cloned_traverse(
2625 self.secondaryjoin, {}, {"binary": visit_binary}
2626 )
2627
2628 def _refers_to_parent_table(self) -> bool:
2629 """Return True if the join condition contains column
2630 comparisons where both columns are in both tables.
2631
2632 """
2633 pt = self.parent_persist_selectable
2634 mt = self.child_persist_selectable
2635 result = False
2636
2637 def visit_binary(binary: BinaryExpression[Any]) -> None:
2638 nonlocal result
2639 c, f = binary.left, binary.right
2640 if (
2641 isinstance(c, expression.ColumnClause)
2642 and isinstance(f, expression.ColumnClause)
2643 and pt.is_derived_from(c.table)
2644 and pt.is_derived_from(f.table)
2645 and mt.is_derived_from(c.table)
2646 and mt.is_derived_from(f.table)
2647 ):
2648 result = True
2649
2650 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2651 return result
2652
2653 def _tables_overlap(self) -> bool:
2654 """Return True if parent/child tables have some overlap."""
2655
2656 return selectables_overlap(
2657 self.parent_persist_selectable, self.child_persist_selectable
2658 )
2659
2660 def _annotate_remote(self) -> None:
2661 """Annotate the primaryjoin and secondaryjoin
2662 structures with 'remote' annotations marking columns
2663 considered as part of the 'remote' side.
2664
2665 """
2666 if self._has_remote_annotations:
2667 return
2668
2669 if self.secondary is not None:
2670 self._annotate_remote_secondary()
2671 elif self._local_remote_pairs or self._remote_side:
2672 self._annotate_remote_from_args()
2673 elif self._refers_to_parent_table():
2674 self._annotate_selfref(
2675 lambda col: "foreign" in col._annotations, False
2676 )
2677 elif self._tables_overlap():
2678 self._annotate_remote_with_overlap()
2679 else:
2680 self._annotate_remote_distinct_selectables()
2681
2682 def _annotate_remote_secondary(self) -> None:
2683 """annotate 'remote' in primaryjoin, secondaryjoin
2684 when 'secondary' is present.
2685
2686 """
2687
2688 assert self.secondary is not None
2689 fixed_secondary = self.secondary
2690
2691 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2692 if fixed_secondary.c.contains_column(element):
2693 return element._annotate({"remote": True})
2694 return None
2695
2696 self.primaryjoin = visitors.replacement_traverse(
2697 self.primaryjoin, {}, repl
2698 )
2699
2700 assert self.secondaryjoin is not None
2701 self.secondaryjoin = visitors.replacement_traverse(
2702 self.secondaryjoin, {}, repl
2703 )
2704
2705 def _annotate_selfref(
2706 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2707 ) -> None:
2708 """annotate 'remote' in primaryjoin, secondaryjoin
2709 when the relationship is detected as self-referential.
2710
2711 """
2712
2713 def visit_binary(binary: BinaryExpression[Any]) -> None:
2714 equated = binary.left.compare(binary.right)
2715 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2716 binary.right, expression.ColumnClause
2717 ):
2718 # assume one to many - FKs are "remote"
2719 if fn(binary.left):
2720 binary.left = binary.left._annotate({"remote": True})
2721 if fn(binary.right) and not equated:
2722 binary.right = binary.right._annotate({"remote": True})
2723 elif not remote_side_given:
2724 self._warn_non_column_elements()
2725
2726 self.primaryjoin = visitors.cloned_traverse(
2727 self.primaryjoin, {}, {"binary": visit_binary}
2728 )
2729
2730 def _annotate_remote_from_args(self) -> None:
2731 """annotate 'remote' in primaryjoin, secondaryjoin
2732 when the 'remote_side' or '_local_remote_pairs'
2733 arguments are used.
2734
2735 """
2736 if self._local_remote_pairs:
2737 if self._remote_side:
2738 raise sa_exc.ArgumentError(
2739 "remote_side argument is redundant "
2740 "against more detailed _local_remote_side "
2741 "argument."
2742 )
2743
2744 remote_side = [r for (l, r) in self._local_remote_pairs]
2745 else:
2746 remote_side = self._remote_side
2747
2748 if self._refers_to_parent_table():
2749 self._annotate_selfref(lambda col: col in remote_side, True)
2750 else:
2751
2752 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2753 # use set() to avoid generating ``__eq__()`` expressions
2754 # against each element
2755 if element in set(remote_side):
2756 return element._annotate({"remote": True})
2757 return None
2758
2759 self.primaryjoin = visitors.replacement_traverse(
2760 self.primaryjoin, {}, repl
2761 )
2762
2763 def _annotate_remote_with_overlap(self) -> None:
2764 """annotate 'remote' in primaryjoin, secondaryjoin
2765 when the parent/child tables have some set of
2766 tables in common, though is not a fully self-referential
2767 relationship.
2768
2769 """
2770
2771 def visit_binary(binary: BinaryExpression[Any]) -> None:
2772 binary.left, binary.right = proc_left_right(
2773 binary.left, binary.right
2774 )
2775 binary.right, binary.left = proc_left_right(
2776 binary.right, binary.left
2777 )
2778
2779 check_entities = (
2780 self.prop is not None and self.prop.mapper is not self.prop.parent
2781 )
2782
2783 def proc_left_right(
2784 left: ColumnElement[Any], right: ColumnElement[Any]
2785 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2786 if isinstance(left, expression.ColumnClause) and isinstance(
2787 right, expression.ColumnClause
2788 ):
2789 if self.child_persist_selectable.c.contains_column(
2790 right
2791 ) and self.parent_persist_selectable.c.contains_column(left):
2792 right = right._annotate({"remote": True})
2793 elif (
2794 check_entities
2795 and right._annotations.get("parentmapper") is self.prop.mapper
2796 ):
2797 right = right._annotate({"remote": True})
2798 elif (
2799 check_entities
2800 and left._annotations.get("parentmapper") is self.prop.mapper
2801 ):
2802 left = left._annotate({"remote": True})
2803 else:
2804 self._warn_non_column_elements()
2805
2806 return left, right
2807
2808 self.primaryjoin = visitors.cloned_traverse(
2809 self.primaryjoin, {}, {"binary": visit_binary}
2810 )
2811
2812 def _annotate_remote_distinct_selectables(self) -> None:
2813 """annotate 'remote' in primaryjoin, secondaryjoin
2814 when the parent/child tables are entirely
2815 separate.
2816
2817 """
2818
2819 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2820 if self.child_persist_selectable.c.contains_column(element) and (
2821 not self.parent_local_selectable.c.contains_column(element)
2822 or self.child_local_selectable.c.contains_column(element)
2823 ):
2824 return element._annotate({"remote": True})
2825 return None
2826
2827 self.primaryjoin = visitors.replacement_traverse(
2828 self.primaryjoin, {}, repl
2829 )
2830
2831 def _warn_non_column_elements(self) -> None:
2832 util.warn(
2833 "Non-simple column elements in primary "
2834 "join condition for property %s - consider using "
2835 "remote() annotations to mark the remote side." % self.prop
2836 )
2837
2838 def _annotate_local(self) -> None:
2839 """Annotate the primaryjoin and secondaryjoin
2840 structures with 'local' annotations.
2841
2842 This annotates all column elements found
2843 simultaneously in the parent table
2844 and the join condition that don't have a
2845 'remote' annotation set up from
2846 _annotate_remote() or user-defined.
2847
2848 """
2849 if self._has_annotation(self.primaryjoin, "local"):
2850 return
2851
2852 if self._local_remote_pairs:
2853 local_side = util.column_set(
2854 [l for (l, r) in self._local_remote_pairs]
2855 )
2856 else:
2857 local_side = util.column_set(self.parent_persist_selectable.c)
2858
2859 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2860 if "remote" not in element._annotations and element in local_side:
2861 return element._annotate({"local": True})
2862 return None
2863
2864 self.primaryjoin = visitors.replacement_traverse(
2865 self.primaryjoin, {}, locals_
2866 )
2867
2868 def _annotate_parentmapper(self) -> None:
2869 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2870 if "remote" in element._annotations:
2871 return element._annotate({"parentmapper": self.prop.mapper})
2872 elif "local" in element._annotations:
2873 return element._annotate({"parentmapper": self.prop.parent})
2874 return None
2875
2876 self.primaryjoin = visitors.replacement_traverse(
2877 self.primaryjoin, {}, parentmappers_
2878 )
2879
2880 def _check_remote_side(self) -> None:
2881 if not self.local_remote_pairs:
2882 raise sa_exc.ArgumentError(
2883 "Relationship %s could "
2884 "not determine any unambiguous local/remote column "
2885 "pairs based on join condition and remote_side "
2886 "arguments. "
2887 "Consider using the remote() annotation to "
2888 "accurately mark those elements of the join "
2889 "condition that are on the remote side of "
2890 "the relationship." % (self.prop,)
2891 )
2892 else:
2893 not_target = util.column_set(
2894 self.parent_persist_selectable.c
2895 ).difference(self.child_persist_selectable.c)
2896
2897 for _, rmt in self.local_remote_pairs:
2898 if rmt in not_target:
2899 util.warn(
2900 "Expression %s is marked as 'remote', but these "
2901 "column(s) are local to the local side. The "
2902 "remote() annotation is needed only for a "
2903 "self-referential relationship where both sides "
2904 "of the relationship refer to the same tables."
2905 % (rmt,)
2906 )
2907
2908 def _check_foreign_cols(
2909 self, join_condition: ColumnElement[bool], primary: bool
2910 ) -> None:
2911 """Check the foreign key columns collected and emit error
2912 messages."""
2913 foreign_cols = self._gather_columns_with_annotation(
2914 join_condition, "foreign"
2915 )
2916
2917 has_foreign = bool(foreign_cols)
2918
2919 if primary:
2920 can_sync = bool(self.synchronize_pairs)
2921 else:
2922 can_sync = bool(self.secondary_synchronize_pairs)
2923
2924 if (
2925 self.support_sync
2926 and can_sync
2927 or (not self.support_sync and has_foreign)
2928 ):
2929 return
2930
2931 # from here below is just determining the best error message
2932 # to report. Check for a join condition using any operator
2933 # (not just ==), perhaps they need to turn on "viewonly=True".
2934 if self.support_sync and has_foreign and not can_sync:
2935 err = (
2936 "Could not locate any simple equality expressions "
2937 "involving locally mapped foreign key columns for "
2938 "%s join condition "
2939 "'%s' on relationship %s."
2940 % (
2941 primary and "primary" or "secondary",
2942 join_condition,
2943 self.prop,
2944 )
2945 )
2946 err += (
2947 " Ensure that referencing columns are associated "
2948 "with a ForeignKey or ForeignKeyConstraint, or are "
2949 "annotated in the join condition with the foreign() "
2950 "annotation. To allow comparison operators other than "
2951 "'==', the relationship can be marked as viewonly=True."
2952 )
2953
2954 raise sa_exc.ArgumentError(err)
2955 else:
2956 err = (
2957 "Could not locate any relevant foreign key columns "
2958 "for %s join condition '%s' on relationship %s."
2959 % (
2960 primary and "primary" or "secondary",
2961 join_condition,
2962 self.prop,
2963 )
2964 )
2965 err += (
2966 " Ensure that referencing columns are associated "
2967 "with a ForeignKey or ForeignKeyConstraint, or are "
2968 "annotated in the join condition with the foreign() "
2969 "annotation."
2970 )
2971 raise sa_exc.ArgumentError(err)
2972
2973 def _determine_direction(self) -> None:
2974 """Determine if this relationship is one to many, many to one,
2975 many to many.
2976
2977 """
2978 if self.secondaryjoin is not None:
2979 self.direction = MANYTOMANY
2980 else:
2981 parentcols = util.column_set(self.parent_persist_selectable.c)
2982 targetcols = util.column_set(self.child_persist_selectable.c)
2983
2984 # fk collection which suggests ONETOMANY.
2985 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
2986
2987 # fk collection which suggests MANYTOONE.
2988
2989 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
2990
2991 if onetomany_fk and manytoone_fk:
2992 # fks on both sides. test for overlap of local/remote
2993 # with foreign key.
2994 # we will gather columns directly from their annotations
2995 # without deannotating, so that we can distinguish on a column
2996 # that refers to itself.
2997
2998 # 1. columns that are both remote and FK suggest
2999 # onetomany.
3000 onetomany_local = self._gather_columns_with_annotation(
3001 self.primaryjoin, "remote", "foreign"
3002 )
3003
3004 # 2. columns that are FK but are not remote (e.g. local)
3005 # suggest manytoone.
3006 manytoone_local = {
3007 c
3008 for c in self._gather_columns_with_annotation(
3009 self.primaryjoin, "foreign"
3010 )
3011 if "remote" not in c._annotations
3012 }
3013
3014 # 3. if both collections are present, remove columns that
3015 # refer to themselves. This is for the case of
3016 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3017 if onetomany_local and manytoone_local:
3018 self_equated = self.remote_columns.intersection(
3019 self.local_columns
3020 )
3021 onetomany_local = onetomany_local.difference(self_equated)
3022 manytoone_local = manytoone_local.difference(self_equated)
3023
3024 # at this point, if only one or the other collection is
3025 # present, we know the direction, otherwise it's still
3026 # ambiguous.
3027
3028 if onetomany_local and not manytoone_local:
3029 self.direction = ONETOMANY
3030 elif manytoone_local and not onetomany_local:
3031 self.direction = MANYTOONE
3032 else:
3033 raise sa_exc.ArgumentError(
3034 "Can't determine relationship"
3035 " direction for relationship '%s' - foreign "
3036 "key columns within the join condition are present "
3037 "in both the parent and the child's mapped tables. "
3038 "Ensure that only those columns referring "
3039 "to a parent column are marked as foreign, "
3040 "either via the foreign() annotation or "
3041 "via the foreign_keys argument." % self.prop
3042 )
3043 elif onetomany_fk:
3044 self.direction = ONETOMANY
3045 elif manytoone_fk:
3046 self.direction = MANYTOONE
3047 else:
3048 raise sa_exc.ArgumentError(
3049 "Can't determine relationship "
3050 "direction for relationship '%s' - foreign "
3051 "key columns are present in neither the parent "
3052 "nor the child's mapped tables" % self.prop
3053 )
3054
3055 def _deannotate_pairs(
3056 self, collection: _ColumnPairIterable
3057 ) -> _MutableColumnPairs:
3058 """provide deannotation for the various lists of
3059 pairs, so that using them in hashes doesn't incur
3060 high-overhead __eq__() comparisons against
3061 original columns mapped.
3062
3063 """
3064 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3065
3066 def _setup_pairs(self) -> None:
3067 sync_pairs: _MutableColumnPairs = []
3068 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3069 util.OrderedSet([])
3070 )
3071 secondary_sync_pairs: _MutableColumnPairs = []
3072
3073 def go(
3074 joincond: ColumnElement[bool],
3075 collection: _MutableColumnPairs,
3076 ) -> None:
3077 def visit_binary(
3078 binary: BinaryExpression[Any],
3079 left: ColumnElement[Any],
3080 right: ColumnElement[Any],
3081 ) -> None:
3082 if (
3083 "remote" in right._annotations
3084 and "remote" not in left._annotations
3085 and self.can_be_synced_fn(left)
3086 ):
3087 lrp.add((left, right))
3088 elif (
3089 "remote" in left._annotations
3090 and "remote" not in right._annotations
3091 and self.can_be_synced_fn(right)
3092 ):
3093 lrp.add((right, left))
3094 if binary.operator is operators.eq and self.can_be_synced_fn(
3095 left, right
3096 ):
3097 if "foreign" in right._annotations:
3098 collection.append((left, right))
3099 elif "foreign" in left._annotations:
3100 collection.append((right, left))
3101
3102 visit_binary_product(visit_binary, joincond)
3103
3104 for joincond, collection in [
3105 (self.primaryjoin, sync_pairs),
3106 (self.secondaryjoin, secondary_sync_pairs),
3107 ]:
3108 if joincond is None:
3109 continue
3110 go(joincond, collection)
3111
3112 self.local_remote_pairs = self._deannotate_pairs(lrp)
3113 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3114 self.secondary_synchronize_pairs = self._deannotate_pairs(
3115 secondary_sync_pairs
3116 )
3117
3118 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3119 ColumnElement[Any],
3120 weakref.WeakKeyDictionary[
3121 RelationshipProperty[Any], ColumnElement[Any]
3122 ],
3123 ] = weakref.WeakKeyDictionary()
3124
3125 def _warn_for_conflicting_sync_targets(self) -> None:
3126 if not self.support_sync:
3127 return
3128
3129 # we would like to detect if we are synchronizing any column
3130 # pairs in conflict with another relationship that wishes to sync
3131 # an entirely different column to the same target. This is a
3132 # very rare edge case so we will try to minimize the memory/overhead
3133 # impact of this check
3134 for from_, to_ in [
3135 (from_, to_) for (from_, to_) in self.synchronize_pairs
3136 ] + [
3137 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3138 ]:
3139 # save ourselves a ton of memory and overhead by only
3140 # considering columns that are subject to a overlapping
3141 # FK constraints at the core level. This condition can arise
3142 # if multiple relationships overlap foreign() directly, but
3143 # we're going to assume it's typically a ForeignKeyConstraint-
3144 # level configuration that benefits from this warning.
3145
3146 if to_ not in self._track_overlapping_sync_targets:
3147 self._track_overlapping_sync_targets[to_] = (
3148 weakref.WeakKeyDictionary({self.prop: from_})
3149 )
3150 else:
3151 other_props = []
3152 prop_to_from = self._track_overlapping_sync_targets[to_]
3153
3154 for pr, fr_ in prop_to_from.items():
3155 if (
3156 not pr.mapper._dispose_called
3157 and pr not in self.prop._reverse_property
3158 and pr.key not in self.prop._overlaps
3159 and self.prop.key not in pr._overlaps
3160 # note: the "__*" symbol is used internally by
3161 # SQLAlchemy as a general means of suppressing the
3162 # overlaps warning for some extension cases, however
3163 # this is not currently
3164 # a publicly supported symbol and may change at
3165 # any time.
3166 and "__*" not in self.prop._overlaps
3167 and "__*" not in pr._overlaps
3168 and not self.prop.parent.is_sibling(pr.parent)
3169 and not self.prop.mapper.is_sibling(pr.mapper)
3170 and not self.prop.parent.is_sibling(pr.mapper)
3171 and not self.prop.mapper.is_sibling(pr.parent)
3172 and (
3173 self.prop.key != pr.key
3174 or not self.prop.parent.common_parent(pr.parent)
3175 )
3176 ):
3177 other_props.append((pr, fr_))
3178
3179 if other_props:
3180 util.warn(
3181 "relationship '%s' will copy column %s to column %s, "
3182 "which conflicts with relationship(s): %s. "
3183 "If this is not the intention, consider if these "
3184 "relationships should be linked with "
3185 "back_populates, or if viewonly=True should be "
3186 "applied to one or more if they are read-only. "
3187 "For the less common case that foreign key "
3188 "constraints are partially overlapping, the "
3189 "orm.foreign() "
3190 "annotation can be used to isolate the columns that "
3191 "should be written towards. To silence this "
3192 "warning, add the parameter 'overlaps=\"%s\"' to the "
3193 "'%s' relationship."
3194 % (
3195 self.prop,
3196 from_,
3197 to_,
3198 ", ".join(
3199 sorted(
3200 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3201 for (pr, fr_) in other_props
3202 )
3203 ),
3204 ",".join(sorted(pr.key for pr, fr in other_props)),
3205 self.prop,
3206 ),
3207 code="qzyx",
3208 )
3209 self._track_overlapping_sync_targets[to_][self.prop] = from_
3210
3211 @util.memoized_property
3212 def remote_columns(self) -> Set[ColumnElement[Any]]:
3213 return self._gather_join_annotations("remote")
3214
3215 @util.memoized_property
3216 def local_columns(self) -> Set[ColumnElement[Any]]:
3217 return self._gather_join_annotations("local")
3218
3219 @util.memoized_property
3220 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3221 return self._gather_join_annotations("foreign")
3222
3223 def _gather_join_annotations(
3224 self, annotation: str
3225 ) -> Set[ColumnElement[Any]]:
3226 s = set(
3227 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3228 )
3229 if self.secondaryjoin is not None:
3230 s.update(
3231 self._gather_columns_with_annotation(
3232 self.secondaryjoin, annotation
3233 )
3234 )
3235 return {x._deannotate() for x in s}
3236
3237 def _gather_columns_with_annotation(
3238 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3239 ) -> Set[ColumnElement[Any]]:
3240 annotation_set = set(annotation)
3241 return {
3242 cast(ColumnElement[Any], col)
3243 for col in visitors.iterate(clause, {})
3244 if annotation_set.issubset(col._annotations)
3245 }
3246
3247 @util.memoized_property
3248 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3249 if self.secondary is not None:
3250 return frozenset(
3251 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3252 )
3253 else:
3254 return util.EMPTY_SET
3255
3256 def join_targets(
3257 self,
3258 source_selectable: Optional[FromClause],
3259 dest_selectable: FromClause,
3260 aliased: bool,
3261 single_crit: Optional[ColumnElement[bool]] = None,
3262 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3263 ) -> Tuple[
3264 ColumnElement[bool],
3265 Optional[ColumnElement[bool]],
3266 Optional[FromClause],
3267 Optional[ClauseAdapter],
3268 FromClause,
3269 ]:
3270 """Given a source and destination selectable, create a
3271 join between them.
3272
3273 This takes into account aliasing the join clause
3274 to reference the appropriate corresponding columns
3275 in the target objects, as well as the extra child
3276 criterion, equivalent column sets, etc.
3277
3278 """
3279 # place a barrier on the destination such that
3280 # replacement traversals won't ever dig into it.
3281 # its internal structure remains fixed
3282 # regardless of context.
3283 dest_selectable = _shallow_annotate(
3284 dest_selectable, {"no_replacement_traverse": True}
3285 )
3286
3287 primaryjoin, secondaryjoin, secondary = (
3288 self.primaryjoin,
3289 self.secondaryjoin,
3290 self.secondary,
3291 )
3292
3293 # adjust the join condition for single table inheritance,
3294 # in the case that the join is to a subclass
3295 # this is analogous to the
3296 # "_adjust_for_single_table_inheritance()" method in Query.
3297
3298 if single_crit is not None:
3299 if secondaryjoin is not None:
3300 secondaryjoin = secondaryjoin & single_crit
3301 else:
3302 primaryjoin = primaryjoin & single_crit
3303
3304 if extra_criteria:
3305
3306 def mark_exclude_cols(
3307 elem: SupportsAnnotations, annotations: _AnnotationDict
3308 ) -> SupportsAnnotations:
3309 """note unrelated columns in the "extra criteria" as either
3310 should be adapted or not adapted, even though they are not
3311 part of our "local" or "remote" side.
3312
3313 see #9779 for this case, as well as #11010 for a follow up
3314
3315 """
3316
3317 parentmapper_for_element = elem._annotations.get(
3318 "parentmapper", None
3319 )
3320
3321 if (
3322 parentmapper_for_element is not self.prop.parent
3323 and parentmapper_for_element is not self.prop.mapper
3324 and elem not in self._secondary_lineage_set
3325 ):
3326 return _safe_annotate(elem, annotations)
3327 else:
3328 return elem
3329
3330 extra_criteria = tuple(
3331 _deep_annotate(
3332 elem,
3333 {"should_not_adapt": True},
3334 annotate_callable=mark_exclude_cols,
3335 )
3336 for elem in extra_criteria
3337 )
3338
3339 if secondaryjoin is not None:
3340 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3341 else:
3342 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3343
3344 if aliased:
3345 if secondary is not None:
3346 secondary = secondary._anonymous_fromclause(flat=True)
3347 primary_aliasizer = ClauseAdapter(
3348 secondary,
3349 exclude_fn=_local_col_exclude,
3350 )
3351 secondary_aliasizer = ClauseAdapter(
3352 dest_selectable, equivalents=self.child_equivalents
3353 ).chain(primary_aliasizer)
3354 if source_selectable is not None:
3355 primary_aliasizer = ClauseAdapter(
3356 secondary,
3357 exclude_fn=_local_col_exclude,
3358 ).chain(
3359 ClauseAdapter(
3360 source_selectable,
3361 equivalents=self.parent_equivalents,
3362 )
3363 )
3364
3365 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3366 else:
3367 primary_aliasizer = ClauseAdapter(
3368 dest_selectable,
3369 exclude_fn=_local_col_exclude,
3370 equivalents=self.child_equivalents,
3371 )
3372 if source_selectable is not None:
3373 primary_aliasizer.chain(
3374 ClauseAdapter(
3375 source_selectable,
3376 exclude_fn=_remote_col_exclude,
3377 equivalents=self.parent_equivalents,
3378 )
3379 )
3380 secondary_aliasizer = None
3381
3382 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3383 target_adapter = secondary_aliasizer or primary_aliasizer
3384 target_adapter.exclude_fn = None
3385 else:
3386 target_adapter = None
3387 return (
3388 primaryjoin,
3389 secondaryjoin,
3390 secondary,
3391 target_adapter,
3392 dest_selectable,
3393 )
3394
3395 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3396 ColumnElement[bool],
3397 Dict[str, ColumnElement[Any]],
3398 Dict[ColumnElement[Any], ColumnElement[Any]],
3399 ]:
3400 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3401 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3402
3403 has_secondary = self.secondaryjoin is not None
3404
3405 if has_secondary:
3406 lookup = collections.defaultdict(list)
3407 for l, r in self.local_remote_pairs:
3408 lookup[l].append((l, r))
3409 equated_columns[r] = l
3410 elif not reverse_direction:
3411 for l, r in self.local_remote_pairs:
3412 equated_columns[r] = l
3413 else:
3414 for l, r in self.local_remote_pairs:
3415 equated_columns[l] = r
3416
3417 def col_to_bind(
3418 element: ColumnElement[Any], **kw: Any
3419 ) -> Optional[BindParameter[Any]]:
3420 if (
3421 (not reverse_direction and "local" in element._annotations)
3422 or reverse_direction
3423 and (
3424 (has_secondary and element in lookup)
3425 or (not has_secondary and "remote" in element._annotations)
3426 )
3427 ):
3428 if element not in binds:
3429 binds[element] = sql.bindparam(
3430 None, None, type_=element.type, unique=True
3431 )
3432 return binds[element]
3433 return None
3434
3435 lazywhere = self.primaryjoin
3436 if self.secondaryjoin is None or not reverse_direction:
3437 lazywhere = visitors.replacement_traverse(
3438 lazywhere, {}, col_to_bind
3439 )
3440
3441 if self.secondaryjoin is not None:
3442 secondaryjoin = self.secondaryjoin
3443 if reverse_direction:
3444 secondaryjoin = visitors.replacement_traverse(
3445 secondaryjoin, {}, col_to_bind
3446 )
3447 lazywhere = sql.and_(lazywhere, secondaryjoin)
3448
3449 bind_to_col = {binds[col].key: col for col in binds}
3450
3451 return lazywhere, bind_to_col, equated_columns
3452
3453
3454class _ColInAnnotations:
3455 """Serializable object that tests for names in c._annotations.
3456
3457 TODO: does this need to be serializable anymore? can we find what the
3458 use case was for that?
3459
3460 """
3461
3462 __slots__ = ("names",)
3463
3464 def __init__(self, *names: str):
3465 self.names = frozenset(names)
3466
3467 def __call__(self, c: ClauseElement) -> bool:
3468 return bool(self.names.intersection(c._annotations))
3469
3470
3471_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3472_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3473
3474
3475class Relationship(
3476 RelationshipProperty[_T],
3477 _DeclarativeMapped[_T],
3478):
3479 """Describes an object property that holds a single item or list
3480 of items that correspond to a related database table.
3481
3482 Public constructor is the :func:`_orm.relationship` function.
3483
3484 .. seealso::
3485
3486 :ref:`relationship_config_toplevel`
3487
3488 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3489 compatible subclass for :class:`_orm.RelationshipProperty`.
3490
3491 """
3492
3493 inherit_cache = True
3494 """:meta private:"""
3495
3496
3497class _RelationshipDeclared( # type: ignore[misc]
3498 Relationship[_T],
3499 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3500 DynamicMapped[_T], # not compatible with Mapped[_T]
3501):
3502 """Relationship subclass used implicitly for declarative mapping."""
3503
3504 inherit_cache = True
3505 """:meta private:"""
3506
3507 @classmethod
3508 def _mapper_property_name(cls) -> str:
3509 return "Relationship"