1# orm/relationships.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import NamedTuple
36from typing import NoReturn
37from typing import Optional
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TypeVar
43from typing import Union
44import weakref
45
46from . import attributes
47from . import strategy_options
48from ._typing import insp_is_aliased_class
49from ._typing import is_has_collection_adapter
50from .base import _DeclarativeMapped
51from .base import _is_mapped_class
52from .base import class_mapper
53from .base import DynamicMapped
54from .base import LoaderCallableStatus
55from .base import PassiveFlag
56from .base import state_str
57from .base import WriteOnlyMapped
58from .interfaces import _AttributeOptions
59from .interfaces import _IntrospectsAnnotations
60from .interfaces import MANYTOMANY
61from .interfaces import MANYTOONE
62from .interfaces import ONETOMANY
63from .interfaces import PropComparator
64from .interfaces import RelationshipDirection
65from .interfaces import StrategizedProperty
66from .util import _orm_annotate
67from .util import _orm_deannotate
68from .util import CascadeOptions
69from .. import exc as sa_exc
70from .. import Exists
71from .. import log
72from .. import schema
73from .. import sql
74from .. import util
75from ..inspection import inspect
76from ..sql import coercions
77from ..sql import expression
78from ..sql import operators
79from ..sql import roles
80from ..sql import visitors
81from ..sql._typing import _ColumnExpressionArgument
82from ..sql._typing import _HasClauseElement
83from ..sql.annotation import _safe_annotate
84from ..sql.elements import ColumnClause
85from ..sql.elements import ColumnElement
86from ..sql.util import _deep_annotate
87from ..sql.util import _deep_deannotate
88from ..sql.util import _shallow_annotate
89from ..sql.util import adapt_criterion_to_null
90from ..sql.util import ClauseAdapter
91from ..sql.util import join_condition
92from ..sql.util import selectables_overlap
93from ..sql.util import visit_binary_product
94from ..util.typing import de_optionalize_union_types
95from ..util.typing import Literal
96from ..util.typing import resolve_name_to_real_class_name
97
98if typing.TYPE_CHECKING:
99 from ._typing import _EntityType
100 from ._typing import _ExternalEntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InstanceDict
103 from ._typing import _InternalEntityType
104 from ._typing import _O
105 from ._typing import _RegistryType
106 from .base import Mapped
107 from .clsregistry import _class_resolver
108 from .clsregistry import _ModNS
109 from .decl_base import _ClassScanMapperConfig
110 from .dependency import DependencyProcessor
111 from .mapper import Mapper
112 from .query import Query
113 from .session import Session
114 from .state import InstanceState
115 from .strategies import LazyLoader
116 from .util import AliasedClass
117 from .util import AliasedInsp
118 from ..sql._typing import _CoreAdapterProto
119 from ..sql._typing import _EquivalentColumnMap
120 from ..sql._typing import _InfoType
121 from ..sql.annotation import _AnnotationDict
122 from ..sql.annotation import SupportsAnnotations
123 from ..sql.elements import BinaryExpression
124 from ..sql.elements import BindParameter
125 from ..sql.elements import ClauseElement
126 from ..sql.schema import Table
127 from ..sql.selectable import FromClause
128 from ..util.typing import _AnnotationScanType
129 from ..util.typing import RODescriptorReference
130
131_T = TypeVar("_T", bound=Any)
132_T1 = TypeVar("_T1", bound=Any)
133_T2 = TypeVar("_T2", bound=Any)
134
135_PT = TypeVar("_PT", bound=Any)
136
137_PT2 = TypeVar("_PT2", bound=Any)
138
139
140_RelationshipArgumentType = Union[
141 str,
142 Type[_T],
143 Callable[[], Type[_T]],
144 "Mapper[_T]",
145 "AliasedClass[_T]",
146 Callable[[], "Mapper[_T]"],
147 Callable[[], "AliasedClass[_T]"],
148]
149
150_LazyLoadArgumentType = Literal[
151 "select",
152 "joined",
153 "selectin",
154 "subquery",
155 "raise",
156 "raise_on_sql",
157 "noload",
158 "immediate",
159 "write_only",
160 "dynamic",
161 True,
162 False,
163 None,
164]
165
166
167_RelationshipJoinConditionArgument = Union[
168 str, _ColumnExpressionArgument[bool]
169]
170_RelationshipSecondaryArgument = Union[
171 "FromClause", str, Callable[[], "FromClause"]
172]
173_ORMOrderByArgument = Union[
174 Literal[False],
175 str,
176 _ColumnExpressionArgument[Any],
177 Callable[[], _ColumnExpressionArgument[Any]],
178 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
179 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
180]
181ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
182
183_ORMColCollectionElement = Union[
184 ColumnClause[Any],
185 _HasClauseElement[Any],
186 roles.DMLColumnRole,
187 "Mapped[Any]",
188]
189_ORMColCollectionArgument = Union[
190 str,
191 Sequence[_ORMColCollectionElement],
192 Callable[[], Sequence[_ORMColCollectionElement]],
193 Callable[[], _ORMColCollectionElement],
194 _ORMColCollectionElement,
195]
196
197
198_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
199
200_CE = TypeVar("_CE", bound="ColumnElement[Any]")
201
202
203_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
204
205_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
206
207_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
208
209
210def remote(expr: _CEA) -> _CEA:
211 """Annotate a portion of a primaryjoin expression
212 with a 'remote' annotation.
213
214 See the section :ref:`relationship_custom_foreign` for a
215 description of use.
216
217 .. seealso::
218
219 :ref:`relationship_custom_foreign`
220
221 :func:`.foreign`
222
223 """
224 return _annotate_columns( # type: ignore
225 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
226 )
227
228
229def foreign(expr: _CEA) -> _CEA:
230 """Annotate a portion of a primaryjoin expression
231 with a 'foreign' annotation.
232
233 See the section :ref:`relationship_custom_foreign` for a
234 description of use.
235
236 .. seealso::
237
238 :ref:`relationship_custom_foreign`
239
240 :func:`.remote`
241
242 """
243
244 return _annotate_columns( # type: ignore
245 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
246 )
247
248
249@dataclasses.dataclass
250class _RelationshipArg(Generic[_T1, _T2]):
251 """stores a user-defined parameter value that must be resolved and
252 parsed later at mapper configuration time.
253
254 """
255
256 __slots__ = "name", "argument", "resolved"
257 name: str
258 argument: _T1
259 resolved: Optional[_T2]
260
261 def _is_populated(self) -> bool:
262 return self.argument is not None
263
264 def _resolve_against_registry(
265 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
266 ) -> None:
267 attr_value = self.argument
268
269 if isinstance(attr_value, str):
270 self.resolved = clsregistry_resolver(
271 attr_value, self.name == "secondary"
272 )()
273 elif callable(attr_value) and not _is_mapped_class(attr_value):
274 self.resolved = attr_value()
275 else:
276 self.resolved = attr_value
277
278
279_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
280
281
282class _RelationshipArgs(NamedTuple):
283 """stores user-passed parameters that are resolved at mapper configuration
284 time.
285
286 """
287
288 secondary: _RelationshipArg[
289 Optional[_RelationshipSecondaryArgument],
290 Optional[FromClause],
291 ]
292 primaryjoin: _RelationshipArg[
293 Optional[_RelationshipJoinConditionArgument],
294 Optional[ColumnElement[Any]],
295 ]
296 secondaryjoin: _RelationshipArg[
297 Optional[_RelationshipJoinConditionArgument],
298 Optional[ColumnElement[Any]],
299 ]
300 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
301 foreign_keys: _RelationshipArg[
302 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
303 ]
304 remote_side: _RelationshipArg[
305 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
306 ]
307
308
309@log.class_logger
310class RelationshipProperty(
311 _IntrospectsAnnotations, StrategizedProperty[_T], log.Identified
312):
313 """Describes an object property that holds a single item or list
314 of items that correspond to a related database table.
315
316 Public constructor is the :func:`_orm.relationship` function.
317
318 .. seealso::
319
320 :ref:`relationship_config_toplevel`
321
322 """
323
324 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
325 inherit_cache = True
326 """:meta private:"""
327
328 _links_to_entity = True
329 _is_relationship = True
330
331 _overlaps: Sequence[str]
332
333 _lazy_strategy: LazyLoader
334
335 _persistence_only = dict(
336 passive_deletes=False,
337 passive_updates=True,
338 enable_typechecks=True,
339 active_history=False,
340 cascade_backrefs=False,
341 )
342
343 _dependency_processor: Optional[DependencyProcessor] = None
344
345 primaryjoin: ColumnElement[bool]
346 secondaryjoin: Optional[ColumnElement[bool]]
347 secondary: Optional[FromClause]
348 _join_condition: JoinCondition
349 order_by: _RelationshipOrderByArg
350
351 _user_defined_foreign_keys: Set[ColumnElement[Any]]
352 _calculated_foreign_keys: Set[ColumnElement[Any]]
353
354 remote_side: Set[ColumnElement[Any]]
355 local_columns: Set[ColumnElement[Any]]
356
357 synchronize_pairs: _ColumnPairs
358 secondary_synchronize_pairs: Optional[_ColumnPairs]
359
360 local_remote_pairs: Optional[_ColumnPairs]
361
362 direction: RelationshipDirection
363
364 _init_args: _RelationshipArgs
365
366 def __init__(
367 self,
368 argument: Optional[_RelationshipArgumentType[_T]] = None,
369 secondary: Optional[_RelationshipSecondaryArgument] = None,
370 *,
371 uselist: Optional[bool] = None,
372 collection_class: Optional[
373 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
374 ] = None,
375 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
376 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
377 back_populates: Optional[str] = None,
378 order_by: _ORMOrderByArgument = False,
379 backref: Optional[ORMBackrefArgument] = None,
380 overlaps: Optional[str] = None,
381 post_update: bool = False,
382 cascade: str = "save-update, merge",
383 viewonly: bool = False,
384 attribute_options: Optional[_AttributeOptions] = None,
385 lazy: _LazyLoadArgumentType = "select",
386 passive_deletes: Union[Literal["all"], bool] = False,
387 passive_updates: bool = True,
388 active_history: bool = False,
389 enable_typechecks: bool = True,
390 foreign_keys: Optional[_ORMColCollectionArgument] = None,
391 remote_side: Optional[_ORMColCollectionArgument] = None,
392 join_depth: Optional[int] = None,
393 comparator_factory: Optional[
394 Type[RelationshipProperty.Comparator[Any]]
395 ] = None,
396 single_parent: bool = False,
397 innerjoin: bool = False,
398 distinct_target_key: Optional[bool] = None,
399 load_on_pending: bool = False,
400 query_class: Optional[Type[Query[Any]]] = None,
401 info: Optional[_InfoType] = None,
402 omit_join: Literal[None, False] = None,
403 sync_backref: Optional[bool] = None,
404 doc: Optional[str] = None,
405 bake_queries: Literal[True] = True,
406 cascade_backrefs: Literal[False] = False,
407 _local_remote_pairs: Optional[_ColumnPairs] = None,
408 _legacy_inactive_history_style: bool = False,
409 ):
410 super().__init__(attribute_options=attribute_options)
411
412 self.uselist = uselist
413 self.argument = argument
414
415 self._init_args = _RelationshipArgs(
416 _RelationshipArg("secondary", secondary, None),
417 _RelationshipArg("primaryjoin", primaryjoin, None),
418 _RelationshipArg("secondaryjoin", secondaryjoin, None),
419 _RelationshipArg("order_by", order_by, None),
420 _RelationshipArg("foreign_keys", foreign_keys, None),
421 _RelationshipArg("remote_side", remote_side, None),
422 )
423
424 self.post_update = post_update
425 self.viewonly = viewonly
426 if viewonly:
427 self._warn_for_persistence_only_flags(
428 passive_deletes=passive_deletes,
429 passive_updates=passive_updates,
430 enable_typechecks=enable_typechecks,
431 active_history=active_history,
432 cascade_backrefs=cascade_backrefs,
433 )
434 if viewonly and sync_backref:
435 raise sa_exc.ArgumentError(
436 "sync_backref and viewonly cannot both be True"
437 )
438 self.sync_backref = sync_backref
439 self.lazy = lazy
440 self.single_parent = single_parent
441 self.collection_class = collection_class
442 self.passive_deletes = passive_deletes
443
444 if cascade_backrefs:
445 raise sa_exc.ArgumentError(
446 "The 'cascade_backrefs' parameter passed to "
447 "relationship() may only be set to False."
448 )
449
450 self.passive_updates = passive_updates
451 self.enable_typechecks = enable_typechecks
452 self.query_class = query_class
453 self.innerjoin = innerjoin
454 self.distinct_target_key = distinct_target_key
455 self.doc = doc
456 self.active_history = active_history
457 self._legacy_inactive_history_style = _legacy_inactive_history_style
458
459 self.join_depth = join_depth
460 if omit_join:
461 util.warn(
462 "setting omit_join to True is not supported; selectin "
463 "loading of this relationship may not work correctly if this "
464 "flag is set explicitly. omit_join optimization is "
465 "automatically detected for conditions under which it is "
466 "supported."
467 )
468
469 self.omit_join = omit_join
470 self.local_remote_pairs = _local_remote_pairs
471 self.load_on_pending = load_on_pending
472 self.comparator_factory = (
473 comparator_factory or RelationshipProperty.Comparator
474 )
475 util.set_creation_order(self)
476
477 if info is not None:
478 self.info.update(info)
479
480 self.strategy_key = (("lazy", self.lazy),)
481
482 self._reverse_property: Set[RelationshipProperty[Any]] = set()
483
484 if overlaps:
485 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
486 else:
487 self._overlaps = ()
488
489 self.cascade = cascade
490
491 self.back_populates = back_populates
492
493 if self.back_populates:
494 if backref:
495 raise sa_exc.ArgumentError(
496 "backref and back_populates keyword arguments "
497 "are mutually exclusive"
498 )
499 self.backref = None
500 else:
501 self.backref = backref
502
503 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
504 for k, v in kw.items():
505 if v != self._persistence_only[k]:
506 # we are warning here rather than warn deprecated as this is a
507 # configuration mistake, and Python shows regular warnings more
508 # aggressively than deprecation warnings by default. Unlike the
509 # case of setting viewonly with cascade, the settings being
510 # warned about here are not actively doing the wrong thing
511 # against viewonly=True, so it is not as urgent to have these
512 # raise an error.
513 util.warn(
514 "Setting %s on relationship() while also "
515 "setting viewonly=True does not make sense, as a "
516 "viewonly=True relationship does not perform persistence "
517 "operations. This configuration may raise an error "
518 "in a future release." % (k,)
519 )
520
521 def instrument_class(self, mapper: Mapper[Any]) -> None:
522 attributes.register_descriptor(
523 mapper.class_,
524 self.key,
525 comparator=self.comparator_factory(self, mapper),
526 parententity=mapper,
527 doc=self.doc,
528 )
529
530 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
531 """Produce boolean, comparison, and other operators for
532 :class:`.RelationshipProperty` attributes.
533
534 See the documentation for :class:`.PropComparator` for a brief
535 overview of ORM level operator definition.
536
537 .. seealso::
538
539 :class:`.PropComparator`
540
541 :class:`.ColumnProperty.Comparator`
542
543 :class:`.ColumnOperators`
544
545 :ref:`types_operators`
546
547 :attr:`.TypeEngine.comparator_factory`
548
549 """
550
551 __slots__ = (
552 "entity",
553 "mapper",
554 "property",
555 "_of_type",
556 "_extra_criteria",
557 )
558
559 prop: RODescriptorReference[RelationshipProperty[_PT]]
560 _of_type: Optional[_EntityType[_PT]]
561
562 def __init__(
563 self,
564 prop: RelationshipProperty[_PT],
565 parentmapper: _InternalEntityType[Any],
566 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
567 of_type: Optional[_EntityType[_PT]] = None,
568 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
569 ):
570 """Construction of :class:`.RelationshipProperty.Comparator`
571 is internal to the ORM's attribute mechanics.
572
573 """
574 self.prop = prop
575 self._parententity = parentmapper
576 self._adapt_to_entity = adapt_to_entity
577 if of_type:
578 self._of_type = of_type
579 else:
580 self._of_type = None
581 self._extra_criteria = extra_criteria
582
583 def adapt_to_entity(
584 self, adapt_to_entity: AliasedInsp[Any]
585 ) -> RelationshipProperty.Comparator[Any]:
586 return self.__class__(
587 self.prop,
588 self._parententity,
589 adapt_to_entity=adapt_to_entity,
590 of_type=self._of_type,
591 )
592
593 entity: _InternalEntityType[_PT]
594 """The target entity referred to by this
595 :class:`.RelationshipProperty.Comparator`.
596
597 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
598 object.
599
600 This is the "target" or "remote" side of the
601 :func:`_orm.relationship`.
602
603 """
604
605 mapper: Mapper[_PT]
606 """The target :class:`_orm.Mapper` referred to by this
607 :class:`.RelationshipProperty.Comparator`.
608
609 This is the "target" or "remote" side of the
610 :func:`_orm.relationship`.
611
612 """
613
614 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
615 if self._of_type:
616 return inspect(self._of_type) # type: ignore
617 else:
618 return self.prop.entity
619
620 def _memoized_attr_mapper(self) -> Mapper[_PT]:
621 return self.entity.mapper
622
623 def _source_selectable(self) -> FromClause:
624 if self._adapt_to_entity:
625 return self._adapt_to_entity.selectable
626 else:
627 return self.property.parent._with_polymorphic_selectable
628
629 def __clause_element__(self) -> ColumnElement[bool]:
630 adapt_from = self._source_selectable()
631 if self._of_type:
632 of_type_entity = inspect(self._of_type)
633 else:
634 of_type_entity = None
635
636 (
637 pj,
638 sj,
639 source,
640 dest,
641 secondary,
642 target_adapter,
643 ) = self.prop._create_joins(
644 source_selectable=adapt_from,
645 source_polymorphic=True,
646 of_type_entity=of_type_entity,
647 alias_secondary=True,
648 extra_criteria=self._extra_criteria,
649 )
650 if sj is not None:
651 return pj & sj
652 else:
653 return pj
654
655 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
656 r"""Redefine this object in terms of a polymorphic subclass.
657
658 See :meth:`.PropComparator.of_type` for an example.
659
660
661 """
662 return RelationshipProperty.Comparator(
663 self.prop,
664 self._parententity,
665 adapt_to_entity=self._adapt_to_entity,
666 of_type=class_,
667 extra_criteria=self._extra_criteria,
668 )
669
670 def and_(
671 self, *criteria: _ColumnExpressionArgument[bool]
672 ) -> PropComparator[Any]:
673 """Add AND criteria.
674
675 See :meth:`.PropComparator.and_` for an example.
676
677 .. versionadded:: 1.4
678
679 """
680 exprs = tuple(
681 coercions.expect(roles.WhereHavingRole, clause)
682 for clause in util.coerce_generator_arg(criteria)
683 )
684
685 return RelationshipProperty.Comparator(
686 self.prop,
687 self._parententity,
688 adapt_to_entity=self._adapt_to_entity,
689 of_type=self._of_type,
690 extra_criteria=self._extra_criteria + exprs,
691 )
692
693 def in_(self, other: Any) -> NoReturn:
694 """Produce an IN clause - this is not implemented
695 for :func:`_orm.relationship`-based attributes at this time.
696
697 """
698 raise NotImplementedError(
699 "in_() not yet supported for "
700 "relationships. For a simple "
701 "many-to-one, use in_() against "
702 "the set of foreign key values."
703 )
704
705 # https://github.com/python/mypy/issues/4266
706 __hash__ = None # type: ignore
707
708 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
709 """Implement the ``==`` operator.
710
711 In a many-to-one context, such as:
712
713 .. sourcecode:: text
714
715 MyClass.some_prop == <some object>
716
717 this will typically produce a
718 clause such as:
719
720 .. sourcecode:: text
721
722 mytable.related_id == <some id>
723
724 Where ``<some id>`` is the primary key of the given
725 object.
726
727 The ``==`` operator provides partial functionality for non-
728 many-to-one comparisons:
729
730 * Comparisons against collections are not supported.
731 Use :meth:`~.Relationship.Comparator.contains`.
732 * Compared to a scalar one-to-many, will produce a
733 clause that compares the target columns in the parent to
734 the given target.
735 * Compared to a scalar many-to-many, an alias
736 of the association table will be rendered as
737 well, forming a natural join that is part of the
738 main body of the query. This will not work for
739 queries that go beyond simple AND conjunctions of
740 comparisons, such as those which use OR. Use
741 explicit joins, outerjoins, or
742 :meth:`~.Relationship.Comparator.has` for
743 more comprehensive non-many-to-one scalar
744 membership tests.
745 * Comparisons against ``None`` given in a one-to-many
746 or many-to-many context produce a NOT EXISTS clause.
747
748 """
749 if other is None or isinstance(other, expression.Null):
750 if self.property.direction in [ONETOMANY, MANYTOMANY]:
751 return ~self._criterion_exists()
752 else:
753 return _orm_annotate(
754 self.property._optimized_compare(
755 None, adapt_source=self.adapter
756 )
757 )
758 elif self.property.uselist:
759 raise sa_exc.InvalidRequestError(
760 "Can't compare a collection to an object or collection; "
761 "use contains() to test for membership."
762 )
763 else:
764 return _orm_annotate(
765 self.property._optimized_compare(
766 other, adapt_source=self.adapter
767 )
768 )
769
770 def _criterion_exists(
771 self,
772 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
773 **kwargs: Any,
774 ) -> Exists:
775 where_criteria = (
776 coercions.expect(roles.WhereHavingRole, criterion)
777 if criterion is not None
778 else None
779 )
780
781 if getattr(self, "_of_type", None):
782 info: Optional[_InternalEntityType[Any]] = inspect(
783 self._of_type
784 )
785 assert info is not None
786 target_mapper, to_selectable, is_aliased_class = (
787 info.mapper,
788 info.selectable,
789 info.is_aliased_class,
790 )
791 if self.property._is_self_referential and not is_aliased_class:
792 to_selectable = to_selectable._anonymous_fromclause()
793
794 single_crit = target_mapper._single_table_criterion
795 if single_crit is not None:
796 if where_criteria is not None:
797 where_criteria = single_crit & where_criteria
798 else:
799 where_criteria = single_crit
800 else:
801 is_aliased_class = False
802 to_selectable = None
803
804 if self.adapter:
805 source_selectable = self._source_selectable()
806 else:
807 source_selectable = None
808
809 (
810 pj,
811 sj,
812 source,
813 dest,
814 secondary,
815 target_adapter,
816 ) = self.property._create_joins(
817 dest_selectable=to_selectable,
818 source_selectable=source_selectable,
819 )
820
821 for k in kwargs:
822 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
823 if where_criteria is None:
824 where_criteria = crit
825 else:
826 where_criteria = where_criteria & crit
827
828 # annotate the *local* side of the join condition, in the case
829 # of pj + sj this is the full primaryjoin, in the case of just
830 # pj its the local side of the primaryjoin.
831 if sj is not None:
832 j = _orm_annotate(pj) & sj
833 else:
834 j = _orm_annotate(pj, exclude=self.property.remote_side)
835
836 if (
837 where_criteria is not None
838 and target_adapter
839 and not is_aliased_class
840 ):
841 # limit this adapter to annotated only?
842 where_criteria = target_adapter.traverse(where_criteria)
843
844 # only have the "joined left side" of what we
845 # return be subject to Query adaption. The right
846 # side of it is used for an exists() subquery and
847 # should not correlate or otherwise reach out
848 # to anything in the enclosing query.
849 if where_criteria is not None:
850 where_criteria = where_criteria._annotate(
851 {"no_replacement_traverse": True}
852 )
853
854 crit = j & sql.True_._ifnone(where_criteria)
855
856 if secondary is not None:
857 ex = (
858 sql.exists(1)
859 .where(crit)
860 .select_from(dest, secondary)
861 .correlate_except(dest, secondary)
862 )
863 else:
864 ex = (
865 sql.exists(1)
866 .where(crit)
867 .select_from(dest)
868 .correlate_except(dest)
869 )
870 return ex
871
872 def any(
873 self,
874 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
875 **kwargs: Any,
876 ) -> ColumnElement[bool]:
877 """Produce an expression that tests a collection against
878 particular criterion, using EXISTS.
879
880 An expression like::
881
882 session.query(MyClass).filter(
883 MyClass.somereference.any(SomeRelated.x == 2)
884 )
885
886 Will produce a query like:
887
888 .. sourcecode:: sql
889
890 SELECT * FROM my_table WHERE
891 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
892 AND related.x=2)
893
894 Because :meth:`~.Relationship.Comparator.any` uses
895 a correlated subquery, its performance is not nearly as
896 good when compared against large target tables as that of
897 using a join.
898
899 :meth:`~.Relationship.Comparator.any` is particularly
900 useful for testing for empty collections::
901
902 session.query(MyClass).filter(~MyClass.somereference.any())
903
904 will produce:
905
906 .. sourcecode:: sql
907
908 SELECT * FROM my_table WHERE
909 NOT (EXISTS (SELECT 1 FROM related WHERE
910 related.my_id=my_table.id))
911
912 :meth:`~.Relationship.Comparator.any` is only
913 valid for collections, i.e. a :func:`_orm.relationship`
914 that has ``uselist=True``. For scalar references,
915 use :meth:`~.Relationship.Comparator.has`.
916
917 """
918 if not self.property.uselist:
919 raise sa_exc.InvalidRequestError(
920 "'any()' not implemented for scalar "
921 "attributes. Use has()."
922 )
923
924 return self._criterion_exists(criterion, **kwargs)
925
926 def has(
927 self,
928 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
929 **kwargs: Any,
930 ) -> ColumnElement[bool]:
931 """Produce an expression that tests a scalar reference against
932 particular criterion, using EXISTS.
933
934 An expression like::
935
936 session.query(MyClass).filter(
937 MyClass.somereference.has(SomeRelated.x == 2)
938 )
939
940 Will produce a query like:
941
942 .. sourcecode:: sql
943
944 SELECT * FROM my_table WHERE
945 EXISTS (SELECT 1 FROM related WHERE
946 related.id==my_table.related_id AND related.x=2)
947
948 Because :meth:`~.Relationship.Comparator.has` uses
949 a correlated subquery, its performance is not nearly as
950 good when compared against large target tables as that of
951 using a join.
952
953 :meth:`~.Relationship.Comparator.has` is only
954 valid for scalar references, i.e. a :func:`_orm.relationship`
955 that has ``uselist=False``. For collection references,
956 use :meth:`~.Relationship.Comparator.any`.
957
958 """
959 if self.property.uselist:
960 raise sa_exc.InvalidRequestError(
961 "'has()' not implemented for collections. Use any()."
962 )
963 return self._criterion_exists(criterion, **kwargs)
964
965 def contains(
966 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
967 ) -> ColumnElement[bool]:
968 """Return a simple expression that tests a collection for
969 containment of a particular item.
970
971 :meth:`~.Relationship.Comparator.contains` is
972 only valid for a collection, i.e. a
973 :func:`_orm.relationship` that implements
974 one-to-many or many-to-many with ``uselist=True``.
975
976 When used in a simple one-to-many context, an
977 expression like::
978
979 MyClass.contains(other)
980
981 Produces a clause like:
982
983 .. sourcecode:: sql
984
985 mytable.id == <some id>
986
987 Where ``<some id>`` is the value of the foreign key
988 attribute on ``other`` which refers to the primary
989 key of its parent object. From this it follows that
990 :meth:`~.Relationship.Comparator.contains` is
991 very useful when used with simple one-to-many
992 operations.
993
994 For many-to-many operations, the behavior of
995 :meth:`~.Relationship.Comparator.contains`
996 has more caveats. The association table will be
997 rendered in the statement, producing an "implicit"
998 join, that is, includes multiple tables in the FROM
999 clause which are equated in the WHERE clause::
1000
1001 query(MyClass).filter(MyClass.contains(other))
1002
1003 Produces a query like:
1004
1005 .. sourcecode:: sql
1006
1007 SELECT * FROM my_table, my_association_table AS
1008 my_association_table_1 WHERE
1009 my_table.id = my_association_table_1.parent_id
1010 AND my_association_table_1.child_id = <some id>
1011
1012 Where ``<some id>`` would be the primary key of
1013 ``other``. From the above, it is clear that
1014 :meth:`~.Relationship.Comparator.contains`
1015 will **not** work with many-to-many collections when
1016 used in queries that move beyond simple AND
1017 conjunctions, such as multiple
1018 :meth:`~.Relationship.Comparator.contains`
1019 expressions joined by OR. In such cases subqueries or
1020 explicit "outer joins" will need to be used instead.
1021 See :meth:`~.Relationship.Comparator.any` for
1022 a less-performant alternative using EXISTS, or refer
1023 to :meth:`_query.Query.outerjoin`
1024 as well as :ref:`orm_queryguide_joins`
1025 for more details on constructing outer joins.
1026
1027 kwargs may be ignored by this operator but are required for API
1028 conformance.
1029 """
1030 if not self.prop.uselist:
1031 raise sa_exc.InvalidRequestError(
1032 "'contains' not implemented for scalar "
1033 "attributes. Use =="
1034 )
1035
1036 clause = self.prop._optimized_compare(
1037 other, adapt_source=self.adapter
1038 )
1039
1040 if self.prop.secondaryjoin is not None:
1041 clause.negation_clause = self.__negated_contains_or_equals(
1042 other
1043 )
1044
1045 return clause
1046
1047 def __negated_contains_or_equals(
1048 self, other: Any
1049 ) -> ColumnElement[bool]:
1050 if self.prop.direction == MANYTOONE:
1051 state = attributes.instance_state(other)
1052
1053 def state_bindparam(
1054 local_col: ColumnElement[Any],
1055 state: InstanceState[Any],
1056 remote_col: ColumnElement[Any],
1057 ) -> BindParameter[Any]:
1058 dict_ = state.dict
1059 return sql.bindparam(
1060 local_col.key,
1061 type_=local_col.type,
1062 unique=True,
1063 callable_=self.prop._get_attr_w_warn_on_none(
1064 self.prop.mapper, state, dict_, remote_col
1065 ),
1066 )
1067
1068 def adapt(col: _CE) -> _CE:
1069 if self.adapter:
1070 return self.adapter(col)
1071 else:
1072 return col
1073
1074 if self.property._use_get:
1075 return sql.and_(
1076 *[
1077 sql.or_(
1078 adapt(x)
1079 != state_bindparam(adapt(x), state, y),
1080 adapt(x) == None,
1081 )
1082 for (x, y) in self.property.local_remote_pairs
1083 ]
1084 )
1085
1086 criterion = sql.and_(
1087 *[
1088 x == y
1089 for (x, y) in zip(
1090 self.property.mapper.primary_key,
1091 self.property.mapper.primary_key_from_instance(other),
1092 )
1093 ]
1094 )
1095
1096 return ~self._criterion_exists(criterion)
1097
1098 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1099 """Implement the ``!=`` operator.
1100
1101 In a many-to-one context, such as:
1102
1103 .. sourcecode:: text
1104
1105 MyClass.some_prop != <some object>
1106
1107 This will typically produce a clause such as:
1108
1109 .. sourcecode:: sql
1110
1111 mytable.related_id != <some id>
1112
1113 Where ``<some id>`` is the primary key of the
1114 given object.
1115
1116 The ``!=`` operator provides partial functionality for non-
1117 many-to-one comparisons:
1118
1119 * Comparisons against collections are not supported.
1120 Use
1121 :meth:`~.Relationship.Comparator.contains`
1122 in conjunction with :func:`_expression.not_`.
1123 * Compared to a scalar one-to-many, will produce a
1124 clause that compares the target columns in the parent to
1125 the given target.
1126 * Compared to a scalar many-to-many, an alias
1127 of the association table will be rendered as
1128 well, forming a natural join that is part of the
1129 main body of the query. This will not work for
1130 queries that go beyond simple AND conjunctions of
1131 comparisons, such as those which use OR. Use
1132 explicit joins, outerjoins, or
1133 :meth:`~.Relationship.Comparator.has` in
1134 conjunction with :func:`_expression.not_` for
1135 more comprehensive non-many-to-one scalar
1136 membership tests.
1137 * Comparisons against ``None`` given in a one-to-many
1138 or many-to-many context produce an EXISTS clause.
1139
1140 """
1141 if other is None or isinstance(other, expression.Null):
1142 if self.property.direction == MANYTOONE:
1143 return _orm_annotate(
1144 ~self.property._optimized_compare(
1145 None, adapt_source=self.adapter
1146 )
1147 )
1148
1149 else:
1150 return self._criterion_exists()
1151 elif self.property.uselist:
1152 raise sa_exc.InvalidRequestError(
1153 "Can't compare a collection"
1154 " to an object or collection; use "
1155 "contains() to test for membership."
1156 )
1157 else:
1158 return _orm_annotate(self.__negated_contains_or_equals(other))
1159
1160 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1161 self.prop.parent._check_configure()
1162 return self.prop
1163
1164 def _with_parent(
1165 self,
1166 instance: object,
1167 alias_secondary: bool = True,
1168 from_entity: Optional[_EntityType[Any]] = None,
1169 ) -> ColumnElement[bool]:
1170 assert instance is not None
1171 adapt_source: Optional[_CoreAdapterProto] = None
1172 if from_entity is not None:
1173 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1174 assert insp is not None
1175 if insp_is_aliased_class(insp):
1176 adapt_source = insp._adapter.adapt_clause
1177 return self._optimized_compare(
1178 instance,
1179 value_is_parent=True,
1180 adapt_source=adapt_source,
1181 alias_secondary=alias_secondary,
1182 )
1183
1184 def _optimized_compare(
1185 self,
1186 state: Any,
1187 value_is_parent: bool = False,
1188 adapt_source: Optional[_CoreAdapterProto] = None,
1189 alias_secondary: bool = True,
1190 ) -> ColumnElement[bool]:
1191 if state is not None:
1192 try:
1193 state = inspect(state)
1194 except sa_exc.NoInspectionAvailable:
1195 state = None
1196
1197 if state is None or not getattr(state, "is_instance", False):
1198 raise sa_exc.ArgumentError(
1199 "Mapped instance expected for relationship "
1200 "comparison to object. Classes, queries and other "
1201 "SQL elements are not accepted in this context; for "
1202 "comparison with a subquery, "
1203 "use %s.has(**criteria)." % self
1204 )
1205 reverse_direction = not value_is_parent
1206
1207 if state is None:
1208 return self._lazy_none_clause(
1209 reverse_direction, adapt_source=adapt_source
1210 )
1211
1212 if not reverse_direction:
1213 criterion, bind_to_col = (
1214 self._lazy_strategy._lazywhere,
1215 self._lazy_strategy._bind_to_col,
1216 )
1217 else:
1218 criterion, bind_to_col = (
1219 self._lazy_strategy._rev_lazywhere,
1220 self._lazy_strategy._rev_bind_to_col,
1221 )
1222
1223 if reverse_direction:
1224 mapper = self.mapper
1225 else:
1226 mapper = self.parent
1227
1228 dict_ = attributes.instance_dict(state.obj())
1229
1230 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1231 if bindparam._identifying_key in bind_to_col:
1232 bindparam.callable = self._get_attr_w_warn_on_none(
1233 mapper,
1234 state,
1235 dict_,
1236 bind_to_col[bindparam._identifying_key],
1237 )
1238
1239 if self.secondary is not None and alias_secondary:
1240 criterion = ClauseAdapter(
1241 self.secondary._anonymous_fromclause()
1242 ).traverse(criterion)
1243
1244 criterion = visitors.cloned_traverse(
1245 criterion, {}, {"bindparam": visit_bindparam}
1246 )
1247
1248 if adapt_source:
1249 criterion = adapt_source(criterion)
1250 return criterion
1251
1252 def _get_attr_w_warn_on_none(
1253 self,
1254 mapper: Mapper[Any],
1255 state: InstanceState[Any],
1256 dict_: _InstanceDict,
1257 column: ColumnElement[Any],
1258 ) -> Callable[[], Any]:
1259 """Create the callable that is used in a many-to-one expression.
1260
1261 E.g.::
1262
1263 u1 = s.query(User).get(5)
1264
1265 expr = Address.user == u1
1266
1267 Above, the SQL should be "address.user_id = 5". The callable
1268 returned by this method produces the value "5" based on the identity
1269 of ``u1``.
1270
1271 """
1272
1273 # in this callable, we're trying to thread the needle through
1274 # a wide variety of scenarios, including:
1275 #
1276 # * the object hasn't been flushed yet and there's no value for
1277 # the attribute as of yet
1278 #
1279 # * the object hasn't been flushed yet but it has a user-defined
1280 # value
1281 #
1282 # * the object has a value but it's expired and not locally present
1283 #
1284 # * the object has a value but it's expired and not locally present,
1285 # and the object is also detached
1286 #
1287 # * The object hadn't been flushed yet, there was no value, but
1288 # later, the object has been expired and detached, and *now*
1289 # they're trying to evaluate it
1290 #
1291 # * the object had a value, but it was changed to a new value, and
1292 # then expired
1293 #
1294 # * the object had a value, but it was changed to a new value, and
1295 # then expired, then the object was detached
1296 #
1297 # * the object has a user-set value, but it's None and we don't do
1298 # the comparison correctly for that so warn
1299 #
1300
1301 prop = mapper.get_property_by_column(column)
1302
1303 # by invoking this method, InstanceState will track the last known
1304 # value for this key each time the attribute is to be expired.
1305 # this feature was added explicitly for use in this method.
1306 state._track_last_known_value(prop.key)
1307
1308 lkv_fixed = state._last_known_values
1309
1310 def _go() -> Any:
1311 assert lkv_fixed is not None
1312 last_known = to_return = lkv_fixed[prop.key]
1313 existing_is_available = (
1314 last_known is not LoaderCallableStatus.NO_VALUE
1315 )
1316
1317 # we support that the value may have changed. so here we
1318 # try to get the most recent value including re-fetching.
1319 # only if we can't get a value now due to detachment do we return
1320 # the last known value
1321 current_value = mapper._get_state_attr_by_column(
1322 state,
1323 dict_,
1324 column,
1325 passive=(
1326 PassiveFlag.PASSIVE_OFF
1327 if state.persistent
1328 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1329 ),
1330 )
1331
1332 if current_value is LoaderCallableStatus.NEVER_SET:
1333 if not existing_is_available:
1334 raise sa_exc.InvalidRequestError(
1335 "Can't resolve value for column %s on object "
1336 "%s; no value has been set for this column"
1337 % (column, state_str(state))
1338 )
1339 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1340 if not existing_is_available:
1341 raise sa_exc.InvalidRequestError(
1342 "Can't resolve value for column %s on object "
1343 "%s; the object is detached and the value was "
1344 "expired" % (column, state_str(state))
1345 )
1346 else:
1347 to_return = current_value
1348 if to_return is None:
1349 util.warn(
1350 "Got None for value of column %s; this is unsupported "
1351 "for a relationship comparison and will not "
1352 "currently produce an IS comparison "
1353 "(but may in a future release)" % column
1354 )
1355 return to_return
1356
1357 return _go
1358
1359 def _lazy_none_clause(
1360 self,
1361 reverse_direction: bool = False,
1362 adapt_source: Optional[_CoreAdapterProto] = None,
1363 ) -> ColumnElement[bool]:
1364 if not reverse_direction:
1365 criterion, bind_to_col = (
1366 self._lazy_strategy._lazywhere,
1367 self._lazy_strategy._bind_to_col,
1368 )
1369 else:
1370 criterion, bind_to_col = (
1371 self._lazy_strategy._rev_lazywhere,
1372 self._lazy_strategy._rev_bind_to_col,
1373 )
1374
1375 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1376
1377 if adapt_source:
1378 criterion = adapt_source(criterion)
1379 return criterion
1380
1381 def __str__(self) -> str:
1382 return str(self.parent.class_.__name__) + "." + self.key
1383
1384 def merge(
1385 self,
1386 session: Session,
1387 source_state: InstanceState[Any],
1388 source_dict: _InstanceDict,
1389 dest_state: InstanceState[Any],
1390 dest_dict: _InstanceDict,
1391 load: bool,
1392 _recursive: Dict[Any, object],
1393 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1394 ) -> None:
1395 if load:
1396 for r in self._reverse_property:
1397 if (source_state, r) in _recursive:
1398 return
1399
1400 if "merge" not in self._cascade:
1401 return
1402
1403 if self.key not in source_dict:
1404 return
1405
1406 if self.uselist:
1407 impl = source_state.get_impl(self.key)
1408
1409 assert is_has_collection_adapter(impl)
1410 instances_iterable = impl.get_collection(source_state, source_dict)
1411
1412 # if this is a CollectionAttributeImpl, then empty should
1413 # be False, otherwise "self.key in source_dict" should not be
1414 # True
1415 assert not instances_iterable.empty if impl.collection else True
1416
1417 if load:
1418 # for a full merge, pre-load the destination collection,
1419 # so that individual _merge of each item pulls from identity
1420 # map for those already present.
1421 # also assumes CollectionAttributeImpl behavior of loading
1422 # "old" list in any case
1423 dest_state.get_impl(self.key).get(
1424 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1425 )
1426
1427 dest_list = []
1428 for current in instances_iterable:
1429 current_state = attributes.instance_state(current)
1430 current_dict = attributes.instance_dict(current)
1431 _recursive[(current_state, self)] = True
1432 obj = session._merge(
1433 current_state,
1434 current_dict,
1435 load=load,
1436 _recursive=_recursive,
1437 _resolve_conflict_map=_resolve_conflict_map,
1438 )
1439 if obj is not None:
1440 dest_list.append(obj)
1441
1442 if not load:
1443 coll = attributes.init_state_collection(
1444 dest_state, dest_dict, self.key
1445 )
1446 for c in dest_list:
1447 coll.append_without_event(c)
1448 else:
1449 dest_impl = dest_state.get_impl(self.key)
1450 assert is_has_collection_adapter(dest_impl)
1451 dest_impl.set(
1452 dest_state,
1453 dest_dict,
1454 dest_list,
1455 _adapt=False,
1456 passive=PassiveFlag.PASSIVE_MERGE,
1457 )
1458 else:
1459 current = source_dict[self.key]
1460 if current is not None:
1461 current_state = attributes.instance_state(current)
1462 current_dict = attributes.instance_dict(current)
1463 _recursive[(current_state, self)] = True
1464 obj = session._merge(
1465 current_state,
1466 current_dict,
1467 load=load,
1468 _recursive=_recursive,
1469 _resolve_conflict_map=_resolve_conflict_map,
1470 )
1471 else:
1472 obj = None
1473
1474 if not load:
1475 dest_dict[self.key] = obj
1476 else:
1477 dest_state.get_impl(self.key).set(
1478 dest_state, dest_dict, obj, None
1479 )
1480
1481 def _value_as_iterable(
1482 self,
1483 state: InstanceState[_O],
1484 dict_: _InstanceDict,
1485 key: str,
1486 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1487 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1488 """Return a list of tuples (state, obj) for the given
1489 key.
1490
1491 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1492 """
1493
1494 impl = state.manager[key].impl
1495 x = impl.get(state, dict_, passive=passive)
1496 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1497 return []
1498 elif is_has_collection_adapter(impl):
1499 return [
1500 (attributes.instance_state(o), o)
1501 for o in impl.get_collection(state, dict_, x, passive=passive)
1502 ]
1503 else:
1504 return [(attributes.instance_state(x), x)]
1505
1506 def cascade_iterator(
1507 self,
1508 type_: str,
1509 state: InstanceState[Any],
1510 dict_: _InstanceDict,
1511 visited_states: Set[InstanceState[Any]],
1512 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1513 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1514 # assert type_ in self._cascade
1515
1516 # only actively lazy load on the 'delete' cascade
1517 if type_ != "delete" or self.passive_deletes:
1518 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1519 else:
1520 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1521
1522 if type_ == "save-update":
1523 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1524 else:
1525 tuples = self._value_as_iterable(
1526 state, dict_, self.key, passive=passive
1527 )
1528
1529 skip_pending = (
1530 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1531 )
1532
1533 for instance_state, c in tuples:
1534 if instance_state in visited_states:
1535 continue
1536
1537 if c is None:
1538 # would like to emit a warning here, but
1539 # would not be consistent with collection.append(None)
1540 # current behavior of silently skipping.
1541 # see [ticket:2229]
1542 continue
1543
1544 assert instance_state is not None
1545 instance_dict = attributes.instance_dict(c)
1546
1547 if halt_on and halt_on(instance_state):
1548 continue
1549
1550 if skip_pending and not instance_state.key:
1551 continue
1552
1553 instance_mapper = instance_state.manager.mapper
1554
1555 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1556 raise AssertionError(
1557 "Attribute '%s' on class '%s' "
1558 "doesn't handle objects "
1559 "of type '%s'"
1560 % (self.key, self.parent.class_, c.__class__)
1561 )
1562
1563 visited_states.add(instance_state)
1564
1565 yield c, instance_mapper, instance_state, instance_dict
1566
1567 @property
1568 def _effective_sync_backref(self) -> bool:
1569 if self.viewonly:
1570 return False
1571 else:
1572 return self.sync_backref is not False
1573
1574 @staticmethod
1575 def _check_sync_backref(
1576 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1577 ) -> None:
1578 if rel_a.viewonly and rel_b.sync_backref:
1579 raise sa_exc.InvalidRequestError(
1580 "Relationship %s cannot specify sync_backref=True since %s "
1581 "includes viewonly=True." % (rel_b, rel_a)
1582 )
1583 if (
1584 rel_a.viewonly
1585 and not rel_b.viewonly
1586 and rel_b.sync_backref is not False
1587 ):
1588 rel_b.sync_backref = False
1589
1590 def _add_reverse_property(self, key: str) -> None:
1591 other = self.mapper.get_property(key, _configure_mappers=False)
1592 if not isinstance(other, RelationshipProperty):
1593 raise sa_exc.InvalidRequestError(
1594 "back_populates on relationship '%s' refers to attribute '%s' "
1595 "that is not a relationship. The back_populates parameter "
1596 "should refer to the name of a relationship on the target "
1597 "class." % (self, other)
1598 )
1599 # viewonly and sync_backref cases
1600 # 1. self.viewonly==True and other.sync_backref==True -> error
1601 # 2. self.viewonly==True and other.viewonly==False and
1602 # other.sync_backref==None -> warn sync_backref=False, set to False
1603 self._check_sync_backref(self, other)
1604 # 3. other.viewonly==True and self.sync_backref==True -> error
1605 # 4. other.viewonly==True and self.viewonly==False and
1606 # self.sync_backref==None -> warn sync_backref=False, set to False
1607 self._check_sync_backref(other, self)
1608
1609 self._reverse_property.add(other)
1610 other._reverse_property.add(self)
1611
1612 other._setup_entity()
1613
1614 if not other.mapper.common_parent(self.parent):
1615 raise sa_exc.ArgumentError(
1616 "reverse_property %r on "
1617 "relationship %s references relationship %s, which "
1618 "does not reference mapper %s"
1619 % (key, self, other, self.parent)
1620 )
1621
1622 if (
1623 other._configure_started
1624 and self.direction in (ONETOMANY, MANYTOONE)
1625 and self.direction == other.direction
1626 ):
1627 raise sa_exc.ArgumentError(
1628 "%s and back-reference %s are "
1629 "both of the same direction %r. Did you mean to "
1630 "set remote_side on the many-to-one side ?"
1631 % (other, self, self.direction)
1632 )
1633
1634 @util.memoized_property
1635 def entity(self) -> _InternalEntityType[_T]:
1636 """Return the target mapped entity, which is an inspect() of the
1637 class or aliased class that is referenced by this
1638 :class:`.RelationshipProperty`.
1639
1640 """
1641 self.parent._check_configure()
1642 return self.entity
1643
1644 @util.memoized_property
1645 def mapper(self) -> Mapper[_T]:
1646 """Return the targeted :class:`_orm.Mapper` for this
1647 :class:`.RelationshipProperty`.
1648
1649 """
1650 return self.entity.mapper
1651
1652 def do_init(self) -> None:
1653 self._check_conflicts()
1654 self._process_dependent_arguments()
1655 self._setup_entity()
1656 self._setup_registry_dependencies()
1657 self._setup_join_conditions()
1658 self._check_cascade_settings(self._cascade)
1659 self._post_init()
1660 self._generate_backref()
1661 self._join_condition._warn_for_conflicting_sync_targets()
1662 super().do_init()
1663 self._lazy_strategy = cast(
1664 "LazyLoader", self._get_strategy((("lazy", "select"),))
1665 )
1666
1667 def _setup_registry_dependencies(self) -> None:
1668 self.parent.mapper.registry._set_depends_on(
1669 self.entity.mapper.registry
1670 )
1671
1672 def _process_dependent_arguments(self) -> None:
1673 """Convert incoming configuration arguments to their
1674 proper form.
1675
1676 Callables are resolved, ORM annotations removed.
1677
1678 """
1679
1680 # accept callables for other attributes which may require
1681 # deferred initialization. This technique is used
1682 # by declarative "string configs" and some recipes.
1683 init_args = self._init_args
1684
1685 for attr in (
1686 "order_by",
1687 "primaryjoin",
1688 "secondaryjoin",
1689 "secondary",
1690 "foreign_keys",
1691 "remote_side",
1692 ):
1693 rel_arg = getattr(init_args, attr)
1694
1695 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1696
1697 # remove "annotations" which are present if mapped class
1698 # descriptors are used to create the join expression.
1699 for attr in "primaryjoin", "secondaryjoin":
1700 rel_arg = getattr(init_args, attr)
1701 val = rel_arg.resolved
1702 if val is not None:
1703 rel_arg.resolved = _orm_deannotate(
1704 coercions.expect(
1705 roles.ColumnArgumentRole, val, argname=attr
1706 )
1707 )
1708
1709 secondary = init_args.secondary.resolved
1710 if secondary is not None and _is_mapped_class(secondary):
1711 raise sa_exc.ArgumentError(
1712 "secondary argument %s passed to to relationship() %s must "
1713 "be a Table object or other FROM clause; can't send a mapped "
1714 "class directly as rows in 'secondary' are persisted "
1715 "independently of a class that is mapped "
1716 "to that same table." % (secondary, self)
1717 )
1718
1719 # ensure expressions in self.order_by, foreign_keys,
1720 # remote_side are all columns, not strings.
1721 if (
1722 init_args.order_by.resolved is not False
1723 and init_args.order_by.resolved is not None
1724 ):
1725 self.order_by = tuple(
1726 coercions.expect(
1727 roles.ColumnArgumentRole, x, argname="order_by"
1728 )
1729 for x in util.to_list(init_args.order_by.resolved)
1730 )
1731 else:
1732 self.order_by = False
1733
1734 self._user_defined_foreign_keys = util.column_set(
1735 coercions.expect(
1736 roles.ColumnArgumentRole, x, argname="foreign_keys"
1737 )
1738 for x in util.to_column_set(init_args.foreign_keys.resolved)
1739 )
1740
1741 self.remote_side = util.column_set(
1742 coercions.expect(
1743 roles.ColumnArgumentRole, x, argname="remote_side"
1744 )
1745 for x in util.to_column_set(init_args.remote_side.resolved)
1746 )
1747
1748 def declarative_scan(
1749 self,
1750 decl_scan: _ClassScanMapperConfig,
1751 registry: _RegistryType,
1752 cls: Type[Any],
1753 originating_module: Optional[str],
1754 key: str,
1755 mapped_container: Optional[Type[Mapped[Any]]],
1756 annotation: Optional[_AnnotationScanType],
1757 extracted_mapped_annotation: Optional[_AnnotationScanType],
1758 is_dataclass_field: bool,
1759 ) -> None:
1760 if extracted_mapped_annotation is None:
1761 if self.argument is None:
1762 self._raise_for_required(key, cls)
1763 else:
1764 return
1765
1766 argument = extracted_mapped_annotation
1767 assert originating_module is not None
1768
1769 if mapped_container is not None:
1770 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1771 is_dynamic = issubclass(mapped_container, DynamicMapped)
1772 if is_write_only:
1773 self.lazy = "write_only"
1774 self.strategy_key = (("lazy", self.lazy),)
1775 elif is_dynamic:
1776 self.lazy = "dynamic"
1777 self.strategy_key = (("lazy", self.lazy),)
1778 else:
1779 is_write_only = is_dynamic = False
1780
1781 argument = de_optionalize_union_types(argument)
1782
1783 if hasattr(argument, "__origin__"):
1784 arg_origin = argument.__origin__
1785 if isinstance(arg_origin, type) and issubclass(
1786 arg_origin, abc.Collection
1787 ):
1788 if self.collection_class is None:
1789 if _py_inspect.isabstract(arg_origin):
1790 raise sa_exc.ArgumentError(
1791 f"Collection annotation type {arg_origin} cannot "
1792 "be instantiated; please provide an explicit "
1793 "'collection_class' parameter "
1794 "(e.g. list, set, etc.) to the "
1795 "relationship() function to accompany this "
1796 "annotation"
1797 )
1798
1799 self.collection_class = arg_origin
1800
1801 elif not is_write_only and not is_dynamic:
1802 self.uselist = False
1803
1804 if argument.__args__: # type: ignore
1805 if isinstance(arg_origin, type) and issubclass(
1806 arg_origin, typing.Mapping
1807 ):
1808 type_arg = argument.__args__[-1] # type: ignore
1809 else:
1810 type_arg = argument.__args__[0] # type: ignore
1811 if hasattr(type_arg, "__forward_arg__"):
1812 str_argument = type_arg.__forward_arg__
1813
1814 argument = resolve_name_to_real_class_name(
1815 str_argument, originating_module
1816 )
1817 else:
1818 argument = type_arg
1819 else:
1820 raise sa_exc.ArgumentError(
1821 f"Generic alias {argument} requires an argument"
1822 )
1823 elif hasattr(argument, "__forward_arg__"):
1824 argument = argument.__forward_arg__
1825
1826 argument = resolve_name_to_real_class_name(
1827 argument, originating_module
1828 )
1829
1830 if (
1831 self.collection_class is None
1832 and not is_write_only
1833 and not is_dynamic
1834 ):
1835 self.uselist = False
1836
1837 # ticket #8759
1838 # if a lead argument was given to relationship(), like
1839 # `relationship("B")`, use that, don't replace it with class we
1840 # found in the annotation. The declarative_scan() method call here is
1841 # still useful, as we continue to derive collection type and do
1842 # checking of the annotation in any case.
1843 if self.argument is None:
1844 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1845
1846 @util.preload_module("sqlalchemy.orm.mapper")
1847 def _setup_entity(self, __argument: Any = None) -> None:
1848 if "entity" in self.__dict__:
1849 return
1850
1851 mapperlib = util.preloaded.orm_mapper
1852
1853 if __argument:
1854 argument = __argument
1855 else:
1856 argument = self.argument
1857
1858 resolved_argument: _ExternalEntityType[Any]
1859
1860 if isinstance(argument, str):
1861 # we might want to cleanup clsregistry API to make this
1862 # more straightforward
1863 resolved_argument = cast(
1864 "_ExternalEntityType[Any]",
1865 self._clsregistry_resolve_name(argument)(),
1866 )
1867 elif callable(argument) and not isinstance(
1868 argument, (type, mapperlib.Mapper)
1869 ):
1870 resolved_argument = argument()
1871 else:
1872 resolved_argument = argument
1873
1874 entity: _InternalEntityType[Any]
1875
1876 if isinstance(resolved_argument, type):
1877 entity = class_mapper(resolved_argument, configure=False)
1878 else:
1879 try:
1880 entity = inspect(resolved_argument)
1881 except sa_exc.NoInspectionAvailable:
1882 entity = None # type: ignore
1883
1884 if not hasattr(entity, "mapper"):
1885 raise sa_exc.ArgumentError(
1886 "relationship '%s' expects "
1887 "a class or a mapper argument (received: %s)"
1888 % (self.key, type(resolved_argument))
1889 )
1890
1891 self.entity = entity
1892 self.target = self.entity.persist_selectable
1893
1894 def _setup_join_conditions(self) -> None:
1895 self._join_condition = jc = JoinCondition(
1896 parent_persist_selectable=self.parent.persist_selectable,
1897 child_persist_selectable=self.entity.persist_selectable,
1898 parent_local_selectable=self.parent.local_table,
1899 child_local_selectable=self.entity.local_table,
1900 primaryjoin=self._init_args.primaryjoin.resolved,
1901 secondary=self._init_args.secondary.resolved,
1902 secondaryjoin=self._init_args.secondaryjoin.resolved,
1903 parent_equivalents=self.parent._equivalent_columns,
1904 child_equivalents=self.mapper._equivalent_columns,
1905 consider_as_foreign_keys=self._user_defined_foreign_keys,
1906 local_remote_pairs=self.local_remote_pairs,
1907 remote_side=self.remote_side,
1908 self_referential=self._is_self_referential,
1909 prop=self,
1910 support_sync=not self.viewonly,
1911 can_be_synced_fn=self._columns_are_mapped,
1912 )
1913 self.primaryjoin = jc.primaryjoin
1914 self.secondaryjoin = jc.secondaryjoin
1915 self.secondary = jc.secondary
1916 self.direction = jc.direction
1917 self.local_remote_pairs = jc.local_remote_pairs
1918 self.remote_side = jc.remote_columns
1919 self.local_columns = jc.local_columns
1920 self.synchronize_pairs = jc.synchronize_pairs
1921 self._calculated_foreign_keys = jc.foreign_key_columns
1922 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1923
1924 @property
1925 def _clsregistry_resolve_arg(
1926 self,
1927 ) -> Callable[[str, bool], _class_resolver]:
1928 return self._clsregistry_resolvers[1]
1929
1930 @property
1931 def _clsregistry_resolve_name(
1932 self,
1933 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
1934 return self._clsregistry_resolvers[0]
1935
1936 @util.memoized_property
1937 @util.preload_module("sqlalchemy.orm.clsregistry")
1938 def _clsregistry_resolvers(
1939 self,
1940 ) -> Tuple[
1941 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
1942 Callable[[str, bool], _class_resolver],
1943 ]:
1944 _resolver = util.preloaded.orm_clsregistry._resolver
1945
1946 return _resolver(self.parent.class_, self)
1947
1948 def _check_conflicts(self) -> None:
1949 """Test that this relationship is legal, warn about
1950 inheritance conflicts."""
1951 if self.parent.non_primary and not class_mapper(
1952 self.parent.class_, configure=False
1953 ).has_property(self.key):
1954 raise sa_exc.ArgumentError(
1955 "Attempting to assign a new "
1956 "relationship '%s' to a non-primary mapper on "
1957 "class '%s'. New relationships can only be added "
1958 "to the primary mapper, i.e. the very first mapper "
1959 "created for class '%s' "
1960 % (
1961 self.key,
1962 self.parent.class_.__name__,
1963 self.parent.class_.__name__,
1964 )
1965 )
1966
1967 @property
1968 def cascade(self) -> CascadeOptions:
1969 """Return the current cascade setting for this
1970 :class:`.RelationshipProperty`.
1971 """
1972 return self._cascade
1973
1974 @cascade.setter
1975 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
1976 self._set_cascade(cascade)
1977
1978 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
1979 cascade = CascadeOptions(cascade_arg)
1980
1981 if self.viewonly:
1982 cascade = CascadeOptions(
1983 cascade.intersection(CascadeOptions._viewonly_cascades)
1984 )
1985
1986 if "mapper" in self.__dict__:
1987 self._check_cascade_settings(cascade)
1988 self._cascade = cascade
1989
1990 if self._dependency_processor:
1991 self._dependency_processor.cascade = cascade
1992
1993 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
1994 if (
1995 cascade.delete_orphan
1996 and not self.single_parent
1997 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
1998 ):
1999 raise sa_exc.ArgumentError(
2000 "For %(direction)s relationship %(rel)s, delete-orphan "
2001 "cascade is normally "
2002 'configured only on the "one" side of a one-to-many '
2003 "relationship, "
2004 'and not on the "many" side of a many-to-one or many-to-many '
2005 "relationship. "
2006 "To force this relationship to allow a particular "
2007 '"%(relatedcls)s" object to be referenced by only '
2008 'a single "%(clsname)s" object at a time via the '
2009 "%(rel)s relationship, which "
2010 "would allow "
2011 "delete-orphan cascade to take place in this direction, set "
2012 "the single_parent=True flag."
2013 % {
2014 "rel": self,
2015 "direction": (
2016 "many-to-one"
2017 if self.direction is MANYTOONE
2018 else "many-to-many"
2019 ),
2020 "clsname": self.parent.class_.__name__,
2021 "relatedcls": self.mapper.class_.__name__,
2022 },
2023 code="bbf0",
2024 )
2025
2026 if self.passive_deletes == "all" and (
2027 "delete" in cascade or "delete-orphan" in cascade
2028 ):
2029 raise sa_exc.ArgumentError(
2030 "On %s, can't set passive_deletes='all' in conjunction "
2031 "with 'delete' or 'delete-orphan' cascade" % self
2032 )
2033
2034 if cascade.delete_orphan:
2035 self.mapper.primary_mapper()._delete_orphans.append(
2036 (self.key, self.parent.class_)
2037 )
2038
2039 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2040 """Return True if this property will persist values on behalf
2041 of the given mapper.
2042
2043 """
2044
2045 return (
2046 self.key in mapper.relationships
2047 and mapper.relationships[self.key] is self
2048 )
2049
2050 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2051 """Return True if all columns in the given collection are
2052 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2053
2054 """
2055
2056 secondary = self._init_args.secondary.resolved
2057 for c in cols:
2058 if secondary is not None and secondary.c.contains_column(c):
2059 continue
2060 if not self.parent.persist_selectable.c.contains_column(
2061 c
2062 ) and not self.target.c.contains_column(c):
2063 return False
2064 return True
2065
2066 def _generate_backref(self) -> None:
2067 """Interpret the 'backref' instruction to create a
2068 :func:`_orm.relationship` complementary to this one."""
2069
2070 if self.parent.non_primary:
2071 return
2072 if self.backref is not None and not self.back_populates:
2073 kwargs: Dict[str, Any]
2074 if isinstance(self.backref, str):
2075 backref_key, kwargs = self.backref, {}
2076 else:
2077 backref_key, kwargs = self.backref
2078 mapper = self.mapper.primary_mapper()
2079
2080 if not mapper.concrete:
2081 check = set(mapper.iterate_to_root()).union(
2082 mapper.self_and_descendants
2083 )
2084 for m in check:
2085 if m.has_property(backref_key) and not m.concrete:
2086 raise sa_exc.ArgumentError(
2087 "Error creating backref "
2088 "'%s' on relationship '%s': property of that "
2089 "name exists on mapper '%s'"
2090 % (backref_key, self, m)
2091 )
2092
2093 # determine primaryjoin/secondaryjoin for the
2094 # backref. Use the one we had, so that
2095 # a custom join doesn't have to be specified in
2096 # both directions.
2097 if self.secondary is not None:
2098 # for many to many, just switch primaryjoin/
2099 # secondaryjoin. use the annotated
2100 # pj/sj on the _join_condition.
2101 pj = kwargs.pop(
2102 "primaryjoin",
2103 self._join_condition.secondaryjoin_minus_local,
2104 )
2105 sj = kwargs.pop(
2106 "secondaryjoin",
2107 self._join_condition.primaryjoin_minus_local,
2108 )
2109 else:
2110 pj = kwargs.pop(
2111 "primaryjoin",
2112 self._join_condition.primaryjoin_reverse_remote,
2113 )
2114 sj = kwargs.pop("secondaryjoin", None)
2115 if sj:
2116 raise sa_exc.InvalidRequestError(
2117 "Can't assign 'secondaryjoin' on a backref "
2118 "against a non-secondary relationship."
2119 )
2120
2121 foreign_keys = kwargs.pop(
2122 "foreign_keys", self._user_defined_foreign_keys
2123 )
2124 parent = self.parent.primary_mapper()
2125 kwargs.setdefault("viewonly", self.viewonly)
2126 kwargs.setdefault("post_update", self.post_update)
2127 kwargs.setdefault("passive_updates", self.passive_updates)
2128 kwargs.setdefault("sync_backref", self.sync_backref)
2129 self.back_populates = backref_key
2130 relationship = RelationshipProperty(
2131 parent,
2132 self.secondary,
2133 primaryjoin=pj,
2134 secondaryjoin=sj,
2135 foreign_keys=foreign_keys,
2136 back_populates=self.key,
2137 **kwargs,
2138 )
2139 mapper._configure_property(
2140 backref_key, relationship, warn_for_existing=True
2141 )
2142
2143 if self.back_populates:
2144 self._add_reverse_property(self.back_populates)
2145
2146 @util.preload_module("sqlalchemy.orm.dependency")
2147 def _post_init(self) -> None:
2148 dependency = util.preloaded.orm_dependency
2149
2150 if self.uselist is None:
2151 self.uselist = self.direction is not MANYTOONE
2152 if not self.viewonly:
2153 self._dependency_processor = ( # type: ignore
2154 dependency.DependencyProcessor.from_relationship
2155 )(self)
2156
2157 @util.memoized_property
2158 def _use_get(self) -> bool:
2159 """memoize the 'use_get' attribute of this RelationshipLoader's
2160 lazyloader."""
2161
2162 strategy = self._lazy_strategy
2163 return strategy.use_get
2164
2165 @util.memoized_property
2166 def _is_self_referential(self) -> bool:
2167 return self.mapper.common_parent(self.parent)
2168
2169 def _create_joins(
2170 self,
2171 source_polymorphic: bool = False,
2172 source_selectable: Optional[FromClause] = None,
2173 dest_selectable: Optional[FromClause] = None,
2174 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2175 alias_secondary: bool = False,
2176 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2177 ) -> Tuple[
2178 ColumnElement[bool],
2179 Optional[ColumnElement[bool]],
2180 FromClause,
2181 FromClause,
2182 Optional[FromClause],
2183 Optional[ClauseAdapter],
2184 ]:
2185 aliased = False
2186
2187 if alias_secondary and self.secondary is not None:
2188 aliased = True
2189
2190 if source_selectable is None:
2191 if source_polymorphic and self.parent.with_polymorphic:
2192 source_selectable = self.parent._with_polymorphic_selectable
2193
2194 if of_type_entity:
2195 dest_mapper = of_type_entity.mapper
2196 if dest_selectable is None:
2197 dest_selectable = of_type_entity.selectable
2198 aliased = True
2199 else:
2200 dest_mapper = self.mapper
2201
2202 if dest_selectable is None:
2203 dest_selectable = self.entity.selectable
2204 if self.mapper.with_polymorphic:
2205 aliased = True
2206
2207 if self._is_self_referential and source_selectable is None:
2208 dest_selectable = dest_selectable._anonymous_fromclause()
2209 aliased = True
2210 elif (
2211 dest_selectable is not self.mapper._with_polymorphic_selectable
2212 or self.mapper.with_polymorphic
2213 ):
2214 aliased = True
2215
2216 single_crit = dest_mapper._single_table_criterion
2217 aliased = aliased or (
2218 source_selectable is not None
2219 and (
2220 source_selectable
2221 is not self.parent._with_polymorphic_selectable
2222 or source_selectable._is_subquery
2223 )
2224 )
2225
2226 (
2227 primaryjoin,
2228 secondaryjoin,
2229 secondary,
2230 target_adapter,
2231 dest_selectable,
2232 ) = self._join_condition.join_targets(
2233 source_selectable,
2234 dest_selectable,
2235 aliased,
2236 single_crit,
2237 extra_criteria,
2238 )
2239 if source_selectable is None:
2240 source_selectable = self.parent.local_table
2241 if dest_selectable is None:
2242 dest_selectable = self.entity.local_table
2243 return (
2244 primaryjoin,
2245 secondaryjoin,
2246 source_selectable,
2247 dest_selectable,
2248 secondary,
2249 target_adapter,
2250 )
2251
2252
2253def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2254 def clone(elem: _CE) -> _CE:
2255 if isinstance(elem, expression.ColumnClause):
2256 elem = elem._annotate(annotations.copy()) # type: ignore
2257 elem._copy_internals(clone=clone)
2258 return elem
2259
2260 if element is not None:
2261 element = clone(element)
2262 clone = None # type: ignore # remove gc cycles
2263 return element
2264
2265
2266class JoinCondition:
2267 primaryjoin_initial: Optional[ColumnElement[bool]]
2268 primaryjoin: ColumnElement[bool]
2269 secondaryjoin: Optional[ColumnElement[bool]]
2270 secondary: Optional[FromClause]
2271 prop: RelationshipProperty[Any]
2272
2273 synchronize_pairs: _ColumnPairs
2274 secondary_synchronize_pairs: _ColumnPairs
2275 direction: RelationshipDirection
2276
2277 parent_persist_selectable: FromClause
2278 child_persist_selectable: FromClause
2279 parent_local_selectable: FromClause
2280 child_local_selectable: FromClause
2281
2282 _local_remote_pairs: Optional[_ColumnPairs]
2283
2284 def __init__(
2285 self,
2286 parent_persist_selectable: FromClause,
2287 child_persist_selectable: FromClause,
2288 parent_local_selectable: FromClause,
2289 child_local_selectable: FromClause,
2290 *,
2291 primaryjoin: Optional[ColumnElement[bool]] = None,
2292 secondary: Optional[FromClause] = None,
2293 secondaryjoin: Optional[ColumnElement[bool]] = None,
2294 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2295 child_equivalents: Optional[_EquivalentColumnMap] = None,
2296 consider_as_foreign_keys: Any = None,
2297 local_remote_pairs: Optional[_ColumnPairs] = None,
2298 remote_side: Any = None,
2299 self_referential: Any = False,
2300 prop: RelationshipProperty[Any],
2301 support_sync: bool = True,
2302 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2303 ):
2304 self.parent_persist_selectable = parent_persist_selectable
2305 self.parent_local_selectable = parent_local_selectable
2306 self.child_persist_selectable = child_persist_selectable
2307 self.child_local_selectable = child_local_selectable
2308 self.parent_equivalents = parent_equivalents
2309 self.child_equivalents = child_equivalents
2310 self.primaryjoin_initial = primaryjoin
2311 self.secondaryjoin = secondaryjoin
2312 self.secondary = secondary
2313 self.consider_as_foreign_keys = consider_as_foreign_keys
2314 self._local_remote_pairs = local_remote_pairs
2315 self._remote_side = remote_side
2316 self.prop = prop
2317 self.self_referential = self_referential
2318 self.support_sync = support_sync
2319 self.can_be_synced_fn = can_be_synced_fn
2320
2321 self._determine_joins()
2322 assert self.primaryjoin is not None
2323
2324 self._sanitize_joins()
2325 self._annotate_fks()
2326 self._annotate_remote()
2327 self._annotate_local()
2328 self._annotate_parentmapper()
2329 self._setup_pairs()
2330 self._check_foreign_cols(self.primaryjoin, True)
2331 if self.secondaryjoin is not None:
2332 self._check_foreign_cols(self.secondaryjoin, False)
2333 self._determine_direction()
2334 self._check_remote_side()
2335 self._log_joins()
2336
2337 def _log_joins(self) -> None:
2338 log = self.prop.logger
2339 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2340 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2341 log.info(
2342 "%s synchronize pairs [%s]",
2343 self.prop,
2344 ",".join(
2345 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2346 ),
2347 )
2348 log.info(
2349 "%s secondary synchronize pairs [%s]",
2350 self.prop,
2351 ",".join(
2352 "(%s => %s)" % (l, r)
2353 for (l, r) in self.secondary_synchronize_pairs or []
2354 ),
2355 )
2356 log.info(
2357 "%s local/remote pairs [%s]",
2358 self.prop,
2359 ",".join(
2360 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2361 ),
2362 )
2363 log.info(
2364 "%s remote columns [%s]",
2365 self.prop,
2366 ",".join("%s" % col for col in self.remote_columns),
2367 )
2368 log.info(
2369 "%s local columns [%s]",
2370 self.prop,
2371 ",".join("%s" % col for col in self.local_columns),
2372 )
2373 log.info("%s relationship direction %s", self.prop, self.direction)
2374
2375 def _sanitize_joins(self) -> None:
2376 """remove the parententity annotation from our join conditions which
2377 can leak in here based on some declarative patterns and maybe others.
2378
2379 "parentmapper" is relied upon both by the ORM evaluator as well as
2380 the use case in _join_fixture_inh_selfref_w_entity
2381 that relies upon it being present, see :ticket:`3364`.
2382
2383 """
2384
2385 self.primaryjoin = _deep_deannotate(
2386 self.primaryjoin, values=("parententity", "proxy_key")
2387 )
2388 if self.secondaryjoin is not None:
2389 self.secondaryjoin = _deep_deannotate(
2390 self.secondaryjoin, values=("parententity", "proxy_key")
2391 )
2392
2393 def _determine_joins(self) -> None:
2394 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2395 if not passed to the constructor already.
2396
2397 This is based on analysis of the foreign key relationships
2398 between the parent and target mapped selectables.
2399
2400 """
2401 if self.secondaryjoin is not None and self.secondary is None:
2402 raise sa_exc.ArgumentError(
2403 "Property %s specified with secondary "
2404 "join condition but "
2405 "no secondary argument" % self.prop
2406 )
2407
2408 # find a join between the given mapper's mapped table and
2409 # the given table. will try the mapper's local table first
2410 # for more specificity, then if not found will try the more
2411 # general mapped table, which in the case of inheritance is
2412 # a join.
2413 try:
2414 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2415 if self.secondary is not None:
2416 if self.secondaryjoin is None:
2417 self.secondaryjoin = join_condition(
2418 self.child_persist_selectable,
2419 self.secondary,
2420 a_subset=self.child_local_selectable,
2421 consider_as_foreign_keys=consider_as_foreign_keys,
2422 )
2423 if self.primaryjoin_initial is None:
2424 self.primaryjoin = join_condition(
2425 self.parent_persist_selectable,
2426 self.secondary,
2427 a_subset=self.parent_local_selectable,
2428 consider_as_foreign_keys=consider_as_foreign_keys,
2429 )
2430 else:
2431 self.primaryjoin = self.primaryjoin_initial
2432 else:
2433 if self.primaryjoin_initial is None:
2434 self.primaryjoin = join_condition(
2435 self.parent_persist_selectable,
2436 self.child_persist_selectable,
2437 a_subset=self.parent_local_selectable,
2438 consider_as_foreign_keys=consider_as_foreign_keys,
2439 )
2440 else:
2441 self.primaryjoin = self.primaryjoin_initial
2442 except sa_exc.NoForeignKeysError as nfe:
2443 if self.secondary is not None:
2444 raise sa_exc.NoForeignKeysError(
2445 "Could not determine join "
2446 "condition between parent/child tables on "
2447 "relationship %s - there are no foreign keys "
2448 "linking these tables via secondary table '%s'. "
2449 "Ensure that referencing columns are associated "
2450 "with a ForeignKey or ForeignKeyConstraint, or "
2451 "specify 'primaryjoin' and 'secondaryjoin' "
2452 "expressions." % (self.prop, self.secondary)
2453 ) from nfe
2454 else:
2455 raise sa_exc.NoForeignKeysError(
2456 "Could not determine join "
2457 "condition between parent/child tables on "
2458 "relationship %s - there are no foreign keys "
2459 "linking these tables. "
2460 "Ensure that referencing columns are associated "
2461 "with a ForeignKey or ForeignKeyConstraint, or "
2462 "specify a 'primaryjoin' expression." % self.prop
2463 ) from nfe
2464 except sa_exc.AmbiguousForeignKeysError as afe:
2465 if self.secondary is not None:
2466 raise sa_exc.AmbiguousForeignKeysError(
2467 "Could not determine join "
2468 "condition between parent/child tables on "
2469 "relationship %s - there are multiple foreign key "
2470 "paths linking the tables via secondary table '%s'. "
2471 "Specify the 'foreign_keys' "
2472 "argument, providing a list of those columns which "
2473 "should be counted as containing a foreign key "
2474 "reference from the secondary table to each of the "
2475 "parent and child tables." % (self.prop, self.secondary)
2476 ) from afe
2477 else:
2478 raise sa_exc.AmbiguousForeignKeysError(
2479 "Could not determine join "
2480 "condition between parent/child tables on "
2481 "relationship %s - there are multiple foreign key "
2482 "paths linking the tables. Specify the "
2483 "'foreign_keys' argument, providing a list of those "
2484 "columns which should be counted as containing a "
2485 "foreign key reference to the parent table." % self.prop
2486 ) from afe
2487
2488 @property
2489 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2490 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2491
2492 @property
2493 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2494 assert self.secondaryjoin is not None
2495 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2496
2497 @util.memoized_property
2498 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2499 """Return the primaryjoin condition suitable for the
2500 "reverse" direction.
2501
2502 If the primaryjoin was delivered here with pre-existing
2503 "remote" annotations, the local/remote annotations
2504 are reversed. Otherwise, the local/remote annotations
2505 are removed.
2506
2507 """
2508 if self._has_remote_annotations:
2509
2510 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2511 if "remote" in element._annotations:
2512 v = dict(element._annotations)
2513 del v["remote"]
2514 v["local"] = True
2515 return element._with_annotations(v)
2516 elif "local" in element._annotations:
2517 v = dict(element._annotations)
2518 del v["local"]
2519 v["remote"] = True
2520 return element._with_annotations(v)
2521
2522 return None
2523
2524 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2525 else:
2526 if self._has_foreign_annotations:
2527 # TODO: coverage
2528 return _deep_deannotate(
2529 self.primaryjoin, values=("local", "remote")
2530 )
2531 else:
2532 return _deep_deannotate(self.primaryjoin)
2533
2534 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2535 for col in visitors.iterate(clause, {}):
2536 if annotation in col._annotations:
2537 return True
2538 else:
2539 return False
2540
2541 @util.memoized_property
2542 def _has_foreign_annotations(self) -> bool:
2543 return self._has_annotation(self.primaryjoin, "foreign")
2544
2545 @util.memoized_property
2546 def _has_remote_annotations(self) -> bool:
2547 return self._has_annotation(self.primaryjoin, "remote")
2548
2549 def _annotate_fks(self) -> None:
2550 """Annotate the primaryjoin and secondaryjoin
2551 structures with 'foreign' annotations marking columns
2552 considered as foreign.
2553
2554 """
2555 if self._has_foreign_annotations:
2556 return
2557
2558 if self.consider_as_foreign_keys:
2559 self._annotate_from_fk_list()
2560 else:
2561 self._annotate_present_fks()
2562
2563 def _annotate_from_fk_list(self) -> None:
2564 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2565 if element in self.consider_as_foreign_keys:
2566 return element._annotate({"foreign": True})
2567 return None
2568
2569 self.primaryjoin = visitors.replacement_traverse(
2570 self.primaryjoin, {}, check_fk
2571 )
2572 if self.secondaryjoin is not None:
2573 self.secondaryjoin = visitors.replacement_traverse(
2574 self.secondaryjoin, {}, check_fk
2575 )
2576
2577 def _annotate_present_fks(self) -> None:
2578 if self.secondary is not None:
2579 secondarycols = util.column_set(self.secondary.c)
2580 else:
2581 secondarycols = set()
2582
2583 def is_foreign(
2584 a: ColumnElement[Any], b: ColumnElement[Any]
2585 ) -> Optional[ColumnElement[Any]]:
2586 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2587 if a.references(b):
2588 return a
2589 elif b.references(a):
2590 return b
2591
2592 if secondarycols:
2593 if a in secondarycols and b not in secondarycols:
2594 return a
2595 elif b in secondarycols and a not in secondarycols:
2596 return b
2597
2598 return None
2599
2600 def visit_binary(binary: BinaryExpression[Any]) -> None:
2601 if not isinstance(
2602 binary.left, sql.ColumnElement
2603 ) or not isinstance(binary.right, sql.ColumnElement):
2604 return
2605
2606 if (
2607 "foreign" not in binary.left._annotations
2608 and "foreign" not in binary.right._annotations
2609 ):
2610 col = is_foreign(binary.left, binary.right)
2611 if col is not None:
2612 if col.compare(binary.left):
2613 binary.left = binary.left._annotate({"foreign": True})
2614 elif col.compare(binary.right):
2615 binary.right = binary.right._annotate(
2616 {"foreign": True}
2617 )
2618
2619 self.primaryjoin = visitors.cloned_traverse(
2620 self.primaryjoin, {}, {"binary": visit_binary}
2621 )
2622 if self.secondaryjoin is not None:
2623 self.secondaryjoin = visitors.cloned_traverse(
2624 self.secondaryjoin, {}, {"binary": visit_binary}
2625 )
2626
2627 def _refers_to_parent_table(self) -> bool:
2628 """Return True if the join condition contains column
2629 comparisons where both columns are in both tables.
2630
2631 """
2632 pt = self.parent_persist_selectable
2633 mt = self.child_persist_selectable
2634 result = False
2635
2636 def visit_binary(binary: BinaryExpression[Any]) -> None:
2637 nonlocal result
2638 c, f = binary.left, binary.right
2639 if (
2640 isinstance(c, expression.ColumnClause)
2641 and isinstance(f, expression.ColumnClause)
2642 and pt.is_derived_from(c.table)
2643 and pt.is_derived_from(f.table)
2644 and mt.is_derived_from(c.table)
2645 and mt.is_derived_from(f.table)
2646 ):
2647 result = True
2648
2649 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2650 return result
2651
2652 def _tables_overlap(self) -> bool:
2653 """Return True if parent/child tables have some overlap."""
2654
2655 return selectables_overlap(
2656 self.parent_persist_selectable, self.child_persist_selectable
2657 )
2658
2659 def _annotate_remote(self) -> None:
2660 """Annotate the primaryjoin and secondaryjoin
2661 structures with 'remote' annotations marking columns
2662 considered as part of the 'remote' side.
2663
2664 """
2665 if self._has_remote_annotations:
2666 return
2667
2668 if self.secondary is not None:
2669 self._annotate_remote_secondary()
2670 elif self._local_remote_pairs or self._remote_side:
2671 self._annotate_remote_from_args()
2672 elif self._refers_to_parent_table():
2673 self._annotate_selfref(
2674 lambda col: "foreign" in col._annotations, False
2675 )
2676 elif self._tables_overlap():
2677 self._annotate_remote_with_overlap()
2678 else:
2679 self._annotate_remote_distinct_selectables()
2680
2681 def _annotate_remote_secondary(self) -> None:
2682 """annotate 'remote' in primaryjoin, secondaryjoin
2683 when 'secondary' is present.
2684
2685 """
2686
2687 assert self.secondary is not None
2688 fixed_secondary = self.secondary
2689
2690 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2691 if fixed_secondary.c.contains_column(element):
2692 return element._annotate({"remote": True})
2693 return None
2694
2695 self.primaryjoin = visitors.replacement_traverse(
2696 self.primaryjoin, {}, repl
2697 )
2698
2699 assert self.secondaryjoin is not None
2700 self.secondaryjoin = visitors.replacement_traverse(
2701 self.secondaryjoin, {}, repl
2702 )
2703
2704 def _annotate_selfref(
2705 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2706 ) -> None:
2707 """annotate 'remote' in primaryjoin, secondaryjoin
2708 when the relationship is detected as self-referential.
2709
2710 """
2711
2712 def visit_binary(binary: BinaryExpression[Any]) -> None:
2713 equated = binary.left.compare(binary.right)
2714 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2715 binary.right, expression.ColumnClause
2716 ):
2717 # assume one to many - FKs are "remote"
2718 if fn(binary.left):
2719 binary.left = binary.left._annotate({"remote": True})
2720 if fn(binary.right) and not equated:
2721 binary.right = binary.right._annotate({"remote": True})
2722 elif not remote_side_given:
2723 self._warn_non_column_elements()
2724
2725 self.primaryjoin = visitors.cloned_traverse(
2726 self.primaryjoin, {}, {"binary": visit_binary}
2727 )
2728
2729 def _annotate_remote_from_args(self) -> None:
2730 """annotate 'remote' in primaryjoin, secondaryjoin
2731 when the 'remote_side' or '_local_remote_pairs'
2732 arguments are used.
2733
2734 """
2735 if self._local_remote_pairs:
2736 if self._remote_side:
2737 raise sa_exc.ArgumentError(
2738 "remote_side argument is redundant "
2739 "against more detailed _local_remote_side "
2740 "argument."
2741 )
2742
2743 remote_side = [r for (l, r) in self._local_remote_pairs]
2744 else:
2745 remote_side = self._remote_side
2746
2747 if self._refers_to_parent_table():
2748 self._annotate_selfref(lambda col: col in remote_side, True)
2749 else:
2750
2751 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2752 # use set() to avoid generating ``__eq__()`` expressions
2753 # against each element
2754 if element in set(remote_side):
2755 return element._annotate({"remote": True})
2756 return None
2757
2758 self.primaryjoin = visitors.replacement_traverse(
2759 self.primaryjoin, {}, repl
2760 )
2761
2762 def _annotate_remote_with_overlap(self) -> None:
2763 """annotate 'remote' in primaryjoin, secondaryjoin
2764 when the parent/child tables have some set of
2765 tables in common, though is not a fully self-referential
2766 relationship.
2767
2768 """
2769
2770 def visit_binary(binary: BinaryExpression[Any]) -> None:
2771 binary.left, binary.right = proc_left_right(
2772 binary.left, binary.right
2773 )
2774 binary.right, binary.left = proc_left_right(
2775 binary.right, binary.left
2776 )
2777
2778 check_entities = (
2779 self.prop is not None and self.prop.mapper is not self.prop.parent
2780 )
2781
2782 def proc_left_right(
2783 left: ColumnElement[Any], right: ColumnElement[Any]
2784 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2785 if isinstance(left, expression.ColumnClause) and isinstance(
2786 right, expression.ColumnClause
2787 ):
2788 if self.child_persist_selectable.c.contains_column(
2789 right
2790 ) and self.parent_persist_selectable.c.contains_column(left):
2791 right = right._annotate({"remote": True})
2792 elif (
2793 check_entities
2794 and right._annotations.get("parentmapper") is self.prop.mapper
2795 ):
2796 right = right._annotate({"remote": True})
2797 elif (
2798 check_entities
2799 and left._annotations.get("parentmapper") is self.prop.mapper
2800 ):
2801 left = left._annotate({"remote": True})
2802 else:
2803 self._warn_non_column_elements()
2804
2805 return left, right
2806
2807 self.primaryjoin = visitors.cloned_traverse(
2808 self.primaryjoin, {}, {"binary": visit_binary}
2809 )
2810
2811 def _annotate_remote_distinct_selectables(self) -> None:
2812 """annotate 'remote' in primaryjoin, secondaryjoin
2813 when the parent/child tables are entirely
2814 separate.
2815
2816 """
2817
2818 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2819 if self.child_persist_selectable.c.contains_column(element) and (
2820 not self.parent_local_selectable.c.contains_column(element)
2821 or self.child_local_selectable.c.contains_column(element)
2822 ):
2823 return element._annotate({"remote": True})
2824 return None
2825
2826 self.primaryjoin = visitors.replacement_traverse(
2827 self.primaryjoin, {}, repl
2828 )
2829
2830 def _warn_non_column_elements(self) -> None:
2831 util.warn(
2832 "Non-simple column elements in primary "
2833 "join condition for property %s - consider using "
2834 "remote() annotations to mark the remote side." % self.prop
2835 )
2836
2837 def _annotate_local(self) -> None:
2838 """Annotate the primaryjoin and secondaryjoin
2839 structures with 'local' annotations.
2840
2841 This annotates all column elements found
2842 simultaneously in the parent table
2843 and the join condition that don't have a
2844 'remote' annotation set up from
2845 _annotate_remote() or user-defined.
2846
2847 """
2848 if self._has_annotation(self.primaryjoin, "local"):
2849 return
2850
2851 if self._local_remote_pairs:
2852 local_side = util.column_set(
2853 [l for (l, r) in self._local_remote_pairs]
2854 )
2855 else:
2856 local_side = util.column_set(self.parent_persist_selectable.c)
2857
2858 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2859 if "remote" not in element._annotations and element in local_side:
2860 return element._annotate({"local": True})
2861 return None
2862
2863 self.primaryjoin = visitors.replacement_traverse(
2864 self.primaryjoin, {}, locals_
2865 )
2866
2867 def _annotate_parentmapper(self) -> None:
2868 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2869 if "remote" in element._annotations:
2870 return element._annotate({"parentmapper": self.prop.mapper})
2871 elif "local" in element._annotations:
2872 return element._annotate({"parentmapper": self.prop.parent})
2873 return None
2874
2875 self.primaryjoin = visitors.replacement_traverse(
2876 self.primaryjoin, {}, parentmappers_
2877 )
2878
2879 def _check_remote_side(self) -> None:
2880 if not self.local_remote_pairs:
2881 raise sa_exc.ArgumentError(
2882 "Relationship %s could "
2883 "not determine any unambiguous local/remote column "
2884 "pairs based on join condition and remote_side "
2885 "arguments. "
2886 "Consider using the remote() annotation to "
2887 "accurately mark those elements of the join "
2888 "condition that are on the remote side of "
2889 "the relationship." % (self.prop,)
2890 )
2891 else:
2892 not_target = util.column_set(
2893 self.parent_persist_selectable.c
2894 ).difference(self.child_persist_selectable.c)
2895
2896 for _, rmt in self.local_remote_pairs:
2897 if rmt in not_target:
2898 util.warn(
2899 "Expression %s is marked as 'remote', but these "
2900 "column(s) are local to the local side. The "
2901 "remote() annotation is needed only for a "
2902 "self-referential relationship where both sides "
2903 "of the relationship refer to the same tables."
2904 % (rmt,)
2905 )
2906
2907 def _check_foreign_cols(
2908 self, join_condition: ColumnElement[bool], primary: bool
2909 ) -> None:
2910 """Check the foreign key columns collected and emit error
2911 messages."""
2912 foreign_cols = self._gather_columns_with_annotation(
2913 join_condition, "foreign"
2914 )
2915
2916 has_foreign = bool(foreign_cols)
2917
2918 if primary:
2919 can_sync = bool(self.synchronize_pairs)
2920 else:
2921 can_sync = bool(self.secondary_synchronize_pairs)
2922
2923 if (
2924 self.support_sync
2925 and can_sync
2926 or (not self.support_sync and has_foreign)
2927 ):
2928 return
2929
2930 # from here below is just determining the best error message
2931 # to report. Check for a join condition using any operator
2932 # (not just ==), perhaps they need to turn on "viewonly=True".
2933 if self.support_sync and has_foreign and not can_sync:
2934 err = (
2935 "Could not locate any simple equality expressions "
2936 "involving locally mapped foreign key columns for "
2937 "%s join condition "
2938 "'%s' on relationship %s."
2939 % (
2940 primary and "primary" or "secondary",
2941 join_condition,
2942 self.prop,
2943 )
2944 )
2945 err += (
2946 " Ensure that referencing columns are associated "
2947 "with a ForeignKey or ForeignKeyConstraint, or are "
2948 "annotated in the join condition with the foreign() "
2949 "annotation. To allow comparison operators other than "
2950 "'==', the relationship can be marked as viewonly=True."
2951 )
2952
2953 raise sa_exc.ArgumentError(err)
2954 else:
2955 err = (
2956 "Could not locate any relevant foreign key columns "
2957 "for %s join condition '%s' on relationship %s."
2958 % (
2959 primary and "primary" or "secondary",
2960 join_condition,
2961 self.prop,
2962 )
2963 )
2964 err += (
2965 " Ensure that referencing columns are associated "
2966 "with a ForeignKey or ForeignKeyConstraint, or are "
2967 "annotated in the join condition with the foreign() "
2968 "annotation."
2969 )
2970 raise sa_exc.ArgumentError(err)
2971
2972 def _determine_direction(self) -> None:
2973 """Determine if this relationship is one to many, many to one,
2974 many to many.
2975
2976 """
2977 if self.secondaryjoin is not None:
2978 self.direction = MANYTOMANY
2979 else:
2980 parentcols = util.column_set(self.parent_persist_selectable.c)
2981 targetcols = util.column_set(self.child_persist_selectable.c)
2982
2983 # fk collection which suggests ONETOMANY.
2984 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
2985
2986 # fk collection which suggests MANYTOONE.
2987
2988 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
2989
2990 if onetomany_fk and manytoone_fk:
2991 # fks on both sides. test for overlap of local/remote
2992 # with foreign key.
2993 # we will gather columns directly from their annotations
2994 # without deannotating, so that we can distinguish on a column
2995 # that refers to itself.
2996
2997 # 1. columns that are both remote and FK suggest
2998 # onetomany.
2999 onetomany_local = self._gather_columns_with_annotation(
3000 self.primaryjoin, "remote", "foreign"
3001 )
3002
3003 # 2. columns that are FK but are not remote (e.g. local)
3004 # suggest manytoone.
3005 manytoone_local = {
3006 c
3007 for c in self._gather_columns_with_annotation(
3008 self.primaryjoin, "foreign"
3009 )
3010 if "remote" not in c._annotations
3011 }
3012
3013 # 3. if both collections are present, remove columns that
3014 # refer to themselves. This is for the case of
3015 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3016 if onetomany_local and manytoone_local:
3017 self_equated = self.remote_columns.intersection(
3018 self.local_columns
3019 )
3020 onetomany_local = onetomany_local.difference(self_equated)
3021 manytoone_local = manytoone_local.difference(self_equated)
3022
3023 # at this point, if only one or the other collection is
3024 # present, we know the direction, otherwise it's still
3025 # ambiguous.
3026
3027 if onetomany_local and not manytoone_local:
3028 self.direction = ONETOMANY
3029 elif manytoone_local and not onetomany_local:
3030 self.direction = MANYTOONE
3031 else:
3032 raise sa_exc.ArgumentError(
3033 "Can't determine relationship"
3034 " direction for relationship '%s' - foreign "
3035 "key columns within the join condition are present "
3036 "in both the parent and the child's mapped tables. "
3037 "Ensure that only those columns referring "
3038 "to a parent column are marked as foreign, "
3039 "either via the foreign() annotation or "
3040 "via the foreign_keys argument." % self.prop
3041 )
3042 elif onetomany_fk:
3043 self.direction = ONETOMANY
3044 elif manytoone_fk:
3045 self.direction = MANYTOONE
3046 else:
3047 raise sa_exc.ArgumentError(
3048 "Can't determine relationship "
3049 "direction for relationship '%s' - foreign "
3050 "key columns are present in neither the parent "
3051 "nor the child's mapped tables" % self.prop
3052 )
3053
3054 def _deannotate_pairs(
3055 self, collection: _ColumnPairIterable
3056 ) -> _MutableColumnPairs:
3057 """provide deannotation for the various lists of
3058 pairs, so that using them in hashes doesn't incur
3059 high-overhead __eq__() comparisons against
3060 original columns mapped.
3061
3062 """
3063 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3064
3065 def _setup_pairs(self) -> None:
3066 sync_pairs: _MutableColumnPairs = []
3067 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3068 util.OrderedSet([])
3069 )
3070 secondary_sync_pairs: _MutableColumnPairs = []
3071
3072 def go(
3073 joincond: ColumnElement[bool],
3074 collection: _MutableColumnPairs,
3075 ) -> None:
3076 def visit_binary(
3077 binary: BinaryExpression[Any],
3078 left: ColumnElement[Any],
3079 right: ColumnElement[Any],
3080 ) -> None:
3081 if (
3082 "remote" in right._annotations
3083 and "remote" not in left._annotations
3084 and self.can_be_synced_fn(left)
3085 ):
3086 lrp.add((left, right))
3087 elif (
3088 "remote" in left._annotations
3089 and "remote" not in right._annotations
3090 and self.can_be_synced_fn(right)
3091 ):
3092 lrp.add((right, left))
3093 if binary.operator is operators.eq and self.can_be_synced_fn(
3094 left, right
3095 ):
3096 if "foreign" in right._annotations:
3097 collection.append((left, right))
3098 elif "foreign" in left._annotations:
3099 collection.append((right, left))
3100
3101 visit_binary_product(visit_binary, joincond)
3102
3103 for joincond, collection in [
3104 (self.primaryjoin, sync_pairs),
3105 (self.secondaryjoin, secondary_sync_pairs),
3106 ]:
3107 if joincond is None:
3108 continue
3109 go(joincond, collection)
3110
3111 self.local_remote_pairs = self._deannotate_pairs(lrp)
3112 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3113 self.secondary_synchronize_pairs = self._deannotate_pairs(
3114 secondary_sync_pairs
3115 )
3116
3117 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3118 ColumnElement[Any],
3119 weakref.WeakKeyDictionary[
3120 RelationshipProperty[Any], ColumnElement[Any]
3121 ],
3122 ] = weakref.WeakKeyDictionary()
3123
3124 def _warn_for_conflicting_sync_targets(self) -> None:
3125 if not self.support_sync:
3126 return
3127
3128 # we would like to detect if we are synchronizing any column
3129 # pairs in conflict with another relationship that wishes to sync
3130 # an entirely different column to the same target. This is a
3131 # very rare edge case so we will try to minimize the memory/overhead
3132 # impact of this check
3133 for from_, to_ in [
3134 (from_, to_) for (from_, to_) in self.synchronize_pairs
3135 ] + [
3136 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3137 ]:
3138 # save ourselves a ton of memory and overhead by only
3139 # considering columns that are subject to a overlapping
3140 # FK constraints at the core level. This condition can arise
3141 # if multiple relationships overlap foreign() directly, but
3142 # we're going to assume it's typically a ForeignKeyConstraint-
3143 # level configuration that benefits from this warning.
3144
3145 if to_ not in self._track_overlapping_sync_targets:
3146 self._track_overlapping_sync_targets[to_] = (
3147 weakref.WeakKeyDictionary({self.prop: from_})
3148 )
3149 else:
3150 other_props = []
3151 prop_to_from = self._track_overlapping_sync_targets[to_]
3152
3153 for pr, fr_ in prop_to_from.items():
3154 if (
3155 not pr.mapper._dispose_called
3156 and pr not in self.prop._reverse_property
3157 and pr.key not in self.prop._overlaps
3158 and self.prop.key not in pr._overlaps
3159 # note: the "__*" symbol is used internally by
3160 # SQLAlchemy as a general means of suppressing the
3161 # overlaps warning for some extension cases, however
3162 # this is not currently
3163 # a publicly supported symbol and may change at
3164 # any time.
3165 and "__*" not in self.prop._overlaps
3166 and "__*" not in pr._overlaps
3167 and not self.prop.parent.is_sibling(pr.parent)
3168 and not self.prop.mapper.is_sibling(pr.mapper)
3169 and not self.prop.parent.is_sibling(pr.mapper)
3170 and not self.prop.mapper.is_sibling(pr.parent)
3171 and (
3172 self.prop.key != pr.key
3173 or not self.prop.parent.common_parent(pr.parent)
3174 )
3175 ):
3176 other_props.append((pr, fr_))
3177
3178 if other_props:
3179 util.warn(
3180 "relationship '%s' will copy column %s to column %s, "
3181 "which conflicts with relationship(s): %s. "
3182 "If this is not the intention, consider if these "
3183 "relationships should be linked with "
3184 "back_populates, or if viewonly=True should be "
3185 "applied to one or more if they are read-only. "
3186 "For the less common case that foreign key "
3187 "constraints are partially overlapping, the "
3188 "orm.foreign() "
3189 "annotation can be used to isolate the columns that "
3190 "should be written towards. To silence this "
3191 "warning, add the parameter 'overlaps=\"%s\"' to the "
3192 "'%s' relationship."
3193 % (
3194 self.prop,
3195 from_,
3196 to_,
3197 ", ".join(
3198 sorted(
3199 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3200 for (pr, fr_) in other_props
3201 )
3202 ),
3203 ",".join(sorted(pr.key for pr, fr in other_props)),
3204 self.prop,
3205 ),
3206 code="qzyx",
3207 )
3208 self._track_overlapping_sync_targets[to_][self.prop] = from_
3209
3210 @util.memoized_property
3211 def remote_columns(self) -> Set[ColumnElement[Any]]:
3212 return self._gather_join_annotations("remote")
3213
3214 @util.memoized_property
3215 def local_columns(self) -> Set[ColumnElement[Any]]:
3216 return self._gather_join_annotations("local")
3217
3218 @util.memoized_property
3219 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3220 return self._gather_join_annotations("foreign")
3221
3222 def _gather_join_annotations(
3223 self, annotation: str
3224 ) -> Set[ColumnElement[Any]]:
3225 s = set(
3226 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3227 )
3228 if self.secondaryjoin is not None:
3229 s.update(
3230 self._gather_columns_with_annotation(
3231 self.secondaryjoin, annotation
3232 )
3233 )
3234 return {x._deannotate() for x in s}
3235
3236 def _gather_columns_with_annotation(
3237 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3238 ) -> Set[ColumnElement[Any]]:
3239 annotation_set = set(annotation)
3240 return {
3241 cast(ColumnElement[Any], col)
3242 for col in visitors.iterate(clause, {})
3243 if annotation_set.issubset(col._annotations)
3244 }
3245
3246 @util.memoized_property
3247 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3248 if self.secondary is not None:
3249 return frozenset(
3250 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3251 )
3252 else:
3253 return util.EMPTY_SET
3254
3255 def join_targets(
3256 self,
3257 source_selectable: Optional[FromClause],
3258 dest_selectable: FromClause,
3259 aliased: bool,
3260 single_crit: Optional[ColumnElement[bool]] = None,
3261 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3262 ) -> Tuple[
3263 ColumnElement[bool],
3264 Optional[ColumnElement[bool]],
3265 Optional[FromClause],
3266 Optional[ClauseAdapter],
3267 FromClause,
3268 ]:
3269 """Given a source and destination selectable, create a
3270 join between them.
3271
3272 This takes into account aliasing the join clause
3273 to reference the appropriate corresponding columns
3274 in the target objects, as well as the extra child
3275 criterion, equivalent column sets, etc.
3276
3277 """
3278 # place a barrier on the destination such that
3279 # replacement traversals won't ever dig into it.
3280 # its internal structure remains fixed
3281 # regardless of context.
3282 dest_selectable = _shallow_annotate(
3283 dest_selectable, {"no_replacement_traverse": True}
3284 )
3285
3286 primaryjoin, secondaryjoin, secondary = (
3287 self.primaryjoin,
3288 self.secondaryjoin,
3289 self.secondary,
3290 )
3291
3292 # adjust the join condition for single table inheritance,
3293 # in the case that the join is to a subclass
3294 # this is analogous to the
3295 # "_adjust_for_single_table_inheritance()" method in Query.
3296
3297 if single_crit is not None:
3298 if secondaryjoin is not None:
3299 secondaryjoin = secondaryjoin & single_crit
3300 else:
3301 primaryjoin = primaryjoin & single_crit
3302
3303 if extra_criteria:
3304
3305 def mark_exclude_cols(
3306 elem: SupportsAnnotations, annotations: _AnnotationDict
3307 ) -> SupportsAnnotations:
3308 """note unrelated columns in the "extra criteria" as either
3309 should be adapted or not adapted, even though they are not
3310 part of our "local" or "remote" side.
3311
3312 see #9779 for this case, as well as #11010 for a follow up
3313
3314 """
3315
3316 parentmapper_for_element = elem._annotations.get(
3317 "parentmapper", None
3318 )
3319
3320 if (
3321 parentmapper_for_element is not self.prop.parent
3322 and parentmapper_for_element is not self.prop.mapper
3323 and elem not in self._secondary_lineage_set
3324 ):
3325 return _safe_annotate(elem, annotations)
3326 else:
3327 return elem
3328
3329 extra_criteria = tuple(
3330 _deep_annotate(
3331 elem,
3332 {"should_not_adapt": True},
3333 annotate_callable=mark_exclude_cols,
3334 )
3335 for elem in extra_criteria
3336 )
3337
3338 if secondaryjoin is not None:
3339 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3340 else:
3341 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3342
3343 if aliased:
3344 if secondary is not None:
3345 secondary = secondary._anonymous_fromclause(flat=True)
3346 primary_aliasizer = ClauseAdapter(
3347 secondary,
3348 exclude_fn=_local_col_exclude,
3349 )
3350 secondary_aliasizer = ClauseAdapter(
3351 dest_selectable, equivalents=self.child_equivalents
3352 ).chain(primary_aliasizer)
3353 if source_selectable is not None:
3354 primary_aliasizer = ClauseAdapter(
3355 secondary,
3356 exclude_fn=_local_col_exclude,
3357 ).chain(
3358 ClauseAdapter(
3359 source_selectable,
3360 equivalents=self.parent_equivalents,
3361 )
3362 )
3363
3364 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3365 else:
3366 primary_aliasizer = ClauseAdapter(
3367 dest_selectable,
3368 exclude_fn=_local_col_exclude,
3369 equivalents=self.child_equivalents,
3370 )
3371 if source_selectable is not None:
3372 primary_aliasizer.chain(
3373 ClauseAdapter(
3374 source_selectable,
3375 exclude_fn=_remote_col_exclude,
3376 equivalents=self.parent_equivalents,
3377 )
3378 )
3379 secondary_aliasizer = None
3380
3381 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3382 target_adapter = secondary_aliasizer or primary_aliasizer
3383 target_adapter.exclude_fn = None
3384 else:
3385 target_adapter = None
3386 return (
3387 primaryjoin,
3388 secondaryjoin,
3389 secondary,
3390 target_adapter,
3391 dest_selectable,
3392 )
3393
3394 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3395 ColumnElement[bool],
3396 Dict[str, ColumnElement[Any]],
3397 Dict[ColumnElement[Any], ColumnElement[Any]],
3398 ]:
3399 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3400 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3401
3402 has_secondary = self.secondaryjoin is not None
3403
3404 if has_secondary:
3405 lookup = collections.defaultdict(list)
3406 for l, r in self.local_remote_pairs:
3407 lookup[l].append((l, r))
3408 equated_columns[r] = l
3409 elif not reverse_direction:
3410 for l, r in self.local_remote_pairs:
3411 equated_columns[r] = l
3412 else:
3413 for l, r in self.local_remote_pairs:
3414 equated_columns[l] = r
3415
3416 def col_to_bind(
3417 element: ColumnElement[Any], **kw: Any
3418 ) -> Optional[BindParameter[Any]]:
3419 if (
3420 (not reverse_direction and "local" in element._annotations)
3421 or reverse_direction
3422 and (
3423 (has_secondary and element in lookup)
3424 or (not has_secondary and "remote" in element._annotations)
3425 )
3426 ):
3427 if element not in binds:
3428 binds[element] = sql.bindparam(
3429 None, None, type_=element.type, unique=True
3430 )
3431 return binds[element]
3432 return None
3433
3434 lazywhere = self.primaryjoin
3435 if self.secondaryjoin is None or not reverse_direction:
3436 lazywhere = visitors.replacement_traverse(
3437 lazywhere, {}, col_to_bind
3438 )
3439
3440 if self.secondaryjoin is not None:
3441 secondaryjoin = self.secondaryjoin
3442 if reverse_direction:
3443 secondaryjoin = visitors.replacement_traverse(
3444 secondaryjoin, {}, col_to_bind
3445 )
3446 lazywhere = sql.and_(lazywhere, secondaryjoin)
3447
3448 bind_to_col = {binds[col].key: col for col in binds}
3449
3450 return lazywhere, bind_to_col, equated_columns
3451
3452
3453class _ColInAnnotations:
3454 """Serializable object that tests for names in c._annotations.
3455
3456 TODO: does this need to be serializable anymore? can we find what the
3457 use case was for that?
3458
3459 """
3460
3461 __slots__ = ("names",)
3462
3463 def __init__(self, *names: str):
3464 self.names = frozenset(names)
3465
3466 def __call__(self, c: ClauseElement) -> bool:
3467 return bool(self.names.intersection(c._annotations))
3468
3469
3470_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3471_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3472
3473
3474class Relationship(
3475 RelationshipProperty[_T],
3476 _DeclarativeMapped[_T],
3477):
3478 """Describes an object property that holds a single item or list
3479 of items that correspond to a related database table.
3480
3481 Public constructor is the :func:`_orm.relationship` function.
3482
3483 .. seealso::
3484
3485 :ref:`relationship_config_toplevel`
3486
3487 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3488 compatible subclass for :class:`_orm.RelationshipProperty`.
3489
3490 """
3491
3492 inherit_cache = True
3493 """:meta private:"""
3494
3495
3496class _RelationshipDeclared( # type: ignore[misc]
3497 Relationship[_T],
3498 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3499 DynamicMapped[_T], # not compatible with Mapped[_T]
3500):
3501 """Relationship subclass used implicitly for declarative mapping."""
3502
3503 inherit_cache = True
3504 """:meta private:"""
3505
3506 @classmethod
3507 def _mapper_property_name(cls) -> str:
3508 return "Relationship"