1# orm/relationships.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import NamedTuple
36from typing import NoReturn
37from typing import Optional
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TypeVar
43from typing import Union
44import weakref
45
46from . import attributes
47from . import strategy_options
48from ._typing import insp_is_aliased_class
49from ._typing import is_has_collection_adapter
50from .base import _DeclarativeMapped
51from .base import _is_mapped_class
52from .base import class_mapper
53from .base import DynamicMapped
54from .base import LoaderCallableStatus
55from .base import PassiveFlag
56from .base import state_str
57from .base import WriteOnlyMapped
58from .interfaces import _AttributeOptions
59from .interfaces import _DataclassDefaultsDontSet
60from .interfaces import _IntrospectsAnnotations
61from .interfaces import MANYTOMANY
62from .interfaces import MANYTOONE
63from .interfaces import ONETOMANY
64from .interfaces import PropComparator
65from .interfaces import RelationshipDirection
66from .interfaces import StrategizedProperty
67from .util import _orm_annotate
68from .util import _orm_deannotate
69from .util import CascadeOptions
70from .. import exc as sa_exc
71from .. import Exists
72from .. import log
73from .. import schema
74from .. import sql
75from .. import util
76from ..inspection import inspect
77from ..sql import coercions
78from ..sql import expression
79from ..sql import operators
80from ..sql import roles
81from ..sql import visitors
82from ..sql._typing import _ColumnExpressionArgument
83from ..sql._typing import _HasClauseElement
84from ..sql.annotation import _safe_annotate
85from ..sql.base import _NoArg
86from ..sql.elements import ColumnClause
87from ..sql.elements import ColumnElement
88from ..sql.util import _deep_annotate
89from ..sql.util import _deep_deannotate
90from ..sql.util import _shallow_annotate
91from ..sql.util import adapt_criterion_to_null
92from ..sql.util import ClauseAdapter
93from ..sql.util import join_condition
94from ..sql.util import selectables_overlap
95from ..sql.util import visit_binary_product
96from ..util.typing import de_optionalize_union_types
97from ..util.typing import Literal
98from ..util.typing import resolve_name_to_real_class_name
99
100if typing.TYPE_CHECKING:
101 from ._typing import _EntityType
102 from ._typing import _ExternalEntityType
103 from ._typing import _IdentityKeyType
104 from ._typing import _InstanceDict
105 from ._typing import _InternalEntityType
106 from ._typing import _O
107 from ._typing import _RegistryType
108 from .base import Mapped
109 from .clsregistry import _class_resolver
110 from .clsregistry import _ModNS
111 from .decl_base import _ClassScanMapperConfig
112 from .dependency import _DependencyProcessor
113 from .mapper import Mapper
114 from .query import Query
115 from .session import Session
116 from .state import InstanceState
117 from .strategies import _LazyLoader
118 from .util import AliasedClass
119 from .util import AliasedInsp
120 from ..sql._typing import _CoreAdapterProto
121 from ..sql._typing import _EquivalentColumnMap
122 from ..sql._typing import _InfoType
123 from ..sql.annotation import _AnnotationDict
124 from ..sql.annotation import SupportsAnnotations
125 from ..sql.elements import BinaryExpression
126 from ..sql.elements import BindParameter
127 from ..sql.elements import ClauseElement
128 from ..sql.schema import Table
129 from ..sql.selectable import FromClause
130 from ..util.typing import _AnnotationScanType
131 from ..util.typing import RODescriptorReference
132
133_T = TypeVar("_T", bound=Any)
134_T1 = TypeVar("_T1", bound=Any)
135_T2 = TypeVar("_T2", bound=Any)
136
137_PT = TypeVar("_PT", bound=Any)
138
139_PT2 = TypeVar("_PT2", bound=Any)
140
141
142_RelationshipArgumentType = Union[
143 str,
144 Type[_T],
145 Callable[[], Type[_T]],
146 "Mapper[_T]",
147 "AliasedClass[_T]",
148 Callable[[], "Mapper[_T]"],
149 Callable[[], "AliasedClass[_T]"],
150]
151
152_LazyLoadArgumentType = Literal[
153 "select",
154 "joined",
155 "selectin",
156 "subquery",
157 "raise",
158 "raise_on_sql",
159 "noload",
160 "immediate",
161 "write_only",
162 "dynamic",
163 True,
164 False,
165 None,
166]
167
168
169_RelationshipJoinConditionArgument = Union[
170 str, _ColumnExpressionArgument[bool]
171]
172_RelationshipSecondaryArgument = Union[
173 "FromClause", str, Callable[[], "FromClause"]
174]
175_ORMOrderByArgument = Union[
176 Literal[False],
177 str,
178 _ColumnExpressionArgument[Any],
179 Callable[[], _ColumnExpressionArgument[Any]],
180 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
181 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
182]
183_RelationshipBackPopulatesArgument = Union[
184 str,
185 PropComparator[Any],
186 Callable[[], Union[str, PropComparator[Any]]],
187]
188
189
190ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
191
192_ORMColCollectionElement = Union[
193 ColumnClause[Any],
194 _HasClauseElement[Any],
195 roles.DMLColumnRole,
196 "Mapped[Any]",
197]
198_ORMColCollectionArgument = Union[
199 str,
200 Sequence[_ORMColCollectionElement],
201 Callable[[], Sequence[_ORMColCollectionElement]],
202 Callable[[], _ORMColCollectionElement],
203 _ORMColCollectionElement,
204]
205
206
207_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
208
209_CE = TypeVar("_CE", bound="ColumnElement[Any]")
210
211
212_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
213
214_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
215
216_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
217
218
219def remote(expr: _CEA) -> _CEA:
220 """Annotate a portion of a primaryjoin expression
221 with a 'remote' annotation.
222
223 See the section :ref:`relationship_custom_foreign` for a
224 description of use.
225
226 .. seealso::
227
228 :ref:`relationship_custom_foreign`
229
230 :func:`.foreign`
231
232 """
233 return _annotate_columns( # type: ignore
234 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
235 )
236
237
238def foreign(expr: _CEA) -> _CEA:
239 """Annotate a portion of a primaryjoin expression
240 with a 'foreign' annotation.
241
242 See the section :ref:`relationship_custom_foreign` for a
243 description of use.
244
245 .. seealso::
246
247 :ref:`relationship_custom_foreign`
248
249 :func:`.remote`
250
251 """
252
253 return _annotate_columns( # type: ignore
254 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
255 )
256
257
258@dataclasses.dataclass
259class _RelationshipArg(Generic[_T1, _T2]):
260 """stores a user-defined parameter value that must be resolved and
261 parsed later at mapper configuration time.
262
263 """
264
265 __slots__ = "name", "argument", "resolved"
266 name: str
267 argument: _T1
268 resolved: Optional[_T2]
269
270 def _is_populated(self) -> bool:
271 return self.argument is not None
272
273 def _resolve_against_registry(
274 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
275 ) -> None:
276 attr_value = self.argument
277
278 if isinstance(attr_value, str):
279 self.resolved = clsregistry_resolver(
280 attr_value, self.name == "secondary"
281 )()
282 elif callable(attr_value) and not _is_mapped_class(attr_value):
283 self.resolved = attr_value()
284 else:
285 self.resolved = attr_value
286
287 def effective_value(self) -> Any:
288 if self.resolved is not None:
289 return self.resolved
290 else:
291 return self.argument
292
293
294_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
295
296
297@dataclasses.dataclass
298class _StringRelationshipArg(_RelationshipArg[_T1, _T2]):
299 def _resolve_against_registry(
300 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
301 ) -> None:
302 attr_value = self.argument
303
304 if callable(attr_value):
305 attr_value = attr_value()
306
307 if isinstance(attr_value, attributes.QueryableAttribute):
308 attr_value = attr_value.key # type: ignore
309
310 self.resolved = attr_value
311
312
313class _RelationshipArgs(NamedTuple):
314 """stores user-passed parameters that are resolved at mapper configuration
315 time.
316
317 """
318
319 secondary: _RelationshipArg[
320 Optional[_RelationshipSecondaryArgument],
321 Optional[FromClause],
322 ]
323 primaryjoin: _RelationshipArg[
324 Optional[_RelationshipJoinConditionArgument],
325 Optional[ColumnElement[Any]],
326 ]
327 secondaryjoin: _RelationshipArg[
328 Optional[_RelationshipJoinConditionArgument],
329 Optional[ColumnElement[Any]],
330 ]
331 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
332 foreign_keys: _RelationshipArg[
333 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
334 ]
335 remote_side: _RelationshipArg[
336 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
337 ]
338 back_populates: _StringRelationshipArg[
339 Optional[_RelationshipBackPopulatesArgument], str
340 ]
341
342
343@log.class_logger
344class RelationshipProperty(
345 _DataclassDefaultsDontSet,
346 _IntrospectsAnnotations,
347 StrategizedProperty[_T],
348 log.Identified,
349):
350 """Describes an object property that holds a single item or list
351 of items that correspond to a related database table.
352
353 Public constructor is the :func:`_orm.relationship` function.
354
355 .. seealso::
356
357 :ref:`relationship_config_toplevel`
358
359 """
360
361 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
362 inherit_cache = True
363 """:meta private:"""
364
365 _links_to_entity = True
366 _is_relationship = True
367
368 _overlaps: Sequence[str]
369
370 _lazy_strategy: _LazyLoader
371
372 _persistence_only = dict(
373 passive_deletes=False,
374 passive_updates=True,
375 enable_typechecks=True,
376 active_history=False,
377 cascade_backrefs=False,
378 )
379
380 _dependency_processor: Optional[_DependencyProcessor] = None
381
382 primaryjoin: ColumnElement[bool]
383 secondaryjoin: Optional[ColumnElement[bool]]
384 secondary: Optional[FromClause]
385 _join_condition: _JoinCondition
386 order_by: _RelationshipOrderByArg
387
388 _user_defined_foreign_keys: Set[ColumnElement[Any]]
389 _calculated_foreign_keys: Set[ColumnElement[Any]]
390
391 remote_side: Set[ColumnElement[Any]]
392 local_columns: Set[ColumnElement[Any]]
393
394 synchronize_pairs: _ColumnPairs
395 secondary_synchronize_pairs: Optional[_ColumnPairs]
396
397 local_remote_pairs: Optional[_ColumnPairs]
398
399 direction: RelationshipDirection
400
401 _init_args: _RelationshipArgs
402 _disable_dataclass_default_factory = True
403
404 def __init__(
405 self,
406 argument: Optional[_RelationshipArgumentType[_T]] = None,
407 secondary: Optional[_RelationshipSecondaryArgument] = None,
408 *,
409 uselist: Optional[bool] = None,
410 collection_class: Optional[
411 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
412 ] = None,
413 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
414 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
415 back_populates: Optional[_RelationshipBackPopulatesArgument] = None,
416 order_by: _ORMOrderByArgument = False,
417 backref: Optional[ORMBackrefArgument] = None,
418 overlaps: Optional[str] = None,
419 post_update: bool = False,
420 cascade: str = "save-update, merge",
421 viewonly: bool = False,
422 attribute_options: Optional[_AttributeOptions] = None,
423 lazy: _LazyLoadArgumentType = "select",
424 passive_deletes: Union[Literal["all"], bool] = False,
425 passive_updates: bool = True,
426 active_history: bool = False,
427 enable_typechecks: bool = True,
428 foreign_keys: Optional[_ORMColCollectionArgument] = None,
429 remote_side: Optional[_ORMColCollectionArgument] = None,
430 join_depth: Optional[int] = None,
431 comparator_factory: Optional[
432 Type[RelationshipProperty.Comparator[Any]]
433 ] = None,
434 single_parent: bool = False,
435 innerjoin: bool = False,
436 distinct_target_key: Optional[bool] = None,
437 load_on_pending: bool = False,
438 query_class: Optional[Type[Query[Any]]] = None,
439 info: Optional[_InfoType] = None,
440 omit_join: Literal[None, False] = None,
441 sync_backref: Optional[bool] = None,
442 doc: Optional[str] = None,
443 bake_queries: Literal[True] = True,
444 cascade_backrefs: Literal[False] = False,
445 _local_remote_pairs: Optional[_ColumnPairs] = None,
446 _legacy_inactive_history_style: bool = False,
447 ):
448 super().__init__(attribute_options=attribute_options)
449
450 self.uselist = uselist
451 self.argument = argument
452
453 self._init_args = _RelationshipArgs(
454 _RelationshipArg("secondary", secondary, None),
455 _RelationshipArg("primaryjoin", primaryjoin, None),
456 _RelationshipArg("secondaryjoin", secondaryjoin, None),
457 _RelationshipArg("order_by", order_by, None),
458 _RelationshipArg("foreign_keys", foreign_keys, None),
459 _RelationshipArg("remote_side", remote_side, None),
460 _StringRelationshipArg("back_populates", back_populates, None),
461 )
462
463 if self._attribute_options.dataclasses_default not in (
464 _NoArg.NO_ARG,
465 None,
466 ):
467 raise sa_exc.ArgumentError(
468 "Only 'None' is accepted as dataclass "
469 "default for a relationship()"
470 )
471
472 self.post_update = post_update
473 self.viewonly = viewonly
474 if viewonly:
475 self._warn_for_persistence_only_flags(
476 passive_deletes=passive_deletes,
477 passive_updates=passive_updates,
478 enable_typechecks=enable_typechecks,
479 active_history=active_history,
480 cascade_backrefs=cascade_backrefs,
481 )
482 if viewonly and sync_backref:
483 raise sa_exc.ArgumentError(
484 "sync_backref and viewonly cannot both be True"
485 )
486 self.sync_backref = sync_backref
487 self.lazy = lazy
488 self.single_parent = single_parent
489 self.collection_class = collection_class
490 self.passive_deletes = passive_deletes
491
492 if cascade_backrefs:
493 raise sa_exc.ArgumentError(
494 "The 'cascade_backrefs' parameter passed to "
495 "relationship() may only be set to False."
496 )
497
498 self.passive_updates = passive_updates
499 self.enable_typechecks = enable_typechecks
500 self.query_class = query_class
501 self.innerjoin = innerjoin
502 self.distinct_target_key = distinct_target_key
503 self.doc = doc
504 self.active_history = active_history
505 self._legacy_inactive_history_style = _legacy_inactive_history_style
506
507 self.join_depth = join_depth
508 if omit_join:
509 util.warn(
510 "setting omit_join to True is not supported; selectin "
511 "loading of this relationship may not work correctly if this "
512 "flag is set explicitly. omit_join optimization is "
513 "automatically detected for conditions under which it is "
514 "supported."
515 )
516
517 self.omit_join = omit_join
518 self.local_remote_pairs = _local_remote_pairs
519 self.load_on_pending = load_on_pending
520 self.comparator_factory = (
521 comparator_factory or RelationshipProperty.Comparator
522 )
523 util.set_creation_order(self)
524
525 if info is not None:
526 self.info.update(info)
527
528 self.strategy_key = (("lazy", self.lazy),)
529
530 self._reverse_property: Set[RelationshipProperty[Any]] = set()
531
532 if overlaps:
533 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
534 else:
535 self._overlaps = ()
536
537 self.cascade = cascade
538
539 if back_populates:
540 if backref:
541 raise sa_exc.ArgumentError(
542 "backref and back_populates keyword arguments "
543 "are mutually exclusive"
544 )
545 self.backref = None
546 else:
547 self.backref = backref
548
549 @property
550 def back_populates(self) -> str:
551 return self._init_args.back_populates.effective_value() # type: ignore
552
553 @back_populates.setter
554 def back_populates(self, value: str) -> None:
555 self._init_args.back_populates.argument = value
556
557 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
558 for k, v in kw.items():
559 if v != self._persistence_only[k]:
560 # we are warning here rather than warn deprecated as this is a
561 # configuration mistake, and Python shows regular warnings more
562 # aggressively than deprecation warnings by default. Unlike the
563 # case of setting viewonly with cascade, the settings being
564 # warned about here are not actively doing the wrong thing
565 # against viewonly=True, so it is not as urgent to have these
566 # raise an error.
567 util.warn(
568 "Setting %s on relationship() while also "
569 "setting viewonly=True does not make sense, as a "
570 "viewonly=True relationship does not perform persistence "
571 "operations. This configuration may raise an error "
572 "in a future release." % (k,)
573 )
574
575 def instrument_class(self, mapper: Mapper[Any]) -> None:
576 attributes._register_descriptor(
577 mapper.class_,
578 self.key,
579 comparator=self.comparator_factory(self, mapper),
580 parententity=mapper,
581 doc=self.doc,
582 )
583
584 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
585 """Produce boolean, comparison, and other operators for
586 :class:`.RelationshipProperty` attributes.
587
588 See the documentation for :class:`.PropComparator` for a brief
589 overview of ORM level operator definition.
590
591 .. seealso::
592
593 :class:`.PropComparator`
594
595 :class:`.ColumnProperty.Comparator`
596
597 :class:`.ColumnOperators`
598
599 :ref:`types_operators`
600
601 :attr:`.TypeEngine.comparator_factory`
602
603 """
604
605 __slots__ = (
606 "entity",
607 "mapper",
608 "property",
609 "_of_type",
610 "_extra_criteria",
611 )
612
613 prop: RODescriptorReference[RelationshipProperty[_PT]]
614 _of_type: Optional[_EntityType[_PT]]
615
616 def __init__(
617 self,
618 prop: RelationshipProperty[_PT],
619 parentmapper: _InternalEntityType[Any],
620 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
621 of_type: Optional[_EntityType[_PT]] = None,
622 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
623 ):
624 """Construction of :class:`.RelationshipProperty.Comparator`
625 is internal to the ORM's attribute mechanics.
626
627 """
628 self.prop = prop
629 self._parententity = parentmapper
630 self._adapt_to_entity = adapt_to_entity
631 if of_type:
632 self._of_type = of_type
633 else:
634 self._of_type = None
635 self._extra_criteria = extra_criteria
636
637 def adapt_to_entity(
638 self, adapt_to_entity: AliasedInsp[Any]
639 ) -> RelationshipProperty.Comparator[Any]:
640 return self.__class__(
641 self.prop,
642 self._parententity,
643 adapt_to_entity=adapt_to_entity,
644 of_type=self._of_type,
645 )
646
647 entity: _InternalEntityType[_PT]
648 """The target entity referred to by this
649 :class:`.RelationshipProperty.Comparator`.
650
651 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
652 object.
653
654 This is the "target" or "remote" side of the
655 :func:`_orm.relationship`.
656
657 """
658
659 mapper: Mapper[_PT]
660 """The target :class:`_orm.Mapper` referred to by this
661 :class:`.RelationshipProperty.Comparator`.
662
663 This is the "target" or "remote" side of the
664 :func:`_orm.relationship`.
665
666 """
667
668 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
669 if self._of_type:
670 return inspect(self._of_type) # type: ignore
671 else:
672 return self.prop.entity
673
674 def _memoized_attr_mapper(self) -> Mapper[_PT]:
675 return self.entity.mapper
676
677 def _source_selectable(self) -> FromClause:
678 if self._adapt_to_entity:
679 return self._adapt_to_entity.selectable
680 else:
681 return self.property.parent._with_polymorphic_selectable
682
683 def __clause_element__(self) -> ColumnElement[bool]:
684 adapt_from = self._source_selectable()
685 if self._of_type:
686 of_type_entity = inspect(self._of_type)
687 else:
688 of_type_entity = None
689
690 (
691 pj,
692 sj,
693 source,
694 dest,
695 secondary,
696 target_adapter,
697 ) = self.prop._create_joins(
698 source_selectable=adapt_from,
699 source_polymorphic=True,
700 of_type_entity=of_type_entity,
701 alias_secondary=True,
702 extra_criteria=self._extra_criteria,
703 )
704 if sj is not None:
705 return pj & sj
706 else:
707 return pj
708
709 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
710 r"""Redefine this object in terms of a polymorphic subclass.
711
712 See :meth:`.PropComparator.of_type` for an example.
713
714
715 """
716 return RelationshipProperty.Comparator(
717 self.prop,
718 self._parententity,
719 adapt_to_entity=self._adapt_to_entity,
720 of_type=class_,
721 extra_criteria=self._extra_criteria,
722 )
723
724 def and_(
725 self, *criteria: _ColumnExpressionArgument[bool]
726 ) -> PropComparator[Any]:
727 """Add AND criteria.
728
729 See :meth:`.PropComparator.and_` for an example.
730
731 .. versionadded:: 1.4
732
733 """
734 exprs = tuple(
735 coercions.expect(roles.WhereHavingRole, clause)
736 for clause in util.coerce_generator_arg(criteria)
737 )
738
739 return RelationshipProperty.Comparator(
740 self.prop,
741 self._parententity,
742 adapt_to_entity=self._adapt_to_entity,
743 of_type=self._of_type,
744 extra_criteria=self._extra_criteria + exprs,
745 )
746
747 def in_(self, other: Any) -> NoReturn:
748 """Produce an IN clause - this is not implemented
749 for :func:`_orm.relationship`-based attributes at this time.
750
751 """
752 raise NotImplementedError(
753 "in_() not yet supported for "
754 "relationships. For a simple "
755 "many-to-one, use in_() against "
756 "the set of foreign key values."
757 )
758
759 # https://github.com/python/mypy/issues/4266
760 __hash__ = None # type: ignore
761
762 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
763 """Implement the ``==`` operator.
764
765 In a many-to-one context, such as:
766
767 .. sourcecode:: text
768
769 MyClass.some_prop == <some object>
770
771 this will typically produce a
772 clause such as:
773
774 .. sourcecode:: text
775
776 mytable.related_id == <some id>
777
778 Where ``<some id>`` is the primary key of the given
779 object.
780
781 The ``==`` operator provides partial functionality for non-
782 many-to-one comparisons:
783
784 * Comparisons against collections are not supported.
785 Use :meth:`~.Relationship.Comparator.contains`.
786 * Compared to a scalar one-to-many, will produce a
787 clause that compares the target columns in the parent to
788 the given target.
789 * Compared to a scalar many-to-many, an alias
790 of the association table will be rendered as
791 well, forming a natural join that is part of the
792 main body of the query. This will not work for
793 queries that go beyond simple AND conjunctions of
794 comparisons, such as those which use OR. Use
795 explicit joins, outerjoins, or
796 :meth:`~.Relationship.Comparator.has` for
797 more comprehensive non-many-to-one scalar
798 membership tests.
799 * Comparisons against ``None`` given in a one-to-many
800 or many-to-many context produce a NOT EXISTS clause.
801
802 """
803 if other is None or isinstance(other, expression.Null):
804 if self.property.direction in [ONETOMANY, MANYTOMANY]:
805 return ~self._criterion_exists()
806 else:
807 return _orm_annotate(
808 self.property._optimized_compare(
809 None, adapt_source=self.adapter
810 )
811 )
812 elif self.property.uselist:
813 raise sa_exc.InvalidRequestError(
814 "Can't compare a collection to an object or collection; "
815 "use contains() to test for membership."
816 )
817 else:
818 return _orm_annotate(
819 self.property._optimized_compare(
820 other, adapt_source=self.adapter
821 )
822 )
823
824 def _criterion_exists(
825 self,
826 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
827 **kwargs: Any,
828 ) -> Exists:
829 where_criteria = (
830 coercions.expect(roles.WhereHavingRole, criterion)
831 if criterion is not None
832 else None
833 )
834
835 if getattr(self, "_of_type", None):
836 info: Optional[_InternalEntityType[Any]] = inspect(
837 self._of_type
838 )
839 assert info is not None
840 target_mapper, to_selectable, is_aliased_class = (
841 info.mapper,
842 info.selectable,
843 info.is_aliased_class,
844 )
845 if self.property._is_self_referential and not is_aliased_class:
846 to_selectable = to_selectable._anonymous_fromclause()
847
848 single_crit = target_mapper._single_table_criterion
849 if single_crit is not None:
850 if where_criteria is not None:
851 where_criteria = single_crit & where_criteria
852 else:
853 where_criteria = single_crit
854 else:
855 is_aliased_class = False
856 to_selectable = None
857
858 if self.adapter:
859 source_selectable = self._source_selectable()
860 else:
861 source_selectable = None
862
863 (
864 pj,
865 sj,
866 source,
867 dest,
868 secondary,
869 target_adapter,
870 ) = self.property._create_joins(
871 dest_selectable=to_selectable,
872 source_selectable=source_selectable,
873 )
874
875 for k in kwargs:
876 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
877 if where_criteria is None:
878 where_criteria = crit
879 else:
880 where_criteria = where_criteria & crit
881
882 # annotate the *local* side of the join condition, in the case
883 # of pj + sj this is the full primaryjoin, in the case of just
884 # pj its the local side of the primaryjoin.
885 if sj is not None:
886 j = _orm_annotate(pj) & sj
887 else:
888 j = _orm_annotate(pj, exclude=self.property.remote_side)
889
890 if (
891 where_criteria is not None
892 and target_adapter
893 and not is_aliased_class
894 ):
895 # limit this adapter to annotated only?
896 where_criteria = target_adapter.traverse(where_criteria)
897
898 # only have the "joined left side" of what we
899 # return be subject to Query adaption. The right
900 # side of it is used for an exists() subquery and
901 # should not correlate or otherwise reach out
902 # to anything in the enclosing query.
903 if where_criteria is not None:
904 where_criteria = where_criteria._annotate(
905 {"no_replacement_traverse": True}
906 )
907
908 crit = j & sql.True_._ifnone(where_criteria)
909
910 if secondary is not None:
911 ex = (
912 sql.exists(1)
913 .where(crit)
914 .select_from(dest, secondary)
915 .correlate_except(dest, secondary)
916 )
917 else:
918 ex = (
919 sql.exists(1)
920 .where(crit)
921 .select_from(dest)
922 .correlate_except(dest)
923 )
924 return ex
925
926 def any(
927 self,
928 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
929 **kwargs: Any,
930 ) -> ColumnElement[bool]:
931 """Produce an expression that tests a collection against
932 particular criterion, using EXISTS.
933
934 An expression like::
935
936 session.query(MyClass).filter(
937 MyClass.somereference.any(SomeRelated.x == 2)
938 )
939
940 Will produce a query like:
941
942 .. sourcecode:: sql
943
944 SELECT * FROM my_table WHERE
945 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
946 AND related.x=2)
947
948 Because :meth:`~.Relationship.Comparator.any` uses
949 a correlated subquery, its performance is not nearly as
950 good when compared against large target tables as that of
951 using a join.
952
953 :meth:`~.Relationship.Comparator.any` is particularly
954 useful for testing for empty collections::
955
956 session.query(MyClass).filter(~MyClass.somereference.any())
957
958 will produce:
959
960 .. sourcecode:: sql
961
962 SELECT * FROM my_table WHERE
963 NOT (EXISTS (SELECT 1 FROM related WHERE
964 related.my_id=my_table.id))
965
966 :meth:`~.Relationship.Comparator.any` is only
967 valid for collections, i.e. a :func:`_orm.relationship`
968 that has ``uselist=True``. For scalar references,
969 use :meth:`~.Relationship.Comparator.has`.
970
971 """
972 if not self.property.uselist:
973 raise sa_exc.InvalidRequestError(
974 "'any()' not implemented for scalar "
975 "attributes. Use has()."
976 )
977
978 return self._criterion_exists(criterion, **kwargs)
979
980 def has(
981 self,
982 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
983 **kwargs: Any,
984 ) -> ColumnElement[bool]:
985 """Produce an expression that tests a scalar reference against
986 particular criterion, using EXISTS.
987
988 An expression like::
989
990 session.query(MyClass).filter(
991 MyClass.somereference.has(SomeRelated.x == 2)
992 )
993
994 Will produce a query like:
995
996 .. sourcecode:: sql
997
998 SELECT * FROM my_table WHERE
999 EXISTS (SELECT 1 FROM related WHERE
1000 related.id==my_table.related_id AND related.x=2)
1001
1002 Because :meth:`~.Relationship.Comparator.has` uses
1003 a correlated subquery, its performance is not nearly as
1004 good when compared against large target tables as that of
1005 using a join.
1006
1007 :meth:`~.Relationship.Comparator.has` is only
1008 valid for scalar references, i.e. a :func:`_orm.relationship`
1009 that has ``uselist=False``. For collection references,
1010 use :meth:`~.Relationship.Comparator.any`.
1011
1012 """
1013 if self.property.uselist:
1014 raise sa_exc.InvalidRequestError(
1015 "'has()' not implemented for collections. Use any()."
1016 )
1017 return self._criterion_exists(criterion, **kwargs)
1018
1019 def contains(
1020 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
1021 ) -> ColumnElement[bool]:
1022 """Return a simple expression that tests a collection for
1023 containment of a particular item.
1024
1025 :meth:`~.Relationship.Comparator.contains` is
1026 only valid for a collection, i.e. a
1027 :func:`_orm.relationship` that implements
1028 one-to-many or many-to-many with ``uselist=True``.
1029
1030 When used in a simple one-to-many context, an
1031 expression like::
1032
1033 MyClass.contains(other)
1034
1035 Produces a clause like:
1036
1037 .. sourcecode:: sql
1038
1039 mytable.id == <some id>
1040
1041 Where ``<some id>`` is the value of the foreign key
1042 attribute on ``other`` which refers to the primary
1043 key of its parent object. From this it follows that
1044 :meth:`~.Relationship.Comparator.contains` is
1045 very useful when used with simple one-to-many
1046 operations.
1047
1048 For many-to-many operations, the behavior of
1049 :meth:`~.Relationship.Comparator.contains`
1050 has more caveats. The association table will be
1051 rendered in the statement, producing an "implicit"
1052 join, that is, includes multiple tables in the FROM
1053 clause which are equated in the WHERE clause::
1054
1055 query(MyClass).filter(MyClass.contains(other))
1056
1057 Produces a query like:
1058
1059 .. sourcecode:: sql
1060
1061 SELECT * FROM my_table, my_association_table AS
1062 my_association_table_1 WHERE
1063 my_table.id = my_association_table_1.parent_id
1064 AND my_association_table_1.child_id = <some id>
1065
1066 Where ``<some id>`` would be the primary key of
1067 ``other``. From the above, it is clear that
1068 :meth:`~.Relationship.Comparator.contains`
1069 will **not** work with many-to-many collections when
1070 used in queries that move beyond simple AND
1071 conjunctions, such as multiple
1072 :meth:`~.Relationship.Comparator.contains`
1073 expressions joined by OR. In such cases subqueries or
1074 explicit "outer joins" will need to be used instead.
1075 See :meth:`~.Relationship.Comparator.any` for
1076 a less-performant alternative using EXISTS, or refer
1077 to :meth:`_query.Query.outerjoin`
1078 as well as :ref:`orm_queryguide_joins`
1079 for more details on constructing outer joins.
1080
1081 kwargs may be ignored by this operator but are required for API
1082 conformance.
1083 """
1084 if not self.prop.uselist:
1085 raise sa_exc.InvalidRequestError(
1086 "'contains' not implemented for scalar "
1087 "attributes. Use =="
1088 )
1089
1090 clause = self.prop._optimized_compare(
1091 other, adapt_source=self.adapter
1092 )
1093
1094 if self.prop.secondaryjoin is not None:
1095 clause.negation_clause = self.__negated_contains_or_equals(
1096 other
1097 )
1098
1099 return clause
1100
1101 def __negated_contains_or_equals(
1102 self, other: Any
1103 ) -> ColumnElement[bool]:
1104 if self.prop.direction == MANYTOONE:
1105 state = attributes.instance_state(other)
1106
1107 def state_bindparam(
1108 local_col: ColumnElement[Any],
1109 state: InstanceState[Any],
1110 remote_col: ColumnElement[Any],
1111 ) -> BindParameter[Any]:
1112 dict_ = state.dict
1113 return sql.bindparam(
1114 local_col.key,
1115 type_=local_col.type,
1116 unique=True,
1117 callable_=self.prop._get_attr_w_warn_on_none(
1118 self.prop.mapper, state, dict_, remote_col
1119 ),
1120 )
1121
1122 def adapt(col: _CE) -> _CE:
1123 if self.adapter:
1124 return self.adapter(col)
1125 else:
1126 return col
1127
1128 if self.property._use_get:
1129 return sql.and_(
1130 *[
1131 sql.or_(
1132 adapt(x)
1133 != state_bindparam(adapt(x), state, y),
1134 adapt(x) == None,
1135 )
1136 for (x, y) in self.property.local_remote_pairs
1137 ]
1138 )
1139
1140 criterion = sql.and_(
1141 *[
1142 x == y
1143 for (x, y) in zip(
1144 self.property.mapper.primary_key,
1145 self.property.mapper.primary_key_from_instance(other),
1146 )
1147 ]
1148 )
1149
1150 return ~self._criterion_exists(criterion)
1151
1152 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1153 """Implement the ``!=`` operator.
1154
1155 In a many-to-one context, such as:
1156
1157 .. sourcecode:: text
1158
1159 MyClass.some_prop != <some object>
1160
1161 This will typically produce a clause such as:
1162
1163 .. sourcecode:: sql
1164
1165 mytable.related_id != <some id>
1166
1167 Where ``<some id>`` is the primary key of the
1168 given object.
1169
1170 The ``!=`` operator provides partial functionality for non-
1171 many-to-one comparisons:
1172
1173 * Comparisons against collections are not supported.
1174 Use
1175 :meth:`~.Relationship.Comparator.contains`
1176 in conjunction with :func:`_expression.not_`.
1177 * Compared to a scalar one-to-many, will produce a
1178 clause that compares the target columns in the parent to
1179 the given target.
1180 * Compared to a scalar many-to-many, an alias
1181 of the association table will be rendered as
1182 well, forming a natural join that is part of the
1183 main body of the query. This will not work for
1184 queries that go beyond simple AND conjunctions of
1185 comparisons, such as those which use OR. Use
1186 explicit joins, outerjoins, or
1187 :meth:`~.Relationship.Comparator.has` in
1188 conjunction with :func:`_expression.not_` for
1189 more comprehensive non-many-to-one scalar
1190 membership tests.
1191 * Comparisons against ``None`` given in a one-to-many
1192 or many-to-many context produce an EXISTS clause.
1193
1194 """
1195 if other is None or isinstance(other, expression.Null):
1196 if self.property.direction == MANYTOONE:
1197 return _orm_annotate(
1198 ~self.property._optimized_compare(
1199 None, adapt_source=self.adapter
1200 )
1201 )
1202
1203 else:
1204 return self._criterion_exists()
1205 elif self.property.uselist:
1206 raise sa_exc.InvalidRequestError(
1207 "Can't compare a collection"
1208 " to an object or collection; use "
1209 "contains() to test for membership."
1210 )
1211 else:
1212 return _orm_annotate(self.__negated_contains_or_equals(other))
1213
1214 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1215 self.prop.parent._check_configure()
1216 return self.prop
1217
1218 def _with_parent(
1219 self,
1220 instance: object,
1221 alias_secondary: bool = True,
1222 from_entity: Optional[_EntityType[Any]] = None,
1223 ) -> ColumnElement[bool]:
1224 assert instance is not None
1225 adapt_source: Optional[_CoreAdapterProto] = None
1226 if from_entity is not None:
1227 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1228 assert insp is not None
1229 if insp_is_aliased_class(insp):
1230 adapt_source = insp._adapter.adapt_clause
1231 return self._optimized_compare(
1232 instance,
1233 value_is_parent=True,
1234 adapt_source=adapt_source,
1235 alias_secondary=alias_secondary,
1236 )
1237
1238 def _optimized_compare(
1239 self,
1240 state: Any,
1241 value_is_parent: bool = False,
1242 adapt_source: Optional[_CoreAdapterProto] = None,
1243 alias_secondary: bool = True,
1244 ) -> ColumnElement[bool]:
1245 if state is not None:
1246 try:
1247 state = inspect(state)
1248 except sa_exc.NoInspectionAvailable:
1249 state = None
1250
1251 if state is None or not getattr(state, "is_instance", False):
1252 raise sa_exc.ArgumentError(
1253 "Mapped instance expected for relationship "
1254 "comparison to object. Classes, queries and other "
1255 "SQL elements are not accepted in this context; for "
1256 "comparison with a subquery, "
1257 "use %s.has(**criteria)." % self
1258 )
1259 reverse_direction = not value_is_parent
1260
1261 if state is None:
1262 return self._lazy_none_clause(
1263 reverse_direction, adapt_source=adapt_source
1264 )
1265
1266 if not reverse_direction:
1267 criterion, bind_to_col = (
1268 self._lazy_strategy._lazywhere,
1269 self._lazy_strategy._bind_to_col,
1270 )
1271 else:
1272 criterion, bind_to_col = (
1273 self._lazy_strategy._rev_lazywhere,
1274 self._lazy_strategy._rev_bind_to_col,
1275 )
1276
1277 if reverse_direction:
1278 mapper = self.mapper
1279 else:
1280 mapper = self.parent
1281
1282 dict_ = attributes.instance_dict(state.obj())
1283
1284 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1285 if bindparam._identifying_key in bind_to_col:
1286 bindparam.callable = self._get_attr_w_warn_on_none(
1287 mapper,
1288 state,
1289 dict_,
1290 bind_to_col[bindparam._identifying_key],
1291 )
1292
1293 if self.secondary is not None and alias_secondary:
1294 criterion = ClauseAdapter(
1295 self.secondary._anonymous_fromclause()
1296 ).traverse(criterion)
1297
1298 criterion = visitors.cloned_traverse(
1299 criterion, {}, {"bindparam": visit_bindparam}
1300 )
1301
1302 if adapt_source:
1303 criterion = adapt_source(criterion)
1304 return criterion
1305
1306 def _get_attr_w_warn_on_none(
1307 self,
1308 mapper: Mapper[Any],
1309 state: InstanceState[Any],
1310 dict_: _InstanceDict,
1311 column: ColumnElement[Any],
1312 ) -> Callable[[], Any]:
1313 """Create the callable that is used in a many-to-one expression.
1314
1315 E.g.::
1316
1317 u1 = s.query(User).get(5)
1318
1319 expr = Address.user == u1
1320
1321 Above, the SQL should be "address.user_id = 5". The callable
1322 returned by this method produces the value "5" based on the identity
1323 of ``u1``.
1324
1325 """
1326
1327 # in this callable, we're trying to thread the needle through
1328 # a wide variety of scenarios, including:
1329 #
1330 # * the object hasn't been flushed yet and there's no value for
1331 # the attribute as of yet
1332 #
1333 # * the object hasn't been flushed yet but it has a user-defined
1334 # value
1335 #
1336 # * the object has a value but it's expired and not locally present
1337 #
1338 # * the object has a value but it's expired and not locally present,
1339 # and the object is also detached
1340 #
1341 # * The object hadn't been flushed yet, there was no value, but
1342 # later, the object has been expired and detached, and *now*
1343 # they're trying to evaluate it
1344 #
1345 # * the object had a value, but it was changed to a new value, and
1346 # then expired
1347 #
1348 # * the object had a value, but it was changed to a new value, and
1349 # then expired, then the object was detached
1350 #
1351 # * the object has a user-set value, but it's None and we don't do
1352 # the comparison correctly for that so warn
1353 #
1354
1355 prop = mapper.get_property_by_column(column)
1356
1357 # by invoking this method, InstanceState will track the last known
1358 # value for this key each time the attribute is to be expired.
1359 # this feature was added explicitly for use in this method.
1360 state._track_last_known_value(prop.key)
1361
1362 lkv_fixed = state._last_known_values
1363
1364 def _go() -> Any:
1365 assert lkv_fixed is not None
1366 last_known = to_return = lkv_fixed[prop.key]
1367 existing_is_available = (
1368 last_known is not LoaderCallableStatus.NO_VALUE
1369 )
1370
1371 # we support that the value may have changed. so here we
1372 # try to get the most recent value including re-fetching.
1373 # only if we can't get a value now due to detachment do we return
1374 # the last known value
1375 current_value = mapper._get_state_attr_by_column(
1376 state,
1377 dict_,
1378 column,
1379 passive=(
1380 PassiveFlag.PASSIVE_OFF
1381 if state.persistent
1382 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1383 ),
1384 )
1385
1386 if current_value is LoaderCallableStatus.NEVER_SET:
1387 if not existing_is_available:
1388 raise sa_exc.InvalidRequestError(
1389 "Can't resolve value for column %s on object "
1390 "%s; no value has been set for this column"
1391 % (column, state_str(state))
1392 )
1393 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1394 if not existing_is_available:
1395 raise sa_exc.InvalidRequestError(
1396 "Can't resolve value for column %s on object "
1397 "%s; the object is detached and the value was "
1398 "expired" % (column, state_str(state))
1399 )
1400 else:
1401 to_return = current_value
1402 if to_return is None:
1403 util.warn(
1404 "Got None for value of column %s; this is unsupported "
1405 "for a relationship comparison and will not "
1406 "currently produce an IS comparison "
1407 "(but may in a future release)" % column
1408 )
1409 return to_return
1410
1411 return _go
1412
1413 def _lazy_none_clause(
1414 self,
1415 reverse_direction: bool = False,
1416 adapt_source: Optional[_CoreAdapterProto] = None,
1417 ) -> ColumnElement[bool]:
1418 if not reverse_direction:
1419 criterion, bind_to_col = (
1420 self._lazy_strategy._lazywhere,
1421 self._lazy_strategy._bind_to_col,
1422 )
1423 else:
1424 criterion, bind_to_col = (
1425 self._lazy_strategy._rev_lazywhere,
1426 self._lazy_strategy._rev_bind_to_col,
1427 )
1428
1429 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1430
1431 if adapt_source:
1432 criterion = adapt_source(criterion)
1433 return criterion
1434
1435 def _format_as_string(self, class_: type, key: str) -> str:
1436 return f"{class_.__name__}.{key}"
1437
1438 def __str__(self) -> str:
1439 return self._format_as_string(self.parent.class_, self.key)
1440
1441 def merge(
1442 self,
1443 session: Session,
1444 source_state: InstanceState[Any],
1445 source_dict: _InstanceDict,
1446 dest_state: InstanceState[Any],
1447 dest_dict: _InstanceDict,
1448 load: bool,
1449 _recursive: Dict[Any, object],
1450 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1451 ) -> None:
1452 if load:
1453 for r in self._reverse_property:
1454 if (source_state, r) in _recursive:
1455 return
1456
1457 if "merge" not in self._cascade:
1458 return
1459
1460 if self.key not in source_dict:
1461 return
1462
1463 if self.uselist:
1464 impl = source_state.get_impl(self.key)
1465
1466 assert is_has_collection_adapter(impl)
1467 instances_iterable = impl.get_collection(source_state, source_dict)
1468
1469 # if this is a CollectionAttributeImpl, then empty should
1470 # be False, otherwise "self.key in source_dict" should not be
1471 # True
1472 assert not instances_iterable.empty if impl.collection else True
1473
1474 if load:
1475 # for a full merge, pre-load the destination collection,
1476 # so that individual _merge of each item pulls from identity
1477 # map for those already present.
1478 # also assumes CollectionAttributeImpl behavior of loading
1479 # "old" list in any case
1480 dest_state.get_impl(self.key).get(
1481 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1482 )
1483
1484 dest_list = []
1485 for current in instances_iterable:
1486 current_state = attributes.instance_state(current)
1487 current_dict = attributes.instance_dict(current)
1488 _recursive[(current_state, self)] = True
1489 obj = session._merge(
1490 current_state,
1491 current_dict,
1492 load=load,
1493 _recursive=_recursive,
1494 _resolve_conflict_map=_resolve_conflict_map,
1495 )
1496 if obj is not None:
1497 dest_list.append(obj)
1498
1499 if not load:
1500 coll = attributes.init_state_collection(
1501 dest_state, dest_dict, self.key
1502 )
1503 for c in dest_list:
1504 coll.append_without_event(c)
1505 else:
1506 dest_impl = dest_state.get_impl(self.key)
1507 assert is_has_collection_adapter(dest_impl)
1508 dest_impl.set(
1509 dest_state,
1510 dest_dict,
1511 dest_list,
1512 _adapt=False,
1513 passive=PassiveFlag.PASSIVE_MERGE,
1514 )
1515 else:
1516 current = source_dict[self.key]
1517 if current is not None:
1518 current_state = attributes.instance_state(current)
1519 current_dict = attributes.instance_dict(current)
1520 _recursive[(current_state, self)] = True
1521 obj = session._merge(
1522 current_state,
1523 current_dict,
1524 load=load,
1525 _recursive=_recursive,
1526 _resolve_conflict_map=_resolve_conflict_map,
1527 )
1528 else:
1529 obj = None
1530
1531 if not load:
1532 dest_dict[self.key] = obj
1533 else:
1534 dest_state.get_impl(self.key).set(
1535 dest_state, dest_dict, obj, None
1536 )
1537
1538 def _value_as_iterable(
1539 self,
1540 state: InstanceState[_O],
1541 dict_: _InstanceDict,
1542 key: str,
1543 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1544 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1545 """Return a list of tuples (state, obj) for the given
1546 key.
1547
1548 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1549 """
1550
1551 impl = state.manager[key].impl
1552 x = impl.get(state, dict_, passive=passive)
1553 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1554 return []
1555 elif is_has_collection_adapter(impl):
1556 return [
1557 (attributes.instance_state(o), o)
1558 for o in impl.get_collection(state, dict_, x, passive=passive)
1559 ]
1560 else:
1561 return [(attributes.instance_state(x), x)]
1562
1563 def cascade_iterator(
1564 self,
1565 type_: str,
1566 state: InstanceState[Any],
1567 dict_: _InstanceDict,
1568 visited_states: Set[InstanceState[Any]],
1569 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1570 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1571 # assert type_ in self._cascade
1572
1573 # only actively lazy load on the 'delete' cascade
1574 if type_ != "delete" or self.passive_deletes:
1575 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1576 else:
1577 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1578
1579 if type_ == "save-update":
1580 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1581 else:
1582 tuples = self._value_as_iterable(
1583 state, dict_, self.key, passive=passive
1584 )
1585
1586 skip_pending = (
1587 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1588 )
1589
1590 for instance_state, c in tuples:
1591 if instance_state in visited_states:
1592 continue
1593
1594 if c is None:
1595 # would like to emit a warning here, but
1596 # would not be consistent with collection.append(None)
1597 # current behavior of silently skipping.
1598 # see [ticket:2229]
1599 continue
1600
1601 assert instance_state is not None
1602 instance_dict = attributes.instance_dict(c)
1603
1604 if halt_on and halt_on(instance_state):
1605 continue
1606
1607 if skip_pending and not instance_state.key:
1608 continue
1609
1610 instance_mapper = instance_state.manager.mapper
1611
1612 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1613 raise AssertionError(
1614 "Attribute '%s' on class '%s' "
1615 "doesn't handle objects "
1616 "of type '%s'"
1617 % (self.key, self.parent.class_, c.__class__)
1618 )
1619
1620 visited_states.add(instance_state)
1621
1622 yield c, instance_mapper, instance_state, instance_dict
1623
1624 @property
1625 def _effective_sync_backref(self) -> bool:
1626 if self.viewonly:
1627 return False
1628 else:
1629 return self.sync_backref is not False
1630
1631 @staticmethod
1632 def _check_sync_backref(
1633 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1634 ) -> None:
1635 if rel_a.viewonly and rel_b.sync_backref:
1636 raise sa_exc.InvalidRequestError(
1637 "Relationship %s cannot specify sync_backref=True since %s "
1638 "includes viewonly=True." % (rel_b, rel_a)
1639 )
1640 if (
1641 rel_a.viewonly
1642 and not rel_b.viewonly
1643 and rel_b.sync_backref is not False
1644 ):
1645 rel_b.sync_backref = False
1646
1647 def _add_reverse_property(self, key: str) -> None:
1648 other = self.mapper.get_property(key, _configure_mappers=False)
1649 if not isinstance(other, RelationshipProperty):
1650 raise sa_exc.InvalidRequestError(
1651 "back_populates on relationship '%s' refers to attribute '%s' "
1652 "that is not a relationship. The back_populates parameter "
1653 "should refer to the name of a relationship on the target "
1654 "class." % (self, other)
1655 )
1656 # viewonly and sync_backref cases
1657 # 1. self.viewonly==True and other.sync_backref==True -> error
1658 # 2. self.viewonly==True and other.viewonly==False and
1659 # other.sync_backref==None -> warn sync_backref=False, set to False
1660 self._check_sync_backref(self, other)
1661 # 3. other.viewonly==True and self.sync_backref==True -> error
1662 # 4. other.viewonly==True and self.viewonly==False and
1663 # self.sync_backref==None -> warn sync_backref=False, set to False
1664 self._check_sync_backref(other, self)
1665
1666 self._reverse_property.add(other)
1667 other._reverse_property.add(self)
1668
1669 other._setup_entity()
1670
1671 if not other.mapper.common_parent(self.parent):
1672 raise sa_exc.ArgumentError(
1673 "reverse_property %r on "
1674 "relationship %s references relationship %s, which "
1675 "does not reference mapper %s"
1676 % (key, self, other, self.parent)
1677 )
1678
1679 if (
1680 other._configure_started
1681 and self.direction in (ONETOMANY, MANYTOONE)
1682 and self.direction == other.direction
1683 ):
1684 raise sa_exc.ArgumentError(
1685 "%s and back-reference %s are "
1686 "both of the same direction %r. Did you mean to "
1687 "set remote_side on the many-to-one side ?"
1688 % (other, self, self.direction)
1689 )
1690
1691 @util.memoized_property
1692 def entity(self) -> _InternalEntityType[_T]:
1693 """Return the target mapped entity, which is an inspect() of the
1694 class or aliased class that is referenced by this
1695 :class:`.RelationshipProperty`.
1696
1697 """
1698 self.parent._check_configure()
1699 return self.entity
1700
1701 @util.memoized_property
1702 def mapper(self) -> Mapper[_T]:
1703 """Return the targeted :class:`_orm.Mapper` for this
1704 :class:`.RelationshipProperty`.
1705
1706 """
1707 return self.entity.mapper
1708
1709 def do_init(self) -> None:
1710 self._process_dependent_arguments()
1711 self._setup_entity()
1712 self._setup_registry_dependencies()
1713 self._setup_join_conditions()
1714 self._check_cascade_settings(self._cascade)
1715 self._post_init()
1716 self._generate_backref()
1717 self._join_condition._warn_for_conflicting_sync_targets()
1718 super().do_init()
1719 self._lazy_strategy = cast(
1720 "_LazyLoader", self._get_strategy((("lazy", "select"),))
1721 )
1722
1723 def _setup_registry_dependencies(self) -> None:
1724 self.parent.mapper.registry._set_depends_on(
1725 self.entity.mapper.registry
1726 )
1727
1728 def _process_dependent_arguments(self) -> None:
1729 """Convert incoming configuration arguments to their
1730 proper form.
1731
1732 Callables are resolved, ORM annotations removed.
1733
1734 """
1735
1736 # accept callables for other attributes which may require
1737 # deferred initialization. This technique is used
1738 # by declarative "string configs" and some recipes.
1739 init_args = self._init_args
1740
1741 for attr in (
1742 "order_by",
1743 "primaryjoin",
1744 "secondaryjoin",
1745 "secondary",
1746 "foreign_keys",
1747 "remote_side",
1748 "back_populates",
1749 ):
1750 rel_arg = getattr(init_args, attr)
1751
1752 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1753
1754 # remove "annotations" which are present if mapped class
1755 # descriptors are used to create the join expression.
1756 for attr in "primaryjoin", "secondaryjoin":
1757 rel_arg = getattr(init_args, attr)
1758 val = rel_arg.resolved
1759 if val is not None:
1760 rel_arg.resolved = _orm_deannotate(
1761 coercions.expect(
1762 roles.ColumnArgumentRole, val, argname=attr
1763 )
1764 )
1765
1766 secondary = init_args.secondary.resolved
1767 if secondary is not None and _is_mapped_class(secondary):
1768 raise sa_exc.ArgumentError(
1769 "secondary argument %s passed to to relationship() %s must "
1770 "be a Table object or other FROM clause; can't send a mapped "
1771 "class directly as rows in 'secondary' are persisted "
1772 "independently of a class that is mapped "
1773 "to that same table." % (secondary, self)
1774 )
1775
1776 # ensure expressions in self.order_by, foreign_keys,
1777 # remote_side are all columns, not strings.
1778 if (
1779 init_args.order_by.resolved is not False
1780 and init_args.order_by.resolved is not None
1781 ):
1782 self.order_by = tuple(
1783 coercions.expect(
1784 roles.ColumnArgumentRole, x, argname="order_by"
1785 )
1786 for x in util.to_list(init_args.order_by.resolved)
1787 )
1788 else:
1789 self.order_by = False
1790
1791 self._user_defined_foreign_keys = util.column_set(
1792 coercions.expect(
1793 roles.ColumnArgumentRole, x, argname="foreign_keys"
1794 )
1795 for x in util.to_column_set(init_args.foreign_keys.resolved)
1796 )
1797
1798 self.remote_side = util.column_set(
1799 coercions.expect(
1800 roles.ColumnArgumentRole, x, argname="remote_side"
1801 )
1802 for x in util.to_column_set(init_args.remote_side.resolved)
1803 )
1804
1805 def declarative_scan(
1806 self,
1807 decl_scan: _ClassScanMapperConfig,
1808 registry: _RegistryType,
1809 cls: Type[Any],
1810 originating_module: Optional[str],
1811 key: str,
1812 mapped_container: Optional[Type[Mapped[Any]]],
1813 annotation: Optional[_AnnotationScanType],
1814 extracted_mapped_annotation: Optional[_AnnotationScanType],
1815 is_dataclass_field: bool,
1816 ) -> None:
1817 if extracted_mapped_annotation is None:
1818 if self.argument is None:
1819 self._raise_for_required(key, cls)
1820 else:
1821 return
1822
1823 argument = extracted_mapped_annotation
1824 assert originating_module is not None
1825
1826 if mapped_container is not None:
1827 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1828 is_dynamic = issubclass(mapped_container, DynamicMapped)
1829 if is_write_only:
1830 self.lazy = "write_only"
1831 self.strategy_key = (("lazy", self.lazy),)
1832 elif is_dynamic:
1833 self.lazy = "dynamic"
1834 self.strategy_key = (("lazy", self.lazy),)
1835 else:
1836 is_write_only = is_dynamic = False
1837
1838 argument = de_optionalize_union_types(argument)
1839
1840 if hasattr(argument, "__origin__"):
1841 arg_origin = argument.__origin__
1842 if isinstance(arg_origin, type) and issubclass(
1843 arg_origin, abc.Collection
1844 ):
1845 if self.collection_class is None:
1846 if _py_inspect.isabstract(arg_origin):
1847 raise sa_exc.ArgumentError(
1848 f"Collection annotation type {arg_origin} cannot "
1849 "be instantiated; please provide an explicit "
1850 "'collection_class' parameter "
1851 "(e.g. list, set, etc.) to the "
1852 "relationship() function to accompany this "
1853 "annotation"
1854 )
1855
1856 self.collection_class = arg_origin
1857
1858 elif not is_write_only and not is_dynamic:
1859 self.uselist = False
1860
1861 if argument.__args__: # type: ignore
1862 if isinstance(arg_origin, type) and issubclass(
1863 arg_origin, typing.Mapping
1864 ):
1865 type_arg = argument.__args__[-1] # type: ignore
1866 else:
1867 type_arg = argument.__args__[0] # type: ignore
1868 if hasattr(type_arg, "__forward_arg__"):
1869 str_argument = type_arg.__forward_arg__
1870
1871 argument = resolve_name_to_real_class_name(
1872 str_argument, originating_module
1873 )
1874 else:
1875 argument = type_arg
1876 else:
1877 raise sa_exc.ArgumentError(
1878 f"Generic alias {argument} requires an argument"
1879 )
1880 elif hasattr(argument, "__forward_arg__"):
1881 argument = argument.__forward_arg__
1882
1883 argument = resolve_name_to_real_class_name(
1884 argument, originating_module
1885 )
1886
1887 if (
1888 self.collection_class is None
1889 and not is_write_only
1890 and not is_dynamic
1891 ):
1892 self.uselist = False
1893
1894 # ticket #8759
1895 # if a lead argument was given to relationship(), like
1896 # `relationship("B")`, use that, don't replace it with class we
1897 # found in the annotation. The declarative_scan() method call here is
1898 # still useful, as we continue to derive collection type and do
1899 # checking of the annotation in any case.
1900 if self.argument is None:
1901 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1902
1903 if (
1904 self._attribute_options.dataclasses_default_factory
1905 is not _NoArg.NO_ARG
1906 and self._attribute_options.dataclasses_default_factory
1907 is not self.collection_class
1908 ):
1909 raise sa_exc.ArgumentError(
1910 f"For relationship {self._format_as_string(cls, key)} using "
1911 "dataclass options, default_factory must be exactly "
1912 f"{self.collection_class}"
1913 )
1914
1915 @util.preload_module("sqlalchemy.orm.mapper")
1916 def _setup_entity(self, __argument: Any = None, /) -> None:
1917 if "entity" in self.__dict__:
1918 return
1919
1920 mapperlib = util.preloaded.orm_mapper
1921
1922 if __argument:
1923 argument = __argument
1924 else:
1925 argument = self.argument
1926
1927 resolved_argument: _ExternalEntityType[Any]
1928
1929 if isinstance(argument, str):
1930 # we might want to cleanup clsregistry API to make this
1931 # more straightforward
1932 resolved_argument = cast(
1933 "_ExternalEntityType[Any]",
1934 self._clsregistry_resolve_name(argument)(),
1935 )
1936 elif callable(argument) and not isinstance(
1937 argument, (type, mapperlib.Mapper)
1938 ):
1939 resolved_argument = argument()
1940 else:
1941 resolved_argument = argument
1942
1943 entity: _InternalEntityType[Any]
1944
1945 if isinstance(resolved_argument, type):
1946 entity = class_mapper(resolved_argument, configure=False)
1947 else:
1948 try:
1949 entity = inspect(resolved_argument)
1950 except sa_exc.NoInspectionAvailable:
1951 entity = None # type: ignore
1952
1953 if not hasattr(entity, "mapper"):
1954 raise sa_exc.ArgumentError(
1955 "relationship '%s' expects "
1956 "a class or a mapper argument (received: %s)"
1957 % (self.key, type(resolved_argument))
1958 )
1959
1960 self.entity = entity
1961 self.target = self.entity.persist_selectable
1962
1963 def _setup_join_conditions(self) -> None:
1964 self._join_condition = jc = _JoinCondition(
1965 parent_persist_selectable=self.parent.persist_selectable,
1966 child_persist_selectable=self.entity.persist_selectable,
1967 parent_local_selectable=self.parent.local_table,
1968 child_local_selectable=self.entity.local_table,
1969 primaryjoin=self._init_args.primaryjoin.resolved,
1970 secondary=self._init_args.secondary.resolved,
1971 secondaryjoin=self._init_args.secondaryjoin.resolved,
1972 parent_equivalents=self.parent._equivalent_columns,
1973 child_equivalents=self.mapper._equivalent_columns,
1974 consider_as_foreign_keys=self._user_defined_foreign_keys,
1975 local_remote_pairs=self.local_remote_pairs,
1976 remote_side=self.remote_side,
1977 self_referential=self._is_self_referential,
1978 prop=self,
1979 support_sync=not self.viewonly,
1980 can_be_synced_fn=self._columns_are_mapped,
1981 )
1982 self.primaryjoin = jc.primaryjoin
1983 self.secondaryjoin = jc.secondaryjoin
1984 self.secondary = jc.secondary
1985 self.direction = jc.direction
1986 self.local_remote_pairs = jc.local_remote_pairs
1987 self.remote_side = jc.remote_columns
1988 self.local_columns = jc.local_columns
1989 self.synchronize_pairs = jc.synchronize_pairs
1990 self._calculated_foreign_keys = jc.foreign_key_columns
1991 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1992
1993 @property
1994 def _clsregistry_resolve_arg(
1995 self,
1996 ) -> Callable[[str, bool], _class_resolver]:
1997 return self._clsregistry_resolvers[1]
1998
1999 @property
2000 def _clsregistry_resolve_name(
2001 self,
2002 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
2003 return self._clsregistry_resolvers[0]
2004
2005 @util.memoized_property
2006 @util.preload_module("sqlalchemy.orm.clsregistry")
2007 def _clsregistry_resolvers(
2008 self,
2009 ) -> Tuple[
2010 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
2011 Callable[[str, bool], _class_resolver],
2012 ]:
2013 _resolver = util.preloaded.orm_clsregistry._resolver
2014
2015 return _resolver(self.parent.class_, self)
2016
2017 @property
2018 def cascade(self) -> CascadeOptions:
2019 """Return the current cascade setting for this
2020 :class:`.RelationshipProperty`.
2021 """
2022 return self._cascade
2023
2024 @cascade.setter
2025 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
2026 self._set_cascade(cascade)
2027
2028 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
2029 cascade = CascadeOptions(cascade_arg)
2030
2031 if self.viewonly:
2032 cascade = CascadeOptions(
2033 cascade.intersection(CascadeOptions._viewonly_cascades)
2034 )
2035
2036 if "mapper" in self.__dict__:
2037 self._check_cascade_settings(cascade)
2038 self._cascade = cascade
2039
2040 if self._dependency_processor:
2041 self._dependency_processor.cascade = cascade
2042
2043 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
2044 if (
2045 cascade.delete_orphan
2046 and not self.single_parent
2047 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
2048 ):
2049 raise sa_exc.ArgumentError(
2050 "For %(direction)s relationship %(rel)s, delete-orphan "
2051 "cascade is normally "
2052 'configured only on the "one" side of a one-to-many '
2053 "relationship, "
2054 'and not on the "many" side of a many-to-one or many-to-many '
2055 "relationship. "
2056 "To force this relationship to allow a particular "
2057 '"%(relatedcls)s" object to be referenced by only '
2058 'a single "%(clsname)s" object at a time via the '
2059 "%(rel)s relationship, which "
2060 "would allow "
2061 "delete-orphan cascade to take place in this direction, set "
2062 "the single_parent=True flag."
2063 % {
2064 "rel": self,
2065 "direction": (
2066 "many-to-one"
2067 if self.direction is MANYTOONE
2068 else "many-to-many"
2069 ),
2070 "clsname": self.parent.class_.__name__,
2071 "relatedcls": self.mapper.class_.__name__,
2072 },
2073 code="bbf0",
2074 )
2075
2076 if self.passive_deletes == "all" and (
2077 "delete" in cascade or "delete-orphan" in cascade
2078 ):
2079 raise sa_exc.ArgumentError(
2080 "On %s, can't set passive_deletes='all' in conjunction "
2081 "with 'delete' or 'delete-orphan' cascade" % self
2082 )
2083
2084 if cascade.delete_orphan:
2085 self.mapper.primary_mapper()._delete_orphans.append(
2086 (self.key, self.parent.class_)
2087 )
2088
2089 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2090 """Return True if this property will persist values on behalf
2091 of the given mapper.
2092
2093 """
2094
2095 return (
2096 self.key in mapper.relationships
2097 and mapper.relationships[self.key] is self
2098 )
2099
2100 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2101 """Return True if all columns in the given collection are
2102 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2103
2104 """
2105
2106 secondary = self._init_args.secondary.resolved
2107 for c in cols:
2108 if secondary is not None and secondary.c.contains_column(c):
2109 continue
2110 if not self.parent.persist_selectable.c.contains_column(
2111 c
2112 ) and not self.target.c.contains_column(c):
2113 return False
2114 return True
2115
2116 def _generate_backref(self) -> None:
2117 """Interpret the 'backref' instruction to create a
2118 :func:`_orm.relationship` complementary to this one."""
2119
2120 resolve_back_populates = self._init_args.back_populates.resolved
2121
2122 if self.backref is not None and not resolve_back_populates:
2123 kwargs: Dict[str, Any]
2124 if isinstance(self.backref, str):
2125 backref_key, kwargs = self.backref, {}
2126 else:
2127 backref_key, kwargs = self.backref
2128 mapper = self.mapper.primary_mapper()
2129
2130 if not mapper.concrete:
2131 check = set(mapper.iterate_to_root()).union(
2132 mapper.self_and_descendants
2133 )
2134 for m in check:
2135 if m.has_property(backref_key) and not m.concrete:
2136 raise sa_exc.ArgumentError(
2137 "Error creating backref "
2138 "'%s' on relationship '%s': property of that "
2139 "name exists on mapper '%s'"
2140 % (backref_key, self, m)
2141 )
2142
2143 # determine primaryjoin/secondaryjoin for the
2144 # backref. Use the one we had, so that
2145 # a custom join doesn't have to be specified in
2146 # both directions.
2147 if self.secondary is not None:
2148 # for many to many, just switch primaryjoin/
2149 # secondaryjoin. use the annotated
2150 # pj/sj on the _join_condition.
2151 pj = kwargs.pop(
2152 "primaryjoin",
2153 self._join_condition.secondaryjoin_minus_local,
2154 )
2155 sj = kwargs.pop(
2156 "secondaryjoin",
2157 self._join_condition.primaryjoin_minus_local,
2158 )
2159 else:
2160 pj = kwargs.pop(
2161 "primaryjoin",
2162 self._join_condition.primaryjoin_reverse_remote,
2163 )
2164 sj = kwargs.pop("secondaryjoin", None)
2165 if sj:
2166 raise sa_exc.InvalidRequestError(
2167 "Can't assign 'secondaryjoin' on a backref "
2168 "against a non-secondary relationship."
2169 )
2170
2171 foreign_keys = kwargs.pop(
2172 "foreign_keys", self._user_defined_foreign_keys
2173 )
2174 parent = self.parent.primary_mapper()
2175 kwargs.setdefault("viewonly", self.viewonly)
2176 kwargs.setdefault("post_update", self.post_update)
2177 kwargs.setdefault("passive_updates", self.passive_updates)
2178 kwargs.setdefault("sync_backref", self.sync_backref)
2179 self.back_populates = backref_key
2180 relationship = RelationshipProperty(
2181 parent,
2182 self.secondary,
2183 primaryjoin=pj,
2184 secondaryjoin=sj,
2185 foreign_keys=foreign_keys,
2186 back_populates=self.key,
2187 **kwargs,
2188 )
2189 mapper._configure_property(
2190 backref_key, relationship, warn_for_existing=True
2191 )
2192
2193 if resolve_back_populates:
2194 if isinstance(resolve_back_populates, PropComparator):
2195 back_populates = resolve_back_populates.prop.key
2196 elif isinstance(resolve_back_populates, str):
2197 back_populates = resolve_back_populates
2198 else:
2199 # need test coverage for this case as well
2200 raise sa_exc.ArgumentError(
2201 f"Invalid back_populates value: {resolve_back_populates!r}"
2202 )
2203
2204 self._add_reverse_property(back_populates)
2205
2206 @util.preload_module("sqlalchemy.orm.dependency")
2207 def _post_init(self) -> None:
2208 dependency = util.preloaded.orm_dependency
2209
2210 if self.uselist is None:
2211 self.uselist = self.direction is not MANYTOONE
2212 if not self.viewonly:
2213 self._dependency_processor = ( # type: ignore
2214 dependency._DependencyProcessor.from_relationship
2215 )(self)
2216
2217 if (
2218 self.uselist
2219 and self._attribute_options.dataclasses_default
2220 is not _NoArg.NO_ARG
2221 ):
2222 raise sa_exc.ArgumentError(
2223 f"On relationship {self}, the dataclass default for "
2224 "relationship may only be set for "
2225 "a relationship that references a scalar value, i.e. "
2226 "many-to-one or explicitly uselist=False"
2227 )
2228
2229 @util.memoized_property
2230 def _use_get(self) -> bool:
2231 """memoize the 'use_get' attribute of this RelationshipLoader's
2232 lazyloader."""
2233
2234 strategy = self._lazy_strategy
2235 return strategy.use_get
2236
2237 @util.memoized_property
2238 def _is_self_referential(self) -> bool:
2239 return self.mapper.common_parent(self.parent)
2240
2241 def _create_joins(
2242 self,
2243 source_polymorphic: bool = False,
2244 source_selectable: Optional[FromClause] = None,
2245 dest_selectable: Optional[FromClause] = None,
2246 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2247 alias_secondary: bool = False,
2248 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2249 ) -> Tuple[
2250 ColumnElement[bool],
2251 Optional[ColumnElement[bool]],
2252 FromClause,
2253 FromClause,
2254 Optional[FromClause],
2255 Optional[ClauseAdapter],
2256 ]:
2257 aliased = False
2258
2259 if alias_secondary and self.secondary is not None:
2260 aliased = True
2261
2262 if source_selectable is None:
2263 if source_polymorphic and self.parent.with_polymorphic:
2264 source_selectable = self.parent._with_polymorphic_selectable
2265
2266 if of_type_entity:
2267 dest_mapper = of_type_entity.mapper
2268 if dest_selectable is None:
2269 dest_selectable = of_type_entity.selectable
2270 aliased = True
2271 else:
2272 dest_mapper = self.mapper
2273
2274 if dest_selectable is None:
2275 dest_selectable = self.entity.selectable
2276 if self.mapper.with_polymorphic:
2277 aliased = True
2278
2279 if self._is_self_referential and source_selectable is None:
2280 dest_selectable = dest_selectable._anonymous_fromclause()
2281 aliased = True
2282 elif (
2283 dest_selectable is not self.mapper._with_polymorphic_selectable
2284 or self.mapper.with_polymorphic
2285 ):
2286 aliased = True
2287
2288 single_crit = dest_mapper._single_table_criterion
2289 aliased = aliased or (
2290 source_selectable is not None
2291 and (
2292 source_selectable
2293 is not self.parent._with_polymorphic_selectable
2294 or source_selectable._is_subquery
2295 )
2296 )
2297
2298 (
2299 primaryjoin,
2300 secondaryjoin,
2301 secondary,
2302 target_adapter,
2303 dest_selectable,
2304 ) = self._join_condition.join_targets(
2305 source_selectable,
2306 dest_selectable,
2307 aliased,
2308 single_crit,
2309 extra_criteria,
2310 )
2311 if source_selectable is None:
2312 source_selectable = self.parent.local_table
2313 if dest_selectable is None:
2314 dest_selectable = self.entity.local_table
2315 return (
2316 primaryjoin,
2317 secondaryjoin,
2318 source_selectable,
2319 dest_selectable,
2320 secondary,
2321 target_adapter,
2322 )
2323
2324
2325def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2326 def clone(elem: _CE) -> _CE:
2327 if isinstance(elem, expression.ColumnClause):
2328 elem = elem._annotate(annotations.copy()) # type: ignore
2329 elem._copy_internals(clone=clone)
2330 return elem
2331
2332 if element is not None:
2333 element = clone(element)
2334 clone = None # type: ignore # remove gc cycles
2335 return element
2336
2337
2338class _JoinCondition:
2339 primaryjoin_initial: Optional[ColumnElement[bool]]
2340 primaryjoin: ColumnElement[bool]
2341 secondaryjoin: Optional[ColumnElement[bool]]
2342 secondary: Optional[FromClause]
2343 prop: RelationshipProperty[Any]
2344
2345 synchronize_pairs: _ColumnPairs
2346 secondary_synchronize_pairs: _ColumnPairs
2347 direction: RelationshipDirection
2348
2349 parent_persist_selectable: FromClause
2350 child_persist_selectable: FromClause
2351 parent_local_selectable: FromClause
2352 child_local_selectable: FromClause
2353
2354 _local_remote_pairs: Optional[_ColumnPairs]
2355
2356 def __init__(
2357 self,
2358 parent_persist_selectable: FromClause,
2359 child_persist_selectable: FromClause,
2360 parent_local_selectable: FromClause,
2361 child_local_selectable: FromClause,
2362 *,
2363 primaryjoin: Optional[ColumnElement[bool]] = None,
2364 secondary: Optional[FromClause] = None,
2365 secondaryjoin: Optional[ColumnElement[bool]] = None,
2366 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2367 child_equivalents: Optional[_EquivalentColumnMap] = None,
2368 consider_as_foreign_keys: Any = None,
2369 local_remote_pairs: Optional[_ColumnPairs] = None,
2370 remote_side: Any = None,
2371 self_referential: Any = False,
2372 prop: RelationshipProperty[Any],
2373 support_sync: bool = True,
2374 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2375 ):
2376 self.parent_persist_selectable = parent_persist_selectable
2377 self.parent_local_selectable = parent_local_selectable
2378 self.child_persist_selectable = child_persist_selectable
2379 self.child_local_selectable = child_local_selectable
2380 self.parent_equivalents = parent_equivalents
2381 self.child_equivalents = child_equivalents
2382 self.primaryjoin_initial = primaryjoin
2383 self.secondaryjoin = secondaryjoin
2384 self.secondary = secondary
2385 self.consider_as_foreign_keys = consider_as_foreign_keys
2386 self._local_remote_pairs = local_remote_pairs
2387 self._remote_side = remote_side
2388 self.prop = prop
2389 self.self_referential = self_referential
2390 self.support_sync = support_sync
2391 self.can_be_synced_fn = can_be_synced_fn
2392
2393 self._determine_joins()
2394 assert self.primaryjoin is not None
2395
2396 self._sanitize_joins()
2397 self._annotate_fks()
2398 self._annotate_remote()
2399 self._annotate_local()
2400 self._annotate_parentmapper()
2401 self._setup_pairs()
2402 self._check_foreign_cols(self.primaryjoin, True)
2403 if self.secondaryjoin is not None:
2404 self._check_foreign_cols(self.secondaryjoin, False)
2405 self._determine_direction()
2406 self._check_remote_side()
2407 self._log_joins()
2408
2409 def _log_joins(self) -> None:
2410 log = self.prop.logger
2411 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2412 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2413 log.info(
2414 "%s synchronize pairs [%s]",
2415 self.prop,
2416 ",".join(
2417 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2418 ),
2419 )
2420 log.info(
2421 "%s secondary synchronize pairs [%s]",
2422 self.prop,
2423 ",".join(
2424 "(%s => %s)" % (l, r)
2425 for (l, r) in self.secondary_synchronize_pairs or []
2426 ),
2427 )
2428 log.info(
2429 "%s local/remote pairs [%s]",
2430 self.prop,
2431 ",".join(
2432 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2433 ),
2434 )
2435 log.info(
2436 "%s remote columns [%s]",
2437 self.prop,
2438 ",".join("%s" % col for col in self.remote_columns),
2439 )
2440 log.info(
2441 "%s local columns [%s]",
2442 self.prop,
2443 ",".join("%s" % col for col in self.local_columns),
2444 )
2445 log.info("%s relationship direction %s", self.prop, self.direction)
2446
2447 def _sanitize_joins(self) -> None:
2448 """remove the parententity annotation from our join conditions which
2449 can leak in here based on some declarative patterns and maybe others.
2450
2451 "parentmapper" is relied upon both by the ORM evaluator as well as
2452 the use case in _join_fixture_inh_selfref_w_entity
2453 that relies upon it being present, see :ticket:`3364`.
2454
2455 """
2456
2457 self.primaryjoin = _deep_deannotate(
2458 self.primaryjoin, values=("parententity", "proxy_key")
2459 )
2460 if self.secondaryjoin is not None:
2461 self.secondaryjoin = _deep_deannotate(
2462 self.secondaryjoin, values=("parententity", "proxy_key")
2463 )
2464
2465 def _determine_joins(self) -> None:
2466 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2467 if not passed to the constructor already.
2468
2469 This is based on analysis of the foreign key relationships
2470 between the parent and target mapped selectables.
2471
2472 """
2473 if self.secondaryjoin is not None and self.secondary is None:
2474 raise sa_exc.ArgumentError(
2475 "Property %s specified with secondary "
2476 "join condition but "
2477 "no secondary argument" % self.prop
2478 )
2479
2480 # find a join between the given mapper's mapped table and
2481 # the given table. will try the mapper's local table first
2482 # for more specificity, then if not found will try the more
2483 # general mapped table, which in the case of inheritance is
2484 # a join.
2485 try:
2486 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2487 if self.secondary is not None:
2488 if self.secondaryjoin is None:
2489 self.secondaryjoin = join_condition(
2490 self.child_persist_selectable,
2491 self.secondary,
2492 a_subset=self.child_local_selectable,
2493 consider_as_foreign_keys=consider_as_foreign_keys,
2494 )
2495 if self.primaryjoin_initial is None:
2496 self.primaryjoin = join_condition(
2497 self.parent_persist_selectable,
2498 self.secondary,
2499 a_subset=self.parent_local_selectable,
2500 consider_as_foreign_keys=consider_as_foreign_keys,
2501 )
2502 else:
2503 self.primaryjoin = self.primaryjoin_initial
2504 else:
2505 if self.primaryjoin_initial is None:
2506 self.primaryjoin = join_condition(
2507 self.parent_persist_selectable,
2508 self.child_persist_selectable,
2509 a_subset=self.parent_local_selectable,
2510 consider_as_foreign_keys=consider_as_foreign_keys,
2511 )
2512 else:
2513 self.primaryjoin = self.primaryjoin_initial
2514 except sa_exc.NoForeignKeysError as nfe:
2515 if self.secondary is not None:
2516 raise sa_exc.NoForeignKeysError(
2517 "Could not determine join "
2518 "condition between parent/child tables on "
2519 "relationship %s - there are no foreign keys "
2520 "linking these tables via secondary table '%s'. "
2521 "Ensure that referencing columns are associated "
2522 "with a ForeignKey or ForeignKeyConstraint, or "
2523 "specify 'primaryjoin' and 'secondaryjoin' "
2524 "expressions." % (self.prop, self.secondary)
2525 ) from nfe
2526 else:
2527 raise sa_exc.NoForeignKeysError(
2528 "Could not determine join "
2529 "condition between parent/child tables on "
2530 "relationship %s - there are no foreign keys "
2531 "linking these tables. "
2532 "Ensure that referencing columns are associated "
2533 "with a ForeignKey or ForeignKeyConstraint, or "
2534 "specify a 'primaryjoin' expression." % self.prop
2535 ) from nfe
2536 except sa_exc.AmbiguousForeignKeysError as afe:
2537 if self.secondary is not None:
2538 raise sa_exc.AmbiguousForeignKeysError(
2539 "Could not determine join "
2540 "condition between parent/child tables on "
2541 "relationship %s - there are multiple foreign key "
2542 "paths linking the tables via secondary table '%s'. "
2543 "Specify the 'foreign_keys' "
2544 "argument, providing a list of those columns which "
2545 "should be counted as containing a foreign key "
2546 "reference from the secondary table to each of the "
2547 "parent and child tables." % (self.prop, self.secondary)
2548 ) from afe
2549 else:
2550 raise sa_exc.AmbiguousForeignKeysError(
2551 "Could not determine join "
2552 "condition between parent/child tables on "
2553 "relationship %s - there are multiple foreign key "
2554 "paths linking the tables. Specify the "
2555 "'foreign_keys' argument, providing a list of those "
2556 "columns which should be counted as containing a "
2557 "foreign key reference to the parent table." % self.prop
2558 ) from afe
2559
2560 @property
2561 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2562 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2563
2564 @property
2565 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2566 assert self.secondaryjoin is not None
2567 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2568
2569 @util.memoized_property
2570 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2571 """Return the primaryjoin condition suitable for the
2572 "reverse" direction.
2573
2574 If the primaryjoin was delivered here with pre-existing
2575 "remote" annotations, the local/remote annotations
2576 are reversed. Otherwise, the local/remote annotations
2577 are removed.
2578
2579 """
2580 if self._has_remote_annotations:
2581
2582 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2583 if "remote" in element._annotations:
2584 v = dict(element._annotations)
2585 del v["remote"]
2586 v["local"] = True
2587 return element._with_annotations(v)
2588 elif "local" in element._annotations:
2589 v = dict(element._annotations)
2590 del v["local"]
2591 v["remote"] = True
2592 return element._with_annotations(v)
2593
2594 return None
2595
2596 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2597 else:
2598 if self._has_foreign_annotations:
2599 # TODO: coverage
2600 return _deep_deannotate(
2601 self.primaryjoin, values=("local", "remote")
2602 )
2603 else:
2604 return _deep_deannotate(self.primaryjoin)
2605
2606 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2607 for col in visitors.iterate(clause, {}):
2608 if annotation in col._annotations:
2609 return True
2610 else:
2611 return False
2612
2613 @util.memoized_property
2614 def _has_foreign_annotations(self) -> bool:
2615 return self._has_annotation(self.primaryjoin, "foreign")
2616
2617 @util.memoized_property
2618 def _has_remote_annotations(self) -> bool:
2619 return self._has_annotation(self.primaryjoin, "remote")
2620
2621 def _annotate_fks(self) -> None:
2622 """Annotate the primaryjoin and secondaryjoin
2623 structures with 'foreign' annotations marking columns
2624 considered as foreign.
2625
2626 """
2627 if self._has_foreign_annotations:
2628 return
2629
2630 if self.consider_as_foreign_keys:
2631 self._annotate_from_fk_list()
2632 else:
2633 self._annotate_present_fks()
2634
2635 def _annotate_from_fk_list(self) -> None:
2636 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2637 if element in self.consider_as_foreign_keys:
2638 return element._annotate({"foreign": True})
2639 return None
2640
2641 self.primaryjoin = visitors.replacement_traverse(
2642 self.primaryjoin, {}, check_fk
2643 )
2644 if self.secondaryjoin is not None:
2645 self.secondaryjoin = visitors.replacement_traverse(
2646 self.secondaryjoin, {}, check_fk
2647 )
2648
2649 def _annotate_present_fks(self) -> None:
2650 if self.secondary is not None:
2651 secondarycols = util.column_set(self.secondary.c)
2652 else:
2653 secondarycols = set()
2654
2655 def is_foreign(
2656 a: ColumnElement[Any], b: ColumnElement[Any]
2657 ) -> Optional[ColumnElement[Any]]:
2658 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2659 if a.references(b):
2660 return a
2661 elif b.references(a):
2662 return b
2663
2664 if secondarycols:
2665 if a in secondarycols and b not in secondarycols:
2666 return a
2667 elif b in secondarycols and a not in secondarycols:
2668 return b
2669
2670 return None
2671
2672 def visit_binary(binary: BinaryExpression[Any]) -> None:
2673 if not isinstance(
2674 binary.left, sql.ColumnElement
2675 ) or not isinstance(binary.right, sql.ColumnElement):
2676 return
2677
2678 if (
2679 "foreign" not in binary.left._annotations
2680 and "foreign" not in binary.right._annotations
2681 ):
2682 col = is_foreign(binary.left, binary.right)
2683 if col is not None:
2684 if col.compare(binary.left):
2685 binary.left = binary.left._annotate({"foreign": True})
2686 elif col.compare(binary.right):
2687 binary.right = binary.right._annotate(
2688 {"foreign": True}
2689 )
2690
2691 self.primaryjoin = visitors.cloned_traverse(
2692 self.primaryjoin, {}, {"binary": visit_binary}
2693 )
2694 if self.secondaryjoin is not None:
2695 self.secondaryjoin = visitors.cloned_traverse(
2696 self.secondaryjoin, {}, {"binary": visit_binary}
2697 )
2698
2699 def _refers_to_parent_table(self) -> bool:
2700 """Return True if the join condition contains column
2701 comparisons where both columns are in both tables.
2702
2703 """
2704 pt = self.parent_persist_selectable
2705 mt = self.child_persist_selectable
2706 result = False
2707
2708 def visit_binary(binary: BinaryExpression[Any]) -> None:
2709 nonlocal result
2710 c, f = binary.left, binary.right
2711 if (
2712 isinstance(c, expression.ColumnClause)
2713 and isinstance(f, expression.ColumnClause)
2714 and pt.is_derived_from(c.table)
2715 and pt.is_derived_from(f.table)
2716 and mt.is_derived_from(c.table)
2717 and mt.is_derived_from(f.table)
2718 ):
2719 result = True
2720
2721 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2722 return result
2723
2724 def _tables_overlap(self) -> bool:
2725 """Return True if parent/child tables have some overlap."""
2726
2727 return selectables_overlap(
2728 self.parent_persist_selectable, self.child_persist_selectable
2729 )
2730
2731 def _annotate_remote(self) -> None:
2732 """Annotate the primaryjoin and secondaryjoin
2733 structures with 'remote' annotations marking columns
2734 considered as part of the 'remote' side.
2735
2736 """
2737 if self._has_remote_annotations:
2738 return
2739
2740 if self.secondary is not None:
2741 self._annotate_remote_secondary()
2742 elif self._local_remote_pairs or self._remote_side:
2743 self._annotate_remote_from_args()
2744 elif self._refers_to_parent_table():
2745 self._annotate_selfref(
2746 lambda col: "foreign" in col._annotations, False
2747 )
2748 elif self._tables_overlap():
2749 self._annotate_remote_with_overlap()
2750 else:
2751 self._annotate_remote_distinct_selectables()
2752
2753 def _annotate_remote_secondary(self) -> None:
2754 """annotate 'remote' in primaryjoin, secondaryjoin
2755 when 'secondary' is present.
2756
2757 """
2758
2759 assert self.secondary is not None
2760 fixed_secondary = self.secondary
2761
2762 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2763 if fixed_secondary.c.contains_column(element):
2764 return element._annotate({"remote": True})
2765 return None
2766
2767 self.primaryjoin = visitors.replacement_traverse(
2768 self.primaryjoin, {}, repl
2769 )
2770
2771 assert self.secondaryjoin is not None
2772 self.secondaryjoin = visitors.replacement_traverse(
2773 self.secondaryjoin, {}, repl
2774 )
2775
2776 def _annotate_selfref(
2777 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2778 ) -> None:
2779 """annotate 'remote' in primaryjoin, secondaryjoin
2780 when the relationship is detected as self-referential.
2781
2782 """
2783
2784 def visit_binary(binary: BinaryExpression[Any]) -> None:
2785 equated = binary.left.compare(binary.right)
2786 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2787 binary.right, expression.ColumnClause
2788 ):
2789 # assume one to many - FKs are "remote"
2790 if fn(binary.left):
2791 binary.left = binary.left._annotate({"remote": True})
2792 if fn(binary.right) and not equated:
2793 binary.right = binary.right._annotate({"remote": True})
2794 elif not remote_side_given:
2795 self._warn_non_column_elements()
2796
2797 self.primaryjoin = visitors.cloned_traverse(
2798 self.primaryjoin, {}, {"binary": visit_binary}
2799 )
2800
2801 def _annotate_remote_from_args(self) -> None:
2802 """annotate 'remote' in primaryjoin, secondaryjoin
2803 when the 'remote_side' or '_local_remote_pairs'
2804 arguments are used.
2805
2806 """
2807 if self._local_remote_pairs:
2808 if self._remote_side:
2809 raise sa_exc.ArgumentError(
2810 "remote_side argument is redundant "
2811 "against more detailed _local_remote_side "
2812 "argument."
2813 )
2814
2815 remote_side = [r for (l, r) in self._local_remote_pairs]
2816 else:
2817 remote_side = self._remote_side
2818
2819 if self._refers_to_parent_table():
2820 self._annotate_selfref(lambda col: col in remote_side, True)
2821 else:
2822
2823 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2824 # use set() to avoid generating ``__eq__()`` expressions
2825 # against each element
2826 if element in set(remote_side):
2827 return element._annotate({"remote": True})
2828 return None
2829
2830 self.primaryjoin = visitors.replacement_traverse(
2831 self.primaryjoin, {}, repl
2832 )
2833
2834 def _annotate_remote_with_overlap(self) -> None:
2835 """annotate 'remote' in primaryjoin, secondaryjoin
2836 when the parent/child tables have some set of
2837 tables in common, though is not a fully self-referential
2838 relationship.
2839
2840 """
2841
2842 def visit_binary(binary: BinaryExpression[Any]) -> None:
2843 binary.left, binary.right = proc_left_right(
2844 binary.left, binary.right
2845 )
2846 binary.right, binary.left = proc_left_right(
2847 binary.right, binary.left
2848 )
2849
2850 check_entities = (
2851 self.prop is not None and self.prop.mapper is not self.prop.parent
2852 )
2853
2854 def proc_left_right(
2855 left: ColumnElement[Any], right: ColumnElement[Any]
2856 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2857 if isinstance(left, expression.ColumnClause) and isinstance(
2858 right, expression.ColumnClause
2859 ):
2860 if self.child_persist_selectable.c.contains_column(
2861 right
2862 ) and self.parent_persist_selectable.c.contains_column(left):
2863 right = right._annotate({"remote": True})
2864 elif (
2865 check_entities
2866 and right._annotations.get("parentmapper") is self.prop.mapper
2867 ):
2868 right = right._annotate({"remote": True})
2869 elif (
2870 check_entities
2871 and left._annotations.get("parentmapper") is self.prop.mapper
2872 ):
2873 left = left._annotate({"remote": True})
2874 else:
2875 self._warn_non_column_elements()
2876
2877 return left, right
2878
2879 self.primaryjoin = visitors.cloned_traverse(
2880 self.primaryjoin, {}, {"binary": visit_binary}
2881 )
2882
2883 def _annotate_remote_distinct_selectables(self) -> None:
2884 """annotate 'remote' in primaryjoin, secondaryjoin
2885 when the parent/child tables are entirely
2886 separate.
2887
2888 """
2889
2890 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2891 if self.child_persist_selectable.c.contains_column(element) and (
2892 not self.parent_local_selectable.c.contains_column(element)
2893 or self.child_local_selectable.c.contains_column(element)
2894 ):
2895 return element._annotate({"remote": True})
2896 return None
2897
2898 self.primaryjoin = visitors.replacement_traverse(
2899 self.primaryjoin, {}, repl
2900 )
2901
2902 def _warn_non_column_elements(self) -> None:
2903 util.warn(
2904 "Non-simple column elements in primary "
2905 "join condition for property %s - consider using "
2906 "remote() annotations to mark the remote side." % self.prop
2907 )
2908
2909 def _annotate_local(self) -> None:
2910 """Annotate the primaryjoin and secondaryjoin
2911 structures with 'local' annotations.
2912
2913 This annotates all column elements found
2914 simultaneously in the parent table
2915 and the join condition that don't have a
2916 'remote' annotation set up from
2917 _annotate_remote() or user-defined.
2918
2919 """
2920 if self._has_annotation(self.primaryjoin, "local"):
2921 return
2922
2923 if self._local_remote_pairs:
2924 local_side = util.column_set(
2925 [l for (l, r) in self._local_remote_pairs]
2926 )
2927 else:
2928 local_side = util.column_set(self.parent_persist_selectable.c)
2929
2930 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2931 if "remote" not in element._annotations and element in local_side:
2932 return element._annotate({"local": True})
2933 return None
2934
2935 self.primaryjoin = visitors.replacement_traverse(
2936 self.primaryjoin, {}, locals_
2937 )
2938
2939 def _annotate_parentmapper(self) -> None:
2940 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2941 if "remote" in element._annotations:
2942 return element._annotate({"parentmapper": self.prop.mapper})
2943 elif "local" in element._annotations:
2944 return element._annotate({"parentmapper": self.prop.parent})
2945 return None
2946
2947 self.primaryjoin = visitors.replacement_traverse(
2948 self.primaryjoin, {}, parentmappers_
2949 )
2950
2951 def _check_remote_side(self) -> None:
2952 if not self.local_remote_pairs:
2953 raise sa_exc.ArgumentError(
2954 "Relationship %s could "
2955 "not determine any unambiguous local/remote column "
2956 "pairs based on join condition and remote_side "
2957 "arguments. "
2958 "Consider using the remote() annotation to "
2959 "accurately mark those elements of the join "
2960 "condition that are on the remote side of "
2961 "the relationship." % (self.prop,)
2962 )
2963 else:
2964 not_target = util.column_set(
2965 self.parent_persist_selectable.c
2966 ).difference(self.child_persist_selectable.c)
2967
2968 for _, rmt in self.local_remote_pairs:
2969 if rmt in not_target:
2970 util.warn(
2971 "Expression %s is marked as 'remote', but these "
2972 "column(s) are local to the local side. The "
2973 "remote() annotation is needed only for a "
2974 "self-referential relationship where both sides "
2975 "of the relationship refer to the same tables."
2976 % (rmt,)
2977 )
2978
2979 def _check_foreign_cols(
2980 self, join_condition: ColumnElement[bool], primary: bool
2981 ) -> None:
2982 """Check the foreign key columns collected and emit error
2983 messages."""
2984 foreign_cols = self._gather_columns_with_annotation(
2985 join_condition, "foreign"
2986 )
2987
2988 has_foreign = bool(foreign_cols)
2989
2990 if primary:
2991 can_sync = bool(self.synchronize_pairs)
2992 else:
2993 can_sync = bool(self.secondary_synchronize_pairs)
2994
2995 if (
2996 self.support_sync
2997 and can_sync
2998 or (not self.support_sync and has_foreign)
2999 ):
3000 return
3001
3002 # from here below is just determining the best error message
3003 # to report. Check for a join condition using any operator
3004 # (not just ==), perhaps they need to turn on "viewonly=True".
3005 if self.support_sync and has_foreign and not can_sync:
3006 err = (
3007 "Could not locate any simple equality expressions "
3008 "involving locally mapped foreign key columns for "
3009 "%s join condition "
3010 "'%s' on relationship %s."
3011 % (
3012 primary and "primary" or "secondary",
3013 join_condition,
3014 self.prop,
3015 )
3016 )
3017 err += (
3018 " Ensure that referencing columns are associated "
3019 "with a ForeignKey or ForeignKeyConstraint, or are "
3020 "annotated in the join condition with the foreign() "
3021 "annotation. To allow comparison operators other than "
3022 "'==', the relationship can be marked as viewonly=True."
3023 )
3024
3025 raise sa_exc.ArgumentError(err)
3026 else:
3027 err = (
3028 "Could not locate any relevant foreign key columns "
3029 "for %s join condition '%s' on relationship %s."
3030 % (
3031 primary and "primary" or "secondary",
3032 join_condition,
3033 self.prop,
3034 )
3035 )
3036 err += (
3037 " Ensure that referencing columns are associated "
3038 "with a ForeignKey or ForeignKeyConstraint, or are "
3039 "annotated in the join condition with the foreign() "
3040 "annotation."
3041 )
3042 raise sa_exc.ArgumentError(err)
3043
3044 def _determine_direction(self) -> None:
3045 """Determine if this relationship is one to many, many to one,
3046 many to many.
3047
3048 """
3049 if self.secondaryjoin is not None:
3050 self.direction = MANYTOMANY
3051 else:
3052 parentcols = util.column_set(self.parent_persist_selectable.c)
3053 targetcols = util.column_set(self.child_persist_selectable.c)
3054
3055 # fk collection which suggests ONETOMANY.
3056 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
3057
3058 # fk collection which suggests MANYTOONE.
3059
3060 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
3061
3062 if onetomany_fk and manytoone_fk:
3063 # fks on both sides. test for overlap of local/remote
3064 # with foreign key.
3065 # we will gather columns directly from their annotations
3066 # without deannotating, so that we can distinguish on a column
3067 # that refers to itself.
3068
3069 # 1. columns that are both remote and FK suggest
3070 # onetomany.
3071 onetomany_local = self._gather_columns_with_annotation(
3072 self.primaryjoin, "remote", "foreign"
3073 )
3074
3075 # 2. columns that are FK but are not remote (e.g. local)
3076 # suggest manytoone.
3077 manytoone_local = {
3078 c
3079 for c in self._gather_columns_with_annotation(
3080 self.primaryjoin, "foreign"
3081 )
3082 if "remote" not in c._annotations
3083 }
3084
3085 # 3. if both collections are present, remove columns that
3086 # refer to themselves. This is for the case of
3087 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3088 if onetomany_local and manytoone_local:
3089 self_equated = self.remote_columns.intersection(
3090 self.local_columns
3091 )
3092 onetomany_local = onetomany_local.difference(self_equated)
3093 manytoone_local = manytoone_local.difference(self_equated)
3094
3095 # at this point, if only one or the other collection is
3096 # present, we know the direction, otherwise it's still
3097 # ambiguous.
3098
3099 if onetomany_local and not manytoone_local:
3100 self.direction = ONETOMANY
3101 elif manytoone_local and not onetomany_local:
3102 self.direction = MANYTOONE
3103 else:
3104 raise sa_exc.ArgumentError(
3105 "Can't determine relationship"
3106 " direction for relationship '%s' - foreign "
3107 "key columns within the join condition are present "
3108 "in both the parent and the child's mapped tables. "
3109 "Ensure that only those columns referring "
3110 "to a parent column are marked as foreign, "
3111 "either via the foreign() annotation or "
3112 "via the foreign_keys argument." % self.prop
3113 )
3114 elif onetomany_fk:
3115 self.direction = ONETOMANY
3116 elif manytoone_fk:
3117 self.direction = MANYTOONE
3118 else:
3119 raise sa_exc.ArgumentError(
3120 "Can't determine relationship "
3121 "direction for relationship '%s' - foreign "
3122 "key columns are present in neither the parent "
3123 "nor the child's mapped tables" % self.prop
3124 )
3125
3126 def _deannotate_pairs(
3127 self, collection: _ColumnPairIterable
3128 ) -> _MutableColumnPairs:
3129 """provide deannotation for the various lists of
3130 pairs, so that using them in hashes doesn't incur
3131 high-overhead __eq__() comparisons against
3132 original columns mapped.
3133
3134 """
3135 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3136
3137 def _setup_pairs(self) -> None:
3138 sync_pairs: _MutableColumnPairs = []
3139 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3140 util.OrderedSet([])
3141 )
3142 secondary_sync_pairs: _MutableColumnPairs = []
3143
3144 def go(
3145 joincond: ColumnElement[bool],
3146 collection: _MutableColumnPairs,
3147 ) -> None:
3148 def visit_binary(
3149 binary: BinaryExpression[Any],
3150 left: ColumnElement[Any],
3151 right: ColumnElement[Any],
3152 ) -> None:
3153 if (
3154 "remote" in right._annotations
3155 and "remote" not in left._annotations
3156 and self.can_be_synced_fn(left)
3157 ):
3158 lrp.add((left, right))
3159 elif (
3160 "remote" in left._annotations
3161 and "remote" not in right._annotations
3162 and self.can_be_synced_fn(right)
3163 ):
3164 lrp.add((right, left))
3165 if binary.operator is operators.eq and self.can_be_synced_fn(
3166 left, right
3167 ):
3168 if "foreign" in right._annotations:
3169 collection.append((left, right))
3170 elif "foreign" in left._annotations:
3171 collection.append((right, left))
3172
3173 visit_binary_product(visit_binary, joincond)
3174
3175 for joincond, collection in [
3176 (self.primaryjoin, sync_pairs),
3177 (self.secondaryjoin, secondary_sync_pairs),
3178 ]:
3179 if joincond is None:
3180 continue
3181 go(joincond, collection)
3182
3183 self.local_remote_pairs = self._deannotate_pairs(lrp)
3184 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3185 self.secondary_synchronize_pairs = self._deannotate_pairs(
3186 secondary_sync_pairs
3187 )
3188
3189 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3190 ColumnElement[Any],
3191 weakref.WeakKeyDictionary[
3192 RelationshipProperty[Any], ColumnElement[Any]
3193 ],
3194 ] = weakref.WeakKeyDictionary()
3195
3196 def _warn_for_conflicting_sync_targets(self) -> None:
3197 if not self.support_sync:
3198 return
3199
3200 # we would like to detect if we are synchronizing any column
3201 # pairs in conflict with another relationship that wishes to sync
3202 # an entirely different column to the same target. This is a
3203 # very rare edge case so we will try to minimize the memory/overhead
3204 # impact of this check
3205 for from_, to_ in [
3206 (from_, to_) for (from_, to_) in self.synchronize_pairs
3207 ] + [
3208 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3209 ]:
3210 # save ourselves a ton of memory and overhead by only
3211 # considering columns that are subject to a overlapping
3212 # FK constraints at the core level. This condition can arise
3213 # if multiple relationships overlap foreign() directly, but
3214 # we're going to assume it's typically a ForeignKeyConstraint-
3215 # level configuration that benefits from this warning.
3216
3217 if to_ not in self._track_overlapping_sync_targets:
3218 self._track_overlapping_sync_targets[to_] = (
3219 weakref.WeakKeyDictionary({self.prop: from_})
3220 )
3221 else:
3222 other_props = []
3223 prop_to_from = self._track_overlapping_sync_targets[to_]
3224
3225 for pr, fr_ in prop_to_from.items():
3226 if (
3227 not pr.mapper._dispose_called
3228 and pr not in self.prop._reverse_property
3229 and pr.key not in self.prop._overlaps
3230 and self.prop.key not in pr._overlaps
3231 # note: the "__*" symbol is used internally by
3232 # SQLAlchemy as a general means of suppressing the
3233 # overlaps warning for some extension cases, however
3234 # this is not currently
3235 # a publicly supported symbol and may change at
3236 # any time.
3237 and "__*" not in self.prop._overlaps
3238 and "__*" not in pr._overlaps
3239 and not self.prop.parent.is_sibling(pr.parent)
3240 and not self.prop.mapper.is_sibling(pr.mapper)
3241 and not self.prop.parent.is_sibling(pr.mapper)
3242 and not self.prop.mapper.is_sibling(pr.parent)
3243 and (
3244 self.prop.key != pr.key
3245 or not self.prop.parent.common_parent(pr.parent)
3246 )
3247 ):
3248 other_props.append((pr, fr_))
3249
3250 if other_props:
3251 util.warn(
3252 "relationship '%s' will copy column %s to column %s, "
3253 "which conflicts with relationship(s): %s. "
3254 "If this is not the intention, consider if these "
3255 "relationships should be linked with "
3256 "back_populates, or if viewonly=True should be "
3257 "applied to one or more if they are read-only. "
3258 "For the less common case that foreign key "
3259 "constraints are partially overlapping, the "
3260 "orm.foreign() "
3261 "annotation can be used to isolate the columns that "
3262 "should be written towards. To silence this "
3263 "warning, add the parameter 'overlaps=\"%s\"' to the "
3264 "'%s' relationship."
3265 % (
3266 self.prop,
3267 from_,
3268 to_,
3269 ", ".join(
3270 sorted(
3271 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3272 for (pr, fr_) in other_props
3273 )
3274 ),
3275 ",".join(sorted(pr.key for pr, fr in other_props)),
3276 self.prop,
3277 ),
3278 code="qzyx",
3279 )
3280 self._track_overlapping_sync_targets[to_][self.prop] = from_
3281
3282 @util.memoized_property
3283 def remote_columns(self) -> Set[ColumnElement[Any]]:
3284 return self._gather_join_annotations("remote")
3285
3286 @util.memoized_property
3287 def local_columns(self) -> Set[ColumnElement[Any]]:
3288 return self._gather_join_annotations("local")
3289
3290 @util.memoized_property
3291 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3292 return self._gather_join_annotations("foreign")
3293
3294 def _gather_join_annotations(
3295 self, annotation: str
3296 ) -> Set[ColumnElement[Any]]:
3297 s = set(
3298 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3299 )
3300 if self.secondaryjoin is not None:
3301 s.update(
3302 self._gather_columns_with_annotation(
3303 self.secondaryjoin, annotation
3304 )
3305 )
3306 return {x._deannotate() for x in s}
3307
3308 def _gather_columns_with_annotation(
3309 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3310 ) -> Set[ColumnElement[Any]]:
3311 annotation_set = set(annotation)
3312 return {
3313 cast(ColumnElement[Any], col)
3314 for col in visitors.iterate(clause, {})
3315 if annotation_set.issubset(col._annotations)
3316 }
3317
3318 @util.memoized_property
3319 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3320 if self.secondary is not None:
3321 return frozenset(
3322 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3323 )
3324 else:
3325 return util.EMPTY_SET
3326
3327 def join_targets(
3328 self,
3329 source_selectable: Optional[FromClause],
3330 dest_selectable: FromClause,
3331 aliased: bool,
3332 single_crit: Optional[ColumnElement[bool]] = None,
3333 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3334 ) -> Tuple[
3335 ColumnElement[bool],
3336 Optional[ColumnElement[bool]],
3337 Optional[FromClause],
3338 Optional[ClauseAdapter],
3339 FromClause,
3340 ]:
3341 """Given a source and destination selectable, create a
3342 join between them.
3343
3344 This takes into account aliasing the join clause
3345 to reference the appropriate corresponding columns
3346 in the target objects, as well as the extra child
3347 criterion, equivalent column sets, etc.
3348
3349 """
3350 # place a barrier on the destination such that
3351 # replacement traversals won't ever dig into it.
3352 # its internal structure remains fixed
3353 # regardless of context.
3354 dest_selectable = _shallow_annotate(
3355 dest_selectable, {"no_replacement_traverse": True}
3356 )
3357
3358 primaryjoin, secondaryjoin, secondary = (
3359 self.primaryjoin,
3360 self.secondaryjoin,
3361 self.secondary,
3362 )
3363
3364 # adjust the join condition for single table inheritance,
3365 # in the case that the join is to a subclass
3366 # this is analogous to the
3367 # "_adjust_for_single_table_inheritance()" method in Query.
3368
3369 if single_crit is not None:
3370 if secondaryjoin is not None:
3371 secondaryjoin = secondaryjoin & single_crit
3372 else:
3373 primaryjoin = primaryjoin & single_crit
3374
3375 if extra_criteria:
3376
3377 def mark_exclude_cols(
3378 elem: SupportsAnnotations, annotations: _AnnotationDict
3379 ) -> SupportsAnnotations:
3380 """note unrelated columns in the "extra criteria" as either
3381 should be adapted or not adapted, even though they are not
3382 part of our "local" or "remote" side.
3383
3384 see #9779 for this case, as well as #11010 for a follow up
3385
3386 """
3387
3388 parentmapper_for_element = elem._annotations.get(
3389 "parentmapper", None
3390 )
3391
3392 if (
3393 parentmapper_for_element is not self.prop.parent
3394 and parentmapper_for_element is not self.prop.mapper
3395 and elem not in self._secondary_lineage_set
3396 ):
3397 return _safe_annotate(elem, annotations)
3398 else:
3399 return elem
3400
3401 extra_criteria = tuple(
3402 _deep_annotate(
3403 elem,
3404 {"should_not_adapt": True},
3405 annotate_callable=mark_exclude_cols,
3406 )
3407 for elem in extra_criteria
3408 )
3409
3410 if secondaryjoin is not None:
3411 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3412 else:
3413 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3414
3415 if aliased:
3416 if secondary is not None:
3417 secondary = secondary._anonymous_fromclause(flat=True)
3418 primary_aliasizer = ClauseAdapter(
3419 secondary,
3420 exclude_fn=_local_col_exclude,
3421 )
3422 secondary_aliasizer = ClauseAdapter(
3423 dest_selectable, equivalents=self.child_equivalents
3424 ).chain(primary_aliasizer)
3425 if source_selectable is not None:
3426 primary_aliasizer = ClauseAdapter(
3427 secondary,
3428 exclude_fn=_local_col_exclude,
3429 ).chain(
3430 ClauseAdapter(
3431 source_selectable,
3432 equivalents=self.parent_equivalents,
3433 )
3434 )
3435
3436 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3437 else:
3438 primary_aliasizer = ClauseAdapter(
3439 dest_selectable,
3440 exclude_fn=_local_col_exclude,
3441 equivalents=self.child_equivalents,
3442 )
3443 if source_selectable is not None:
3444 primary_aliasizer.chain(
3445 ClauseAdapter(
3446 source_selectable,
3447 exclude_fn=_remote_col_exclude,
3448 equivalents=self.parent_equivalents,
3449 )
3450 )
3451 secondary_aliasizer = None
3452
3453 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3454 target_adapter = secondary_aliasizer or primary_aliasizer
3455 target_adapter.exclude_fn = None
3456 else:
3457 target_adapter = None
3458 return (
3459 primaryjoin,
3460 secondaryjoin,
3461 secondary,
3462 target_adapter,
3463 dest_selectable,
3464 )
3465
3466 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3467 ColumnElement[bool],
3468 Dict[str, ColumnElement[Any]],
3469 Dict[ColumnElement[Any], ColumnElement[Any]],
3470 ]:
3471 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3472 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3473
3474 has_secondary = self.secondaryjoin is not None
3475
3476 if has_secondary:
3477 lookup = collections.defaultdict(list)
3478 for l, r in self.local_remote_pairs:
3479 lookup[l].append((l, r))
3480 equated_columns[r] = l
3481 elif not reverse_direction:
3482 for l, r in self.local_remote_pairs:
3483 equated_columns[r] = l
3484 else:
3485 for l, r in self.local_remote_pairs:
3486 equated_columns[l] = r
3487
3488 def col_to_bind(
3489 element: ColumnElement[Any], **kw: Any
3490 ) -> Optional[BindParameter[Any]]:
3491 if (
3492 (not reverse_direction and "local" in element._annotations)
3493 or reverse_direction
3494 and (
3495 (has_secondary and element in lookup)
3496 or (not has_secondary and "remote" in element._annotations)
3497 )
3498 ):
3499 if element not in binds:
3500 binds[element] = sql.bindparam(
3501 None, None, type_=element.type, unique=True
3502 )
3503 return binds[element]
3504 return None
3505
3506 lazywhere = self.primaryjoin
3507 if self.secondaryjoin is None or not reverse_direction:
3508 lazywhere = visitors.replacement_traverse(
3509 lazywhere, {}, col_to_bind
3510 )
3511
3512 if self.secondaryjoin is not None:
3513 secondaryjoin = self.secondaryjoin
3514 if reverse_direction:
3515 secondaryjoin = visitors.replacement_traverse(
3516 secondaryjoin, {}, col_to_bind
3517 )
3518 lazywhere = sql.and_(lazywhere, secondaryjoin)
3519
3520 bind_to_col = {binds[col].key: col for col in binds}
3521
3522 return lazywhere, bind_to_col, equated_columns
3523
3524
3525class _ColInAnnotations:
3526 """Serializable object that tests for names in c._annotations.
3527
3528 TODO: does this need to be serializable anymore? can we find what the
3529 use case was for that?
3530
3531 """
3532
3533 __slots__ = ("names",)
3534
3535 def __init__(self, *names: str):
3536 self.names = frozenset(names)
3537
3538 def __call__(self, c: ClauseElement) -> bool:
3539 return bool(self.names.intersection(c._annotations))
3540
3541
3542_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3543_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3544
3545
3546class Relationship(
3547 RelationshipProperty[_T],
3548 _DeclarativeMapped[_T],
3549):
3550 """Describes an object property that holds a single item or list
3551 of items that correspond to a related database table.
3552
3553 Public constructor is the :func:`_orm.relationship` function.
3554
3555 .. seealso::
3556
3557 :ref:`relationship_config_toplevel`
3558
3559 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3560 compatible subclass for :class:`_orm.RelationshipProperty`.
3561
3562 """
3563
3564 inherit_cache = True
3565 """:meta private:"""
3566
3567
3568class _RelationshipDeclared( # type: ignore[misc]
3569 Relationship[_T],
3570 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3571 DynamicMapped[_T], # not compatible with Mapped[_T]
3572):
3573 """Relationship subclass used implicitly for declarative mapping."""
3574
3575 inherit_cache = True
3576 """:meta private:"""
3577
3578 @classmethod
3579 def _mapper_property_name(cls) -> str:
3580 return "Relationship"