1# orm/relationships.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import Literal
36from typing import NamedTuple
37from typing import NoReturn
38from typing import Optional
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeVar
45from typing import Union
46import weakref
47
48from . import attributes
49from . import strategy_options
50from ._typing import insp_is_aliased_class
51from ._typing import is_has_collection_adapter
52from .base import _DeclarativeMapped
53from .base import _is_mapped_class
54from .base import class_mapper
55from .base import DynamicMapped
56from .base import LoaderCallableStatus
57from .base import PassiveFlag
58from .base import state_str
59from .base import WriteOnlyMapped
60from .interfaces import _AttributeOptions
61from .interfaces import _DataclassDefaultsDontSet
62from .interfaces import _IntrospectsAnnotations
63from .interfaces import MANYTOMANY
64from .interfaces import MANYTOONE
65from .interfaces import ONETOMANY
66from .interfaces import PropComparator
67from .interfaces import RelationshipDirection
68from .interfaces import StrategizedProperty
69from .util import CascadeOptions
70from .. import exc as sa_exc
71from .. import Exists
72from .. import log
73from .. import schema
74from .. import sql
75from .. import util
76from ..inspection import inspect
77from ..sql import coercions
78from ..sql import expression
79from ..sql import operators
80from ..sql import roles
81from ..sql import visitors
82from ..sql._typing import _ColumnExpressionArgument
83from ..sql._typing import _HasClauseElement
84from ..sql.annotation import _safe_annotate
85from ..sql.base import _NoArg
86from ..sql.elements import ColumnClause
87from ..sql.elements import ColumnElement
88from ..sql.util import _deep_annotate
89from ..sql.util import _deep_deannotate
90from ..sql.util import _shallow_annotate
91from ..sql.util import adapt_criterion_to_null
92from ..sql.util import ClauseAdapter
93from ..sql.util import join_condition
94from ..sql.util import selectables_overlap
95from ..sql.util import visit_binary_product
96from ..util.typing import de_optionalize_union_types
97from ..util.typing import resolve_name_to_real_class_name
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _ExternalEntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InstanceDict
104 from ._typing import _InternalEntityType
105 from ._typing import _O
106 from ._typing import _RegistryType
107 from .base import Mapped
108 from .clsregistry import _class_resolver
109 from .clsregistry import _ModNS
110 from .decl_base import _DeclarativeMapperConfig
111 from .dependency import _DependencyProcessor
112 from .mapper import Mapper
113 from .query import Query
114 from .session import Session
115 from .state import InstanceState
116 from .strategies import _LazyLoader
117 from .util import AliasedClass
118 from .util import AliasedInsp
119 from ..sql._typing import _CoreAdapterProto
120 from ..sql._typing import _EquivalentColumnMap
121 from ..sql._typing import _InfoType
122 from ..sql.annotation import _AnnotationDict
123 from ..sql.annotation import SupportsAnnotations
124 from ..sql.elements import BinaryExpression
125 from ..sql.elements import BindParameter
126 from ..sql.elements import ClauseElement
127 from ..sql.schema import Table
128 from ..sql.selectable import FromClause
129 from ..util.typing import _AnnotationScanType
130 from ..util.typing import RODescriptorReference
131
132_T = TypeVar("_T", bound=Any)
133_T1 = TypeVar("_T1", bound=Any)
134_T2 = TypeVar("_T2", bound=Any)
135
136_PT = TypeVar("_PT", bound=Any)
137
138_PT2 = TypeVar("_PT2", bound=Any)
139
140
141_RelationshipArgumentType = Union[
142 str,
143 Type[_T],
144 Callable[[], Type[_T]],
145 "Mapper[_T]",
146 "AliasedClass[_T]",
147 Callable[[], "Mapper[_T]"],
148 Callable[[], "AliasedClass[_T]"],
149]
150
151_LazyLoadArgumentType = Literal[
152 "select",
153 "joined",
154 "selectin",
155 "subquery",
156 "raise",
157 "raise_on_sql",
158 "noload",
159 "immediate",
160 "write_only",
161 "dynamic",
162 True,
163 False,
164 None,
165]
166
167
168_RelationshipJoinConditionArgument = Union[
169 str, _ColumnExpressionArgument[bool]
170]
171_RelationshipSecondaryArgument = Union[
172 "FromClause", str, Callable[[], "FromClause"]
173]
174_ORMOrderByArgument = Union[
175 Literal[False],
176 str,
177 _ColumnExpressionArgument[Any],
178 Callable[[], _ColumnExpressionArgument[Any]],
179 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
180 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
181]
182_RelationshipBackPopulatesArgument = Union[
183 str,
184 PropComparator[Any],
185 Callable[[], Union[str, PropComparator[Any]]],
186]
187
188
189ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
190
191_ORMColCollectionElement = Union[
192 ColumnClause[Any],
193 _HasClauseElement[Any],
194 roles.DMLColumnRole,
195 "Mapped[Any]",
196]
197_ORMColCollectionArgument = Union[
198 str,
199 Sequence[_ORMColCollectionElement],
200 Callable[[], Sequence[_ORMColCollectionElement]],
201 Callable[[], _ORMColCollectionElement],
202 _ORMColCollectionElement,
203]
204
205
206_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
207
208_CE = TypeVar("_CE", bound="ColumnElement[Any]")
209
210
211_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
212
213_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
214
215_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
216
217
218def remote(expr: _CEA) -> _CEA:
219 """Annotate a portion of a primaryjoin expression
220 with a 'remote' annotation.
221
222 See the section :ref:`relationship_custom_foreign` for a
223 description of use.
224
225 .. seealso::
226
227 :ref:`relationship_custom_foreign`
228
229 :func:`.foreign`
230
231 """
232 return _annotate_columns( # type: ignore
233 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
234 )
235
236
237def foreign(expr: _CEA) -> _CEA:
238 """Annotate a portion of a primaryjoin expression
239 with a 'foreign' annotation.
240
241 See the section :ref:`relationship_custom_foreign` for a
242 description of use.
243
244 .. seealso::
245
246 :ref:`relationship_custom_foreign`
247
248 :func:`.remote`
249
250 """
251
252 return _annotate_columns( # type: ignore
253 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
254 )
255
256
257@dataclasses.dataclass
258class _RelationshipArg(Generic[_T1, _T2]):
259 """stores a user-defined parameter value that must be resolved and
260 parsed later at mapper configuration time.
261
262 """
263
264 __slots__ = "name", "argument", "resolved"
265 name: str
266 argument: _T1
267 resolved: Optional[_T2]
268
269 def _is_populated(self) -> bool:
270 return self.argument is not None
271
272 def _resolve_against_registry(
273 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
274 ) -> None:
275 attr_value = self.argument
276
277 if isinstance(attr_value, str):
278 self.resolved = clsregistry_resolver(
279 attr_value, self.name == "secondary"
280 )()
281 elif callable(attr_value) and not _is_mapped_class(attr_value):
282 self.resolved = attr_value()
283 else:
284 self.resolved = attr_value
285
286 def effective_value(self) -> Any:
287 if self.resolved is not None:
288 return self.resolved
289 else:
290 return self.argument
291
292
293_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
294
295
296@dataclasses.dataclass
297class _StringRelationshipArg(_RelationshipArg[_T1, _T2]):
298 def _resolve_against_registry(
299 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
300 ) -> None:
301 attr_value = self.argument
302
303 if callable(attr_value):
304 attr_value = attr_value()
305
306 if isinstance(attr_value, attributes.QueryableAttribute):
307 attr_value = attr_value.key # type: ignore
308
309 self.resolved = attr_value
310
311
312class _RelationshipArgs(NamedTuple):
313 """stores user-passed parameters that are resolved at mapper configuration
314 time.
315
316 """
317
318 secondary: _RelationshipArg[
319 Optional[_RelationshipSecondaryArgument],
320 Optional[FromClause],
321 ]
322 primaryjoin: _RelationshipArg[
323 Optional[_RelationshipJoinConditionArgument],
324 Optional[ColumnElement[Any]],
325 ]
326 secondaryjoin: _RelationshipArg[
327 Optional[_RelationshipJoinConditionArgument],
328 Optional[ColumnElement[Any]],
329 ]
330 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
331 foreign_keys: _RelationshipArg[
332 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
333 ]
334 remote_side: _RelationshipArg[
335 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
336 ]
337 back_populates: _StringRelationshipArg[
338 Optional[_RelationshipBackPopulatesArgument], str
339 ]
340
341
342@log.class_logger
343class RelationshipProperty(
344 _DataclassDefaultsDontSet,
345 _IntrospectsAnnotations,
346 StrategizedProperty[_T],
347 log.Identified,
348):
349 """Describes an object property that holds a single item or list
350 of items that correspond to a related database table.
351
352 Public constructor is the :func:`_orm.relationship` function.
353
354 .. seealso::
355
356 :ref:`relationship_config_toplevel`
357
358 """
359
360 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
361 inherit_cache = True
362 """:meta private:"""
363
364 _links_to_entity = True
365 _is_relationship = True
366
367 _overlaps: Sequence[str]
368
369 _lazy_strategy: _LazyLoader
370
371 _persistence_only = dict(
372 passive_deletes=False,
373 passive_updates=True,
374 enable_typechecks=True,
375 active_history=False,
376 cascade_backrefs=False,
377 )
378
379 _dependency_processor: Optional[_DependencyProcessor] = None
380
381 primaryjoin: ColumnElement[bool]
382 secondaryjoin: Optional[ColumnElement[bool]]
383 secondary: Optional[FromClause]
384 _join_condition: _JoinCondition
385 order_by: _RelationshipOrderByArg
386
387 _user_defined_foreign_keys: Set[ColumnElement[Any]]
388 _calculated_foreign_keys: Set[ColumnElement[Any]]
389
390 remote_side: Set[ColumnElement[Any]]
391 local_columns: Set[ColumnElement[Any]]
392
393 synchronize_pairs: _ColumnPairs
394 secondary_synchronize_pairs: Optional[_ColumnPairs]
395
396 local_remote_pairs: _ColumnPairs
397
398 direction: RelationshipDirection
399
400 _init_args: _RelationshipArgs
401 _disable_dataclass_default_factory = True
402
403 def __init__(
404 self,
405 argument: Optional[_RelationshipArgumentType[_T]] = None,
406 secondary: Optional[_RelationshipSecondaryArgument] = None,
407 *,
408 uselist: Optional[bool] = None,
409 collection_class: Optional[
410 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
411 ] = None,
412 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
413 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
414 back_populates: Optional[_RelationshipBackPopulatesArgument] = None,
415 order_by: _ORMOrderByArgument = False,
416 backref: Optional[ORMBackrefArgument] = None,
417 overlaps: Optional[str] = None,
418 post_update: bool = False,
419 cascade: str = "save-update, merge",
420 viewonly: bool = False,
421 attribute_options: Optional[_AttributeOptions] = None,
422 lazy: _LazyLoadArgumentType = "select",
423 passive_deletes: Union[Literal["all"], bool] = False,
424 passive_updates: bool = True,
425 active_history: bool = False,
426 enable_typechecks: bool = True,
427 foreign_keys: Optional[_ORMColCollectionArgument] = None,
428 remote_side: Optional[_ORMColCollectionArgument] = None,
429 join_depth: Optional[int] = None,
430 comparator_factory: Optional[
431 Type[RelationshipProperty.Comparator[Any]]
432 ] = None,
433 single_parent: bool = False,
434 innerjoin: bool = False,
435 distinct_target_key: Optional[bool] = None,
436 load_on_pending: bool = False,
437 query_class: Optional[Type[Query[Any]]] = None,
438 info: Optional[_InfoType] = None,
439 omit_join: Literal[None, False] = None,
440 sync_backref: Optional[bool] = None,
441 doc: Optional[str] = None,
442 bake_queries: Literal[True] = True,
443 cascade_backrefs: Literal[False] = False,
444 _local_remote_pairs: Optional[_ColumnPairs] = None,
445 _legacy_inactive_history_style: bool = False,
446 ):
447 super().__init__(attribute_options=attribute_options)
448
449 self.uselist = uselist
450 self.argument = argument
451
452 self._init_args = _RelationshipArgs(
453 _RelationshipArg("secondary", secondary, None),
454 _RelationshipArg("primaryjoin", primaryjoin, None),
455 _RelationshipArg("secondaryjoin", secondaryjoin, None),
456 _RelationshipArg("order_by", order_by, None),
457 _RelationshipArg("foreign_keys", foreign_keys, None),
458 _RelationshipArg("remote_side", remote_side, None),
459 _StringRelationshipArg("back_populates", back_populates, None),
460 )
461
462 if self._attribute_options.dataclasses_default not in (
463 _NoArg.NO_ARG,
464 None,
465 ):
466 raise sa_exc.ArgumentError(
467 "Only 'None' is accepted as dataclass "
468 "default for a relationship()"
469 )
470
471 self.post_update = post_update
472 self.viewonly = viewonly
473 if viewonly:
474 self._warn_for_persistence_only_flags(
475 passive_deletes=passive_deletes,
476 passive_updates=passive_updates,
477 enable_typechecks=enable_typechecks,
478 active_history=active_history,
479 cascade_backrefs=cascade_backrefs,
480 )
481 if viewonly and sync_backref:
482 raise sa_exc.ArgumentError(
483 "sync_backref and viewonly cannot both be True"
484 )
485 self.sync_backref = sync_backref
486 self.lazy = lazy
487 self.single_parent = single_parent
488 self.collection_class = collection_class
489 self.passive_deletes = passive_deletes
490
491 if cascade_backrefs:
492 raise sa_exc.ArgumentError(
493 "The 'cascade_backrefs' parameter passed to "
494 "relationship() may only be set to False."
495 )
496
497 self.passive_updates = passive_updates
498 self.enable_typechecks = enable_typechecks
499 self.query_class = query_class
500 self.innerjoin = innerjoin
501 self.distinct_target_key = distinct_target_key
502 self.doc = doc
503 self.active_history = active_history
504 self._legacy_inactive_history_style = _legacy_inactive_history_style
505
506 self.join_depth = join_depth
507 if omit_join:
508 util.warn(
509 "setting omit_join to True is not supported; selectin "
510 "loading of this relationship may not work correctly if this "
511 "flag is set explicitly. omit_join optimization is "
512 "automatically detected for conditions under which it is "
513 "supported."
514 )
515
516 self.omit_join = omit_join
517 self.local_remote_pairs = _local_remote_pairs or ()
518 self.load_on_pending = load_on_pending
519 self.comparator_factory = (
520 comparator_factory or RelationshipProperty.Comparator
521 )
522 util.set_creation_order(self)
523
524 if info is not None:
525 self.info.update(info)
526
527 self.strategy_key = (("lazy", self.lazy),)
528
529 self._reverse_property: Set[RelationshipProperty[Any]] = set()
530
531 if overlaps:
532 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
533 else:
534 self._overlaps = ()
535
536 self.cascade = cascade
537
538 if back_populates:
539 if backref:
540 raise sa_exc.ArgumentError(
541 "backref and back_populates keyword arguments "
542 "are mutually exclusive"
543 )
544 self.backref = None
545 else:
546 self.backref = backref
547
548 @property
549 def back_populates(self) -> str:
550 return self._init_args.back_populates.effective_value() # type: ignore
551
552 @back_populates.setter
553 def back_populates(self, value: str) -> None:
554 self._init_args.back_populates.argument = value
555
556 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
557 for k, v in kw.items():
558 if v != self._persistence_only[k]:
559 # we are warning here rather than warn deprecated as this is a
560 # configuration mistake, and Python shows regular warnings more
561 # aggressively than deprecation warnings by default. Unlike the
562 # case of setting viewonly with cascade, the settings being
563 # warned about here are not actively doing the wrong thing
564 # against viewonly=True, so it is not as urgent to have these
565 # raise an error.
566 util.warn(
567 "Setting %s on relationship() while also "
568 "setting viewonly=True does not make sense, as a "
569 "viewonly=True relationship does not perform persistence "
570 "operations. This configuration may raise an error "
571 "in a future release." % (k,)
572 )
573
574 def instrument_class(self, mapper: Mapper[Any]) -> None:
575 attributes._register_descriptor(
576 mapper.class_,
577 self.key,
578 comparator=self.comparator_factory(self, mapper),
579 parententity=mapper,
580 doc=self.doc,
581 )
582
583 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
584 """Produce boolean, comparison, and other operators for
585 :class:`.RelationshipProperty` attributes.
586
587 See the documentation for :class:`.PropComparator` for a brief
588 overview of ORM level operator definition.
589
590 .. seealso::
591
592 :class:`.PropComparator`
593
594 :class:`.ColumnProperty.Comparator`
595
596 :class:`.ColumnOperators`
597
598 :ref:`types_operators`
599
600 :attr:`.TypeEngine.comparator_factory`
601
602 """
603
604 __slots__ = (
605 "entity",
606 "mapper",
607 "property",
608 "_of_type",
609 "_extra_criteria",
610 )
611
612 prop: RODescriptorReference[RelationshipProperty[_PT]]
613 _of_type: Optional[_EntityType[_PT]]
614
615 def __init__(
616 self,
617 prop: RelationshipProperty[_PT],
618 parentmapper: _InternalEntityType[Any],
619 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
620 of_type: Optional[_EntityType[_PT]] = None,
621 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
622 ):
623 """Construction of :class:`.RelationshipProperty.Comparator`
624 is internal to the ORM's attribute mechanics.
625
626 """
627 self.prop = prop
628 self._parententity = parentmapper
629 self._adapt_to_entity = adapt_to_entity
630 if of_type:
631 self._of_type = of_type
632 else:
633 self._of_type = None
634 self._extra_criteria = extra_criteria
635
636 def adapt_to_entity(
637 self, adapt_to_entity: AliasedInsp[Any]
638 ) -> RelationshipProperty.Comparator[Any]:
639 return self.__class__(
640 self.prop,
641 self._parententity,
642 adapt_to_entity=adapt_to_entity,
643 of_type=self._of_type,
644 )
645
646 entity: _InternalEntityType[_PT]
647 """The target entity referred to by this
648 :class:`.RelationshipProperty.Comparator`.
649
650 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
651 object.
652
653 This is the "target" or "remote" side of the
654 :func:`_orm.relationship`.
655
656 """
657
658 mapper: Mapper[_PT]
659 """The target :class:`_orm.Mapper` referred to by this
660 :class:`.RelationshipProperty.Comparator`.
661
662 This is the "target" or "remote" side of the
663 :func:`_orm.relationship`.
664
665 """
666
667 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
668 if self._of_type:
669 return inspect(self._of_type) # type: ignore
670 else:
671 return self.prop.entity
672
673 def _memoized_attr_mapper(self) -> Mapper[_PT]:
674 return self.entity.mapper
675
676 def _source_selectable(self) -> FromClause:
677 if self._adapt_to_entity:
678 return self._adapt_to_entity.selectable
679 else:
680 return self.property.parent._with_polymorphic_selectable
681
682 def __clause_element__(self) -> ColumnElement[bool]:
683 adapt_from = self._source_selectable()
684 if self._of_type:
685 of_type_entity = inspect(self._of_type)
686 else:
687 of_type_entity = None
688
689 (
690 pj,
691 sj,
692 source,
693 dest,
694 secondary,
695 target_adapter,
696 ) = self.prop._create_joins(
697 source_selectable=adapt_from,
698 source_polymorphic=True,
699 of_type_entity=of_type_entity,
700 alias_secondary=True,
701 extra_criteria=self._extra_criteria,
702 )
703 if sj is not None:
704 return pj & sj
705 else:
706 return pj
707
708 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
709 r"""Redefine this object in terms of a polymorphic subclass.
710
711 See :meth:`.PropComparator.of_type` for an example.
712
713
714 """
715 return RelationshipProperty.Comparator(
716 self.prop,
717 self._parententity,
718 adapt_to_entity=self._adapt_to_entity,
719 of_type=class_,
720 extra_criteria=self._extra_criteria,
721 )
722
723 def and_(
724 self, *criteria: _ColumnExpressionArgument[bool]
725 ) -> PropComparator[Any]:
726 """Add AND criteria.
727
728 See :meth:`.PropComparator.and_` for an example.
729
730 .. versionadded:: 1.4
731
732 """
733 exprs = tuple(
734 coercions.expect(roles.WhereHavingRole, clause)
735 for clause in util.coerce_generator_arg(criteria)
736 )
737
738 return RelationshipProperty.Comparator(
739 self.prop,
740 self._parententity,
741 adapt_to_entity=self._adapt_to_entity,
742 of_type=self._of_type,
743 extra_criteria=self._extra_criteria + exprs,
744 )
745
746 def in_(self, other: Any) -> NoReturn:
747 """Produce an IN clause - this is not implemented
748 for :func:`_orm.relationship`-based attributes at this time.
749
750 """
751 raise NotImplementedError(
752 "in_() not yet supported for "
753 "relationships. For a simple "
754 "many-to-one, use in_() against "
755 "the set of foreign key values."
756 )
757
758 # https://github.com/python/mypy/issues/4266
759 __hash__ = None # type: ignore
760
761 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
762 """Implement the ``==`` operator.
763
764 In a many-to-one context, such as:
765
766 .. sourcecode:: text
767
768 MyClass.some_prop == <some object>
769
770 this will typically produce a
771 clause such as:
772
773 .. sourcecode:: text
774
775 mytable.related_id == <some id>
776
777 Where ``<some id>`` is the primary key of the given
778 object.
779
780 The ``==`` operator provides partial functionality for non-
781 many-to-one comparisons:
782
783 * Comparisons against collections are not supported.
784 Use :meth:`~.Relationship.Comparator.contains`.
785 * Compared to a scalar one-to-many, will produce a
786 clause that compares the target columns in the parent to
787 the given target.
788 * Compared to a scalar many-to-many, an alias
789 of the association table will be rendered as
790 well, forming a natural join that is part of the
791 main body of the query. This will not work for
792 queries that go beyond simple AND conjunctions of
793 comparisons, such as those which use OR. Use
794 explicit joins, outerjoins, or
795 :meth:`~.Relationship.Comparator.has` for
796 more comprehensive non-many-to-one scalar
797 membership tests.
798 * Comparisons against ``None`` given in a one-to-many
799 or many-to-many context produce a NOT EXISTS clause.
800
801 """
802 if other is None or isinstance(other, expression.Null):
803 if self.property.direction in [ONETOMANY, MANYTOMANY]:
804 return ~self._criterion_exists()
805 else:
806 return self.property._optimized_compare(
807 None, adapt_source=self.adapter
808 )
809 elif self.property.uselist:
810 raise sa_exc.InvalidRequestError(
811 "Can't compare a collection to an object or collection; "
812 "use contains() to test for membership."
813 )
814 else:
815 return self.property._optimized_compare(
816 other, adapt_source=self.adapter
817 )
818
819 def _criterion_exists(
820 self,
821 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
822 **kwargs: Any,
823 ) -> Exists:
824 where_criteria = (
825 coercions.expect(roles.WhereHavingRole, criterion)
826 if criterion is not None
827 else None
828 )
829
830 if getattr(self, "_of_type", None):
831 info: Optional[_InternalEntityType[Any]] = inspect(
832 self._of_type
833 )
834 assert info is not None
835 target_mapper, to_selectable, is_aliased_class = (
836 info.mapper,
837 info.selectable,
838 info.is_aliased_class,
839 )
840 if self.property._is_self_referential and not is_aliased_class:
841 to_selectable = to_selectable._anonymous_fromclause()
842
843 single_crit = target_mapper._single_table_criterion
844 if single_crit is not None:
845 if where_criteria is not None:
846 where_criteria = single_crit & where_criteria
847 else:
848 where_criteria = single_crit
849 dest_entity = info
850 else:
851 is_aliased_class = False
852 to_selectable = None
853 dest_entity = self.mapper
854
855 if self.adapter:
856 source_selectable = self._source_selectable()
857 else:
858 source_selectable = None
859
860 (
861 pj,
862 sj,
863 source,
864 dest,
865 secondary,
866 target_adapter,
867 ) = self.property._create_joins(
868 dest_selectable=to_selectable,
869 source_selectable=source_selectable,
870 )
871
872 for k in kwargs:
873 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
874 if where_criteria is None:
875 where_criteria = crit
876 else:
877 where_criteria = where_criteria & crit
878
879 # annotate the *local* side of the join condition, in the case
880 # of pj + sj this is the full primaryjoin, in the case of just
881 # pj its the local side of the primaryjoin.
882 j: ColumnElement[bool]
883 if sj is not None:
884 j = pj & sj
885 else:
886 j = pj
887
888 if (
889 where_criteria is not None
890 and target_adapter
891 and not is_aliased_class
892 ):
893 # limit this adapter to annotated only?
894 where_criteria = target_adapter.traverse(where_criteria)
895
896 # only have the "joined left side" of what we
897 # return be subject to Query adaption. The right
898 # side of it is used for an exists() subquery and
899 # should not correlate or otherwise reach out
900 # to anything in the enclosing query.
901 if where_criteria is not None:
902 where_criteria = where_criteria._annotate(
903 {"no_replacement_traverse": True}
904 )
905
906 crit = j & sql.True_._ifnone(where_criteria)
907
908 # ensure the exists query gets picked up by the ORM
909 # compiler and that it has what we expect as parententity so that
910 # _adjust_for_extra_criteria() gets set up
911 dest = dest._annotate(
912 {
913 "parentmapper": dest_entity.mapper,
914 "entity_namespace": dest_entity,
915 "parententity": dest_entity,
916 }
917 )._set_propagate_attrs(
918 {"compile_state_plugin": "orm", "plugin_subject": dest_entity}
919 )
920 if secondary is not None:
921 ex = (
922 sql.exists(1)
923 .where(crit)
924 .select_from(dest, secondary)
925 .correlate_except(dest, secondary)
926 )
927 else:
928 ex = (
929 sql.exists(1)
930 .where(crit)
931 .select_from(dest)
932 .correlate_except(dest)
933 )
934 return ex
935
936 def any(
937 self,
938 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
939 **kwargs: Any,
940 ) -> ColumnElement[bool]:
941 """Produce an expression that tests a collection against
942 particular criterion, using EXISTS.
943
944 An expression like::
945
946 session.query(MyClass).filter(
947 MyClass.somereference.any(SomeRelated.x == 2)
948 )
949
950 Will produce a query like:
951
952 .. sourcecode:: sql
953
954 SELECT * FROM my_table WHERE
955 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
956 AND related.x=2)
957
958 Because :meth:`~.Relationship.Comparator.any` uses
959 a correlated subquery, its performance is not nearly as
960 good when compared against large target tables as that of
961 using a join.
962
963 :meth:`~.Relationship.Comparator.any` is particularly
964 useful for testing for empty collections::
965
966 session.query(MyClass).filter(~MyClass.somereference.any())
967
968 will produce:
969
970 .. sourcecode:: sql
971
972 SELECT * FROM my_table WHERE
973 NOT (EXISTS (SELECT 1 FROM related WHERE
974 related.my_id=my_table.id))
975
976 :meth:`~.Relationship.Comparator.any` is only
977 valid for collections, i.e. a :func:`_orm.relationship`
978 that has ``uselist=True``. For scalar references,
979 use :meth:`~.Relationship.Comparator.has`.
980
981 """
982 if not self.property.uselist:
983 raise sa_exc.InvalidRequestError(
984 "'any()' not implemented for scalar "
985 "attributes. Use has()."
986 )
987
988 return self._criterion_exists(criterion, **kwargs)
989
990 def has(
991 self,
992 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
993 **kwargs: Any,
994 ) -> ColumnElement[bool]:
995 """Produce an expression that tests a scalar reference against
996 particular criterion, using EXISTS.
997
998 An expression like::
999
1000 session.query(MyClass).filter(
1001 MyClass.somereference.has(SomeRelated.x == 2)
1002 )
1003
1004 Will produce a query like:
1005
1006 .. sourcecode:: sql
1007
1008 SELECT * FROM my_table WHERE
1009 EXISTS (SELECT 1 FROM related WHERE
1010 related.id==my_table.related_id AND related.x=2)
1011
1012 Because :meth:`~.Relationship.Comparator.has` uses
1013 a correlated subquery, its performance is not nearly as
1014 good when compared against large target tables as that of
1015 using a join.
1016
1017 :meth:`~.Relationship.Comparator.has` is only
1018 valid for scalar references, i.e. a :func:`_orm.relationship`
1019 that has ``uselist=False``. For collection references,
1020 use :meth:`~.Relationship.Comparator.any`.
1021
1022 """
1023 if self.property.uselist:
1024 raise sa_exc.InvalidRequestError(
1025 "'has()' not implemented for collections. Use any()."
1026 )
1027 return self._criterion_exists(criterion, **kwargs)
1028
1029 def contains(
1030 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
1031 ) -> ColumnElement[bool]:
1032 """Return a simple expression that tests a collection for
1033 containment of a particular item.
1034
1035 :meth:`~.Relationship.Comparator.contains` is
1036 only valid for a collection, i.e. a
1037 :func:`_orm.relationship` that implements
1038 one-to-many or many-to-many with ``uselist=True``.
1039
1040 When used in a simple one-to-many context, an
1041 expression like::
1042
1043 MyClass.contains(other)
1044
1045 Produces a clause like:
1046
1047 .. sourcecode:: sql
1048
1049 mytable.id == <some id>
1050
1051 Where ``<some id>`` is the value of the foreign key
1052 attribute on ``other`` which refers to the primary
1053 key of its parent object. From this it follows that
1054 :meth:`~.Relationship.Comparator.contains` is
1055 very useful when used with simple one-to-many
1056 operations.
1057
1058 For many-to-many operations, the behavior of
1059 :meth:`~.Relationship.Comparator.contains`
1060 has more caveats. The association table will be
1061 rendered in the statement, producing an "implicit"
1062 join, that is, includes multiple tables in the FROM
1063 clause which are equated in the WHERE clause::
1064
1065 query(MyClass).filter(MyClass.contains(other))
1066
1067 Produces a query like:
1068
1069 .. sourcecode:: sql
1070
1071 SELECT * FROM my_table, my_association_table AS
1072 my_association_table_1 WHERE
1073 my_table.id = my_association_table_1.parent_id
1074 AND my_association_table_1.child_id = <some id>
1075
1076 Where ``<some id>`` would be the primary key of
1077 ``other``. From the above, it is clear that
1078 :meth:`~.Relationship.Comparator.contains`
1079 will **not** work with many-to-many collections when
1080 used in queries that move beyond simple AND
1081 conjunctions, such as multiple
1082 :meth:`~.Relationship.Comparator.contains`
1083 expressions joined by OR. In such cases subqueries or
1084 explicit "outer joins" will need to be used instead.
1085 See :meth:`~.Relationship.Comparator.any` for
1086 a less-performant alternative using EXISTS, or refer
1087 to :meth:`_query.Query.outerjoin`
1088 as well as :ref:`orm_queryguide_joins`
1089 for more details on constructing outer joins.
1090
1091 kwargs may be ignored by this operator but are required for API
1092 conformance.
1093 """
1094 if not self.prop.uselist:
1095 raise sa_exc.InvalidRequestError(
1096 "'contains' not implemented for scalar "
1097 "attributes. Use =="
1098 )
1099
1100 clause = self.prop._optimized_compare(
1101 other, adapt_source=self.adapter
1102 )
1103
1104 if self.prop.secondaryjoin is not None:
1105 clause.negation_clause = self.__negated_contains_or_equals(
1106 other
1107 )
1108
1109 return clause
1110
1111 def __negated_contains_or_equals(
1112 self, other: Any
1113 ) -> ColumnElement[bool]:
1114 if self.prop.direction == MANYTOONE:
1115 state = attributes.instance_state(other)
1116
1117 def state_bindparam(
1118 local_col: ColumnElement[Any],
1119 state: InstanceState[Any],
1120 remote_col: ColumnElement[Any],
1121 ) -> BindParameter[Any]:
1122 dict_ = state.dict
1123 return sql.bindparam(
1124 local_col.key,
1125 type_=local_col.type,
1126 unique=True,
1127 callable_=self.prop._get_attr_w_warn_on_none(
1128 self.prop.mapper, state, dict_, remote_col
1129 ),
1130 )
1131
1132 def adapt(col: _CE) -> _CE:
1133 if self.adapter:
1134 return self.adapter(col)
1135 else:
1136 return col
1137
1138 if self.property._use_get:
1139 return sql.and_(
1140 *[
1141 sql.or_(
1142 adapt(x)
1143 != state_bindparam(adapt(x), state, y),
1144 adapt(x) == None,
1145 )
1146 for (x, y) in self.property.local_remote_pairs
1147 ]
1148 )
1149
1150 criterion = sql.and_(
1151 *[
1152 x == y
1153 for (x, y) in zip(
1154 self.property.mapper.primary_key,
1155 self.property.mapper.primary_key_from_instance(other),
1156 )
1157 ]
1158 )
1159
1160 return ~self._criterion_exists(criterion)
1161
1162 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1163 """Implement the ``!=`` operator.
1164
1165 In a many-to-one context, such as:
1166
1167 .. sourcecode:: text
1168
1169 MyClass.some_prop != <some object>
1170
1171 This will typically produce a clause such as:
1172
1173 .. sourcecode:: sql
1174
1175 mytable.related_id != <some id>
1176
1177 Where ``<some id>`` is the primary key of the
1178 given object.
1179
1180 The ``!=`` operator provides partial functionality for non-
1181 many-to-one comparisons:
1182
1183 * Comparisons against collections are not supported.
1184 Use
1185 :meth:`~.Relationship.Comparator.contains`
1186 in conjunction with :func:`_expression.not_`.
1187 * Compared to a scalar one-to-many, will produce a
1188 clause that compares the target columns in the parent to
1189 the given target.
1190 * Compared to a scalar many-to-many, an alias
1191 of the association table will be rendered as
1192 well, forming a natural join that is part of the
1193 main body of the query. This will not work for
1194 queries that go beyond simple AND conjunctions of
1195 comparisons, such as those which use OR. Use
1196 explicit joins, outerjoins, or
1197 :meth:`~.Relationship.Comparator.has` in
1198 conjunction with :func:`_expression.not_` for
1199 more comprehensive non-many-to-one scalar
1200 membership tests.
1201 * Comparisons against ``None`` given in a one-to-many
1202 or many-to-many context produce an EXISTS clause.
1203
1204 """
1205 if other is None or isinstance(other, expression.Null):
1206 if self.property.direction == MANYTOONE:
1207 return ~self.property._optimized_compare(
1208 None, adapt_source=self.adapter
1209 )
1210
1211 else:
1212 return self._criterion_exists()
1213 elif self.property.uselist:
1214 raise sa_exc.InvalidRequestError(
1215 "Can't compare a collection"
1216 " to an object or collection; use "
1217 "contains() to test for membership."
1218 )
1219 else:
1220 return self.__negated_contains_or_equals(other)
1221
1222 if TYPE_CHECKING:
1223 property: RelationshipProperty[_PT] # noqa: A001
1224
1225 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1226 self.prop.parent._check_configure()
1227 return self.prop
1228
1229 def _with_parent(
1230 self,
1231 instance: object,
1232 alias_secondary: bool = True,
1233 from_entity: Optional[_EntityType[Any]] = None,
1234 ) -> ColumnElement[bool]:
1235 assert instance is not None
1236 adapt_source: Optional[_CoreAdapterProto] = None
1237 if from_entity is not None:
1238 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1239 assert insp is not None
1240 if insp_is_aliased_class(insp):
1241 adapt_source = insp._adapter.adapt_clause
1242 return self._optimized_compare(
1243 instance,
1244 value_is_parent=True,
1245 adapt_source=adapt_source,
1246 alias_secondary=alias_secondary,
1247 )
1248
1249 def _optimized_compare(
1250 self,
1251 state: Any,
1252 value_is_parent: bool = False,
1253 adapt_source: Optional[_CoreAdapterProto] = None,
1254 alias_secondary: bool = True,
1255 ) -> ColumnElement[bool]:
1256 if state is not None:
1257 try:
1258 state = inspect(state)
1259 except sa_exc.NoInspectionAvailable:
1260 state = None
1261
1262 if state is None or not getattr(state, "is_instance", False):
1263 raise sa_exc.ArgumentError(
1264 "Mapped instance expected for relationship "
1265 "comparison to object. Classes, queries and other "
1266 "SQL elements are not accepted in this context; for "
1267 "comparison with a subquery, "
1268 "use %s.has(**criteria)." % self
1269 )
1270 reverse_direction = not value_is_parent
1271
1272 if state is None:
1273 return self._lazy_none_clause(
1274 reverse_direction, adapt_source=adapt_source
1275 )
1276
1277 if not reverse_direction:
1278 criterion, bind_to_col = (
1279 self._lazy_strategy._lazywhere,
1280 self._lazy_strategy._bind_to_col,
1281 )
1282 else:
1283 criterion, bind_to_col = (
1284 self._lazy_strategy._rev_lazywhere,
1285 self._lazy_strategy._rev_bind_to_col,
1286 )
1287
1288 if reverse_direction:
1289 mapper = self.mapper
1290 else:
1291 mapper = self.parent
1292
1293 dict_ = attributes.instance_dict(state.obj())
1294
1295 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1296 if bindparam._identifying_key in bind_to_col:
1297 bindparam.callable = self._get_attr_w_warn_on_none(
1298 mapper,
1299 state,
1300 dict_,
1301 bind_to_col[bindparam._identifying_key],
1302 )
1303
1304 if self.secondary is not None and alias_secondary:
1305 criterion = ClauseAdapter(
1306 self.secondary._anonymous_fromclause()
1307 ).traverse(criterion)
1308
1309 criterion = visitors.cloned_traverse(
1310 criterion, {}, {"bindparam": visit_bindparam}
1311 )
1312
1313 if adapt_source:
1314 criterion = adapt_source(criterion)
1315 return criterion
1316
1317 def _get_attr_w_warn_on_none(
1318 self,
1319 mapper: Mapper[Any],
1320 state: InstanceState[Any],
1321 dict_: _InstanceDict,
1322 column: ColumnElement[Any],
1323 ) -> Callable[[], Any]:
1324 """Create the callable that is used in a many-to-one expression.
1325
1326 E.g.::
1327
1328 u1 = s.query(User).get(5)
1329
1330 expr = Address.user == u1
1331
1332 Above, the SQL should be "address.user_id = 5". The callable
1333 returned by this method produces the value "5" based on the identity
1334 of ``u1``.
1335
1336 """
1337
1338 # in this callable, we're trying to thread the needle through
1339 # a wide variety of scenarios, including:
1340 #
1341 # * the object hasn't been flushed yet and there's no value for
1342 # the attribute as of yet
1343 #
1344 # * the object hasn't been flushed yet but it has a user-defined
1345 # value
1346 #
1347 # * the object has a value but it's expired and not locally present
1348 #
1349 # * the object has a value but it's expired and not locally present,
1350 # and the object is also detached
1351 #
1352 # * The object hadn't been flushed yet, there was no value, but
1353 # later, the object has been expired and detached, and *now*
1354 # they're trying to evaluate it
1355 #
1356 # * the object had a value, but it was changed to a new value, and
1357 # then expired
1358 #
1359 # * the object had a value, but it was changed to a new value, and
1360 # then expired, then the object was detached
1361 #
1362 # * the object has a user-set value, but it's None and we don't do
1363 # the comparison correctly for that so warn
1364 #
1365
1366 prop = mapper.get_property_by_column(column)
1367
1368 # by invoking this method, InstanceState will track the last known
1369 # value for this key each time the attribute is to be expired.
1370 # this feature was added explicitly for use in this method.
1371 state._track_last_known_value(prop.key)
1372
1373 lkv_fixed = state._last_known_values
1374
1375 def _go() -> Any:
1376 assert lkv_fixed is not None
1377 last_known = to_return = lkv_fixed[prop.key]
1378 existing_is_available = (
1379 last_known is not LoaderCallableStatus.NO_VALUE
1380 )
1381
1382 # we support that the value may have changed. so here we
1383 # try to get the most recent value including re-fetching.
1384 # only if we can't get a value now due to detachment do we return
1385 # the last known value
1386 current_value = mapper._get_state_attr_by_column(
1387 state,
1388 dict_,
1389 column,
1390 passive=(
1391 PassiveFlag.PASSIVE_OFF
1392 if state.persistent
1393 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1394 ),
1395 )
1396
1397 if current_value is LoaderCallableStatus.NEVER_SET:
1398 if not existing_is_available:
1399 raise sa_exc.InvalidRequestError(
1400 "Can't resolve value for column %s on object "
1401 "%s; no value has been set for this column"
1402 % (column, state_str(state))
1403 )
1404 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1405 if not existing_is_available:
1406 raise sa_exc.InvalidRequestError(
1407 "Can't resolve value for column %s on object "
1408 "%s; the object is detached and the value was "
1409 "expired" % (column, state_str(state))
1410 )
1411 else:
1412 to_return = current_value
1413 if to_return is None:
1414 util.warn(
1415 "Got None for value of column %s; this is unsupported "
1416 "for a relationship comparison and will not "
1417 "currently produce an IS comparison "
1418 "(but may in a future release)" % column
1419 )
1420 return to_return
1421
1422 return _go
1423
1424 def _lazy_none_clause(
1425 self,
1426 reverse_direction: bool = False,
1427 adapt_source: Optional[_CoreAdapterProto] = None,
1428 ) -> ColumnElement[bool]:
1429 if not reverse_direction:
1430 criterion, bind_to_col = (
1431 self._lazy_strategy._lazywhere,
1432 self._lazy_strategy._bind_to_col,
1433 )
1434 else:
1435 criterion, bind_to_col = (
1436 self._lazy_strategy._rev_lazywhere,
1437 self._lazy_strategy._rev_bind_to_col,
1438 )
1439
1440 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1441
1442 if adapt_source:
1443 criterion = adapt_source(criterion)
1444 return criterion
1445
1446 def _format_as_string(self, class_: type, key: str) -> str:
1447 return f"{class_.__name__}.{key}"
1448
1449 def __str__(self) -> str:
1450 return self._format_as_string(self.parent.class_, self.key)
1451
1452 def merge(
1453 self,
1454 session: Session,
1455 source_state: InstanceState[Any],
1456 source_dict: _InstanceDict,
1457 dest_state: InstanceState[Any],
1458 dest_dict: _InstanceDict,
1459 load: bool,
1460 _recursive: Dict[Any, object],
1461 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1462 ) -> None:
1463 if load:
1464 for r in self._reverse_property:
1465 if (source_state, r) in _recursive:
1466 return
1467
1468 if "merge" not in self._cascade:
1469 return
1470
1471 if self.key not in source_dict:
1472 return
1473
1474 if self.uselist:
1475 impl = source_state.get_impl(self.key)
1476
1477 assert is_has_collection_adapter(impl)
1478 instances_iterable = impl.get_collection(source_state, source_dict)
1479
1480 # if this is a CollectionAttributeImpl, then empty should
1481 # be False, otherwise "self.key in source_dict" should not be
1482 # True
1483 assert not instances_iterable.empty if impl.collection else True
1484
1485 if load:
1486 # for a full merge, pre-load the destination collection,
1487 # so that individual _merge of each item pulls from identity
1488 # map for those already present.
1489 # also assumes CollectionAttributeImpl behavior of loading
1490 # "old" list in any case
1491 dest_state.get_impl(self.key).get(
1492 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1493 )
1494
1495 dest_list = []
1496 for current in instances_iterable:
1497 current_state = attributes.instance_state(current)
1498 current_dict = attributes.instance_dict(current)
1499 _recursive[(current_state, self)] = True
1500 obj = session._merge(
1501 current_state,
1502 current_dict,
1503 load=load,
1504 _recursive=_recursive,
1505 _resolve_conflict_map=_resolve_conflict_map,
1506 )
1507 if obj is not None:
1508 dest_list.append(obj)
1509
1510 if not load:
1511 coll = attributes.init_state_collection(
1512 dest_state, dest_dict, self.key
1513 )
1514 for c in dest_list:
1515 coll.append_without_event(c)
1516 else:
1517 dest_impl = dest_state.get_impl(self.key)
1518 assert is_has_collection_adapter(dest_impl)
1519 dest_impl.set(
1520 dest_state,
1521 dest_dict,
1522 dest_list,
1523 _adapt=False,
1524 passive=PassiveFlag.PASSIVE_MERGE,
1525 )
1526 else:
1527 current = source_dict[self.key]
1528 if current is not None:
1529 current_state = attributes.instance_state(current)
1530 current_dict = attributes.instance_dict(current)
1531 _recursive[(current_state, self)] = True
1532 obj = session._merge(
1533 current_state,
1534 current_dict,
1535 load=load,
1536 _recursive=_recursive,
1537 _resolve_conflict_map=_resolve_conflict_map,
1538 )
1539 else:
1540 obj = None
1541
1542 if not load:
1543 dest_dict[self.key] = obj
1544 else:
1545 dest_state.get_impl(self.key).set(
1546 dest_state, dest_dict, obj, None
1547 )
1548
1549 def _value_as_iterable(
1550 self,
1551 state: InstanceState[_O],
1552 dict_: _InstanceDict,
1553 key: str,
1554 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1555 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1556 """Return a list of tuples (state, obj) for the given
1557 key.
1558
1559 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1560 """
1561
1562 impl = state.manager[key].impl
1563 x = impl.get(state, dict_, passive=passive)
1564 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1565 return []
1566 elif is_has_collection_adapter(impl):
1567 return [
1568 (attributes.instance_state(o), o)
1569 for o in impl.get_collection(state, dict_, x, passive=passive)
1570 ]
1571 else:
1572 return [(attributes.instance_state(x), x)]
1573
1574 def cascade_iterator(
1575 self,
1576 type_: str,
1577 state: InstanceState[Any],
1578 dict_: _InstanceDict,
1579 visited_states: Set[InstanceState[Any]],
1580 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1581 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1582 # assert type_ in self._cascade
1583
1584 # only actively lazy load on the 'delete' cascade
1585 if type_ != "delete" or self.passive_deletes:
1586 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1587 else:
1588 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1589
1590 if type_ == "save-update":
1591 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1592 else:
1593 tuples = self._value_as_iterable(
1594 state, dict_, self.key, passive=passive
1595 )
1596
1597 skip_pending = (
1598 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1599 )
1600
1601 for instance_state, c in tuples:
1602 if instance_state in visited_states:
1603 continue
1604
1605 if c is None:
1606 # would like to emit a warning here, but
1607 # would not be consistent with collection.append(None)
1608 # current behavior of silently skipping.
1609 # see [ticket:2229]
1610 continue
1611
1612 assert instance_state is not None
1613 instance_dict = attributes.instance_dict(c)
1614
1615 if halt_on and halt_on(instance_state):
1616 continue
1617
1618 if skip_pending and not instance_state.key:
1619 continue
1620
1621 instance_mapper = instance_state.manager.mapper
1622
1623 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1624 raise AssertionError(
1625 "Attribute '%s' on class '%s' "
1626 "doesn't handle objects "
1627 "of type '%s'"
1628 % (self.key, self.parent.class_, c.__class__)
1629 )
1630
1631 visited_states.add(instance_state)
1632
1633 yield c, instance_mapper, instance_state, instance_dict
1634
1635 @property
1636 def _effective_sync_backref(self) -> bool:
1637 if self.viewonly:
1638 return False
1639 else:
1640 return self.sync_backref is not False
1641
1642 @staticmethod
1643 def _check_sync_backref(
1644 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1645 ) -> None:
1646 if rel_a.viewonly and rel_b.sync_backref:
1647 raise sa_exc.InvalidRequestError(
1648 "Relationship %s cannot specify sync_backref=True since %s "
1649 "includes viewonly=True." % (rel_b, rel_a)
1650 )
1651 if (
1652 rel_a.viewonly
1653 and not rel_b.viewonly
1654 and rel_b.sync_backref is not False
1655 ):
1656 rel_b.sync_backref = False
1657
1658 def _add_reverse_property(self, key: str) -> None:
1659 other = self.mapper.get_property(key, _configure_mappers=False)
1660 if not isinstance(other, RelationshipProperty):
1661 raise sa_exc.InvalidRequestError(
1662 "back_populates on relationship '%s' refers to attribute '%s' "
1663 "that is not a relationship. The back_populates parameter "
1664 "should refer to the name of a relationship on the target "
1665 "class." % (self, other)
1666 )
1667 # viewonly and sync_backref cases
1668 # 1. self.viewonly==True and other.sync_backref==True -> error
1669 # 2. self.viewonly==True and other.viewonly==False and
1670 # other.sync_backref==None -> warn sync_backref=False, set to False
1671 self._check_sync_backref(self, other)
1672 # 3. other.viewonly==True and self.sync_backref==True -> error
1673 # 4. other.viewonly==True and self.viewonly==False and
1674 # self.sync_backref==None -> warn sync_backref=False, set to False
1675 self._check_sync_backref(other, self)
1676
1677 self._reverse_property.add(other)
1678 other._reverse_property.add(self)
1679
1680 other._setup_entity()
1681
1682 if not other.mapper.common_parent(self.parent):
1683 raise sa_exc.ArgumentError(
1684 "reverse_property %r on "
1685 "relationship %s references relationship %s, which "
1686 "does not reference mapper %s"
1687 % (key, self, other, self.parent)
1688 )
1689
1690 if (
1691 other._configure_started
1692 and self.direction in (ONETOMANY, MANYTOONE)
1693 and self.direction == other.direction
1694 ):
1695 raise sa_exc.ArgumentError(
1696 "%s and back-reference %s are "
1697 "both of the same direction %r. Did you mean to "
1698 "set remote_side on the many-to-one side ?"
1699 % (other, self, self.direction)
1700 )
1701
1702 @util.memoized_property
1703 def entity(self) -> _InternalEntityType[_T]:
1704 """Return the target mapped entity, which is an inspect() of the
1705 class or aliased class that is referenced by this
1706 :class:`.RelationshipProperty`.
1707
1708 """
1709 self.parent._check_configure()
1710 return self.entity
1711
1712 @util.memoized_property
1713 def mapper(self) -> Mapper[_T]:
1714 """Return the targeted :class:`_orm.Mapper` for this
1715 :class:`.RelationshipProperty`.
1716
1717 """
1718 return self.entity.mapper
1719
1720 def do_init(self) -> None:
1721 self._process_dependent_arguments()
1722 self._setup_entity()
1723 self._setup_registry_dependencies()
1724 self._setup_join_conditions()
1725 self._check_cascade_settings(self._cascade)
1726 self._post_init()
1727 self._generate_backref()
1728 self._join_condition._warn_for_conflicting_sync_targets()
1729 super().do_init()
1730 self._lazy_strategy = cast(
1731 "_LazyLoader", self._get_strategy((("lazy", "select"),))
1732 )
1733
1734 def _setup_registry_dependencies(self) -> None:
1735 self.parent.mapper.registry._set_depends_on(
1736 self.entity.mapper.registry
1737 )
1738
1739 def _process_dependent_arguments(self) -> None:
1740 """Convert incoming configuration arguments to their
1741 proper form.
1742
1743 Callables are resolved, ORM annotations removed.
1744
1745 """
1746
1747 # accept callables for other attributes which may require
1748 # deferred initialization. This technique is used
1749 # by declarative "string configs" and some recipes.
1750 init_args = self._init_args
1751
1752 for attr in (
1753 "order_by",
1754 "primaryjoin",
1755 "secondaryjoin",
1756 "secondary",
1757 "foreign_keys",
1758 "remote_side",
1759 "back_populates",
1760 ):
1761 rel_arg = getattr(init_args, attr)
1762
1763 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1764
1765 # remove "annotations" which are present if mapped class
1766 # descriptors are used to create the join expression.
1767 for attr in "primaryjoin", "secondaryjoin":
1768 rel_arg = getattr(init_args, attr)
1769 val = rel_arg.resolved
1770 if val is not None:
1771 rel_arg.resolved = coercions.expect(
1772 roles.ColumnArgumentRole, val, argname=attr
1773 )
1774
1775 secondary = init_args.secondary.resolved
1776 if secondary is not None and _is_mapped_class(secondary):
1777 raise sa_exc.ArgumentError(
1778 "secondary argument %s passed to to relationship() %s must "
1779 "be a Table object or other FROM clause; can't send a mapped "
1780 "class directly as rows in 'secondary' are persisted "
1781 "independently of a class that is mapped "
1782 "to that same table." % (secondary, self)
1783 )
1784
1785 # ensure expressions in self.order_by, foreign_keys,
1786 # remote_side are all columns, not strings.
1787 if (
1788 init_args.order_by.resolved is not False
1789 and init_args.order_by.resolved is not None
1790 ):
1791 self.order_by = tuple(
1792 coercions.expect(
1793 roles.ColumnArgumentRole, x, argname="order_by"
1794 )
1795 for x in util.to_list(init_args.order_by.resolved)
1796 )
1797 else:
1798 self.order_by = False
1799
1800 self._user_defined_foreign_keys = util.column_set(
1801 coercions.expect(
1802 roles.ColumnArgumentRole, x, argname="foreign_keys"
1803 )
1804 for x in util.to_column_set(init_args.foreign_keys.resolved)
1805 )
1806
1807 self.remote_side = util.column_set(
1808 coercions.expect(
1809 roles.ColumnArgumentRole, x, argname="remote_side"
1810 )
1811 for x in util.to_column_set(init_args.remote_side.resolved)
1812 )
1813
1814 def declarative_scan(
1815 self,
1816 decl_scan: _DeclarativeMapperConfig,
1817 registry: _RegistryType,
1818 cls: Type[Any],
1819 originating_module: Optional[str],
1820 key: str,
1821 mapped_container: Optional[Type[Mapped[Any]]],
1822 annotation: Optional[_AnnotationScanType],
1823 extracted_mapped_annotation: Optional[_AnnotationScanType],
1824 is_dataclass_field: bool,
1825 ) -> None:
1826 if extracted_mapped_annotation is None:
1827 if self.argument is None:
1828 self._raise_for_required(key, cls)
1829 else:
1830 return
1831
1832 argument = extracted_mapped_annotation
1833 assert originating_module is not None
1834
1835 if mapped_container is not None:
1836 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1837 is_dynamic = issubclass(mapped_container, DynamicMapped)
1838 if is_write_only:
1839 self.lazy = "write_only"
1840 self.strategy_key = (("lazy", self.lazy),)
1841 elif is_dynamic:
1842 self.lazy = "dynamic"
1843 self.strategy_key = (("lazy", self.lazy),)
1844 else:
1845 is_write_only = is_dynamic = False
1846
1847 argument = de_optionalize_union_types(argument)
1848
1849 if hasattr(argument, "__origin__"):
1850 arg_origin = argument.__origin__
1851 if isinstance(arg_origin, type) and issubclass(
1852 arg_origin, abc.Collection
1853 ):
1854 if self.collection_class is None:
1855 if _py_inspect.isabstract(arg_origin):
1856 raise sa_exc.ArgumentError(
1857 f"Collection annotation type {arg_origin} cannot "
1858 "be instantiated; please provide an explicit "
1859 "'collection_class' parameter "
1860 "(e.g. list, set, etc.) to the "
1861 "relationship() function to accompany this "
1862 "annotation"
1863 )
1864
1865 self.collection_class = arg_origin
1866
1867 elif not is_write_only and not is_dynamic:
1868 self.uselist = False
1869
1870 if argument.__args__: # type: ignore
1871 if isinstance(arg_origin, type) and issubclass(
1872 arg_origin, typing.Mapping
1873 ):
1874 type_arg = argument.__args__[-1] # type: ignore
1875 else:
1876 type_arg = argument.__args__[0] # type: ignore
1877 if hasattr(type_arg, "__forward_arg__"):
1878 str_argument = type_arg.__forward_arg__
1879
1880 argument = resolve_name_to_real_class_name(
1881 str_argument, originating_module
1882 )
1883 else:
1884 argument = type_arg
1885 else:
1886 raise sa_exc.ArgumentError(
1887 f"Generic alias {argument} requires an argument"
1888 )
1889 elif hasattr(argument, "__forward_arg__"):
1890 argument = argument.__forward_arg__
1891
1892 argument = resolve_name_to_real_class_name(
1893 argument, originating_module
1894 )
1895
1896 if (
1897 self.collection_class is None
1898 and not is_write_only
1899 and not is_dynamic
1900 ):
1901 self.uselist = False
1902
1903 # ticket #8759
1904 # if a lead argument was given to relationship(), like
1905 # `relationship("B")`, use that, don't replace it with class we
1906 # found in the annotation. The declarative_scan() method call here is
1907 # still useful, as we continue to derive collection type and do
1908 # checking of the annotation in any case.
1909 if self.argument is None:
1910 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1911
1912 if (
1913 self._attribute_options.dataclasses_default_factory
1914 is not _NoArg.NO_ARG
1915 and self._attribute_options.dataclasses_default_factory
1916 is not self.collection_class
1917 ):
1918 raise sa_exc.ArgumentError(
1919 f"For relationship {self._format_as_string(cls, key)} using "
1920 "dataclass options, default_factory must be exactly "
1921 f"{self.collection_class}"
1922 )
1923
1924 @util.preload_module("sqlalchemy.orm.mapper")
1925 def _setup_entity(self, __argument: Any = None, /) -> None:
1926 if "entity" in self.__dict__:
1927 return
1928
1929 mapperlib = util.preloaded.orm_mapper
1930
1931 if __argument:
1932 argument = __argument
1933 else:
1934 argument = self.argument
1935
1936 resolved_argument: _ExternalEntityType[Any]
1937
1938 if isinstance(argument, str):
1939 # we might want to cleanup clsregistry API to make this
1940 # more straightforward
1941 resolved_argument = cast(
1942 "_ExternalEntityType[Any]",
1943 self._clsregistry_resolve_name(argument)(),
1944 )
1945 elif callable(argument) and not isinstance(
1946 argument, (type, mapperlib.Mapper)
1947 ):
1948 resolved_argument = argument()
1949 else:
1950 resolved_argument = argument
1951
1952 entity: _InternalEntityType[Any]
1953
1954 if isinstance(resolved_argument, type):
1955 entity = class_mapper(resolved_argument, configure=False)
1956 else:
1957 try:
1958 entity = inspect(resolved_argument)
1959 except sa_exc.NoInspectionAvailable:
1960 entity = None # type: ignore
1961
1962 if not hasattr(entity, "mapper"):
1963 raise sa_exc.ArgumentError(
1964 "relationship '%s' expects "
1965 "a class or a mapper argument (received: %s)"
1966 % (self.key, type(resolved_argument))
1967 )
1968
1969 self.entity = entity
1970 self.target = self.entity.persist_selectable
1971
1972 def _setup_join_conditions(self) -> None:
1973 self._join_condition = jc = _JoinCondition(
1974 parent_persist_selectable=self.parent.persist_selectable,
1975 child_persist_selectable=self.entity.persist_selectable,
1976 parent_local_selectable=self.parent.local_table,
1977 child_local_selectable=self.entity.local_table,
1978 primaryjoin=self._init_args.primaryjoin.resolved,
1979 secondary=self._init_args.secondary.resolved,
1980 secondaryjoin=self._init_args.secondaryjoin.resolved,
1981 parent_equivalents=self.parent._equivalent_columns,
1982 child_equivalents=self.mapper._equivalent_columns,
1983 consider_as_foreign_keys=self._user_defined_foreign_keys,
1984 local_remote_pairs=self.local_remote_pairs,
1985 remote_side=self.remote_side,
1986 self_referential=self._is_self_referential,
1987 prop=self,
1988 support_sync=not self.viewonly,
1989 can_be_synced_fn=self._columns_are_mapped,
1990 )
1991 self.primaryjoin = jc.primaryjoin
1992 self.secondaryjoin = jc.secondaryjoin
1993 self.secondary = jc.secondary
1994 self.direction = jc.direction
1995 self.local_remote_pairs = jc.local_remote_pairs
1996 self.remote_side = jc.remote_columns
1997 self.local_columns = jc.local_columns
1998 self.synchronize_pairs = jc.synchronize_pairs
1999 self._calculated_foreign_keys = jc.foreign_key_columns
2000 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
2001
2002 @property
2003 def _clsregistry_resolve_arg(
2004 self,
2005 ) -> Callable[[str, bool], _class_resolver]:
2006 return self._clsregistry_resolvers[1]
2007
2008 @property
2009 def _clsregistry_resolve_name(
2010 self,
2011 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
2012 return self._clsregistry_resolvers[0]
2013
2014 @util.memoized_property
2015 @util.preload_module("sqlalchemy.orm.clsregistry")
2016 def _clsregistry_resolvers(
2017 self,
2018 ) -> Tuple[
2019 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
2020 Callable[[str, bool], _class_resolver],
2021 ]:
2022 _resolver = util.preloaded.orm_clsregistry._resolver
2023
2024 return _resolver(self.parent.class_, self)
2025
2026 @property
2027 def cascade(self) -> CascadeOptions:
2028 """Return the current cascade setting for this
2029 :class:`.RelationshipProperty`.
2030 """
2031 return self._cascade
2032
2033 @cascade.setter
2034 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
2035 self._set_cascade(cascade)
2036
2037 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
2038 cascade = CascadeOptions(cascade_arg)
2039
2040 if self.viewonly:
2041 cascade = CascadeOptions(
2042 cascade.intersection(CascadeOptions._viewonly_cascades)
2043 )
2044
2045 if "mapper" in self.__dict__:
2046 self._check_cascade_settings(cascade)
2047 self._cascade = cascade
2048
2049 if self._dependency_processor:
2050 self._dependency_processor.cascade = cascade
2051
2052 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
2053 if (
2054 cascade.delete_orphan
2055 and not self.single_parent
2056 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
2057 ):
2058 raise sa_exc.ArgumentError(
2059 "For %(direction)s relationship %(rel)s, delete-orphan "
2060 "cascade is normally "
2061 'configured only on the "one" side of a one-to-many '
2062 "relationship, "
2063 'and not on the "many" side of a many-to-one or many-to-many '
2064 "relationship. "
2065 "To force this relationship to allow a particular "
2066 '"%(relatedcls)s" object to be referenced by only '
2067 'a single "%(clsname)s" object at a time via the '
2068 "%(rel)s relationship, which "
2069 "would allow "
2070 "delete-orphan cascade to take place in this direction, set "
2071 "the single_parent=True flag."
2072 % {
2073 "rel": self,
2074 "direction": (
2075 "many-to-one"
2076 if self.direction is MANYTOONE
2077 else "many-to-many"
2078 ),
2079 "clsname": self.parent.class_.__name__,
2080 "relatedcls": self.mapper.class_.__name__,
2081 },
2082 code="bbf0",
2083 )
2084
2085 if self.passive_deletes == "all" and (
2086 "delete" in cascade or "delete-orphan" in cascade
2087 ):
2088 raise sa_exc.ArgumentError(
2089 "On %s, can't set passive_deletes='all' in conjunction "
2090 "with 'delete' or 'delete-orphan' cascade" % self
2091 )
2092
2093 if cascade.delete_orphan:
2094 self.mapper.primary_mapper()._delete_orphans.append(
2095 (self.key, self.parent.class_)
2096 )
2097
2098 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2099 """Return True if this property will persist values on behalf
2100 of the given mapper.
2101
2102 """
2103
2104 return (
2105 self.key in mapper.relationships
2106 and mapper.relationships[self.key] is self
2107 )
2108
2109 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2110 """Return True if all columns in the given collection are
2111 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2112
2113 """
2114
2115 secondary = self._init_args.secondary.resolved
2116 for c in cols:
2117 if secondary is not None and secondary.c.contains_column(c):
2118 continue
2119 if not self.parent.persist_selectable.c.contains_column(
2120 c
2121 ) and not self.target.c.contains_column(c):
2122 return False
2123 return True
2124
2125 def _generate_backref(self) -> None:
2126 """Interpret the 'backref' instruction to create a
2127 :func:`_orm.relationship` complementary to this one."""
2128
2129 resolve_back_populates = self._init_args.back_populates.resolved
2130
2131 if self.backref is not None and not resolve_back_populates:
2132 kwargs: Dict[str, Any]
2133 if isinstance(self.backref, str):
2134 backref_key, kwargs = self.backref, {}
2135 else:
2136 backref_key, kwargs = self.backref
2137 mapper = self.mapper.primary_mapper()
2138
2139 if not mapper.concrete:
2140 check = set(mapper.iterate_to_root()).union(
2141 mapper.self_and_descendants
2142 )
2143 for m in check:
2144 if m.has_property(backref_key) and not m.concrete:
2145 raise sa_exc.ArgumentError(
2146 "Error creating backref "
2147 "'%s' on relationship '%s': property of that "
2148 "name exists on mapper '%s'"
2149 % (backref_key, self, m)
2150 )
2151
2152 # determine primaryjoin/secondaryjoin for the
2153 # backref. Use the one we had, so that
2154 # a custom join doesn't have to be specified in
2155 # both directions.
2156 if self.secondary is not None:
2157 # for many to many, just switch primaryjoin/
2158 # secondaryjoin. use the annotated
2159 # pj/sj on the _join_condition.
2160 pj = kwargs.pop(
2161 "primaryjoin",
2162 self._join_condition.secondaryjoin_minus_local,
2163 )
2164 sj = kwargs.pop(
2165 "secondaryjoin",
2166 self._join_condition.primaryjoin_minus_local,
2167 )
2168 else:
2169 pj = kwargs.pop(
2170 "primaryjoin",
2171 self._join_condition.primaryjoin_reverse_remote,
2172 )
2173 sj = kwargs.pop("secondaryjoin", None)
2174 if sj:
2175 raise sa_exc.InvalidRequestError(
2176 "Can't assign 'secondaryjoin' on a backref "
2177 "against a non-secondary relationship."
2178 )
2179
2180 foreign_keys = kwargs.pop(
2181 "foreign_keys", self._user_defined_foreign_keys
2182 )
2183 parent = self.parent.primary_mapper()
2184 kwargs.setdefault("viewonly", self.viewonly)
2185 kwargs.setdefault("post_update", self.post_update)
2186 kwargs.setdefault("passive_updates", self.passive_updates)
2187 kwargs.setdefault("sync_backref", self.sync_backref)
2188 self.back_populates = backref_key
2189 relationship = RelationshipProperty(
2190 parent,
2191 self.secondary,
2192 primaryjoin=pj,
2193 secondaryjoin=sj,
2194 foreign_keys=foreign_keys,
2195 back_populates=self.key,
2196 **kwargs,
2197 )
2198 mapper._configure_property(
2199 backref_key, relationship, warn_for_existing=True
2200 )
2201
2202 if resolve_back_populates:
2203 if isinstance(resolve_back_populates, PropComparator):
2204 back_populates = resolve_back_populates.prop.key
2205 elif isinstance(resolve_back_populates, str):
2206 back_populates = resolve_back_populates
2207 else:
2208 # need test coverage for this case as well
2209 raise sa_exc.ArgumentError(
2210 f"Invalid back_populates value: {resolve_back_populates!r}"
2211 )
2212
2213 self._add_reverse_property(back_populates)
2214
2215 @util.preload_module("sqlalchemy.orm.dependency")
2216 def _post_init(self) -> None:
2217 dependency = util.preloaded.orm_dependency
2218
2219 if self.uselist is None:
2220 self.uselist = self.direction is not MANYTOONE
2221 if not self.viewonly:
2222 self._dependency_processor = ( # type: ignore
2223 dependency._DependencyProcessor.from_relationship
2224 )(self)
2225
2226 if (
2227 self.uselist
2228 and self._attribute_options.dataclasses_default
2229 is not _NoArg.NO_ARG
2230 ):
2231 raise sa_exc.ArgumentError(
2232 f"On relationship {self}, the dataclass default for "
2233 "relationship may only be set for "
2234 "a relationship that references a scalar value, i.e. "
2235 "many-to-one or explicitly uselist=False"
2236 )
2237
2238 @util.memoized_property
2239 def _use_get(self) -> bool:
2240 """memoize the 'use_get' attribute of this RelationshipLoader's
2241 lazyloader."""
2242
2243 strategy = self._lazy_strategy
2244 return strategy.use_get
2245
2246 @util.memoized_property
2247 def _is_self_referential(self) -> bool:
2248 return self.mapper.common_parent(self.parent)
2249
2250 def _create_joins(
2251 self,
2252 source_polymorphic: bool = False,
2253 source_selectable: Optional[FromClause] = None,
2254 dest_selectable: Optional[FromClause] = None,
2255 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2256 alias_secondary: bool = False,
2257 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2258 ) -> Tuple[
2259 ColumnElement[bool],
2260 Optional[ColumnElement[bool]],
2261 FromClause,
2262 FromClause,
2263 Optional[FromClause],
2264 Optional[ClauseAdapter],
2265 ]:
2266 aliased = False
2267
2268 if alias_secondary and self.secondary is not None:
2269 aliased = True
2270
2271 if source_selectable is None:
2272 if source_polymorphic and self.parent.with_polymorphic:
2273 source_selectable = self.parent._with_polymorphic_selectable
2274
2275 if of_type_entity:
2276 dest_mapper = of_type_entity.mapper
2277 if dest_selectable is None:
2278 dest_selectable = of_type_entity.selectable
2279 aliased = True
2280 else:
2281 dest_mapper = self.mapper
2282
2283 if dest_selectable is None:
2284 dest_selectable = self.entity.selectable
2285 if self.mapper.with_polymorphic:
2286 aliased = True
2287
2288 if self._is_self_referential and source_selectable is None:
2289 dest_selectable = dest_selectable._anonymous_fromclause()
2290 aliased = True
2291 elif (
2292 dest_selectable is not self.mapper._with_polymorphic_selectable
2293 or self.mapper.with_polymorphic
2294 ):
2295 aliased = True
2296
2297 single_crit = dest_mapper._single_table_criterion
2298 aliased = aliased or (
2299 source_selectable is not None
2300 and (
2301 source_selectable
2302 is not self.parent._with_polymorphic_selectable
2303 or source_selectable._is_subquery
2304 )
2305 )
2306
2307 (
2308 primaryjoin,
2309 secondaryjoin,
2310 secondary,
2311 target_adapter,
2312 dest_selectable,
2313 ) = self._join_condition.join_targets(
2314 source_selectable,
2315 dest_selectable,
2316 aliased,
2317 single_crit,
2318 extra_criteria,
2319 )
2320 if source_selectable is None:
2321 source_selectable = self.parent.local_table
2322 if dest_selectable is None:
2323 dest_selectable = self.entity.local_table
2324 return (
2325 primaryjoin,
2326 secondaryjoin,
2327 source_selectable,
2328 dest_selectable,
2329 secondary,
2330 target_adapter,
2331 )
2332
2333
2334def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2335 def clone(elem: _CE) -> _CE:
2336 if isinstance(elem, expression.ColumnClause):
2337 elem = elem._annotate(annotations.copy()) # type: ignore
2338 elem._copy_internals(clone=clone)
2339 return elem
2340
2341 if element is not None:
2342 element = clone(element)
2343 clone = None # type: ignore # remove gc cycles
2344 return element
2345
2346
2347class _JoinCondition:
2348 primaryjoin_initial: Optional[ColumnElement[bool]]
2349 primaryjoin: ColumnElement[bool]
2350 secondaryjoin: Optional[ColumnElement[bool]]
2351 secondary: Optional[FromClause]
2352 prop: RelationshipProperty[Any]
2353
2354 synchronize_pairs: _ColumnPairs
2355 secondary_synchronize_pairs: _ColumnPairs
2356 direction: RelationshipDirection
2357
2358 parent_persist_selectable: FromClause
2359 child_persist_selectable: FromClause
2360 parent_local_selectable: FromClause
2361 child_local_selectable: FromClause
2362
2363 _local_remote_pairs: Optional[_ColumnPairs]
2364
2365 def __init__(
2366 self,
2367 parent_persist_selectable: FromClause,
2368 child_persist_selectable: FromClause,
2369 parent_local_selectable: FromClause,
2370 child_local_selectable: FromClause,
2371 *,
2372 primaryjoin: Optional[ColumnElement[bool]] = None,
2373 secondary: Optional[FromClause] = None,
2374 secondaryjoin: Optional[ColumnElement[bool]] = None,
2375 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2376 child_equivalents: Optional[_EquivalentColumnMap] = None,
2377 consider_as_foreign_keys: Any = None,
2378 local_remote_pairs: Optional[_ColumnPairs] = None,
2379 remote_side: Any = None,
2380 self_referential: Any = False,
2381 prop: RelationshipProperty[Any],
2382 support_sync: bool = True,
2383 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2384 ):
2385 self.parent_persist_selectable = parent_persist_selectable
2386 self.parent_local_selectable = parent_local_selectable
2387 self.child_persist_selectable = child_persist_selectable
2388 self.child_local_selectable = child_local_selectable
2389 self.parent_equivalents = parent_equivalents
2390 self.child_equivalents = child_equivalents
2391 self.primaryjoin_initial = primaryjoin
2392 self.secondaryjoin = secondaryjoin
2393 self.secondary = secondary
2394 self.consider_as_foreign_keys = consider_as_foreign_keys
2395 self._local_remote_pairs = local_remote_pairs
2396 self._remote_side = remote_side
2397 self.prop = prop
2398 self.self_referential = self_referential
2399 self.support_sync = support_sync
2400 self.can_be_synced_fn = can_be_synced_fn
2401
2402 self._determine_joins()
2403 assert self.primaryjoin is not None
2404
2405 self._annotate_fks()
2406 self._annotate_remote()
2407 self._annotate_local()
2408 self._annotate_parentmapper()
2409 self._setup_pairs()
2410 self._check_foreign_cols(self.primaryjoin, True)
2411 if self.secondaryjoin is not None:
2412 self._check_foreign_cols(self.secondaryjoin, False)
2413 self._determine_direction()
2414 self._check_remote_side()
2415 self._log_joins()
2416
2417 def _log_joins(self) -> None:
2418 log = self.prop.logger
2419 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2420 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2421 log.info(
2422 "%s synchronize pairs [%s]",
2423 self.prop,
2424 ",".join(
2425 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2426 ),
2427 )
2428 log.info(
2429 "%s secondary synchronize pairs [%s]",
2430 self.prop,
2431 ",".join(
2432 "(%s => %s)" % (l, r)
2433 for (l, r) in self.secondary_synchronize_pairs or []
2434 ),
2435 )
2436 log.info(
2437 "%s local/remote pairs [%s]",
2438 self.prop,
2439 ",".join(
2440 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2441 ),
2442 )
2443 log.info(
2444 "%s remote columns [%s]",
2445 self.prop,
2446 ",".join("%s" % col for col in self.remote_columns),
2447 )
2448 log.info(
2449 "%s local columns [%s]",
2450 self.prop,
2451 ",".join("%s" % col for col in self.local_columns),
2452 )
2453 log.info("%s relationship direction %s", self.prop, self.direction)
2454
2455 def _determine_joins(self) -> None:
2456 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2457 if not passed to the constructor already.
2458
2459 This is based on analysis of the foreign key relationships
2460 between the parent and target mapped selectables.
2461
2462 """
2463 if self.secondaryjoin is not None and self.secondary is None:
2464 raise sa_exc.ArgumentError(
2465 "Property %s specified with secondary "
2466 "join condition but "
2467 "no secondary argument" % self.prop
2468 )
2469
2470 # find a join between the given mapper's mapped table and
2471 # the given table. will try the mapper's local table first
2472 # for more specificity, then if not found will try the more
2473 # general mapped table, which in the case of inheritance is
2474 # a join.
2475 try:
2476 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2477 if self.secondary is not None:
2478 if self.secondaryjoin is None:
2479 self.secondaryjoin = join_condition(
2480 self.child_persist_selectable,
2481 self.secondary,
2482 a_subset=self.child_local_selectable,
2483 consider_as_foreign_keys=consider_as_foreign_keys,
2484 )
2485 if self.primaryjoin_initial is None:
2486 self.primaryjoin = join_condition(
2487 self.parent_persist_selectable,
2488 self.secondary,
2489 a_subset=self.parent_local_selectable,
2490 consider_as_foreign_keys=consider_as_foreign_keys,
2491 )
2492 else:
2493 self.primaryjoin = self.primaryjoin_initial
2494 else:
2495 if self.primaryjoin_initial is None:
2496 self.primaryjoin = join_condition(
2497 self.parent_persist_selectable,
2498 self.child_persist_selectable,
2499 a_subset=self.parent_local_selectable,
2500 consider_as_foreign_keys=consider_as_foreign_keys,
2501 )
2502 else:
2503 self.primaryjoin = self.primaryjoin_initial
2504 except sa_exc.NoForeignKeysError as nfe:
2505 if self.secondary is not None:
2506 raise sa_exc.NoForeignKeysError(
2507 "Could not determine join "
2508 "condition between parent/child tables on "
2509 "relationship %s - there are no foreign keys "
2510 "linking these tables via secondary table '%s'. "
2511 "Ensure that referencing columns are associated "
2512 "with a ForeignKey or ForeignKeyConstraint, or "
2513 "specify 'primaryjoin' and 'secondaryjoin' "
2514 "expressions." % (self.prop, self.secondary)
2515 ) from nfe
2516 else:
2517 raise sa_exc.NoForeignKeysError(
2518 "Could not determine join "
2519 "condition between parent/child tables on "
2520 "relationship %s - there are no foreign keys "
2521 "linking these tables. "
2522 "Ensure that referencing columns are associated "
2523 "with a ForeignKey or ForeignKeyConstraint, or "
2524 "specify a 'primaryjoin' expression." % self.prop
2525 ) from nfe
2526 except sa_exc.AmbiguousForeignKeysError as afe:
2527 if self.secondary is not None:
2528 raise sa_exc.AmbiguousForeignKeysError(
2529 "Could not determine join "
2530 "condition between parent/child tables on "
2531 "relationship %s - there are multiple foreign key "
2532 "paths linking the tables via secondary table '%s'. "
2533 "Specify the 'foreign_keys' "
2534 "argument, providing a list of those columns which "
2535 "should be counted as containing a foreign key "
2536 "reference from the secondary table to each of the "
2537 "parent and child tables." % (self.prop, self.secondary)
2538 ) from afe
2539 else:
2540 raise sa_exc.AmbiguousForeignKeysError(
2541 "Could not determine join "
2542 "condition between parent/child tables on "
2543 "relationship %s - there are multiple foreign key "
2544 "paths linking the tables. Specify the "
2545 "'foreign_keys' argument, providing a list of those "
2546 "columns which should be counted as containing a "
2547 "foreign key reference to the parent table." % self.prop
2548 ) from afe
2549
2550 @property
2551 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2552 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2553
2554 @property
2555 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2556 assert self.secondaryjoin is not None
2557 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2558
2559 @util.memoized_property
2560 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2561 """Return the primaryjoin condition suitable for the
2562 "reverse" direction.
2563
2564 If the primaryjoin was delivered here with pre-existing
2565 "remote" annotations, the local/remote annotations
2566 are reversed. Otherwise, the local/remote annotations
2567 are removed.
2568
2569 """
2570 if self._has_remote_annotations:
2571
2572 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2573 if "remote" in element._annotations:
2574 v = dict(element._annotations)
2575 del v["remote"]
2576 v["local"] = True
2577 return element._with_annotations(v)
2578 elif "local" in element._annotations:
2579 v = dict(element._annotations)
2580 del v["local"]
2581 v["remote"] = True
2582 return element._with_annotations(v)
2583
2584 return None
2585
2586 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2587 else:
2588 if self._has_foreign_annotations:
2589 # TODO: coverage
2590 return _deep_deannotate(
2591 self.primaryjoin, values=("local", "remote")
2592 )
2593 else:
2594 return _deep_deannotate(self.primaryjoin)
2595
2596 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2597 for col in visitors.iterate(clause, {}):
2598 if annotation in col._annotations:
2599 return True
2600 else:
2601 return False
2602
2603 @util.memoized_property
2604 def _has_foreign_annotations(self) -> bool:
2605 return self._has_annotation(self.primaryjoin, "foreign")
2606
2607 @util.memoized_property
2608 def _has_remote_annotations(self) -> bool:
2609 return self._has_annotation(self.primaryjoin, "remote")
2610
2611 def _annotate_fks(self) -> None:
2612 """Annotate the primaryjoin and secondaryjoin
2613 structures with 'foreign' annotations marking columns
2614 considered as foreign.
2615
2616 """
2617 if self._has_foreign_annotations:
2618 return
2619
2620 if self.consider_as_foreign_keys:
2621 self._annotate_from_fk_list()
2622 else:
2623 self._annotate_present_fks()
2624
2625 def _annotate_from_fk_list(self) -> None:
2626 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2627 if element in self.consider_as_foreign_keys:
2628 return element._annotate({"foreign": True})
2629 return None
2630
2631 self.primaryjoin = visitors.replacement_traverse(
2632 self.primaryjoin, {}, check_fk
2633 )
2634 if self.secondaryjoin is not None:
2635 self.secondaryjoin = visitors.replacement_traverse(
2636 self.secondaryjoin, {}, check_fk
2637 )
2638
2639 def _annotate_present_fks(self) -> None:
2640 if self.secondary is not None:
2641 secondarycols = util.column_set(self.secondary.c)
2642 else:
2643 secondarycols = set()
2644
2645 def is_foreign(
2646 a: ColumnElement[Any], b: ColumnElement[Any]
2647 ) -> Optional[ColumnElement[Any]]:
2648 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2649 if a.references(b):
2650 return a
2651 elif b.references(a):
2652 return b
2653
2654 if secondarycols:
2655 if a in secondarycols and b not in secondarycols:
2656 return a
2657 elif b in secondarycols and a not in secondarycols:
2658 return b
2659
2660 return None
2661
2662 def visit_binary(binary: BinaryExpression[Any]) -> None:
2663 if not isinstance(
2664 binary.left, sql.ColumnElement
2665 ) or not isinstance(binary.right, sql.ColumnElement):
2666 return
2667
2668 if (
2669 "foreign" not in binary.left._annotations
2670 and "foreign" not in binary.right._annotations
2671 ):
2672 col = is_foreign(binary.left, binary.right)
2673 if col is not None:
2674 if col.compare(binary.left):
2675 binary.left = binary.left._annotate({"foreign": True})
2676 elif col.compare(binary.right):
2677 binary.right = binary.right._annotate(
2678 {"foreign": True}
2679 )
2680
2681 self.primaryjoin = visitors.cloned_traverse(
2682 self.primaryjoin, {}, {"binary": visit_binary}
2683 )
2684 if self.secondaryjoin is not None:
2685 self.secondaryjoin = visitors.cloned_traverse(
2686 self.secondaryjoin, {}, {"binary": visit_binary}
2687 )
2688
2689 def _refers_to_parent_table(self) -> bool:
2690 """Return True if the join condition contains column
2691 comparisons where both columns are in both tables.
2692
2693 """
2694 pt = self.parent_persist_selectable
2695 mt = self.child_persist_selectable
2696 result = False
2697
2698 def visit_binary(binary: BinaryExpression[Any]) -> None:
2699 nonlocal result
2700 c, f = binary.left, binary.right
2701 if (
2702 isinstance(c, expression.ColumnClause)
2703 and isinstance(f, expression.ColumnClause)
2704 and pt.is_derived_from(c.table)
2705 and pt.is_derived_from(f.table)
2706 and mt.is_derived_from(c.table)
2707 and mt.is_derived_from(f.table)
2708 ):
2709 result = True
2710
2711 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2712 return result
2713
2714 def _tables_overlap(self) -> bool:
2715 """Return True if parent/child tables have some overlap."""
2716
2717 return selectables_overlap(
2718 self.parent_persist_selectable, self.child_persist_selectable
2719 )
2720
2721 def _annotate_remote(self) -> None:
2722 """Annotate the primaryjoin and secondaryjoin
2723 structures with 'remote' annotations marking columns
2724 considered as part of the 'remote' side.
2725
2726 """
2727 if self._has_remote_annotations:
2728 return
2729
2730 if self.secondary is not None:
2731 self._annotate_remote_secondary()
2732 elif self._local_remote_pairs or self._remote_side:
2733 self._annotate_remote_from_args()
2734 elif self._refers_to_parent_table():
2735 self._annotate_selfref(
2736 lambda col: "foreign" in col._annotations, False
2737 )
2738 elif self._tables_overlap():
2739 self._annotate_remote_with_overlap()
2740 else:
2741 self._annotate_remote_distinct_selectables()
2742
2743 def _annotate_remote_secondary(self) -> None:
2744 """annotate 'remote' in primaryjoin, secondaryjoin
2745 when 'secondary' is present.
2746
2747 """
2748
2749 assert self.secondary is not None
2750 fixed_secondary = self.secondary
2751
2752 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2753 if fixed_secondary.c.contains_column(element):
2754 return element._annotate({"remote": True})
2755 return None
2756
2757 self.primaryjoin = visitors.replacement_traverse(
2758 self.primaryjoin, {}, repl
2759 )
2760
2761 assert self.secondaryjoin is not None
2762 self.secondaryjoin = visitors.replacement_traverse(
2763 self.secondaryjoin, {}, repl
2764 )
2765
2766 def _annotate_selfref(
2767 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2768 ) -> None:
2769 """annotate 'remote' in primaryjoin, secondaryjoin
2770 when the relationship is detected as self-referential.
2771
2772 """
2773
2774 def visit_binary(binary: BinaryExpression[Any]) -> None:
2775 equated = binary.left.compare(binary.right)
2776 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2777 binary.right, expression.ColumnClause
2778 ):
2779 # assume one to many - FKs are "remote"
2780 if fn(binary.left):
2781 binary.left = binary.left._annotate({"remote": True})
2782 if fn(binary.right) and not equated:
2783 binary.right = binary.right._annotate({"remote": True})
2784 elif not remote_side_given:
2785 self._warn_non_column_elements()
2786
2787 self.primaryjoin = visitors.cloned_traverse(
2788 self.primaryjoin, {}, {"binary": visit_binary}
2789 )
2790
2791 def _annotate_remote_from_args(self) -> None:
2792 """annotate 'remote' in primaryjoin, secondaryjoin
2793 when the 'remote_side' or '_local_remote_pairs'
2794 arguments are used.
2795
2796 """
2797 if self._local_remote_pairs:
2798 if self._remote_side:
2799 raise sa_exc.ArgumentError(
2800 "remote_side argument is redundant "
2801 "against more detailed _local_remote_side "
2802 "argument."
2803 )
2804
2805 remote_side = [r for (l, r) in self._local_remote_pairs]
2806 else:
2807 remote_side = self._remote_side
2808
2809 if self._refers_to_parent_table():
2810 self._annotate_selfref(lambda col: col in remote_side, True)
2811 else:
2812
2813 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2814 # use set() to avoid generating ``__eq__()`` expressions
2815 # against each element
2816 if element in set(remote_side):
2817 return element._annotate({"remote": True})
2818 return None
2819
2820 self.primaryjoin = visitors.replacement_traverse(
2821 self.primaryjoin, {}, repl
2822 )
2823
2824 def _annotate_remote_with_overlap(self) -> None:
2825 """annotate 'remote' in primaryjoin, secondaryjoin
2826 when the parent/child tables have some set of
2827 tables in common, though is not a fully self-referential
2828 relationship.
2829
2830 """
2831
2832 def visit_binary(binary: BinaryExpression[Any]) -> None:
2833 binary.left, binary.right = proc_left_right(
2834 binary.left, binary.right
2835 )
2836 binary.right, binary.left = proc_left_right(
2837 binary.right, binary.left
2838 )
2839
2840 check_entities = (
2841 self.prop is not None and self.prop.mapper is not self.prop.parent
2842 )
2843
2844 def proc_left_right(
2845 left: ColumnElement[Any], right: ColumnElement[Any]
2846 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2847 if isinstance(left, expression.ColumnClause) and isinstance(
2848 right, expression.ColumnClause
2849 ):
2850 if self.child_persist_selectable.c.contains_column(
2851 right
2852 ) and self.parent_persist_selectable.c.contains_column(left):
2853 right = right._annotate({"remote": True})
2854 elif (
2855 check_entities
2856 and right._annotations.get("parentmapper") is self.prop.mapper
2857 ):
2858 right = right._annotate({"remote": True})
2859 elif (
2860 check_entities
2861 and left._annotations.get("parentmapper") is self.prop.mapper
2862 ):
2863 left = left._annotate({"remote": True})
2864 else:
2865 self._warn_non_column_elements()
2866
2867 return left, right
2868
2869 self.primaryjoin = visitors.cloned_traverse(
2870 self.primaryjoin, {}, {"binary": visit_binary}
2871 )
2872
2873 def _annotate_remote_distinct_selectables(self) -> None:
2874 """annotate 'remote' in primaryjoin, secondaryjoin
2875 when the parent/child tables are entirely
2876 separate.
2877
2878 """
2879
2880 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2881 if self.child_persist_selectable.c.contains_column(element) and (
2882 not self.parent_local_selectable.c.contains_column(element)
2883 or self.child_local_selectable.c.contains_column(element)
2884 ):
2885 return element._annotate({"remote": True})
2886 return None
2887
2888 self.primaryjoin = visitors.replacement_traverse(
2889 self.primaryjoin, {}, repl
2890 )
2891
2892 def _warn_non_column_elements(self) -> None:
2893 util.warn(
2894 "Non-simple column elements in primary "
2895 "join condition for property %s - consider using "
2896 "remote() annotations to mark the remote side." % self.prop
2897 )
2898
2899 def _annotate_local(self) -> None:
2900 """Annotate the primaryjoin and secondaryjoin
2901 structures with 'local' annotations.
2902
2903 This annotates all column elements found
2904 simultaneously in the parent table
2905 and the join condition that don't have a
2906 'remote' annotation set up from
2907 _annotate_remote() or user-defined.
2908
2909 """
2910 if self._has_annotation(self.primaryjoin, "local"):
2911 return
2912
2913 if self._local_remote_pairs:
2914 local_side = util.column_set(
2915 [l for (l, r) in self._local_remote_pairs]
2916 )
2917 else:
2918 local_side = util.column_set(self.parent_persist_selectable.c)
2919
2920 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2921 if "remote" not in element._annotations and element in local_side:
2922 return element._annotate({"local": True})
2923 return None
2924
2925 self.primaryjoin = visitors.replacement_traverse(
2926 self.primaryjoin, {}, locals_
2927 )
2928
2929 def _annotate_parentmapper(self) -> None:
2930 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2931 if "remote" in element._annotations:
2932 return element._annotate({"parentmapper": self.prop.mapper})
2933 elif "local" in element._annotations:
2934 return element._annotate({"parentmapper": self.prop.parent})
2935 return None
2936
2937 self.primaryjoin = visitors.replacement_traverse(
2938 self.primaryjoin, {}, parentmappers_
2939 )
2940
2941 def _check_remote_side(self) -> None:
2942 if not self.local_remote_pairs:
2943 raise sa_exc.ArgumentError(
2944 "Relationship %s could "
2945 "not determine any unambiguous local/remote column "
2946 "pairs based on join condition and remote_side "
2947 "arguments. "
2948 "Consider using the remote() annotation to "
2949 "accurately mark those elements of the join "
2950 "condition that are on the remote side of "
2951 "the relationship." % (self.prop,)
2952 )
2953 else:
2954 not_target = util.column_set(
2955 self.parent_persist_selectable.c
2956 ).difference(self.child_persist_selectable.c)
2957
2958 for _, rmt in self.local_remote_pairs:
2959 if rmt in not_target:
2960 util.warn(
2961 "Expression %s is marked as 'remote', but these "
2962 "column(s) are local to the local side. The "
2963 "remote() annotation is needed only for a "
2964 "self-referential relationship where both sides "
2965 "of the relationship refer to the same tables."
2966 % (rmt,)
2967 )
2968
2969 def _check_foreign_cols(
2970 self, join_condition: ColumnElement[bool], primary: bool
2971 ) -> None:
2972 """Check the foreign key columns collected and emit error
2973 messages."""
2974 foreign_cols = self._gather_columns_with_annotation(
2975 join_condition, "foreign"
2976 )
2977
2978 has_foreign = bool(foreign_cols)
2979
2980 if primary:
2981 can_sync = bool(self.synchronize_pairs)
2982 else:
2983 can_sync = bool(self.secondary_synchronize_pairs)
2984
2985 if (
2986 self.support_sync
2987 and can_sync
2988 or (not self.support_sync and has_foreign)
2989 ):
2990 return
2991
2992 # from here below is just determining the best error message
2993 # to report. Check for a join condition using any operator
2994 # (not just ==), perhaps they need to turn on "viewonly=True".
2995 if self.support_sync and has_foreign and not can_sync:
2996 err = (
2997 "Could not locate any simple equality expressions "
2998 "involving locally mapped foreign key columns for "
2999 "%s join condition "
3000 "'%s' on relationship %s."
3001 % (
3002 primary and "primary" or "secondary",
3003 join_condition,
3004 self.prop,
3005 )
3006 )
3007 err += (
3008 " Ensure that referencing columns are associated "
3009 "with a ForeignKey or ForeignKeyConstraint, or are "
3010 "annotated in the join condition with the foreign() "
3011 "annotation. To allow comparison operators other than "
3012 "'==', the relationship can be marked as viewonly=True."
3013 )
3014
3015 raise sa_exc.ArgumentError(err)
3016 else:
3017 err = (
3018 "Could not locate any relevant foreign key columns "
3019 "for %s join condition '%s' on relationship %s."
3020 % (
3021 primary and "primary" or "secondary",
3022 join_condition,
3023 self.prop,
3024 )
3025 )
3026 err += (
3027 " Ensure that referencing columns are associated "
3028 "with a ForeignKey or ForeignKeyConstraint, or are "
3029 "annotated in the join condition with the foreign() "
3030 "annotation."
3031 )
3032 raise sa_exc.ArgumentError(err)
3033
3034 def _determine_direction(self) -> None:
3035 """Determine if this relationship is one to many, many to one,
3036 many to many.
3037
3038 """
3039 if self.secondaryjoin is not None:
3040 self.direction = MANYTOMANY
3041 else:
3042 parentcols = util.column_set(self.parent_persist_selectable.c)
3043 targetcols = util.column_set(self.child_persist_selectable.c)
3044
3045 # fk collection which suggests ONETOMANY.
3046 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
3047
3048 # fk collection which suggests MANYTOONE.
3049
3050 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
3051
3052 if onetomany_fk and manytoone_fk:
3053 # fks on both sides. test for overlap of local/remote
3054 # with foreign key.
3055 # we will gather columns directly from their annotations
3056 # without deannotating, so that we can distinguish on a column
3057 # that refers to itself.
3058
3059 # 1. columns that are both remote and FK suggest
3060 # onetomany.
3061 onetomany_local = self._gather_columns_with_annotation(
3062 self.primaryjoin, "remote", "foreign"
3063 )
3064
3065 # 2. columns that are FK but are not remote (e.g. local)
3066 # suggest manytoone.
3067 manytoone_local = {
3068 c
3069 for c in self._gather_columns_with_annotation(
3070 self.primaryjoin, "foreign"
3071 )
3072 if "remote" not in c._annotations
3073 }
3074
3075 # 3. if both collections are present, remove columns that
3076 # refer to themselves. This is for the case of
3077 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3078 if onetomany_local and manytoone_local:
3079 self_equated = self.remote_columns.intersection(
3080 self.local_columns
3081 )
3082 onetomany_local = onetomany_local.difference(self_equated)
3083 manytoone_local = manytoone_local.difference(self_equated)
3084
3085 # at this point, if only one or the other collection is
3086 # present, we know the direction, otherwise it's still
3087 # ambiguous.
3088
3089 if onetomany_local and not manytoone_local:
3090 self.direction = ONETOMANY
3091 elif manytoone_local and not onetomany_local:
3092 self.direction = MANYTOONE
3093 else:
3094 raise sa_exc.ArgumentError(
3095 "Can't determine relationship"
3096 " direction for relationship '%s' - foreign "
3097 "key columns within the join condition are present "
3098 "in both the parent and the child's mapped tables. "
3099 "Ensure that only those columns referring "
3100 "to a parent column are marked as foreign, "
3101 "either via the foreign() annotation or "
3102 "via the foreign_keys argument." % self.prop
3103 )
3104 elif onetomany_fk:
3105 self.direction = ONETOMANY
3106 elif manytoone_fk:
3107 self.direction = MANYTOONE
3108 else:
3109 raise sa_exc.ArgumentError(
3110 "Can't determine relationship "
3111 "direction for relationship '%s' - foreign "
3112 "key columns are present in neither the parent "
3113 "nor the child's mapped tables" % self.prop
3114 )
3115
3116 def _deannotate_pairs(
3117 self, collection: _ColumnPairIterable
3118 ) -> _MutableColumnPairs:
3119 """provide deannotation for the various lists of
3120 pairs, so that using them in hashes doesn't incur
3121 high-overhead __eq__() comparisons against
3122 original columns mapped.
3123
3124 """
3125 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3126
3127 def _setup_pairs(self) -> None:
3128 sync_pairs: _MutableColumnPairs = []
3129 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3130 util.OrderedSet([])
3131 )
3132 secondary_sync_pairs: _MutableColumnPairs = []
3133
3134 def go(
3135 joincond: ColumnElement[bool],
3136 collection: _MutableColumnPairs,
3137 ) -> None:
3138 def visit_binary(
3139 binary: BinaryExpression[Any],
3140 left: ColumnElement[Any],
3141 right: ColumnElement[Any],
3142 ) -> None:
3143 if (
3144 "remote" in right._annotations
3145 and "remote" not in left._annotations
3146 and self.can_be_synced_fn(left)
3147 ):
3148 lrp.add((left, right))
3149 elif (
3150 "remote" in left._annotations
3151 and "remote" not in right._annotations
3152 and self.can_be_synced_fn(right)
3153 ):
3154 lrp.add((right, left))
3155 if binary.operator is operators.eq and self.can_be_synced_fn(
3156 left, right
3157 ):
3158 if "foreign" in right._annotations:
3159 collection.append((left, right))
3160 elif "foreign" in left._annotations:
3161 collection.append((right, left))
3162
3163 visit_binary_product(visit_binary, joincond)
3164
3165 for joincond, collection in [
3166 (self.primaryjoin, sync_pairs),
3167 (self.secondaryjoin, secondary_sync_pairs),
3168 ]:
3169 if joincond is None:
3170 continue
3171 go(joincond, collection)
3172
3173 self.local_remote_pairs = self._deannotate_pairs(lrp)
3174 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3175 self.secondary_synchronize_pairs = self._deannotate_pairs(
3176 secondary_sync_pairs
3177 )
3178
3179 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3180 ColumnElement[Any],
3181 weakref.WeakKeyDictionary[
3182 RelationshipProperty[Any], ColumnElement[Any]
3183 ],
3184 ] = weakref.WeakKeyDictionary()
3185
3186 def _warn_for_conflicting_sync_targets(self) -> None:
3187 if not self.support_sync:
3188 return
3189
3190 # we would like to detect if we are synchronizing any column
3191 # pairs in conflict with another relationship that wishes to sync
3192 # an entirely different column to the same target. This is a
3193 # very rare edge case so we will try to minimize the memory/overhead
3194 # impact of this check
3195 for from_, to_ in [
3196 (from_, to_) for (from_, to_) in self.synchronize_pairs
3197 ] + [
3198 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3199 ]:
3200 # save ourselves a ton of memory and overhead by only
3201 # considering columns that are subject to a overlapping
3202 # FK constraints at the core level. This condition can arise
3203 # if multiple relationships overlap foreign() directly, but
3204 # we're going to assume it's typically a ForeignKeyConstraint-
3205 # level configuration that benefits from this warning.
3206
3207 if to_ not in self._track_overlapping_sync_targets:
3208 self._track_overlapping_sync_targets[to_] = (
3209 weakref.WeakKeyDictionary({self.prop: from_})
3210 )
3211 else:
3212 other_props = []
3213 prop_to_from = self._track_overlapping_sync_targets[to_]
3214
3215 for pr, fr_ in prop_to_from.items():
3216 if (
3217 not pr.mapper._dispose_called
3218 and pr not in self.prop._reverse_property
3219 and pr.key not in self.prop._overlaps
3220 and self.prop.key not in pr._overlaps
3221 # note: the "__*" symbol is used internally by
3222 # SQLAlchemy as a general means of suppressing the
3223 # overlaps warning for some extension cases, however
3224 # this is not currently
3225 # a publicly supported symbol and may change at
3226 # any time.
3227 and "__*" not in self.prop._overlaps
3228 and "__*" not in pr._overlaps
3229 and not self.prop.parent.is_sibling(pr.parent)
3230 and not self.prop.mapper.is_sibling(pr.mapper)
3231 and not self.prop.parent.is_sibling(pr.mapper)
3232 and not self.prop.mapper.is_sibling(pr.parent)
3233 and (
3234 self.prop.key != pr.key
3235 or not self.prop.parent.common_parent(pr.parent)
3236 )
3237 ):
3238 other_props.append((pr, fr_))
3239
3240 if other_props:
3241 util.warn(
3242 "relationship '%s' will copy column %s to column %s, "
3243 "which conflicts with relationship(s): %s. "
3244 "If this is not the intention, consider if these "
3245 "relationships should be linked with "
3246 "back_populates, or if viewonly=True should be "
3247 "applied to one or more if they are read-only. "
3248 "For the less common case that foreign key "
3249 "constraints are partially overlapping, the "
3250 "orm.foreign() "
3251 "annotation can be used to isolate the columns that "
3252 "should be written towards. To silence this "
3253 "warning, add the parameter 'overlaps=\"%s\"' to the "
3254 "'%s' relationship."
3255 % (
3256 self.prop,
3257 from_,
3258 to_,
3259 ", ".join(
3260 sorted(
3261 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3262 for (pr, fr_) in other_props
3263 )
3264 ),
3265 ",".join(sorted(pr.key for pr, fr in other_props)),
3266 self.prop,
3267 ),
3268 code="qzyx",
3269 )
3270 self._track_overlapping_sync_targets[to_][self.prop] = from_
3271
3272 @util.memoized_property
3273 def remote_columns(self) -> Set[ColumnElement[Any]]:
3274 return self._gather_join_annotations("remote")
3275
3276 @util.memoized_property
3277 def local_columns(self) -> Set[ColumnElement[Any]]:
3278 return self._gather_join_annotations("local")
3279
3280 @util.memoized_property
3281 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3282 return self._gather_join_annotations("foreign")
3283
3284 def _gather_join_annotations(
3285 self, annotation: str
3286 ) -> Set[ColumnElement[Any]]:
3287 s = set(
3288 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3289 )
3290 if self.secondaryjoin is not None:
3291 s.update(
3292 self._gather_columns_with_annotation(
3293 self.secondaryjoin, annotation
3294 )
3295 )
3296 return {x._deannotate() for x in s}
3297
3298 def _gather_columns_with_annotation(
3299 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3300 ) -> Set[ColumnElement[Any]]:
3301 annotation_set = set(annotation)
3302 return {
3303 cast(ColumnElement[Any], col)
3304 for col in visitors.iterate(clause, {})
3305 if annotation_set.issubset(col._annotations)
3306 }
3307
3308 @util.memoized_property
3309 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3310 if self.secondary is not None:
3311 return frozenset(
3312 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3313 )
3314 else:
3315 return util.EMPTY_SET
3316
3317 def join_targets(
3318 self,
3319 source_selectable: Optional[FromClause],
3320 dest_selectable: FromClause,
3321 aliased: bool,
3322 single_crit: Optional[ColumnElement[bool]] = None,
3323 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3324 ) -> Tuple[
3325 ColumnElement[bool],
3326 Optional[ColumnElement[bool]],
3327 Optional[FromClause],
3328 Optional[ClauseAdapter],
3329 FromClause,
3330 ]:
3331 """Given a source and destination selectable, create a
3332 join between them.
3333
3334 This takes into account aliasing the join clause
3335 to reference the appropriate corresponding columns
3336 in the target objects, as well as the extra child
3337 criterion, equivalent column sets, etc.
3338
3339 """
3340 # place a barrier on the destination such that
3341 # replacement traversals won't ever dig into it.
3342 # its internal structure remains fixed
3343 # regardless of context.
3344 dest_selectable = _shallow_annotate(
3345 dest_selectable, {"no_replacement_traverse": True}
3346 )
3347
3348 primaryjoin, secondaryjoin, secondary = (
3349 self.primaryjoin,
3350 self.secondaryjoin,
3351 self.secondary,
3352 )
3353
3354 # adjust the join condition for single table inheritance,
3355 # in the case that the join is to a subclass
3356 # this is analogous to the
3357 # "_adjust_for_single_table_inheritance()" method in Query.
3358
3359 if single_crit is not None:
3360 if secondaryjoin is not None:
3361 secondaryjoin = secondaryjoin & single_crit
3362 else:
3363 primaryjoin = primaryjoin & single_crit
3364
3365 if extra_criteria:
3366
3367 def mark_exclude_cols(
3368 elem: SupportsAnnotations, annotations: _AnnotationDict
3369 ) -> SupportsAnnotations:
3370 """note unrelated columns in the "extra criteria" as either
3371 should be adapted or not adapted, even though they are not
3372 part of our "local" or "remote" side.
3373
3374 see #9779 for this case, as well as #11010 for a follow up
3375
3376 """
3377
3378 parentmapper_for_element = elem._annotations.get(
3379 "parentmapper", None
3380 )
3381
3382 if (
3383 parentmapper_for_element is not self.prop.parent
3384 and parentmapper_for_element is not self.prop.mapper
3385 and elem not in self._secondary_lineage_set
3386 ):
3387 return _safe_annotate(elem, annotations)
3388 else:
3389 return elem
3390
3391 extra_criteria = tuple(
3392 _deep_annotate(
3393 elem,
3394 {"should_not_adapt": True},
3395 annotate_callable=mark_exclude_cols,
3396 )
3397 for elem in extra_criteria
3398 )
3399
3400 if secondaryjoin is not None:
3401 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3402 else:
3403 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3404
3405 if aliased:
3406 if secondary is not None:
3407 secondary = secondary._anonymous_fromclause(flat=True)
3408 primary_aliasizer = ClauseAdapter(
3409 secondary,
3410 exclude_fn=_local_col_exclude,
3411 )
3412 secondary_aliasizer = ClauseAdapter(
3413 dest_selectable, equivalents=self.child_equivalents
3414 ).chain(primary_aliasizer)
3415 if source_selectable is not None:
3416 primary_aliasizer = ClauseAdapter(
3417 secondary,
3418 exclude_fn=_local_col_exclude,
3419 ).chain(
3420 ClauseAdapter(
3421 source_selectable,
3422 equivalents=self.parent_equivalents,
3423 )
3424 )
3425
3426 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3427 else:
3428 primary_aliasizer = ClauseAdapter(
3429 dest_selectable,
3430 exclude_fn=_local_col_exclude,
3431 equivalents=self.child_equivalents,
3432 )
3433 if source_selectable is not None:
3434 primary_aliasizer.chain(
3435 ClauseAdapter(
3436 source_selectable,
3437 exclude_fn=_remote_col_exclude,
3438 equivalents=self.parent_equivalents,
3439 )
3440 )
3441 secondary_aliasizer = None
3442
3443 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3444 target_adapter = secondary_aliasizer or primary_aliasizer
3445 target_adapter.exclude_fn = None
3446 else:
3447 target_adapter = None
3448 return (
3449 primaryjoin,
3450 secondaryjoin,
3451 secondary,
3452 target_adapter,
3453 dest_selectable,
3454 )
3455
3456 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3457 ColumnElement[bool],
3458 Dict[str, ColumnElement[Any]],
3459 Dict[ColumnElement[Any], ColumnElement[Any]],
3460 ]:
3461 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3462 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3463
3464 has_secondary = self.secondaryjoin is not None
3465
3466 if has_secondary:
3467 lookup = collections.defaultdict(list)
3468 for l, r in self.local_remote_pairs:
3469 lookup[l].append((l, r))
3470 equated_columns[r] = l
3471 elif not reverse_direction:
3472 for l, r in self.local_remote_pairs:
3473 equated_columns[r] = l
3474 else:
3475 for l, r in self.local_remote_pairs:
3476 equated_columns[l] = r
3477
3478 def col_to_bind(
3479 element: ColumnElement[Any], **kw: Any
3480 ) -> Optional[BindParameter[Any]]:
3481 if (
3482 (not reverse_direction and "local" in element._annotations)
3483 or reverse_direction
3484 and (
3485 (has_secondary and element in lookup)
3486 or (not has_secondary and "remote" in element._annotations)
3487 )
3488 ):
3489 if element not in binds:
3490 binds[element] = sql.bindparam(
3491 None, None, type_=element.type, unique=True
3492 )
3493 return binds[element]
3494 return None
3495
3496 lazywhere = self.primaryjoin
3497 if self.secondaryjoin is None or not reverse_direction:
3498 lazywhere = visitors.replacement_traverse(
3499 lazywhere, {}, col_to_bind
3500 )
3501
3502 if self.secondaryjoin is not None:
3503 secondaryjoin = self.secondaryjoin
3504 if reverse_direction:
3505 secondaryjoin = visitors.replacement_traverse(
3506 secondaryjoin, {}, col_to_bind
3507 )
3508 lazywhere = sql.and_(lazywhere, secondaryjoin)
3509
3510 bind_to_col = {binds[col].key: col for col in binds}
3511
3512 return lazywhere, bind_to_col, equated_columns
3513
3514
3515class _ColInAnnotations:
3516 """Serializable object that tests for names in c._annotations.
3517
3518 TODO: does this need to be serializable anymore? can we find what the
3519 use case was for that?
3520
3521 """
3522
3523 __slots__ = ("names",)
3524
3525 def __init__(self, *names: str):
3526 self.names = frozenset(names)
3527
3528 def __call__(self, c: ClauseElement) -> bool:
3529 return bool(self.names.intersection(c._annotations))
3530
3531
3532_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3533_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3534
3535
3536class Relationship(
3537 RelationshipProperty[_T],
3538 _DeclarativeMapped[_T],
3539):
3540 """Describes an object property that holds a single item or list
3541 of items that correspond to a related database table.
3542
3543 Public constructor is the :func:`_orm.relationship` function.
3544
3545 .. seealso::
3546
3547 :ref:`relationship_config_toplevel`
3548
3549 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3550 compatible subclass for :class:`_orm.RelationshipProperty`.
3551
3552 """
3553
3554 inherit_cache = True
3555 """:meta private:"""
3556
3557
3558class _RelationshipDeclared( # type: ignore[misc]
3559 Relationship[_T],
3560 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3561 DynamicMapped[_T], # not compatible with Mapped[_T]
3562):
3563 """Relationship subclass used implicitly for declarative mapping."""
3564
3565 inherit_cache = True
3566 """:meta private:"""
3567
3568 @classmethod
3569 def _mapper_property_name(cls) -> str:
3570 return "Relationship"