1# orm/relationships.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16
17from __future__ import annotations
18
19import collections
20from collections import abc
21import dataclasses
22import inspect as _py_inspect
23import itertools
24import re
25import typing
26from typing import Any
27from typing import Callable
28from typing import cast
29from typing import Collection
30from typing import Dict
31from typing import FrozenSet
32from typing import Generic
33from typing import Iterable
34from typing import Iterator
35from typing import List
36from typing import NamedTuple
37from typing import NoReturn
38from typing import Optional
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TypeVar
44from typing import Union
45import weakref
46
47from . import attributes
48from . import strategy_options
49from ._typing import insp_is_aliased_class
50from ._typing import is_has_collection_adapter
51from .base import _DeclarativeMapped
52from .base import _is_mapped_class
53from .base import class_mapper
54from .base import DynamicMapped
55from .base import LoaderCallableStatus
56from .base import PassiveFlag
57from .base import state_str
58from .base import WriteOnlyMapped
59from .interfaces import _AttributeOptions
60from .interfaces import _IntrospectsAnnotations
61from .interfaces import MANYTOMANY
62from .interfaces import MANYTOONE
63from .interfaces import ONETOMANY
64from .interfaces import PropComparator
65from .interfaces import RelationshipDirection
66from .interfaces import StrategizedProperty
67from .util import _orm_annotate
68from .util import _orm_deannotate
69from .util import CascadeOptions
70from .. import exc as sa_exc
71from .. import Exists
72from .. import log
73from .. import schema
74from .. import sql
75from .. import util
76from ..inspection import inspect
77from ..sql import coercions
78from ..sql import expression
79from ..sql import operators
80from ..sql import roles
81from ..sql import visitors
82from ..sql._typing import _ColumnExpressionArgument
83from ..sql._typing import _HasClauseElement
84from ..sql.annotation import _safe_annotate
85from ..sql.elements import ColumnClause
86from ..sql.elements import ColumnElement
87from ..sql.util import _deep_annotate
88from ..sql.util import _deep_deannotate
89from ..sql.util import _shallow_annotate
90from ..sql.util import adapt_criterion_to_null
91from ..sql.util import ClauseAdapter
92from ..sql.util import join_condition
93from ..sql.util import selectables_overlap
94from ..sql.util import visit_binary_product
95from ..util.typing import de_optionalize_union_types
96from ..util.typing import Literal
97from ..util.typing import resolve_name_to_real_class_name
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _ExternalEntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InstanceDict
104 from ._typing import _InternalEntityType
105 from ._typing import _O
106 from ._typing import _RegistryType
107 from .base import Mapped
108 from .clsregistry import _class_resolver
109 from .clsregistry import _ModNS
110 from .decl_base import _ClassScanMapperConfig
111 from .dependency import DependencyProcessor
112 from .mapper import Mapper
113 from .query import Query
114 from .session import Session
115 from .state import InstanceState
116 from .strategies import LazyLoader
117 from .util import AliasedClass
118 from .util import AliasedInsp
119 from ..sql._typing import _CoreAdapterProto
120 from ..sql._typing import _EquivalentColumnMap
121 from ..sql._typing import _InfoType
122 from ..sql.annotation import _AnnotationDict
123 from ..sql.annotation import SupportsAnnotations
124 from ..sql.elements import BinaryExpression
125 from ..sql.elements import BindParameter
126 from ..sql.elements import ClauseElement
127 from ..sql.schema import Table
128 from ..sql.selectable import FromClause
129 from ..util.typing import _AnnotationScanType
130 from ..util.typing import RODescriptorReference
131
132_T = TypeVar("_T", bound=Any)
133_T1 = TypeVar("_T1", bound=Any)
134_T2 = TypeVar("_T2", bound=Any)
135
136_PT = TypeVar("_PT", bound=Any)
137
138_PT2 = TypeVar("_PT2", bound=Any)
139
140
141_RelationshipArgumentType = Union[
142 str,
143 Type[_T],
144 Callable[[], Type[_T]],
145 "Mapper[_T]",
146 "AliasedClass[_T]",
147 Callable[[], "Mapper[_T]"],
148 Callable[[], "AliasedClass[_T]"],
149]
150
151_LazyLoadArgumentType = Literal[
152 "select",
153 "joined",
154 "selectin",
155 "subquery",
156 "raise",
157 "raise_on_sql",
158 "noload",
159 "immediate",
160 "write_only",
161 "dynamic",
162 True,
163 False,
164 None,
165]
166
167
168_RelationshipJoinConditionArgument = Union[
169 str, _ColumnExpressionArgument[bool]
170]
171_RelationshipSecondaryArgument = Union[
172 "FromClause", str, Callable[[], "FromClause"]
173]
174_ORMOrderByArgument = Union[
175 Literal[False],
176 str,
177 _ColumnExpressionArgument[Any],
178 Callable[[], _ColumnExpressionArgument[Any]],
179 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
180 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
181]
182ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
183
184_ORMColCollectionElement = Union[
185 ColumnClause[Any],
186 _HasClauseElement[Any],
187 roles.DMLColumnRole,
188 "Mapped[Any]",
189]
190_ORMColCollectionArgument = Union[
191 str,
192 Sequence[_ORMColCollectionElement],
193 Callable[[], Sequence[_ORMColCollectionElement]],
194 Callable[[], _ORMColCollectionElement],
195 _ORMColCollectionElement,
196]
197
198
199_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
200
201_CE = TypeVar("_CE", bound="ColumnElement[Any]")
202
203
204_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
205
206_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
207
208_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
209
210
211def remote(expr: _CEA) -> _CEA:
212 """Annotate a portion of a primaryjoin expression
213 with a 'remote' annotation.
214
215 See the section :ref:`relationship_custom_foreign` for a
216 description of use.
217
218 .. seealso::
219
220 :ref:`relationship_custom_foreign`
221
222 :func:`.foreign`
223
224 """
225 return _annotate_columns( # type: ignore
226 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
227 )
228
229
230def foreign(expr: _CEA) -> _CEA:
231 """Annotate a portion of a primaryjoin expression
232 with a 'foreign' annotation.
233
234 See the section :ref:`relationship_custom_foreign` for a
235 description of use.
236
237 .. seealso::
238
239 :ref:`relationship_custom_foreign`
240
241 :func:`.remote`
242
243 """
244
245 return _annotate_columns( # type: ignore
246 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
247 )
248
249
250@dataclasses.dataclass
251class _RelationshipArg(Generic[_T1, _T2]):
252 """stores a user-defined parameter value that must be resolved and
253 parsed later at mapper configuration time.
254
255 """
256
257 __slots__ = "name", "argument", "resolved"
258 name: str
259 argument: _T1
260 resolved: Optional[_T2]
261
262 def _is_populated(self) -> bool:
263 return self.argument is not None
264
265 def _resolve_against_registry(
266 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
267 ) -> None:
268 attr_value = self.argument
269
270 if isinstance(attr_value, str):
271 self.resolved = clsregistry_resolver(
272 attr_value, self.name == "secondary"
273 )()
274 elif callable(attr_value) and not _is_mapped_class(attr_value):
275 self.resolved = attr_value()
276 else:
277 self.resolved = attr_value
278
279
280_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
281
282
283class _RelationshipArgs(NamedTuple):
284 """stores user-passed parameters that are resolved at mapper configuration
285 time.
286
287 """
288
289 secondary: _RelationshipArg[
290 Optional[_RelationshipSecondaryArgument],
291 Optional[FromClause],
292 ]
293 primaryjoin: _RelationshipArg[
294 Optional[_RelationshipJoinConditionArgument],
295 Optional[ColumnElement[Any]],
296 ]
297 secondaryjoin: _RelationshipArg[
298 Optional[_RelationshipJoinConditionArgument],
299 Optional[ColumnElement[Any]],
300 ]
301 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
302 foreign_keys: _RelationshipArg[
303 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
304 ]
305 remote_side: _RelationshipArg[
306 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
307 ]
308
309
310@log.class_logger
311class RelationshipProperty(
312 _IntrospectsAnnotations, StrategizedProperty[_T], log.Identified
313):
314 """Describes an object property that holds a single item or list
315 of items that correspond to a related database table.
316
317 Public constructor is the :func:`_orm.relationship` function.
318
319 .. seealso::
320
321 :ref:`relationship_config_toplevel`
322
323 """
324
325 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
326 inherit_cache = True
327 """:meta private:"""
328
329 _links_to_entity = True
330 _is_relationship = True
331
332 _overlaps: Sequence[str]
333
334 _lazy_strategy: LazyLoader
335
336 _persistence_only = dict(
337 passive_deletes=False,
338 passive_updates=True,
339 enable_typechecks=True,
340 active_history=False,
341 cascade_backrefs=False,
342 )
343
344 _dependency_processor: Optional[DependencyProcessor] = None
345
346 primaryjoin: ColumnElement[bool]
347 secondaryjoin: Optional[ColumnElement[bool]]
348 secondary: Optional[FromClause]
349 _join_condition: JoinCondition
350 order_by: _RelationshipOrderByArg
351
352 _user_defined_foreign_keys: Set[ColumnElement[Any]]
353 _calculated_foreign_keys: Set[ColumnElement[Any]]
354
355 remote_side: Set[ColumnElement[Any]]
356 local_columns: Set[ColumnElement[Any]]
357
358 synchronize_pairs: _ColumnPairs
359 secondary_synchronize_pairs: Optional[_ColumnPairs]
360
361 local_remote_pairs: Optional[_ColumnPairs]
362
363 direction: RelationshipDirection
364
365 _init_args: _RelationshipArgs
366
367 def __init__(
368 self,
369 argument: Optional[_RelationshipArgumentType[_T]] = None,
370 secondary: Optional[_RelationshipSecondaryArgument] = None,
371 *,
372 uselist: Optional[bool] = None,
373 collection_class: Optional[
374 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
375 ] = None,
376 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
377 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
378 back_populates: Optional[str] = None,
379 order_by: _ORMOrderByArgument = False,
380 backref: Optional[ORMBackrefArgument] = None,
381 overlaps: Optional[str] = None,
382 post_update: bool = False,
383 cascade: str = "save-update, merge",
384 viewonly: bool = False,
385 attribute_options: Optional[_AttributeOptions] = None,
386 lazy: _LazyLoadArgumentType = "select",
387 passive_deletes: Union[Literal["all"], bool] = False,
388 passive_updates: bool = True,
389 active_history: bool = False,
390 enable_typechecks: bool = True,
391 foreign_keys: Optional[_ORMColCollectionArgument] = None,
392 remote_side: Optional[_ORMColCollectionArgument] = None,
393 join_depth: Optional[int] = None,
394 comparator_factory: Optional[
395 Type[RelationshipProperty.Comparator[Any]]
396 ] = None,
397 single_parent: bool = False,
398 innerjoin: bool = False,
399 distinct_target_key: Optional[bool] = None,
400 load_on_pending: bool = False,
401 query_class: Optional[Type[Query[Any]]] = None,
402 info: Optional[_InfoType] = None,
403 omit_join: Literal[None, False] = None,
404 sync_backref: Optional[bool] = None,
405 doc: Optional[str] = None,
406 bake_queries: Literal[True] = True,
407 cascade_backrefs: Literal[False] = False,
408 _local_remote_pairs: Optional[_ColumnPairs] = None,
409 _legacy_inactive_history_style: bool = False,
410 ):
411 super().__init__(attribute_options=attribute_options)
412
413 self.uselist = uselist
414 self.argument = argument
415
416 self._init_args = _RelationshipArgs(
417 _RelationshipArg("secondary", secondary, None),
418 _RelationshipArg("primaryjoin", primaryjoin, None),
419 _RelationshipArg("secondaryjoin", secondaryjoin, None),
420 _RelationshipArg("order_by", order_by, None),
421 _RelationshipArg("foreign_keys", foreign_keys, None),
422 _RelationshipArg("remote_side", remote_side, None),
423 )
424
425 self.post_update = post_update
426 self.viewonly = viewonly
427 if viewonly:
428 self._warn_for_persistence_only_flags(
429 passive_deletes=passive_deletes,
430 passive_updates=passive_updates,
431 enable_typechecks=enable_typechecks,
432 active_history=active_history,
433 cascade_backrefs=cascade_backrefs,
434 )
435 if viewonly and sync_backref:
436 raise sa_exc.ArgumentError(
437 "sync_backref and viewonly cannot both be True"
438 )
439 self.sync_backref = sync_backref
440 self.lazy = lazy
441 self.single_parent = single_parent
442 self.collection_class = collection_class
443 self.passive_deletes = passive_deletes
444
445 if cascade_backrefs:
446 raise sa_exc.ArgumentError(
447 "The 'cascade_backrefs' parameter passed to "
448 "relationship() may only be set to False."
449 )
450
451 self.passive_updates = passive_updates
452 self.enable_typechecks = enable_typechecks
453 self.query_class = query_class
454 self.innerjoin = innerjoin
455 self.distinct_target_key = distinct_target_key
456 self.doc = doc
457 self.active_history = active_history
458 self._legacy_inactive_history_style = _legacy_inactive_history_style
459
460 self.join_depth = join_depth
461 if omit_join:
462 util.warn(
463 "setting omit_join to True is not supported; selectin "
464 "loading of this relationship may not work correctly if this "
465 "flag is set explicitly. omit_join optimization is "
466 "automatically detected for conditions under which it is "
467 "supported."
468 )
469
470 self.omit_join = omit_join
471 self.local_remote_pairs = _local_remote_pairs
472 self.load_on_pending = load_on_pending
473 self.comparator_factory = (
474 comparator_factory or RelationshipProperty.Comparator
475 )
476 util.set_creation_order(self)
477
478 if info is not None:
479 self.info.update(info)
480
481 self.strategy_key = (("lazy", self.lazy),)
482
483 self._reverse_property: Set[RelationshipProperty[Any]] = set()
484
485 if overlaps:
486 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
487 else:
488 self._overlaps = ()
489
490 self.cascade = cascade
491
492 self.back_populates = back_populates
493
494 if self.back_populates:
495 if backref:
496 raise sa_exc.ArgumentError(
497 "backref and back_populates keyword arguments "
498 "are mutually exclusive"
499 )
500 self.backref = None
501 else:
502 self.backref = backref
503
504 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
505 for k, v in kw.items():
506 if v != self._persistence_only[k]:
507 # we are warning here rather than warn deprecated as this is a
508 # configuration mistake, and Python shows regular warnings more
509 # aggressively than deprecation warnings by default. Unlike the
510 # case of setting viewonly with cascade, the settings being
511 # warned about here are not actively doing the wrong thing
512 # against viewonly=True, so it is not as urgent to have these
513 # raise an error.
514 util.warn(
515 "Setting %s on relationship() while also "
516 "setting viewonly=True does not make sense, as a "
517 "viewonly=True relationship does not perform persistence "
518 "operations. This configuration may raise an error "
519 "in a future release." % (k,)
520 )
521
522 def instrument_class(self, mapper: Mapper[Any]) -> None:
523 attributes.register_descriptor(
524 mapper.class_,
525 self.key,
526 comparator=self.comparator_factory(self, mapper),
527 parententity=mapper,
528 doc=self.doc,
529 )
530
531 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
532 """Produce boolean, comparison, and other operators for
533 :class:`.RelationshipProperty` attributes.
534
535 See the documentation for :class:`.PropComparator` for a brief
536 overview of ORM level operator definition.
537
538 .. seealso::
539
540 :class:`.PropComparator`
541
542 :class:`.ColumnProperty.Comparator`
543
544 :class:`.ColumnOperators`
545
546 :ref:`types_operators`
547
548 :attr:`.TypeEngine.comparator_factory`
549
550 """
551
552 __slots__ = (
553 "entity",
554 "mapper",
555 "property",
556 "_of_type",
557 "_extra_criteria",
558 )
559
560 prop: RODescriptorReference[RelationshipProperty[_PT]]
561 _of_type: Optional[_EntityType[_PT]]
562
563 def __init__(
564 self,
565 prop: RelationshipProperty[_PT],
566 parentmapper: _InternalEntityType[Any],
567 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
568 of_type: Optional[_EntityType[_PT]] = None,
569 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
570 ):
571 """Construction of :class:`.RelationshipProperty.Comparator`
572 is internal to the ORM's attribute mechanics.
573
574 """
575 self.prop = prop
576 self._parententity = parentmapper
577 self._adapt_to_entity = adapt_to_entity
578 if of_type:
579 self._of_type = of_type
580 else:
581 self._of_type = None
582 self._extra_criteria = extra_criteria
583
584 def adapt_to_entity(
585 self, adapt_to_entity: AliasedInsp[Any]
586 ) -> RelationshipProperty.Comparator[Any]:
587 return self.__class__(
588 self.prop,
589 self._parententity,
590 adapt_to_entity=adapt_to_entity,
591 of_type=self._of_type,
592 )
593
594 entity: _InternalEntityType[_PT]
595 """The target entity referred to by this
596 :class:`.RelationshipProperty.Comparator`.
597
598 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
599 object.
600
601 This is the "target" or "remote" side of the
602 :func:`_orm.relationship`.
603
604 """
605
606 mapper: Mapper[_PT]
607 """The target :class:`_orm.Mapper` referred to by this
608 :class:`.RelationshipProperty.Comparator`.
609
610 This is the "target" or "remote" side of the
611 :func:`_orm.relationship`.
612
613 """
614
615 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
616 if self._of_type:
617 return inspect(self._of_type) # type: ignore
618 else:
619 return self.prop.entity
620
621 def _memoized_attr_mapper(self) -> Mapper[_PT]:
622 return self.entity.mapper
623
624 def _source_selectable(self) -> FromClause:
625 if self._adapt_to_entity:
626 return self._adapt_to_entity.selectable
627 else:
628 return self.property.parent._with_polymorphic_selectable
629
630 def __clause_element__(self) -> ColumnElement[bool]:
631 adapt_from = self._source_selectable()
632 if self._of_type:
633 of_type_entity = inspect(self._of_type)
634 else:
635 of_type_entity = None
636
637 (
638 pj,
639 sj,
640 source,
641 dest,
642 secondary,
643 target_adapter,
644 ) = self.prop._create_joins(
645 source_selectable=adapt_from,
646 source_polymorphic=True,
647 of_type_entity=of_type_entity,
648 alias_secondary=True,
649 extra_criteria=self._extra_criteria,
650 )
651 if sj is not None:
652 return pj & sj
653 else:
654 return pj
655
656 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
657 r"""Redefine this object in terms of a polymorphic subclass.
658
659 See :meth:`.PropComparator.of_type` for an example.
660
661
662 """
663 return RelationshipProperty.Comparator(
664 self.prop,
665 self._parententity,
666 adapt_to_entity=self._adapt_to_entity,
667 of_type=class_,
668 extra_criteria=self._extra_criteria,
669 )
670
671 def and_(
672 self, *criteria: _ColumnExpressionArgument[bool]
673 ) -> PropComparator[Any]:
674 """Add AND criteria.
675
676 See :meth:`.PropComparator.and_` for an example.
677
678 .. versionadded:: 1.4
679
680 """
681 exprs = tuple(
682 coercions.expect(roles.WhereHavingRole, clause)
683 for clause in util.coerce_generator_arg(criteria)
684 )
685
686 return RelationshipProperty.Comparator(
687 self.prop,
688 self._parententity,
689 adapt_to_entity=self._adapt_to_entity,
690 of_type=self._of_type,
691 extra_criteria=self._extra_criteria + exprs,
692 )
693
694 def in_(self, other: Any) -> NoReturn:
695 """Produce an IN clause - this is not implemented
696 for :func:`_orm.relationship`-based attributes at this time.
697
698 """
699 raise NotImplementedError(
700 "in_() not yet supported for "
701 "relationships. For a simple "
702 "many-to-one, use in_() against "
703 "the set of foreign key values."
704 )
705
706 # https://github.com/python/mypy/issues/4266
707 __hash__ = None # type: ignore
708
709 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
710 """Implement the ``==`` operator.
711
712 In a many-to-one context, such as:
713
714 .. sourcecode:: text
715
716 MyClass.some_prop == <some object>
717
718 this will typically produce a
719 clause such as:
720
721 .. sourcecode:: text
722
723 mytable.related_id == <some id>
724
725 Where ``<some id>`` is the primary key of the given
726 object.
727
728 The ``==`` operator provides partial functionality for non-
729 many-to-one comparisons:
730
731 * Comparisons against collections are not supported.
732 Use :meth:`~.RelationshipProperty.Comparator.contains`.
733 * Compared to a scalar one-to-many, will produce a
734 clause that compares the target columns in the parent to
735 the given target.
736 * Compared to a scalar many-to-many, an alias
737 of the association table will be rendered as
738 well, forming a natural join that is part of the
739 main body of the query. This will not work for
740 queries that go beyond simple AND conjunctions of
741 comparisons, such as those which use OR. Use
742 explicit joins, outerjoins, or
743 :meth:`~.RelationshipProperty.Comparator.has` for
744 more comprehensive non-many-to-one scalar
745 membership tests.
746 * Comparisons against ``None`` given in a one-to-many
747 or many-to-many context produce a NOT EXISTS clause.
748
749 """
750 if other is None or isinstance(other, expression.Null):
751 if self.property.direction in [ONETOMANY, MANYTOMANY]:
752 return ~self._criterion_exists()
753 else:
754 return _orm_annotate(
755 self.property._optimized_compare(
756 None, adapt_source=self.adapter
757 )
758 )
759 elif self.property.uselist:
760 raise sa_exc.InvalidRequestError(
761 "Can't compare a collection to an object or collection; "
762 "use contains() to test for membership."
763 )
764 else:
765 return _orm_annotate(
766 self.property._optimized_compare(
767 other, adapt_source=self.adapter
768 )
769 )
770
771 def _criterion_exists(
772 self,
773 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
774 **kwargs: Any,
775 ) -> Exists:
776 where_criteria = (
777 coercions.expect(roles.WhereHavingRole, criterion)
778 if criterion is not None
779 else None
780 )
781
782 if getattr(self, "_of_type", None):
783 info: Optional[_InternalEntityType[Any]] = inspect(
784 self._of_type
785 )
786 assert info is not None
787 target_mapper, to_selectable, is_aliased_class = (
788 info.mapper,
789 info.selectable,
790 info.is_aliased_class,
791 )
792 if self.property._is_self_referential and not is_aliased_class:
793 to_selectable = to_selectable._anonymous_fromclause()
794
795 single_crit = target_mapper._single_table_criterion
796 if single_crit is not None:
797 if where_criteria is not None:
798 where_criteria = single_crit & where_criteria
799 else:
800 where_criteria = single_crit
801 else:
802 is_aliased_class = False
803 to_selectable = None
804
805 if self.adapter:
806 source_selectable = self._source_selectable()
807 else:
808 source_selectable = None
809
810 (
811 pj,
812 sj,
813 source,
814 dest,
815 secondary,
816 target_adapter,
817 ) = self.property._create_joins(
818 dest_selectable=to_selectable,
819 source_selectable=source_selectable,
820 )
821
822 for k in kwargs:
823 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
824 if where_criteria is None:
825 where_criteria = crit
826 else:
827 where_criteria = where_criteria & crit
828
829 # annotate the *local* side of the join condition, in the case
830 # of pj + sj this is the full primaryjoin, in the case of just
831 # pj its the local side of the primaryjoin.
832 if sj is not None:
833 j = _orm_annotate(pj) & sj
834 else:
835 j = _orm_annotate(pj, exclude=self.property.remote_side)
836
837 if (
838 where_criteria is not None
839 and target_adapter
840 and not is_aliased_class
841 ):
842 # limit this adapter to annotated only?
843 where_criteria = target_adapter.traverse(where_criteria)
844
845 # only have the "joined left side" of what we
846 # return be subject to Query adaption. The right
847 # side of it is used for an exists() subquery and
848 # should not correlate or otherwise reach out
849 # to anything in the enclosing query.
850 if where_criteria is not None:
851 where_criteria = where_criteria._annotate(
852 {"no_replacement_traverse": True}
853 )
854
855 crit = j & sql.True_._ifnone(where_criteria)
856
857 if secondary is not None:
858 ex = (
859 sql.exists(1)
860 .where(crit)
861 .select_from(dest, secondary)
862 .correlate_except(dest, secondary)
863 )
864 else:
865 ex = (
866 sql.exists(1)
867 .where(crit)
868 .select_from(dest)
869 .correlate_except(dest)
870 )
871 return ex
872
873 def any(
874 self,
875 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
876 **kwargs: Any,
877 ) -> ColumnElement[bool]:
878 """Produce an expression that tests a collection against
879 particular criterion, using EXISTS.
880
881 An expression like::
882
883 session.query(MyClass).filter(
884 MyClass.somereference.any(SomeRelated.x == 2)
885 )
886
887 Will produce a query like:
888
889 .. sourcecode:: sql
890
891 SELECT * FROM my_table WHERE
892 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
893 AND related.x=2)
894
895 Because :meth:`~.RelationshipProperty.Comparator.any` uses
896 a correlated subquery, its performance is not nearly as
897 good when compared against large target tables as that of
898 using a join.
899
900 :meth:`~.RelationshipProperty.Comparator.any` is particularly
901 useful for testing for empty collections::
902
903 session.query(MyClass).filter(~MyClass.somereference.any())
904
905 will produce:
906
907 .. sourcecode:: sql
908
909 SELECT * FROM my_table WHERE
910 NOT (EXISTS (SELECT 1 FROM related WHERE
911 related.my_id=my_table.id))
912
913 :meth:`~.RelationshipProperty.Comparator.any` is only
914 valid for collections, i.e. a :func:`_orm.relationship`
915 that has ``uselist=True``. For scalar references,
916 use :meth:`~.RelationshipProperty.Comparator.has`.
917
918 """
919 if not self.property.uselist:
920 raise sa_exc.InvalidRequestError(
921 "'any()' not implemented for scalar "
922 "attributes. Use has()."
923 )
924
925 return self._criterion_exists(criterion, **kwargs)
926
927 def has(
928 self,
929 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
930 **kwargs: Any,
931 ) -> ColumnElement[bool]:
932 """Produce an expression that tests a scalar reference against
933 particular criterion, using EXISTS.
934
935 An expression like::
936
937 session.query(MyClass).filter(
938 MyClass.somereference.has(SomeRelated.x == 2)
939 )
940
941 Will produce a query like:
942
943 .. sourcecode:: sql
944
945 SELECT * FROM my_table WHERE
946 EXISTS (SELECT 1 FROM related WHERE
947 related.id==my_table.related_id AND related.x=2)
948
949 Because :meth:`~.RelationshipProperty.Comparator.has` uses
950 a correlated subquery, its performance is not nearly as
951 good when compared against large target tables as that of
952 using a join.
953
954 :meth:`~.RelationshipProperty.Comparator.has` is only
955 valid for scalar references, i.e. a :func:`_orm.relationship`
956 that has ``uselist=False``. For collection references,
957 use :meth:`~.RelationshipProperty.Comparator.any`.
958
959 """
960 if self.property.uselist:
961 raise sa_exc.InvalidRequestError(
962 "'has()' not implemented for collections. Use any()."
963 )
964 return self._criterion_exists(criterion, **kwargs)
965
966 def contains(
967 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
968 ) -> ColumnElement[bool]:
969 """Return a simple expression that tests a collection for
970 containment of a particular item.
971
972 :meth:`~.RelationshipProperty.Comparator.contains` is
973 only valid for a collection, i.e. a
974 :func:`_orm.relationship` that implements
975 one-to-many or many-to-many with ``uselist=True``.
976
977 When used in a simple one-to-many context, an
978 expression like::
979
980 MyClass.contains(other)
981
982 Produces a clause like:
983
984 .. sourcecode:: sql
985
986 mytable.id == <some id>
987
988 Where ``<some id>`` is the value of the foreign key
989 attribute on ``other`` which refers to the primary
990 key of its parent object. From this it follows that
991 :meth:`~.RelationshipProperty.Comparator.contains` is
992 very useful when used with simple one-to-many
993 operations.
994
995 For many-to-many operations, the behavior of
996 :meth:`~.RelationshipProperty.Comparator.contains`
997 has more caveats. The association table will be
998 rendered in the statement, producing an "implicit"
999 join, that is, includes multiple tables in the FROM
1000 clause which are equated in the WHERE clause::
1001
1002 query(MyClass).filter(MyClass.contains(other))
1003
1004 Produces a query like:
1005
1006 .. sourcecode:: sql
1007
1008 SELECT * FROM my_table, my_association_table AS
1009 my_association_table_1 WHERE
1010 my_table.id = my_association_table_1.parent_id
1011 AND my_association_table_1.child_id = <some id>
1012
1013 Where ``<some id>`` would be the primary key of
1014 ``other``. From the above, it is clear that
1015 :meth:`~.RelationshipProperty.Comparator.contains`
1016 will **not** work with many-to-many collections when
1017 used in queries that move beyond simple AND
1018 conjunctions, such as multiple
1019 :meth:`~.RelationshipProperty.Comparator.contains`
1020 expressions joined by OR. In such cases subqueries or
1021 explicit "outer joins" will need to be used instead.
1022 See :meth:`~.RelationshipProperty.Comparator.any` for
1023 a less-performant alternative using EXISTS, or refer
1024 to :meth:`_query.Query.outerjoin`
1025 as well as :ref:`orm_queryguide_joins`
1026 for more details on constructing outer joins.
1027
1028 kwargs may be ignored by this operator but are required for API
1029 conformance.
1030 """
1031 if not self.prop.uselist:
1032 raise sa_exc.InvalidRequestError(
1033 "'contains' not implemented for scalar "
1034 "attributes. Use =="
1035 )
1036
1037 clause = self.prop._optimized_compare(
1038 other, adapt_source=self.adapter
1039 )
1040
1041 if self.prop.secondaryjoin is not None:
1042 clause.negation_clause = self.__negated_contains_or_equals(
1043 other
1044 )
1045
1046 return clause
1047
1048 def __negated_contains_or_equals(
1049 self, other: Any
1050 ) -> ColumnElement[bool]:
1051 if self.prop.direction == MANYTOONE:
1052 state = attributes.instance_state(other)
1053
1054 def state_bindparam(
1055 local_col: ColumnElement[Any],
1056 state: InstanceState[Any],
1057 remote_col: ColumnElement[Any],
1058 ) -> BindParameter[Any]:
1059 dict_ = state.dict
1060 return sql.bindparam(
1061 local_col.key,
1062 type_=local_col.type,
1063 unique=True,
1064 callable_=self.prop._get_attr_w_warn_on_none(
1065 self.prop.mapper, state, dict_, remote_col
1066 ),
1067 )
1068
1069 def adapt(col: _CE) -> _CE:
1070 if self.adapter:
1071 return self.adapter(col)
1072 else:
1073 return col
1074
1075 if self.property._use_get:
1076 return sql.and_(
1077 *[
1078 sql.or_(
1079 adapt(x)
1080 != state_bindparam(adapt(x), state, y),
1081 adapt(x) == None,
1082 )
1083 for (x, y) in self.property.local_remote_pairs
1084 ]
1085 )
1086
1087 criterion = sql.and_(
1088 *[
1089 x == y
1090 for (x, y) in zip(
1091 self.property.mapper.primary_key,
1092 self.property.mapper.primary_key_from_instance(other),
1093 )
1094 ]
1095 )
1096
1097 return ~self._criterion_exists(criterion)
1098
1099 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1100 """Implement the ``!=`` operator.
1101
1102 In a many-to-one context, such as:
1103
1104 .. sourcecode:: text
1105
1106 MyClass.some_prop != <some object>
1107
1108 This will typically produce a clause such as:
1109
1110 .. sourcecode:: sql
1111
1112 mytable.related_id != <some id>
1113
1114 Where ``<some id>`` is the primary key of the
1115 given object.
1116
1117 The ``!=`` operator provides partial functionality for non-
1118 many-to-one comparisons:
1119
1120 * Comparisons against collections are not supported.
1121 Use
1122 :meth:`~.RelationshipProperty.Comparator.contains`
1123 in conjunction with :func:`_expression.not_`.
1124 * Compared to a scalar one-to-many, will produce a
1125 clause that compares the target columns in the parent to
1126 the given target.
1127 * Compared to a scalar many-to-many, an alias
1128 of the association table will be rendered as
1129 well, forming a natural join that is part of the
1130 main body of the query. This will not work for
1131 queries that go beyond simple AND conjunctions of
1132 comparisons, such as those which use OR. Use
1133 explicit joins, outerjoins, or
1134 :meth:`~.RelationshipProperty.Comparator.has` in
1135 conjunction with :func:`_expression.not_` for
1136 more comprehensive non-many-to-one scalar
1137 membership tests.
1138 * Comparisons against ``None`` given in a one-to-many
1139 or many-to-many context produce an EXISTS clause.
1140
1141 """
1142 if other is None or isinstance(other, expression.Null):
1143 if self.property.direction == MANYTOONE:
1144 return _orm_annotate(
1145 ~self.property._optimized_compare(
1146 None, adapt_source=self.adapter
1147 )
1148 )
1149
1150 else:
1151 return self._criterion_exists()
1152 elif self.property.uselist:
1153 raise sa_exc.InvalidRequestError(
1154 "Can't compare a collection"
1155 " to an object or collection; use "
1156 "contains() to test for membership."
1157 )
1158 else:
1159 return _orm_annotate(self.__negated_contains_or_equals(other))
1160
1161 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1162 self.prop.parent._check_configure()
1163 return self.prop
1164
1165 def _with_parent(
1166 self,
1167 instance: object,
1168 alias_secondary: bool = True,
1169 from_entity: Optional[_EntityType[Any]] = None,
1170 ) -> ColumnElement[bool]:
1171 assert instance is not None
1172 adapt_source: Optional[_CoreAdapterProto] = None
1173 if from_entity is not None:
1174 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1175 assert insp is not None
1176 if insp_is_aliased_class(insp):
1177 adapt_source = insp._adapter.adapt_clause
1178 return self._optimized_compare(
1179 instance,
1180 value_is_parent=True,
1181 adapt_source=adapt_source,
1182 alias_secondary=alias_secondary,
1183 )
1184
1185 def _optimized_compare(
1186 self,
1187 state: Any,
1188 value_is_parent: bool = False,
1189 adapt_source: Optional[_CoreAdapterProto] = None,
1190 alias_secondary: bool = True,
1191 ) -> ColumnElement[bool]:
1192 if state is not None:
1193 try:
1194 state = inspect(state)
1195 except sa_exc.NoInspectionAvailable:
1196 state = None
1197
1198 if state is None or not getattr(state, "is_instance", False):
1199 raise sa_exc.ArgumentError(
1200 "Mapped instance expected for relationship "
1201 "comparison to object. Classes, queries and other "
1202 "SQL elements are not accepted in this context; for "
1203 "comparison with a subquery, "
1204 "use %s.has(**criteria)." % self
1205 )
1206 reverse_direction = not value_is_parent
1207
1208 if state is None:
1209 return self._lazy_none_clause(
1210 reverse_direction, adapt_source=adapt_source
1211 )
1212
1213 if not reverse_direction:
1214 criterion, bind_to_col = (
1215 self._lazy_strategy._lazywhere,
1216 self._lazy_strategy._bind_to_col,
1217 )
1218 else:
1219 criterion, bind_to_col = (
1220 self._lazy_strategy._rev_lazywhere,
1221 self._lazy_strategy._rev_bind_to_col,
1222 )
1223
1224 if reverse_direction:
1225 mapper = self.mapper
1226 else:
1227 mapper = self.parent
1228
1229 dict_ = attributes.instance_dict(state.obj())
1230
1231 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1232 if bindparam._identifying_key in bind_to_col:
1233 bindparam.callable = self._get_attr_w_warn_on_none(
1234 mapper,
1235 state,
1236 dict_,
1237 bind_to_col[bindparam._identifying_key],
1238 )
1239
1240 if self.secondary is not None and alias_secondary:
1241 criterion = ClauseAdapter(
1242 self.secondary._anonymous_fromclause()
1243 ).traverse(criterion)
1244
1245 criterion = visitors.cloned_traverse(
1246 criterion, {}, {"bindparam": visit_bindparam}
1247 )
1248
1249 if adapt_source:
1250 criterion = adapt_source(criterion)
1251 return criterion
1252
1253 def _get_attr_w_warn_on_none(
1254 self,
1255 mapper: Mapper[Any],
1256 state: InstanceState[Any],
1257 dict_: _InstanceDict,
1258 column: ColumnElement[Any],
1259 ) -> Callable[[], Any]:
1260 """Create the callable that is used in a many-to-one expression.
1261
1262 E.g.::
1263
1264 u1 = s.query(User).get(5)
1265
1266 expr = Address.user == u1
1267
1268 Above, the SQL should be "address.user_id = 5". The callable
1269 returned by this method produces the value "5" based on the identity
1270 of ``u1``.
1271
1272 """
1273
1274 # in this callable, we're trying to thread the needle through
1275 # a wide variety of scenarios, including:
1276 #
1277 # * the object hasn't been flushed yet and there's no value for
1278 # the attribute as of yet
1279 #
1280 # * the object hasn't been flushed yet but it has a user-defined
1281 # value
1282 #
1283 # * the object has a value but it's expired and not locally present
1284 #
1285 # * the object has a value but it's expired and not locally present,
1286 # and the object is also detached
1287 #
1288 # * The object hadn't been flushed yet, there was no value, but
1289 # later, the object has been expired and detached, and *now*
1290 # they're trying to evaluate it
1291 #
1292 # * the object had a value, but it was changed to a new value, and
1293 # then expired
1294 #
1295 # * the object had a value, but it was changed to a new value, and
1296 # then expired, then the object was detached
1297 #
1298 # * the object has a user-set value, but it's None and we don't do
1299 # the comparison correctly for that so warn
1300 #
1301
1302 prop = mapper.get_property_by_column(column)
1303
1304 # by invoking this method, InstanceState will track the last known
1305 # value for this key each time the attribute is to be expired.
1306 # this feature was added explicitly for use in this method.
1307 state._track_last_known_value(prop.key)
1308
1309 lkv_fixed = state._last_known_values
1310
1311 def _go() -> Any:
1312 assert lkv_fixed is not None
1313 last_known = to_return = lkv_fixed[prop.key]
1314 existing_is_available = (
1315 last_known is not LoaderCallableStatus.NO_VALUE
1316 )
1317
1318 # we support that the value may have changed. so here we
1319 # try to get the most recent value including re-fetching.
1320 # only if we can't get a value now due to detachment do we return
1321 # the last known value
1322 current_value = mapper._get_state_attr_by_column(
1323 state,
1324 dict_,
1325 column,
1326 passive=(
1327 PassiveFlag.PASSIVE_OFF
1328 if state.persistent
1329 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1330 ),
1331 )
1332
1333 if current_value is LoaderCallableStatus.NEVER_SET:
1334 if not existing_is_available:
1335 raise sa_exc.InvalidRequestError(
1336 "Can't resolve value for column %s on object "
1337 "%s; no value has been set for this column"
1338 % (column, state_str(state))
1339 )
1340 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1341 if not existing_is_available:
1342 raise sa_exc.InvalidRequestError(
1343 "Can't resolve value for column %s on object "
1344 "%s; the object is detached and the value was "
1345 "expired" % (column, state_str(state))
1346 )
1347 else:
1348 to_return = current_value
1349 if to_return is None:
1350 util.warn(
1351 "Got None for value of column %s; this is unsupported "
1352 "for a relationship comparison and will not "
1353 "currently produce an IS comparison "
1354 "(but may in a future release)" % column
1355 )
1356 return to_return
1357
1358 return _go
1359
1360 def _lazy_none_clause(
1361 self,
1362 reverse_direction: bool = False,
1363 adapt_source: Optional[_CoreAdapterProto] = None,
1364 ) -> ColumnElement[bool]:
1365 if not reverse_direction:
1366 criterion, bind_to_col = (
1367 self._lazy_strategy._lazywhere,
1368 self._lazy_strategy._bind_to_col,
1369 )
1370 else:
1371 criterion, bind_to_col = (
1372 self._lazy_strategy._rev_lazywhere,
1373 self._lazy_strategy._rev_bind_to_col,
1374 )
1375
1376 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1377
1378 if adapt_source:
1379 criterion = adapt_source(criterion)
1380 return criterion
1381
1382 def __str__(self) -> str:
1383 return str(self.parent.class_.__name__) + "." + self.key
1384
1385 def merge(
1386 self,
1387 session: Session,
1388 source_state: InstanceState[Any],
1389 source_dict: _InstanceDict,
1390 dest_state: InstanceState[Any],
1391 dest_dict: _InstanceDict,
1392 load: bool,
1393 _recursive: Dict[Any, object],
1394 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1395 ) -> None:
1396 if load:
1397 for r in self._reverse_property:
1398 if (source_state, r) in _recursive:
1399 return
1400
1401 if "merge" not in self._cascade:
1402 return
1403
1404 if self.key not in source_dict:
1405 return
1406
1407 if self.uselist:
1408 impl = source_state.get_impl(self.key)
1409
1410 assert is_has_collection_adapter(impl)
1411 instances_iterable = impl.get_collection(source_state, source_dict)
1412
1413 # if this is a CollectionAttributeImpl, then empty should
1414 # be False, otherwise "self.key in source_dict" should not be
1415 # True
1416 assert not instances_iterable.empty if impl.collection else True
1417
1418 if load:
1419 # for a full merge, pre-load the destination collection,
1420 # so that individual _merge of each item pulls from identity
1421 # map for those already present.
1422 # also assumes CollectionAttributeImpl behavior of loading
1423 # "old" list in any case
1424 dest_state.get_impl(self.key).get(
1425 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1426 )
1427
1428 dest_list = []
1429 for current in instances_iterable:
1430 current_state = attributes.instance_state(current)
1431 current_dict = attributes.instance_dict(current)
1432 _recursive[(current_state, self)] = True
1433 obj = session._merge(
1434 current_state,
1435 current_dict,
1436 load=load,
1437 _recursive=_recursive,
1438 _resolve_conflict_map=_resolve_conflict_map,
1439 )
1440 if obj is not None:
1441 dest_list.append(obj)
1442
1443 if not load:
1444 coll = attributes.init_state_collection(
1445 dest_state, dest_dict, self.key
1446 )
1447 for c in dest_list:
1448 coll.append_without_event(c)
1449 else:
1450 dest_impl = dest_state.get_impl(self.key)
1451 assert is_has_collection_adapter(dest_impl)
1452 dest_impl.set(
1453 dest_state,
1454 dest_dict,
1455 dest_list,
1456 _adapt=False,
1457 passive=PassiveFlag.PASSIVE_MERGE,
1458 )
1459 else:
1460 current = source_dict[self.key]
1461 if current is not None:
1462 current_state = attributes.instance_state(current)
1463 current_dict = attributes.instance_dict(current)
1464 _recursive[(current_state, self)] = True
1465 obj = session._merge(
1466 current_state,
1467 current_dict,
1468 load=load,
1469 _recursive=_recursive,
1470 _resolve_conflict_map=_resolve_conflict_map,
1471 )
1472 else:
1473 obj = None
1474
1475 if not load:
1476 dest_dict[self.key] = obj
1477 else:
1478 dest_state.get_impl(self.key).set(
1479 dest_state, dest_dict, obj, None
1480 )
1481
1482 def _value_as_iterable(
1483 self,
1484 state: InstanceState[_O],
1485 dict_: _InstanceDict,
1486 key: str,
1487 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1488 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1489 """Return a list of tuples (state, obj) for the given
1490 key.
1491
1492 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1493 """
1494
1495 impl = state.manager[key].impl
1496 x = impl.get(state, dict_, passive=passive)
1497 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1498 return []
1499 elif is_has_collection_adapter(impl):
1500 return [
1501 (attributes.instance_state(o), o)
1502 for o in impl.get_collection(state, dict_, x, passive=passive)
1503 ]
1504 else:
1505 return [(attributes.instance_state(x), x)]
1506
1507 def cascade_iterator(
1508 self,
1509 type_: str,
1510 state: InstanceState[Any],
1511 dict_: _InstanceDict,
1512 visited_states: Set[InstanceState[Any]],
1513 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1514 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1515 # assert type_ in self._cascade
1516
1517 # only actively lazy load on the 'delete' cascade
1518 if type_ != "delete" or self.passive_deletes:
1519 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1520 else:
1521 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1522
1523 if type_ == "save-update":
1524 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1525 else:
1526 tuples = self._value_as_iterable(
1527 state, dict_, self.key, passive=passive
1528 )
1529
1530 skip_pending = (
1531 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1532 )
1533
1534 for instance_state, c in tuples:
1535 if instance_state in visited_states:
1536 continue
1537
1538 if c is None:
1539 # would like to emit a warning here, but
1540 # would not be consistent with collection.append(None)
1541 # current behavior of silently skipping.
1542 # see [ticket:2229]
1543 continue
1544
1545 assert instance_state is not None
1546 instance_dict = attributes.instance_dict(c)
1547
1548 if halt_on and halt_on(instance_state):
1549 continue
1550
1551 if skip_pending and not instance_state.key:
1552 continue
1553
1554 instance_mapper = instance_state.manager.mapper
1555
1556 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1557 raise AssertionError(
1558 "Attribute '%s' on class '%s' "
1559 "doesn't handle objects "
1560 "of type '%s'"
1561 % (self.key, self.parent.class_, c.__class__)
1562 )
1563
1564 visited_states.add(instance_state)
1565
1566 yield c, instance_mapper, instance_state, instance_dict
1567
1568 @property
1569 def _effective_sync_backref(self) -> bool:
1570 if self.viewonly:
1571 return False
1572 else:
1573 return self.sync_backref is not False
1574
1575 @staticmethod
1576 def _check_sync_backref(
1577 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1578 ) -> None:
1579 if rel_a.viewonly and rel_b.sync_backref:
1580 raise sa_exc.InvalidRequestError(
1581 "Relationship %s cannot specify sync_backref=True since %s "
1582 "includes viewonly=True." % (rel_b, rel_a)
1583 )
1584 if (
1585 rel_a.viewonly
1586 and not rel_b.viewonly
1587 and rel_b.sync_backref is not False
1588 ):
1589 rel_b.sync_backref = False
1590
1591 def _add_reverse_property(self, key: str) -> None:
1592 other = self.mapper.get_property(key, _configure_mappers=False)
1593 if not isinstance(other, RelationshipProperty):
1594 raise sa_exc.InvalidRequestError(
1595 "back_populates on relationship '%s' refers to attribute '%s' "
1596 "that is not a relationship. The back_populates parameter "
1597 "should refer to the name of a relationship on the target "
1598 "class." % (self, other)
1599 )
1600 # viewonly and sync_backref cases
1601 # 1. self.viewonly==True and other.sync_backref==True -> error
1602 # 2. self.viewonly==True and other.viewonly==False and
1603 # other.sync_backref==None -> warn sync_backref=False, set to False
1604 self._check_sync_backref(self, other)
1605 # 3. other.viewonly==True and self.sync_backref==True -> error
1606 # 4. other.viewonly==True and self.viewonly==False and
1607 # self.sync_backref==None -> warn sync_backref=False, set to False
1608 self._check_sync_backref(other, self)
1609
1610 self._reverse_property.add(other)
1611 other._reverse_property.add(self)
1612
1613 other._setup_entity()
1614
1615 if not other.mapper.common_parent(self.parent):
1616 raise sa_exc.ArgumentError(
1617 "reverse_property %r on "
1618 "relationship %s references relationship %s, which "
1619 "does not reference mapper %s"
1620 % (key, self, other, self.parent)
1621 )
1622
1623 if (
1624 other._configure_started
1625 and self.direction in (ONETOMANY, MANYTOONE)
1626 and self.direction == other.direction
1627 ):
1628 raise sa_exc.ArgumentError(
1629 "%s and back-reference %s are "
1630 "both of the same direction %r. Did you mean to "
1631 "set remote_side on the many-to-one side ?"
1632 % (other, self, self.direction)
1633 )
1634
1635 @util.memoized_property
1636 def entity(self) -> _InternalEntityType[_T]:
1637 """Return the target mapped entity, which is an inspect() of the
1638 class or aliased class that is referenced by this
1639 :class:`.RelationshipProperty`.
1640
1641 """
1642 self.parent._check_configure()
1643 return self.entity
1644
1645 @util.memoized_property
1646 def mapper(self) -> Mapper[_T]:
1647 """Return the targeted :class:`_orm.Mapper` for this
1648 :class:`.RelationshipProperty`.
1649
1650 """
1651 return self.entity.mapper
1652
1653 def do_init(self) -> None:
1654 self._check_conflicts()
1655 self._process_dependent_arguments()
1656 self._setup_entity()
1657 self._setup_registry_dependencies()
1658 self._setup_join_conditions()
1659 self._check_cascade_settings(self._cascade)
1660 self._post_init()
1661 self._generate_backref()
1662 self._join_condition._warn_for_conflicting_sync_targets()
1663 super().do_init()
1664 self._lazy_strategy = cast(
1665 "LazyLoader", self._get_strategy((("lazy", "select"),))
1666 )
1667
1668 def _setup_registry_dependencies(self) -> None:
1669 self.parent.mapper.registry._set_depends_on(
1670 self.entity.mapper.registry
1671 )
1672
1673 def _process_dependent_arguments(self) -> None:
1674 """Convert incoming configuration arguments to their
1675 proper form.
1676
1677 Callables are resolved, ORM annotations removed.
1678
1679 """
1680
1681 # accept callables for other attributes which may require
1682 # deferred initialization. This technique is used
1683 # by declarative "string configs" and some recipes.
1684 init_args = self._init_args
1685
1686 for attr in (
1687 "order_by",
1688 "primaryjoin",
1689 "secondaryjoin",
1690 "secondary",
1691 "foreign_keys",
1692 "remote_side",
1693 ):
1694 rel_arg = getattr(init_args, attr)
1695
1696 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1697
1698 # remove "annotations" which are present if mapped class
1699 # descriptors are used to create the join expression.
1700 for attr in "primaryjoin", "secondaryjoin":
1701 rel_arg = getattr(init_args, attr)
1702 val = rel_arg.resolved
1703 if val is not None:
1704 rel_arg.resolved = _orm_deannotate(
1705 coercions.expect(
1706 roles.ColumnArgumentRole, val, argname=attr
1707 )
1708 )
1709
1710 secondary = init_args.secondary.resolved
1711 if secondary is not None and _is_mapped_class(secondary):
1712 raise sa_exc.ArgumentError(
1713 "secondary argument %s passed to to relationship() %s must "
1714 "be a Table object or other FROM clause; can't send a mapped "
1715 "class directly as rows in 'secondary' are persisted "
1716 "independently of a class that is mapped "
1717 "to that same table." % (secondary, self)
1718 )
1719
1720 # ensure expressions in self.order_by, foreign_keys,
1721 # remote_side are all columns, not strings.
1722 if (
1723 init_args.order_by.resolved is not False
1724 and init_args.order_by.resolved is not None
1725 ):
1726 self.order_by = tuple(
1727 coercions.expect(
1728 roles.ColumnArgumentRole, x, argname="order_by"
1729 )
1730 for x in util.to_list(init_args.order_by.resolved)
1731 )
1732 else:
1733 self.order_by = False
1734
1735 self._user_defined_foreign_keys = util.column_set(
1736 coercions.expect(
1737 roles.ColumnArgumentRole, x, argname="foreign_keys"
1738 )
1739 for x in util.to_column_set(init_args.foreign_keys.resolved)
1740 )
1741
1742 self.remote_side = util.column_set(
1743 coercions.expect(
1744 roles.ColumnArgumentRole, x, argname="remote_side"
1745 )
1746 for x in util.to_column_set(init_args.remote_side.resolved)
1747 )
1748
1749 def declarative_scan(
1750 self,
1751 decl_scan: _ClassScanMapperConfig,
1752 registry: _RegistryType,
1753 cls: Type[Any],
1754 originating_module: Optional[str],
1755 key: str,
1756 mapped_container: Optional[Type[Mapped[Any]]],
1757 annotation: Optional[_AnnotationScanType],
1758 extracted_mapped_annotation: Optional[_AnnotationScanType],
1759 is_dataclass_field: bool,
1760 ) -> None:
1761 if extracted_mapped_annotation is None:
1762 if self.argument is None:
1763 self._raise_for_required(key, cls)
1764 else:
1765 return
1766
1767 argument = extracted_mapped_annotation
1768 assert originating_module is not None
1769
1770 if mapped_container is not None:
1771 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1772 is_dynamic = issubclass(mapped_container, DynamicMapped)
1773 if is_write_only:
1774 self.lazy = "write_only"
1775 self.strategy_key = (("lazy", self.lazy),)
1776 elif is_dynamic:
1777 self.lazy = "dynamic"
1778 self.strategy_key = (("lazy", self.lazy),)
1779 else:
1780 is_write_only = is_dynamic = False
1781
1782 argument = de_optionalize_union_types(argument)
1783
1784 if hasattr(argument, "__origin__"):
1785 arg_origin = argument.__origin__
1786 if isinstance(arg_origin, type) and issubclass(
1787 arg_origin, abc.Collection
1788 ):
1789 if self.collection_class is None:
1790 if _py_inspect.isabstract(arg_origin):
1791 raise sa_exc.ArgumentError(
1792 f"Collection annotation type {arg_origin} cannot "
1793 "be instantiated; please provide an explicit "
1794 "'collection_class' parameter "
1795 "(e.g. list, set, etc.) to the "
1796 "relationship() function to accompany this "
1797 "annotation"
1798 )
1799
1800 self.collection_class = arg_origin
1801
1802 elif not is_write_only and not is_dynamic:
1803 self.uselist = False
1804
1805 if argument.__args__: # type: ignore
1806 if isinstance(arg_origin, type) and issubclass(
1807 arg_origin, typing.Mapping
1808 ):
1809 type_arg = argument.__args__[-1] # type: ignore
1810 else:
1811 type_arg = argument.__args__[0] # type: ignore
1812 if hasattr(type_arg, "__forward_arg__"):
1813 str_argument = type_arg.__forward_arg__
1814
1815 argument = resolve_name_to_real_class_name(
1816 str_argument, originating_module
1817 )
1818 else:
1819 argument = type_arg
1820 else:
1821 raise sa_exc.ArgumentError(
1822 f"Generic alias {argument} requires an argument"
1823 )
1824 elif hasattr(argument, "__forward_arg__"):
1825 argument = argument.__forward_arg__
1826
1827 argument = resolve_name_to_real_class_name(
1828 argument, originating_module
1829 )
1830
1831 if (
1832 self.collection_class is None
1833 and not is_write_only
1834 and not is_dynamic
1835 ):
1836 self.uselist = False
1837
1838 # ticket #8759
1839 # if a lead argument was given to relationship(), like
1840 # `relationship("B")`, use that, don't replace it with class we
1841 # found in the annotation. The declarative_scan() method call here is
1842 # still useful, as we continue to derive collection type and do
1843 # checking of the annotation in any case.
1844 if self.argument is None:
1845 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1846
1847 @util.preload_module("sqlalchemy.orm.mapper")
1848 def _setup_entity(self, __argument: Any = None) -> None:
1849 if "entity" in self.__dict__:
1850 return
1851
1852 mapperlib = util.preloaded.orm_mapper
1853
1854 if __argument:
1855 argument = __argument
1856 else:
1857 argument = self.argument
1858
1859 resolved_argument: _ExternalEntityType[Any]
1860
1861 if isinstance(argument, str):
1862 # we might want to cleanup clsregistry API to make this
1863 # more straightforward
1864 resolved_argument = cast(
1865 "_ExternalEntityType[Any]",
1866 self._clsregistry_resolve_name(argument)(),
1867 )
1868 elif callable(argument) and not isinstance(
1869 argument, (type, mapperlib.Mapper)
1870 ):
1871 resolved_argument = argument()
1872 else:
1873 resolved_argument = argument
1874
1875 entity: _InternalEntityType[Any]
1876
1877 if isinstance(resolved_argument, type):
1878 entity = class_mapper(resolved_argument, configure=False)
1879 else:
1880 try:
1881 entity = inspect(resolved_argument)
1882 except sa_exc.NoInspectionAvailable:
1883 entity = None # type: ignore
1884
1885 if not hasattr(entity, "mapper"):
1886 raise sa_exc.ArgumentError(
1887 "relationship '%s' expects "
1888 "a class or a mapper argument (received: %s)"
1889 % (self.key, type(resolved_argument))
1890 )
1891
1892 self.entity = entity
1893 self.target = self.entity.persist_selectable
1894
1895 def _setup_join_conditions(self) -> None:
1896 self._join_condition = jc = JoinCondition(
1897 parent_persist_selectable=self.parent.persist_selectable,
1898 child_persist_selectable=self.entity.persist_selectable,
1899 parent_local_selectable=self.parent.local_table,
1900 child_local_selectable=self.entity.local_table,
1901 primaryjoin=self._init_args.primaryjoin.resolved,
1902 secondary=self._init_args.secondary.resolved,
1903 secondaryjoin=self._init_args.secondaryjoin.resolved,
1904 parent_equivalents=self.parent._equivalent_columns,
1905 child_equivalents=self.mapper._equivalent_columns,
1906 consider_as_foreign_keys=self._user_defined_foreign_keys,
1907 local_remote_pairs=self.local_remote_pairs,
1908 remote_side=self.remote_side,
1909 self_referential=self._is_self_referential,
1910 prop=self,
1911 support_sync=not self.viewonly,
1912 can_be_synced_fn=self._columns_are_mapped,
1913 )
1914 self.primaryjoin = jc.primaryjoin
1915 self.secondaryjoin = jc.secondaryjoin
1916 self.secondary = jc.secondary
1917 self.direction = jc.direction
1918 self.local_remote_pairs = jc.local_remote_pairs
1919 self.remote_side = jc.remote_columns
1920 self.local_columns = jc.local_columns
1921 self.synchronize_pairs = jc.synchronize_pairs
1922 self._calculated_foreign_keys = jc.foreign_key_columns
1923 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1924
1925 @property
1926 def _clsregistry_resolve_arg(
1927 self,
1928 ) -> Callable[[str, bool], _class_resolver]:
1929 return self._clsregistry_resolvers[1]
1930
1931 @property
1932 def _clsregistry_resolve_name(
1933 self,
1934 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
1935 return self._clsregistry_resolvers[0]
1936
1937 @util.memoized_property
1938 @util.preload_module("sqlalchemy.orm.clsregistry")
1939 def _clsregistry_resolvers(
1940 self,
1941 ) -> Tuple[
1942 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
1943 Callable[[str, bool], _class_resolver],
1944 ]:
1945 _resolver = util.preloaded.orm_clsregistry._resolver
1946
1947 return _resolver(self.parent.class_, self)
1948
1949 def _check_conflicts(self) -> None:
1950 """Test that this relationship is legal, warn about
1951 inheritance conflicts."""
1952 if self.parent.non_primary and not class_mapper(
1953 self.parent.class_, configure=False
1954 ).has_property(self.key):
1955 raise sa_exc.ArgumentError(
1956 "Attempting to assign a new "
1957 "relationship '%s' to a non-primary mapper on "
1958 "class '%s'. New relationships can only be added "
1959 "to the primary mapper, i.e. the very first mapper "
1960 "created for class '%s' "
1961 % (
1962 self.key,
1963 self.parent.class_.__name__,
1964 self.parent.class_.__name__,
1965 )
1966 )
1967
1968 @property
1969 def cascade(self) -> CascadeOptions:
1970 """Return the current cascade setting for this
1971 :class:`.RelationshipProperty`.
1972 """
1973 return self._cascade
1974
1975 @cascade.setter
1976 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
1977 self._set_cascade(cascade)
1978
1979 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
1980 cascade = CascadeOptions(cascade_arg)
1981
1982 if self.viewonly:
1983 cascade = CascadeOptions(
1984 cascade.intersection(CascadeOptions._viewonly_cascades)
1985 )
1986
1987 if "mapper" in self.__dict__:
1988 self._check_cascade_settings(cascade)
1989 self._cascade = cascade
1990
1991 if self._dependency_processor:
1992 self._dependency_processor.cascade = cascade
1993
1994 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
1995 if (
1996 cascade.delete_orphan
1997 and not self.single_parent
1998 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
1999 ):
2000 raise sa_exc.ArgumentError(
2001 "For %(direction)s relationship %(rel)s, delete-orphan "
2002 "cascade is normally "
2003 'configured only on the "one" side of a one-to-many '
2004 "relationship, "
2005 'and not on the "many" side of a many-to-one or many-to-many '
2006 "relationship. "
2007 "To force this relationship to allow a particular "
2008 '"%(relatedcls)s" object to be referenced by only '
2009 'a single "%(clsname)s" object at a time via the '
2010 "%(rel)s relationship, which "
2011 "would allow "
2012 "delete-orphan cascade to take place in this direction, set "
2013 "the single_parent=True flag."
2014 % {
2015 "rel": self,
2016 "direction": (
2017 "many-to-one"
2018 if self.direction is MANYTOONE
2019 else "many-to-many"
2020 ),
2021 "clsname": self.parent.class_.__name__,
2022 "relatedcls": self.mapper.class_.__name__,
2023 },
2024 code="bbf0",
2025 )
2026
2027 if self.passive_deletes == "all" and (
2028 "delete" in cascade or "delete-orphan" in cascade
2029 ):
2030 raise sa_exc.ArgumentError(
2031 "On %s, can't set passive_deletes='all' in conjunction "
2032 "with 'delete' or 'delete-orphan' cascade" % self
2033 )
2034
2035 if cascade.delete_orphan:
2036 self.mapper.primary_mapper()._delete_orphans.append(
2037 (self.key, self.parent.class_)
2038 )
2039
2040 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2041 """Return True if this property will persist values on behalf
2042 of the given mapper.
2043
2044 """
2045
2046 return (
2047 self.key in mapper.relationships
2048 and mapper.relationships[self.key] is self
2049 )
2050
2051 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2052 """Return True if all columns in the given collection are
2053 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2054
2055 """
2056
2057 secondary = self._init_args.secondary.resolved
2058 for c in cols:
2059 if secondary is not None and secondary.c.contains_column(c):
2060 continue
2061 if not self.parent.persist_selectable.c.contains_column(
2062 c
2063 ) and not self.target.c.contains_column(c):
2064 return False
2065 return True
2066
2067 def _generate_backref(self) -> None:
2068 """Interpret the 'backref' instruction to create a
2069 :func:`_orm.relationship` complementary to this one."""
2070
2071 if self.parent.non_primary:
2072 return
2073 if self.backref is not None and not self.back_populates:
2074 kwargs: Dict[str, Any]
2075 if isinstance(self.backref, str):
2076 backref_key, kwargs = self.backref, {}
2077 else:
2078 backref_key, kwargs = self.backref
2079 mapper = self.mapper.primary_mapper()
2080
2081 if not mapper.concrete:
2082 check = set(mapper.iterate_to_root()).union(
2083 mapper.self_and_descendants
2084 )
2085 for m in check:
2086 if m.has_property(backref_key) and not m.concrete:
2087 raise sa_exc.ArgumentError(
2088 "Error creating backref "
2089 "'%s' on relationship '%s': property of that "
2090 "name exists on mapper '%s'"
2091 % (backref_key, self, m)
2092 )
2093
2094 # determine primaryjoin/secondaryjoin for the
2095 # backref. Use the one we had, so that
2096 # a custom join doesn't have to be specified in
2097 # both directions.
2098 if self.secondary is not None:
2099 # for many to many, just switch primaryjoin/
2100 # secondaryjoin. use the annotated
2101 # pj/sj on the _join_condition.
2102 pj = kwargs.pop(
2103 "primaryjoin",
2104 self._join_condition.secondaryjoin_minus_local,
2105 )
2106 sj = kwargs.pop(
2107 "secondaryjoin",
2108 self._join_condition.primaryjoin_minus_local,
2109 )
2110 else:
2111 pj = kwargs.pop(
2112 "primaryjoin",
2113 self._join_condition.primaryjoin_reverse_remote,
2114 )
2115 sj = kwargs.pop("secondaryjoin", None)
2116 if sj:
2117 raise sa_exc.InvalidRequestError(
2118 "Can't assign 'secondaryjoin' on a backref "
2119 "against a non-secondary relationship."
2120 )
2121
2122 foreign_keys = kwargs.pop(
2123 "foreign_keys", self._user_defined_foreign_keys
2124 )
2125 parent = self.parent.primary_mapper()
2126 kwargs.setdefault("viewonly", self.viewonly)
2127 kwargs.setdefault("post_update", self.post_update)
2128 kwargs.setdefault("passive_updates", self.passive_updates)
2129 kwargs.setdefault("sync_backref", self.sync_backref)
2130 self.back_populates = backref_key
2131 relationship = RelationshipProperty(
2132 parent,
2133 self.secondary,
2134 primaryjoin=pj,
2135 secondaryjoin=sj,
2136 foreign_keys=foreign_keys,
2137 back_populates=self.key,
2138 **kwargs,
2139 )
2140 mapper._configure_property(
2141 backref_key, relationship, warn_for_existing=True
2142 )
2143
2144 if self.back_populates:
2145 self._add_reverse_property(self.back_populates)
2146
2147 @util.preload_module("sqlalchemy.orm.dependency")
2148 def _post_init(self) -> None:
2149 dependency = util.preloaded.orm_dependency
2150
2151 if self.uselist is None:
2152 self.uselist = self.direction is not MANYTOONE
2153 if not self.viewonly:
2154 self._dependency_processor = ( # type: ignore
2155 dependency.DependencyProcessor.from_relationship
2156 )(self)
2157
2158 @util.memoized_property
2159 def _use_get(self) -> bool:
2160 """memoize the 'use_get' attribute of this RelationshipLoader's
2161 lazyloader."""
2162
2163 strategy = self._lazy_strategy
2164 return strategy.use_get
2165
2166 @util.memoized_property
2167 def _is_self_referential(self) -> bool:
2168 return self.mapper.common_parent(self.parent)
2169
2170 def _create_joins(
2171 self,
2172 source_polymorphic: bool = False,
2173 source_selectable: Optional[FromClause] = None,
2174 dest_selectable: Optional[FromClause] = None,
2175 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2176 alias_secondary: bool = False,
2177 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2178 ) -> Tuple[
2179 ColumnElement[bool],
2180 Optional[ColumnElement[bool]],
2181 FromClause,
2182 FromClause,
2183 Optional[FromClause],
2184 Optional[ClauseAdapter],
2185 ]:
2186 aliased = False
2187
2188 if alias_secondary and self.secondary is not None:
2189 aliased = True
2190
2191 if source_selectable is None:
2192 if source_polymorphic and self.parent.with_polymorphic:
2193 source_selectable = self.parent._with_polymorphic_selectable
2194
2195 if of_type_entity:
2196 dest_mapper = of_type_entity.mapper
2197 if dest_selectable is None:
2198 dest_selectable = of_type_entity.selectable
2199 aliased = True
2200 else:
2201 dest_mapper = self.mapper
2202
2203 if dest_selectable is None:
2204 dest_selectable = self.entity.selectable
2205 if self.mapper.with_polymorphic:
2206 aliased = True
2207
2208 if self._is_self_referential and source_selectable is None:
2209 dest_selectable = dest_selectable._anonymous_fromclause()
2210 aliased = True
2211 elif (
2212 dest_selectable is not self.mapper._with_polymorphic_selectable
2213 or self.mapper.with_polymorphic
2214 ):
2215 aliased = True
2216
2217 single_crit = dest_mapper._single_table_criterion
2218 aliased = aliased or (
2219 source_selectable is not None
2220 and (
2221 source_selectable
2222 is not self.parent._with_polymorphic_selectable
2223 or source_selectable._is_subquery
2224 )
2225 )
2226
2227 (
2228 primaryjoin,
2229 secondaryjoin,
2230 secondary,
2231 target_adapter,
2232 dest_selectable,
2233 ) = self._join_condition.join_targets(
2234 source_selectable,
2235 dest_selectable,
2236 aliased,
2237 single_crit,
2238 extra_criteria,
2239 )
2240 if source_selectable is None:
2241 source_selectable = self.parent.local_table
2242 if dest_selectable is None:
2243 dest_selectable = self.entity.local_table
2244 return (
2245 primaryjoin,
2246 secondaryjoin,
2247 source_selectable,
2248 dest_selectable,
2249 secondary,
2250 target_adapter,
2251 )
2252
2253
2254def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2255 def clone(elem: _CE) -> _CE:
2256 if isinstance(elem, expression.ColumnClause):
2257 elem = elem._annotate(annotations.copy()) # type: ignore
2258 elem._copy_internals(clone=clone)
2259 return elem
2260
2261 if element is not None:
2262 element = clone(element)
2263 clone = None # type: ignore # remove gc cycles
2264 return element
2265
2266
2267class JoinCondition:
2268 primaryjoin_initial: Optional[ColumnElement[bool]]
2269 primaryjoin: ColumnElement[bool]
2270 secondaryjoin: Optional[ColumnElement[bool]]
2271 secondary: Optional[FromClause]
2272 prop: RelationshipProperty[Any]
2273
2274 synchronize_pairs: _ColumnPairs
2275 secondary_synchronize_pairs: _ColumnPairs
2276 direction: RelationshipDirection
2277
2278 parent_persist_selectable: FromClause
2279 child_persist_selectable: FromClause
2280 parent_local_selectable: FromClause
2281 child_local_selectable: FromClause
2282
2283 _local_remote_pairs: Optional[_ColumnPairs]
2284
2285 def __init__(
2286 self,
2287 parent_persist_selectable: FromClause,
2288 child_persist_selectable: FromClause,
2289 parent_local_selectable: FromClause,
2290 child_local_selectable: FromClause,
2291 *,
2292 primaryjoin: Optional[ColumnElement[bool]] = None,
2293 secondary: Optional[FromClause] = None,
2294 secondaryjoin: Optional[ColumnElement[bool]] = None,
2295 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2296 child_equivalents: Optional[_EquivalentColumnMap] = None,
2297 consider_as_foreign_keys: Any = None,
2298 local_remote_pairs: Optional[_ColumnPairs] = None,
2299 remote_side: Any = None,
2300 self_referential: Any = False,
2301 prop: RelationshipProperty[Any],
2302 support_sync: bool = True,
2303 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2304 ):
2305 self.parent_persist_selectable = parent_persist_selectable
2306 self.parent_local_selectable = parent_local_selectable
2307 self.child_persist_selectable = child_persist_selectable
2308 self.child_local_selectable = child_local_selectable
2309 self.parent_equivalents = parent_equivalents
2310 self.child_equivalents = child_equivalents
2311 self.primaryjoin_initial = primaryjoin
2312 self.secondaryjoin = secondaryjoin
2313 self.secondary = secondary
2314 self.consider_as_foreign_keys = consider_as_foreign_keys
2315 self._local_remote_pairs = local_remote_pairs
2316 self._remote_side = remote_side
2317 self.prop = prop
2318 self.self_referential = self_referential
2319 self.support_sync = support_sync
2320 self.can_be_synced_fn = can_be_synced_fn
2321
2322 self._determine_joins()
2323 assert self.primaryjoin is not None
2324
2325 self._sanitize_joins()
2326 self._annotate_fks()
2327 self._annotate_remote()
2328 self._annotate_local()
2329 self._annotate_parentmapper()
2330 self._setup_pairs()
2331 self._check_foreign_cols(self.primaryjoin, True)
2332 if self.secondaryjoin is not None:
2333 self._check_foreign_cols(self.secondaryjoin, False)
2334 self._determine_direction()
2335 self._check_remote_side()
2336 self._log_joins()
2337
2338 def _log_joins(self) -> None:
2339 log = self.prop.logger
2340 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2341 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2342 log.info(
2343 "%s synchronize pairs [%s]",
2344 self.prop,
2345 ",".join(
2346 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2347 ),
2348 )
2349 log.info(
2350 "%s secondary synchronize pairs [%s]",
2351 self.prop,
2352 ",".join(
2353 "(%s => %s)" % (l, r)
2354 for (l, r) in self.secondary_synchronize_pairs or []
2355 ),
2356 )
2357 log.info(
2358 "%s local/remote pairs [%s]",
2359 self.prop,
2360 ",".join(
2361 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2362 ),
2363 )
2364 log.info(
2365 "%s remote columns [%s]",
2366 self.prop,
2367 ",".join("%s" % col for col in self.remote_columns),
2368 )
2369 log.info(
2370 "%s local columns [%s]",
2371 self.prop,
2372 ",".join("%s" % col for col in self.local_columns),
2373 )
2374 log.info("%s relationship direction %s", self.prop, self.direction)
2375
2376 def _sanitize_joins(self) -> None:
2377 """remove the parententity annotation from our join conditions which
2378 can leak in here based on some declarative patterns and maybe others.
2379
2380 "parentmapper" is relied upon both by the ORM evaluator as well as
2381 the use case in _join_fixture_inh_selfref_w_entity
2382 that relies upon it being present, see :ticket:`3364`.
2383
2384 """
2385
2386 self.primaryjoin = _deep_deannotate(
2387 self.primaryjoin, values=("parententity", "proxy_key")
2388 )
2389 if self.secondaryjoin is not None:
2390 self.secondaryjoin = _deep_deannotate(
2391 self.secondaryjoin, values=("parententity", "proxy_key")
2392 )
2393
2394 def _determine_joins(self) -> None:
2395 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2396 if not passed to the constructor already.
2397
2398 This is based on analysis of the foreign key relationships
2399 between the parent and target mapped selectables.
2400
2401 """
2402 if self.secondaryjoin is not None and self.secondary is None:
2403 raise sa_exc.ArgumentError(
2404 "Property %s specified with secondary "
2405 "join condition but "
2406 "no secondary argument" % self.prop
2407 )
2408
2409 # find a join between the given mapper's mapped table and
2410 # the given table. will try the mapper's local table first
2411 # for more specificity, then if not found will try the more
2412 # general mapped table, which in the case of inheritance is
2413 # a join.
2414 try:
2415 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2416 if self.secondary is not None:
2417 if self.secondaryjoin is None:
2418 self.secondaryjoin = join_condition(
2419 self.child_persist_selectable,
2420 self.secondary,
2421 a_subset=self.child_local_selectable,
2422 consider_as_foreign_keys=consider_as_foreign_keys,
2423 )
2424 if self.primaryjoin_initial is None:
2425 self.primaryjoin = join_condition(
2426 self.parent_persist_selectable,
2427 self.secondary,
2428 a_subset=self.parent_local_selectable,
2429 consider_as_foreign_keys=consider_as_foreign_keys,
2430 )
2431 else:
2432 self.primaryjoin = self.primaryjoin_initial
2433 else:
2434 if self.primaryjoin_initial is None:
2435 self.primaryjoin = join_condition(
2436 self.parent_persist_selectable,
2437 self.child_persist_selectable,
2438 a_subset=self.parent_local_selectable,
2439 consider_as_foreign_keys=consider_as_foreign_keys,
2440 )
2441 else:
2442 self.primaryjoin = self.primaryjoin_initial
2443 except sa_exc.NoForeignKeysError as nfe:
2444 if self.secondary is not None:
2445 raise sa_exc.NoForeignKeysError(
2446 "Could not determine join "
2447 "condition between parent/child tables on "
2448 "relationship %s - there are no foreign keys "
2449 "linking these tables via secondary table '%s'. "
2450 "Ensure that referencing columns are associated "
2451 "with a ForeignKey or ForeignKeyConstraint, or "
2452 "specify 'primaryjoin' and 'secondaryjoin' "
2453 "expressions." % (self.prop, self.secondary)
2454 ) from nfe
2455 else:
2456 raise sa_exc.NoForeignKeysError(
2457 "Could not determine join "
2458 "condition between parent/child tables on "
2459 "relationship %s - there are no foreign keys "
2460 "linking these tables. "
2461 "Ensure that referencing columns are associated "
2462 "with a ForeignKey or ForeignKeyConstraint, or "
2463 "specify a 'primaryjoin' expression." % self.prop
2464 ) from nfe
2465 except sa_exc.AmbiguousForeignKeysError as afe:
2466 if self.secondary is not None:
2467 raise sa_exc.AmbiguousForeignKeysError(
2468 "Could not determine join "
2469 "condition between parent/child tables on "
2470 "relationship %s - there are multiple foreign key "
2471 "paths linking the tables via secondary table '%s'. "
2472 "Specify the 'foreign_keys' "
2473 "argument, providing a list of those columns which "
2474 "should be counted as containing a foreign key "
2475 "reference from the secondary table to each of the "
2476 "parent and child tables." % (self.prop, self.secondary)
2477 ) from afe
2478 else:
2479 raise sa_exc.AmbiguousForeignKeysError(
2480 "Could not determine join "
2481 "condition between parent/child tables on "
2482 "relationship %s - there are multiple foreign key "
2483 "paths linking the tables. Specify the "
2484 "'foreign_keys' argument, providing a list of those "
2485 "columns which should be counted as containing a "
2486 "foreign key reference to the parent table." % self.prop
2487 ) from afe
2488
2489 @property
2490 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2491 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2492
2493 @property
2494 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2495 assert self.secondaryjoin is not None
2496 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2497
2498 @util.memoized_property
2499 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2500 """Return the primaryjoin condition suitable for the
2501 "reverse" direction.
2502
2503 If the primaryjoin was delivered here with pre-existing
2504 "remote" annotations, the local/remote annotations
2505 are reversed. Otherwise, the local/remote annotations
2506 are removed.
2507
2508 """
2509 if self._has_remote_annotations:
2510
2511 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2512 if "remote" in element._annotations:
2513 v = dict(element._annotations)
2514 del v["remote"]
2515 v["local"] = True
2516 return element._with_annotations(v)
2517 elif "local" in element._annotations:
2518 v = dict(element._annotations)
2519 del v["local"]
2520 v["remote"] = True
2521 return element._with_annotations(v)
2522
2523 return None
2524
2525 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2526 else:
2527 if self._has_foreign_annotations:
2528 # TODO: coverage
2529 return _deep_deannotate(
2530 self.primaryjoin, values=("local", "remote")
2531 )
2532 else:
2533 return _deep_deannotate(self.primaryjoin)
2534
2535 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2536 for col in visitors.iterate(clause, {}):
2537 if annotation in col._annotations:
2538 return True
2539 else:
2540 return False
2541
2542 @util.memoized_property
2543 def _has_foreign_annotations(self) -> bool:
2544 return self._has_annotation(self.primaryjoin, "foreign")
2545
2546 @util.memoized_property
2547 def _has_remote_annotations(self) -> bool:
2548 return self._has_annotation(self.primaryjoin, "remote")
2549
2550 def _annotate_fks(self) -> None:
2551 """Annotate the primaryjoin and secondaryjoin
2552 structures with 'foreign' annotations marking columns
2553 considered as foreign.
2554
2555 """
2556 if self._has_foreign_annotations:
2557 return
2558
2559 if self.consider_as_foreign_keys:
2560 self._annotate_from_fk_list()
2561 else:
2562 self._annotate_present_fks()
2563
2564 def _annotate_from_fk_list(self) -> None:
2565 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2566 if element in self.consider_as_foreign_keys:
2567 return element._annotate({"foreign": True})
2568 return None
2569
2570 self.primaryjoin = visitors.replacement_traverse(
2571 self.primaryjoin, {}, check_fk
2572 )
2573 if self.secondaryjoin is not None:
2574 self.secondaryjoin = visitors.replacement_traverse(
2575 self.secondaryjoin, {}, check_fk
2576 )
2577
2578 def _annotate_present_fks(self) -> None:
2579 if self.secondary is not None:
2580 secondarycols = util.column_set(self.secondary.c)
2581 else:
2582 secondarycols = set()
2583
2584 def is_foreign(
2585 a: ColumnElement[Any], b: ColumnElement[Any]
2586 ) -> Optional[ColumnElement[Any]]:
2587 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2588 if a.references(b):
2589 return a
2590 elif b.references(a):
2591 return b
2592
2593 if secondarycols:
2594 if a in secondarycols and b not in secondarycols:
2595 return a
2596 elif b in secondarycols and a not in secondarycols:
2597 return b
2598
2599 return None
2600
2601 def visit_binary(binary: BinaryExpression[Any]) -> None:
2602 if not isinstance(
2603 binary.left, sql.ColumnElement
2604 ) or not isinstance(binary.right, sql.ColumnElement):
2605 return
2606
2607 if (
2608 "foreign" not in binary.left._annotations
2609 and "foreign" not in binary.right._annotations
2610 ):
2611 col = is_foreign(binary.left, binary.right)
2612 if col is not None:
2613 if col.compare(binary.left):
2614 binary.left = binary.left._annotate({"foreign": True})
2615 elif col.compare(binary.right):
2616 binary.right = binary.right._annotate(
2617 {"foreign": True}
2618 )
2619
2620 self.primaryjoin = visitors.cloned_traverse(
2621 self.primaryjoin, {}, {"binary": visit_binary}
2622 )
2623 if self.secondaryjoin is not None:
2624 self.secondaryjoin = visitors.cloned_traverse(
2625 self.secondaryjoin, {}, {"binary": visit_binary}
2626 )
2627
2628 def _refers_to_parent_table(self) -> bool:
2629 """Return True if the join condition contains column
2630 comparisons where both columns are in both tables.
2631
2632 """
2633 pt = self.parent_persist_selectable
2634 mt = self.child_persist_selectable
2635 result = False
2636
2637 def visit_binary(binary: BinaryExpression[Any]) -> None:
2638 nonlocal result
2639 c, f = binary.left, binary.right
2640 if (
2641 isinstance(c, expression.ColumnClause)
2642 and isinstance(f, expression.ColumnClause)
2643 and pt.is_derived_from(c.table)
2644 and pt.is_derived_from(f.table)
2645 and mt.is_derived_from(c.table)
2646 and mt.is_derived_from(f.table)
2647 ):
2648 result = True
2649
2650 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2651 return result
2652
2653 def _tables_overlap(self) -> bool:
2654 """Return True if parent/child tables have some overlap."""
2655
2656 return selectables_overlap(
2657 self.parent_persist_selectable, self.child_persist_selectable
2658 )
2659
2660 def _annotate_remote(self) -> None:
2661 """Annotate the primaryjoin and secondaryjoin
2662 structures with 'remote' annotations marking columns
2663 considered as part of the 'remote' side.
2664
2665 """
2666 if self._has_remote_annotations:
2667 return
2668
2669 if self.secondary is not None:
2670 self._annotate_remote_secondary()
2671 elif self._local_remote_pairs or self._remote_side:
2672 self._annotate_remote_from_args()
2673 elif self._refers_to_parent_table():
2674 self._annotate_selfref(
2675 lambda col: "foreign" in col._annotations, False
2676 )
2677 elif self._tables_overlap():
2678 self._annotate_remote_with_overlap()
2679 else:
2680 self._annotate_remote_distinct_selectables()
2681
2682 def _annotate_remote_secondary(self) -> None:
2683 """annotate 'remote' in primaryjoin, secondaryjoin
2684 when 'secondary' is present.
2685
2686 """
2687
2688 assert self.secondary is not None
2689 fixed_secondary = self.secondary
2690
2691 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2692 if fixed_secondary.c.contains_column(element):
2693 return element._annotate({"remote": True})
2694 return None
2695
2696 self.primaryjoin = visitors.replacement_traverse(
2697 self.primaryjoin, {}, repl
2698 )
2699
2700 assert self.secondaryjoin is not None
2701 self.secondaryjoin = visitors.replacement_traverse(
2702 self.secondaryjoin, {}, repl
2703 )
2704
2705 def _annotate_selfref(
2706 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2707 ) -> None:
2708 """annotate 'remote' in primaryjoin, secondaryjoin
2709 when the relationship is detected as self-referential.
2710
2711 """
2712
2713 def visit_binary(binary: BinaryExpression[Any]) -> None:
2714 equated = binary.left.compare(binary.right)
2715 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2716 binary.right, expression.ColumnClause
2717 ):
2718 # assume one to many - FKs are "remote"
2719 if fn(binary.left):
2720 binary.left = binary.left._annotate({"remote": True})
2721 if fn(binary.right) and not equated:
2722 binary.right = binary.right._annotate({"remote": True})
2723 elif not remote_side_given:
2724 self._warn_non_column_elements()
2725
2726 self.primaryjoin = visitors.cloned_traverse(
2727 self.primaryjoin, {}, {"binary": visit_binary}
2728 )
2729
2730 def _annotate_remote_from_args(self) -> None:
2731 """annotate 'remote' in primaryjoin, secondaryjoin
2732 when the 'remote_side' or '_local_remote_pairs'
2733 arguments are used.
2734
2735 """
2736 if self._local_remote_pairs:
2737 if self._remote_side:
2738 raise sa_exc.ArgumentError(
2739 "remote_side argument is redundant "
2740 "against more detailed _local_remote_side "
2741 "argument."
2742 )
2743
2744 remote_side = [r for (l, r) in self._local_remote_pairs]
2745 else:
2746 remote_side = self._remote_side
2747
2748 if self._refers_to_parent_table():
2749 self._annotate_selfref(lambda col: col in remote_side, True)
2750 else:
2751
2752 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2753 # use set() to avoid generating ``__eq__()`` expressions
2754 # against each element
2755 if element in set(remote_side):
2756 return element._annotate({"remote": True})
2757 return None
2758
2759 self.primaryjoin = visitors.replacement_traverse(
2760 self.primaryjoin, {}, repl
2761 )
2762
2763 def _annotate_remote_with_overlap(self) -> None:
2764 """annotate 'remote' in primaryjoin, secondaryjoin
2765 when the parent/child tables have some set of
2766 tables in common, though is not a fully self-referential
2767 relationship.
2768
2769 """
2770
2771 def visit_binary(binary: BinaryExpression[Any]) -> None:
2772 binary.left, binary.right = proc_left_right(
2773 binary.left, binary.right
2774 )
2775 binary.right, binary.left = proc_left_right(
2776 binary.right, binary.left
2777 )
2778
2779 check_entities = (
2780 self.prop is not None and self.prop.mapper is not self.prop.parent
2781 )
2782
2783 def proc_left_right(
2784 left: ColumnElement[Any], right: ColumnElement[Any]
2785 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2786 if isinstance(left, expression.ColumnClause) and isinstance(
2787 right, expression.ColumnClause
2788 ):
2789 if self.child_persist_selectable.c.contains_column(
2790 right
2791 ) and self.parent_persist_selectable.c.contains_column(left):
2792 right = right._annotate({"remote": True})
2793 elif (
2794 check_entities
2795 and right._annotations.get("parentmapper") is self.prop.mapper
2796 ):
2797 right = right._annotate({"remote": True})
2798 elif (
2799 check_entities
2800 and left._annotations.get("parentmapper") is self.prop.mapper
2801 ):
2802 left = left._annotate({"remote": True})
2803 else:
2804 self._warn_non_column_elements()
2805
2806 return left, right
2807
2808 self.primaryjoin = visitors.cloned_traverse(
2809 self.primaryjoin, {}, {"binary": visit_binary}
2810 )
2811
2812 def _annotate_remote_distinct_selectables(self) -> None:
2813 """annotate 'remote' in primaryjoin, secondaryjoin
2814 when the parent/child tables are entirely
2815 separate.
2816
2817 """
2818
2819 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2820 if self.child_persist_selectable.c.contains_column(element) and (
2821 not self.parent_local_selectable.c.contains_column(element)
2822 or self.child_local_selectable.c.contains_column(element)
2823 ):
2824 return element._annotate({"remote": True})
2825 return None
2826
2827 self.primaryjoin = visitors.replacement_traverse(
2828 self.primaryjoin, {}, repl
2829 )
2830
2831 def _warn_non_column_elements(self) -> None:
2832 util.warn(
2833 "Non-simple column elements in primary "
2834 "join condition for property %s - consider using "
2835 "remote() annotations to mark the remote side." % self.prop
2836 )
2837
2838 def _annotate_local(self) -> None:
2839 """Annotate the primaryjoin and secondaryjoin
2840 structures with 'local' annotations.
2841
2842 This annotates all column elements found
2843 simultaneously in the parent table
2844 and the join condition that don't have a
2845 'remote' annotation set up from
2846 _annotate_remote() or user-defined.
2847
2848 """
2849 if self._has_annotation(self.primaryjoin, "local"):
2850 return
2851
2852 if self._local_remote_pairs:
2853 local_side = util.column_set(
2854 [l for (l, r) in self._local_remote_pairs]
2855 )
2856 else:
2857 local_side = util.column_set(self.parent_persist_selectable.c)
2858
2859 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2860 if "remote" not in element._annotations and element in local_side:
2861 return element._annotate({"local": True})
2862 return None
2863
2864 self.primaryjoin = visitors.replacement_traverse(
2865 self.primaryjoin, {}, locals_
2866 )
2867
2868 def _annotate_parentmapper(self) -> None:
2869 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2870 if "remote" in element._annotations:
2871 return element._annotate({"parentmapper": self.prop.mapper})
2872 elif "local" in element._annotations:
2873 return element._annotate({"parentmapper": self.prop.parent})
2874 return None
2875
2876 self.primaryjoin = visitors.replacement_traverse(
2877 self.primaryjoin, {}, parentmappers_
2878 )
2879
2880 def _check_remote_side(self) -> None:
2881 if not self.local_remote_pairs:
2882 raise sa_exc.ArgumentError(
2883 "Relationship %s could "
2884 "not determine any unambiguous local/remote column "
2885 "pairs based on join condition and remote_side "
2886 "arguments. "
2887 "Consider using the remote() annotation to "
2888 "accurately mark those elements of the join "
2889 "condition that are on the remote side of "
2890 "the relationship." % (self.prop,)
2891 )
2892 else:
2893 not_target = util.column_set(
2894 self.parent_persist_selectable.c
2895 ).difference(self.child_persist_selectable.c)
2896
2897 for _, rmt in self.local_remote_pairs:
2898 if rmt in not_target:
2899 util.warn(
2900 "Expression %s is marked as 'remote', but these "
2901 "column(s) are local to the local side. The "
2902 "remote() annotation is needed only for a "
2903 "self-referential relationship where both sides "
2904 "of the relationship refer to the same tables."
2905 % (rmt,)
2906 )
2907
2908 def _check_foreign_cols(
2909 self, join_condition: ColumnElement[bool], primary: bool
2910 ) -> None:
2911 """Check the foreign key columns collected and emit error
2912 messages."""
2913 foreign_cols = self._gather_columns_with_annotation(
2914 join_condition, "foreign"
2915 )
2916
2917 has_foreign = bool(foreign_cols)
2918
2919 if primary:
2920 can_sync = bool(self.synchronize_pairs)
2921 else:
2922 can_sync = bool(self.secondary_synchronize_pairs)
2923
2924 if (
2925 self.support_sync
2926 and can_sync
2927 or (not self.support_sync and has_foreign)
2928 ):
2929 return
2930
2931 # from here below is just determining the best error message
2932 # to report. Check for a join condition using any operator
2933 # (not just ==), perhaps they need to turn on "viewonly=True".
2934 if self.support_sync and has_foreign and not can_sync:
2935 err = (
2936 "Could not locate any simple equality expressions "
2937 "involving locally mapped foreign key columns for "
2938 "%s join condition "
2939 "'%s' on relationship %s."
2940 % (
2941 primary and "primary" or "secondary",
2942 join_condition,
2943 self.prop,
2944 )
2945 )
2946 err += (
2947 " Ensure that referencing columns are associated "
2948 "with a ForeignKey or ForeignKeyConstraint, or are "
2949 "annotated in the join condition with the foreign() "
2950 "annotation. To allow comparison operators other than "
2951 "'==', the relationship can be marked as viewonly=True."
2952 )
2953
2954 raise sa_exc.ArgumentError(err)
2955 else:
2956 err = (
2957 "Could not locate any relevant foreign key columns "
2958 "for %s join condition '%s' on relationship %s."
2959 % (
2960 primary and "primary" or "secondary",
2961 join_condition,
2962 self.prop,
2963 )
2964 )
2965 err += (
2966 " Ensure that referencing columns are associated "
2967 "with a ForeignKey or ForeignKeyConstraint, or are "
2968 "annotated in the join condition with the foreign() "
2969 "annotation."
2970 )
2971 raise sa_exc.ArgumentError(err)
2972
2973 def _determine_direction(self) -> None:
2974 """Determine if this relationship is one to many, many to one,
2975 many to many.
2976
2977 """
2978 if self.secondaryjoin is not None:
2979 self.direction = MANYTOMANY
2980 else:
2981 parentcols = util.column_set(self.parent_persist_selectable.c)
2982 targetcols = util.column_set(self.child_persist_selectable.c)
2983
2984 # fk collection which suggests ONETOMANY.
2985 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
2986
2987 # fk collection which suggests MANYTOONE.
2988
2989 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
2990
2991 if onetomany_fk and manytoone_fk:
2992 # fks on both sides. test for overlap of local/remote
2993 # with foreign key.
2994 # we will gather columns directly from their annotations
2995 # without deannotating, so that we can distinguish on a column
2996 # that refers to itself.
2997
2998 # 1. columns that are both remote and FK suggest
2999 # onetomany.
3000 onetomany_local = self._gather_columns_with_annotation(
3001 self.primaryjoin, "remote", "foreign"
3002 )
3003
3004 # 2. columns that are FK but are not remote (e.g. local)
3005 # suggest manytoone.
3006 manytoone_local = {
3007 c
3008 for c in self._gather_columns_with_annotation(
3009 self.primaryjoin, "foreign"
3010 )
3011 if "remote" not in c._annotations
3012 }
3013
3014 # 3. if both collections are present, remove columns that
3015 # refer to themselves. This is for the case of
3016 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3017 if onetomany_local and manytoone_local:
3018 self_equated = self.remote_columns.intersection(
3019 self.local_columns
3020 )
3021 onetomany_local = onetomany_local.difference(self_equated)
3022 manytoone_local = manytoone_local.difference(self_equated)
3023
3024 # at this point, if only one or the other collection is
3025 # present, we know the direction, otherwise it's still
3026 # ambiguous.
3027
3028 if onetomany_local and not manytoone_local:
3029 self.direction = ONETOMANY
3030 elif manytoone_local and not onetomany_local:
3031 self.direction = MANYTOONE
3032 else:
3033 raise sa_exc.ArgumentError(
3034 "Can't determine relationship"
3035 " direction for relationship '%s' - foreign "
3036 "key columns within the join condition are present "
3037 "in both the parent and the child's mapped tables. "
3038 "Ensure that only those columns referring "
3039 "to a parent column are marked as foreign, "
3040 "either via the foreign() annotation or "
3041 "via the foreign_keys argument." % self.prop
3042 )
3043 elif onetomany_fk:
3044 self.direction = ONETOMANY
3045 elif manytoone_fk:
3046 self.direction = MANYTOONE
3047 else:
3048 raise sa_exc.ArgumentError(
3049 "Can't determine relationship "
3050 "direction for relationship '%s' - foreign "
3051 "key columns are present in neither the parent "
3052 "nor the child's mapped tables" % self.prop
3053 )
3054
3055 def _deannotate_pairs(
3056 self, collection: _ColumnPairIterable
3057 ) -> _MutableColumnPairs:
3058 """provide deannotation for the various lists of
3059 pairs, so that using them in hashes doesn't incur
3060 high-overhead __eq__() comparisons against
3061 original columns mapped.
3062
3063 """
3064 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3065
3066 def _setup_pairs(self) -> None:
3067 sync_pairs: _MutableColumnPairs = []
3068 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3069 util.OrderedSet([])
3070 )
3071 secondary_sync_pairs: _MutableColumnPairs = []
3072
3073 def go(
3074 joincond: ColumnElement[bool],
3075 collection: _MutableColumnPairs,
3076 ) -> None:
3077 def visit_binary(
3078 binary: BinaryExpression[Any],
3079 left: ColumnElement[Any],
3080 right: ColumnElement[Any],
3081 ) -> None:
3082 if (
3083 "remote" in right._annotations
3084 and "remote" not in left._annotations
3085 and self.can_be_synced_fn(left)
3086 ):
3087 lrp.add((left, right))
3088 elif (
3089 "remote" in left._annotations
3090 and "remote" not in right._annotations
3091 and self.can_be_synced_fn(right)
3092 ):
3093 lrp.add((right, left))
3094 if binary.operator is operators.eq and self.can_be_synced_fn(
3095 left, right
3096 ):
3097 if "foreign" in right._annotations:
3098 collection.append((left, right))
3099 elif "foreign" in left._annotations:
3100 collection.append((right, left))
3101
3102 visit_binary_product(visit_binary, joincond)
3103
3104 for joincond, collection in [
3105 (self.primaryjoin, sync_pairs),
3106 (self.secondaryjoin, secondary_sync_pairs),
3107 ]:
3108 if joincond is None:
3109 continue
3110 go(joincond, collection)
3111
3112 self.local_remote_pairs = self._deannotate_pairs(lrp)
3113 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3114 self.secondary_synchronize_pairs = self._deannotate_pairs(
3115 secondary_sync_pairs
3116 )
3117
3118 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3119 ColumnElement[Any],
3120 weakref.WeakKeyDictionary[
3121 RelationshipProperty[Any], ColumnElement[Any]
3122 ],
3123 ] = weakref.WeakKeyDictionary()
3124
3125 def _warn_for_conflicting_sync_targets(self) -> None:
3126 if not self.support_sync:
3127 return
3128
3129 # we would like to detect if we are synchronizing any column
3130 # pairs in conflict with another relationship that wishes to sync
3131 # an entirely different column to the same target. This is a
3132 # very rare edge case so we will try to minimize the memory/overhead
3133 # impact of this check
3134 for from_, to_ in [
3135 (from_, to_) for (from_, to_) in self.synchronize_pairs
3136 ] + [
3137 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3138 ]:
3139 # save ourselves a ton of memory and overhead by only
3140 # considering columns that are subject to a overlapping
3141 # FK constraints at the core level. This condition can arise
3142 # if multiple relationships overlap foreign() directly, but
3143 # we're going to assume it's typically a ForeignKeyConstraint-
3144 # level configuration that benefits from this warning.
3145
3146 if to_ not in self._track_overlapping_sync_targets:
3147 self._track_overlapping_sync_targets[to_] = (
3148 weakref.WeakKeyDictionary({self.prop: from_})
3149 )
3150 else:
3151 other_props = []
3152 prop_to_from = self._track_overlapping_sync_targets[to_]
3153
3154 for pr, fr_ in prop_to_from.items():
3155 if (
3156 not pr.mapper._dispose_called
3157 and pr not in self.prop._reverse_property
3158 and pr.key not in self.prop._overlaps
3159 and self.prop.key not in pr._overlaps
3160 # note: the "__*" symbol is used internally by
3161 # SQLAlchemy as a general means of suppressing the
3162 # overlaps warning for some extension cases, however
3163 # this is not currently
3164 # a publicly supported symbol and may change at
3165 # any time.
3166 and "__*" not in self.prop._overlaps
3167 and "__*" not in pr._overlaps
3168 and not self.prop.parent.is_sibling(pr.parent)
3169 and not self.prop.mapper.is_sibling(pr.mapper)
3170 and not self.prop.parent.is_sibling(pr.mapper)
3171 and not self.prop.mapper.is_sibling(pr.parent)
3172 and (
3173 self.prop.key != pr.key
3174 or not self.prop.parent.common_parent(pr.parent)
3175 )
3176 ):
3177 other_props.append((pr, fr_))
3178
3179 if other_props:
3180 util.warn(
3181 "relationship '%s' will copy column %s to column %s, "
3182 "which conflicts with relationship(s): %s. "
3183 "If this is not the intention, consider if these "
3184 "relationships should be linked with "
3185 "back_populates, or if viewonly=True should be "
3186 "applied to one or more if they are read-only. "
3187 "For the less common case that foreign key "
3188 "constraints are partially overlapping, the "
3189 "orm.foreign() "
3190 "annotation can be used to isolate the columns that "
3191 "should be written towards. To silence this "
3192 "warning, add the parameter 'overlaps=\"%s\"' to the "
3193 "'%s' relationship."
3194 % (
3195 self.prop,
3196 from_,
3197 to_,
3198 ", ".join(
3199 sorted(
3200 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3201 for (pr, fr_) in other_props
3202 )
3203 ),
3204 ",".join(sorted(pr.key for pr, fr in other_props)),
3205 self.prop,
3206 ),
3207 code="qzyx",
3208 )
3209 self._track_overlapping_sync_targets[to_][self.prop] = from_
3210
3211 @util.memoized_property
3212 def remote_columns(self) -> Set[ColumnElement[Any]]:
3213 return self._gather_join_annotations("remote")
3214
3215 @util.memoized_property
3216 def local_columns(self) -> Set[ColumnElement[Any]]:
3217 return self._gather_join_annotations("local")
3218
3219 @util.memoized_property
3220 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3221 return self._gather_join_annotations("foreign")
3222
3223 def _gather_join_annotations(
3224 self, annotation: str
3225 ) -> Set[ColumnElement[Any]]:
3226 s = set(
3227 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3228 )
3229 if self.secondaryjoin is not None:
3230 s.update(
3231 self._gather_columns_with_annotation(
3232 self.secondaryjoin, annotation
3233 )
3234 )
3235 return {x._deannotate() for x in s}
3236
3237 def _gather_columns_with_annotation(
3238 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3239 ) -> Set[ColumnElement[Any]]:
3240 annotation_set = set(annotation)
3241 return {
3242 cast(ColumnElement[Any], col)
3243 for col in visitors.iterate(clause, {})
3244 if annotation_set.issubset(col._annotations)
3245 }
3246
3247 @util.memoized_property
3248 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3249 if self.secondary is not None:
3250 return frozenset(
3251 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3252 )
3253 else:
3254 return util.EMPTY_SET
3255
3256 def join_targets(
3257 self,
3258 source_selectable: Optional[FromClause],
3259 dest_selectable: FromClause,
3260 aliased: bool,
3261 single_crit: Optional[ColumnElement[bool]] = None,
3262 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3263 ) -> Tuple[
3264 ColumnElement[bool],
3265 Optional[ColumnElement[bool]],
3266 Optional[FromClause],
3267 Optional[ClauseAdapter],
3268 FromClause,
3269 ]:
3270 """Given a source and destination selectable, create a
3271 join between them.
3272
3273 This takes into account aliasing the join clause
3274 to reference the appropriate corresponding columns
3275 in the target objects, as well as the extra child
3276 criterion, equivalent column sets, etc.
3277
3278 """
3279 # place a barrier on the destination such that
3280 # replacement traversals won't ever dig into it.
3281 # its internal structure remains fixed
3282 # regardless of context.
3283 dest_selectable = _shallow_annotate(
3284 dest_selectable, {"no_replacement_traverse": True}
3285 )
3286
3287 primaryjoin, secondaryjoin, secondary = (
3288 self.primaryjoin,
3289 self.secondaryjoin,
3290 self.secondary,
3291 )
3292
3293 # adjust the join condition for single table inheritance,
3294 # in the case that the join is to a subclass
3295 # this is analogous to the
3296 # "_adjust_for_single_table_inheritance()" method in Query.
3297
3298 if single_crit is not None:
3299 if secondaryjoin is not None:
3300 secondaryjoin = secondaryjoin & single_crit
3301 else:
3302 primaryjoin = primaryjoin & single_crit
3303
3304 if extra_criteria:
3305
3306 def mark_exclude_cols(
3307 elem: SupportsAnnotations, annotations: _AnnotationDict
3308 ) -> SupportsAnnotations:
3309 """note unrelated columns in the "extra criteria" as either
3310 should be adapted or not adapted, even though they are not
3311 part of our "local" or "remote" side.
3312
3313 see #9779 for this case, as well as #11010 for a follow up
3314
3315 """
3316
3317 parentmapper_for_element = elem._annotations.get(
3318 "parentmapper", None
3319 )
3320
3321 if (
3322 # NOTE: it's not clear yet if this needs to test for
3323 # parentmapper_for_element.isa(self.prop.parent). so far
3324 # we have not come up with a test.
3325 parentmapper_for_element is not self.prop.parent
3326 and (
3327 parentmapper_for_element is None
3328 or not parentmapper_for_element.isa(self.prop.mapper)
3329 )
3330 and elem not in self._secondary_lineage_set
3331 ):
3332 return _safe_annotate(elem, annotations)
3333 else:
3334 return elem
3335
3336 extra_criteria = tuple(
3337 _deep_annotate(
3338 elem,
3339 {"should_not_adapt": True},
3340 annotate_callable=mark_exclude_cols,
3341 )
3342 for elem in extra_criteria
3343 )
3344
3345 if secondaryjoin is not None:
3346 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3347 else:
3348 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3349
3350 if aliased:
3351 if secondary is not None:
3352 secondary = secondary._anonymous_fromclause(flat=True)
3353 primary_aliasizer = ClauseAdapter(
3354 secondary,
3355 exclude_fn=_local_col_exclude,
3356 )
3357 secondary_aliasizer = ClauseAdapter(
3358 dest_selectable, equivalents=self.child_equivalents
3359 ).chain(primary_aliasizer)
3360 if source_selectable is not None:
3361 primary_aliasizer = ClauseAdapter(
3362 secondary,
3363 exclude_fn=_local_col_exclude,
3364 ).chain(
3365 ClauseAdapter(
3366 source_selectable,
3367 equivalents=self.parent_equivalents,
3368 )
3369 )
3370
3371 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3372 else:
3373 primary_aliasizer = ClauseAdapter(
3374 dest_selectable,
3375 exclude_fn=_local_col_exclude,
3376 equivalents=self.child_equivalents,
3377 )
3378 if source_selectable is not None:
3379 primary_aliasizer.chain(
3380 ClauseAdapter(
3381 source_selectable,
3382 exclude_fn=_remote_col_exclude,
3383 equivalents=self.parent_equivalents,
3384 )
3385 )
3386 secondary_aliasizer = None
3387
3388 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3389 target_adapter = secondary_aliasizer or primary_aliasizer
3390 target_adapter.exclude_fn = None
3391 else:
3392 target_adapter = None
3393 return (
3394 primaryjoin,
3395 secondaryjoin,
3396 secondary,
3397 target_adapter,
3398 dest_selectable,
3399 )
3400
3401 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3402 ColumnElement[bool],
3403 Dict[str, ColumnElement[Any]],
3404 Dict[ColumnElement[Any], ColumnElement[Any]],
3405 ]:
3406 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3407 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3408
3409 has_secondary = self.secondaryjoin is not None
3410
3411 if has_secondary:
3412 lookup = collections.defaultdict(list)
3413 for l, r in self.local_remote_pairs:
3414 lookup[l].append((l, r))
3415 equated_columns[r] = l
3416 elif not reverse_direction:
3417 for l, r in self.local_remote_pairs:
3418 equated_columns[r] = l
3419 else:
3420 for l, r in self.local_remote_pairs:
3421 equated_columns[l] = r
3422
3423 def col_to_bind(
3424 element: ColumnElement[Any], **kw: Any
3425 ) -> Optional[BindParameter[Any]]:
3426 if (
3427 (not reverse_direction and "local" in element._annotations)
3428 or reverse_direction
3429 and (
3430 (has_secondary and element in lookup)
3431 or (not has_secondary and "remote" in element._annotations)
3432 )
3433 ):
3434 if element not in binds:
3435 binds[element] = sql.bindparam(
3436 None, None, type_=element.type, unique=True
3437 )
3438 return binds[element]
3439 return None
3440
3441 lazywhere = self.primaryjoin
3442 if self.secondaryjoin is None or not reverse_direction:
3443 lazywhere = visitors.replacement_traverse(
3444 lazywhere, {}, col_to_bind
3445 )
3446
3447 if self.secondaryjoin is not None:
3448 secondaryjoin = self.secondaryjoin
3449 if reverse_direction:
3450 secondaryjoin = visitors.replacement_traverse(
3451 secondaryjoin, {}, col_to_bind
3452 )
3453 lazywhere = sql.and_(lazywhere, secondaryjoin)
3454
3455 bind_to_col = {binds[col].key: col for col in binds}
3456
3457 return lazywhere, bind_to_col, equated_columns
3458
3459
3460class _ColInAnnotations:
3461 """Serializable object that tests for names in c._annotations.
3462
3463 TODO: does this need to be serializable anymore? can we find what the
3464 use case was for that?
3465
3466 """
3467
3468 __slots__ = ("names",)
3469
3470 def __init__(self, *names: str):
3471 self.names = frozenset(names)
3472
3473 def __call__(self, c: ClauseElement) -> bool:
3474 return bool(self.names.intersection(c._annotations))
3475
3476
3477_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3478_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3479
3480
3481class Relationship(
3482 RelationshipProperty[_T],
3483 _DeclarativeMapped[_T],
3484):
3485 """Describes an object property that holds a single item or list
3486 of items that correspond to a related database table.
3487
3488 Public constructor is the :func:`_orm.relationship` function.
3489
3490 .. seealso::
3491
3492 :ref:`relationship_config_toplevel`
3493
3494 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3495 compatible subclass for :class:`_orm.RelationshipProperty`.
3496
3497 """
3498
3499 inherit_cache = True
3500 """:meta private:"""
3501
3502
3503class _RelationshipDeclared( # type: ignore[misc]
3504 Relationship[_T],
3505 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3506 DynamicMapped[_T], # not compatible with Mapped[_T]
3507):
3508 """Relationship subclass used implicitly for declarative mapping."""
3509
3510 inherit_cache = True
3511 """:meta private:"""
3512
3513 @classmethod
3514 def _mapper_property_name(cls) -> str:
3515 return "Relationship"