1# orm/relationships.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Heuristics related to join conditions as used in
9:func:`_orm.relationship`.
10
11Provides the :class:`.JoinCondition` object, which encapsulates
12SQL annotation and aliasing behavior focused on the `primaryjoin`
13and `secondaryjoin` aspects of :func:`_orm.relationship`.
14
15"""
16from __future__ import annotations
17
18import collections
19from collections import abc
20import dataclasses
21import inspect as _py_inspect
22import itertools
23import re
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Collection
29from typing import Dict
30from typing import FrozenSet
31from typing import Generic
32from typing import Iterable
33from typing import Iterator
34from typing import List
35from typing import NamedTuple
36from typing import NoReturn
37from typing import Optional
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TypeVar
43from typing import Union
44import weakref
45
46from . import attributes
47from . import strategy_options
48from ._typing import insp_is_aliased_class
49from ._typing import is_has_collection_adapter
50from .base import _DeclarativeMapped
51from .base import _is_mapped_class
52from .base import class_mapper
53from .base import DynamicMapped
54from .base import LoaderCallableStatus
55from .base import PassiveFlag
56from .base import state_str
57from .base import WriteOnlyMapped
58from .interfaces import _AttributeOptions
59from .interfaces import _IntrospectsAnnotations
60from .interfaces import MANYTOMANY
61from .interfaces import MANYTOONE
62from .interfaces import ONETOMANY
63from .interfaces import PropComparator
64from .interfaces import RelationshipDirection
65from .interfaces import StrategizedProperty
66from .util import _orm_annotate
67from .util import _orm_deannotate
68from .util import CascadeOptions
69from .. import exc as sa_exc
70from .. import Exists
71from .. import log
72from .. import schema
73from .. import sql
74from .. import util
75from ..inspection import inspect
76from ..sql import coercions
77from ..sql import expression
78from ..sql import operators
79from ..sql import roles
80from ..sql import visitors
81from ..sql._typing import _ColumnExpressionArgument
82from ..sql._typing import _HasClauseElement
83from ..sql.annotation import _safe_annotate
84from ..sql.elements import ColumnClause
85from ..sql.elements import ColumnElement
86from ..sql.util import _deep_annotate
87from ..sql.util import _deep_deannotate
88from ..sql.util import _shallow_annotate
89from ..sql.util import adapt_criterion_to_null
90from ..sql.util import ClauseAdapter
91from ..sql.util import join_condition
92from ..sql.util import selectables_overlap
93from ..sql.util import visit_binary_product
94from ..util.typing import de_optionalize_union_types
95from ..util.typing import Literal
96from ..util.typing import resolve_name_to_real_class_name
97
98if typing.TYPE_CHECKING:
99 from ._typing import _EntityType
100 from ._typing import _ExternalEntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InstanceDict
103 from ._typing import _InternalEntityType
104 from ._typing import _O
105 from ._typing import _RegistryType
106 from .base import Mapped
107 from .clsregistry import _class_resolver
108 from .clsregistry import _ModNS
109 from .decl_base import _ClassScanMapperConfig
110 from .dependency import DependencyProcessor
111 from .mapper import Mapper
112 from .query import Query
113 from .session import Session
114 from .state import InstanceState
115 from .strategies import LazyLoader
116 from .util import AliasedClass
117 from .util import AliasedInsp
118 from ..sql._typing import _CoreAdapterProto
119 from ..sql._typing import _EquivalentColumnMap
120 from ..sql._typing import _InfoType
121 from ..sql.annotation import _AnnotationDict
122 from ..sql.annotation import SupportsAnnotations
123 from ..sql.elements import BinaryExpression
124 from ..sql.elements import BindParameter
125 from ..sql.elements import ClauseElement
126 from ..sql.schema import Table
127 from ..sql.selectable import FromClause
128 from ..util.typing import _AnnotationScanType
129 from ..util.typing import RODescriptorReference
130
131_T = TypeVar("_T", bound=Any)
132_T1 = TypeVar("_T1", bound=Any)
133_T2 = TypeVar("_T2", bound=Any)
134
135_PT = TypeVar("_PT", bound=Any)
136
137_PT2 = TypeVar("_PT2", bound=Any)
138
139
140_RelationshipArgumentType = Union[
141 str,
142 Type[_T],
143 Callable[[], Type[_T]],
144 "Mapper[_T]",
145 "AliasedClass[_T]",
146 Callable[[], "Mapper[_T]"],
147 Callable[[], "AliasedClass[_T]"],
148]
149
150_LazyLoadArgumentType = Literal[
151 "select",
152 "joined",
153 "selectin",
154 "subquery",
155 "raise",
156 "raise_on_sql",
157 "noload",
158 "immediate",
159 "write_only",
160 "dynamic",
161 True,
162 False,
163 None,
164]
165
166
167_RelationshipJoinConditionArgument = Union[
168 str, _ColumnExpressionArgument[bool]
169]
170_RelationshipSecondaryArgument = Union[
171 "FromClause", str, Callable[[], "FromClause"]
172]
173_ORMOrderByArgument = Union[
174 Literal[False],
175 str,
176 _ColumnExpressionArgument[Any],
177 Callable[[], _ColumnExpressionArgument[Any]],
178 Callable[[], Iterable[_ColumnExpressionArgument[Any]]],
179 Iterable[Union[str, _ColumnExpressionArgument[Any]]],
180]
181_RelationshipBackPopulatesArgument = Union[
182 str,
183 PropComparator[Any],
184 Callable[[], Union[str, PropComparator[Any]]],
185]
186
187
188ORMBackrefArgument = Union[str, Tuple[str, Dict[str, Any]]]
189
190_ORMColCollectionElement = Union[
191 ColumnClause[Any],
192 _HasClauseElement[Any],
193 roles.DMLColumnRole,
194 "Mapped[Any]",
195]
196_ORMColCollectionArgument = Union[
197 str,
198 Sequence[_ORMColCollectionElement],
199 Callable[[], Sequence[_ORMColCollectionElement]],
200 Callable[[], _ORMColCollectionElement],
201 _ORMColCollectionElement,
202]
203
204
205_CEA = TypeVar("_CEA", bound=_ColumnExpressionArgument[Any])
206
207_CE = TypeVar("_CE", bound="ColumnElement[Any]")
208
209
210_ColumnPairIterable = Iterable[Tuple[ColumnElement[Any], ColumnElement[Any]]]
211
212_ColumnPairs = Sequence[Tuple[ColumnElement[Any], ColumnElement[Any]]]
213
214_MutableColumnPairs = List[Tuple[ColumnElement[Any], ColumnElement[Any]]]
215
216
217def remote(expr: _CEA) -> _CEA:
218 """Annotate a portion of a primaryjoin expression
219 with a 'remote' annotation.
220
221 See the section :ref:`relationship_custom_foreign` for a
222 description of use.
223
224 .. seealso::
225
226 :ref:`relationship_custom_foreign`
227
228 :func:`.foreign`
229
230 """
231 return _annotate_columns( # type: ignore
232 coercions.expect(roles.ColumnArgumentRole, expr), {"remote": True}
233 )
234
235
236def foreign(expr: _CEA) -> _CEA:
237 """Annotate a portion of a primaryjoin expression
238 with a 'foreign' annotation.
239
240 See the section :ref:`relationship_custom_foreign` for a
241 description of use.
242
243 .. seealso::
244
245 :ref:`relationship_custom_foreign`
246
247 :func:`.remote`
248
249 """
250
251 return _annotate_columns( # type: ignore
252 coercions.expect(roles.ColumnArgumentRole, expr), {"foreign": True}
253 )
254
255
256@dataclasses.dataclass
257class _RelationshipArg(Generic[_T1, _T2]):
258 """stores a user-defined parameter value that must be resolved and
259 parsed later at mapper configuration time.
260
261 """
262
263 __slots__ = "name", "argument", "resolved"
264 name: str
265 argument: _T1
266 resolved: Optional[_T2]
267
268 def _is_populated(self) -> bool:
269 return self.argument is not None
270
271 def _resolve_against_registry(
272 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
273 ) -> None:
274 attr_value = self.argument
275
276 if isinstance(attr_value, str):
277 self.resolved = clsregistry_resolver(
278 attr_value, self.name == "secondary"
279 )()
280 elif callable(attr_value) and not _is_mapped_class(attr_value):
281 self.resolved = attr_value()
282 else:
283 self.resolved = attr_value
284
285 def effective_value(self) -> Any:
286 if self.resolved is not None:
287 return self.resolved
288 else:
289 return self.argument
290
291
292_RelationshipOrderByArg = Union[Literal[False], Tuple[ColumnElement[Any], ...]]
293
294
295@dataclasses.dataclass
296class _StringRelationshipArg(_RelationshipArg[_T1, _T2]):
297 def _resolve_against_registry(
298 self, clsregistry_resolver: Callable[[str, bool], _class_resolver]
299 ) -> None:
300 attr_value = self.argument
301
302 if callable(attr_value):
303 attr_value = attr_value()
304
305 if isinstance(attr_value, attributes.QueryableAttribute):
306 attr_value = attr_value.key # type: ignore
307
308 self.resolved = attr_value
309
310
311class _RelationshipArgs(NamedTuple):
312 """stores user-passed parameters that are resolved at mapper configuration
313 time.
314
315 """
316
317 secondary: _RelationshipArg[
318 Optional[_RelationshipSecondaryArgument],
319 Optional[FromClause],
320 ]
321 primaryjoin: _RelationshipArg[
322 Optional[_RelationshipJoinConditionArgument],
323 Optional[ColumnElement[Any]],
324 ]
325 secondaryjoin: _RelationshipArg[
326 Optional[_RelationshipJoinConditionArgument],
327 Optional[ColumnElement[Any]],
328 ]
329 order_by: _RelationshipArg[_ORMOrderByArgument, _RelationshipOrderByArg]
330 foreign_keys: _RelationshipArg[
331 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
332 ]
333 remote_side: _RelationshipArg[
334 Optional[_ORMColCollectionArgument], Set[ColumnElement[Any]]
335 ]
336 back_populates: _StringRelationshipArg[
337 Optional[_RelationshipBackPopulatesArgument], str
338 ]
339
340
341@log.class_logger
342class RelationshipProperty(
343 _IntrospectsAnnotations, StrategizedProperty[_T], log.Identified
344):
345 """Describes an object property that holds a single item or list
346 of items that correspond to a related database table.
347
348 Public constructor is the :func:`_orm.relationship` function.
349
350 .. seealso::
351
352 :ref:`relationship_config_toplevel`
353
354 """
355
356 strategy_wildcard_key = strategy_options._RELATIONSHIP_TOKEN
357 inherit_cache = True
358 """:meta private:"""
359
360 _links_to_entity = True
361 _is_relationship = True
362
363 _overlaps: Sequence[str]
364
365 _lazy_strategy: LazyLoader
366
367 _persistence_only = dict(
368 passive_deletes=False,
369 passive_updates=True,
370 enable_typechecks=True,
371 active_history=False,
372 cascade_backrefs=False,
373 )
374
375 _dependency_processor: Optional[DependencyProcessor] = None
376
377 primaryjoin: ColumnElement[bool]
378 secondaryjoin: Optional[ColumnElement[bool]]
379 secondary: Optional[FromClause]
380 _join_condition: JoinCondition
381 order_by: _RelationshipOrderByArg
382
383 _user_defined_foreign_keys: Set[ColumnElement[Any]]
384 _calculated_foreign_keys: Set[ColumnElement[Any]]
385
386 remote_side: Set[ColumnElement[Any]]
387 local_columns: Set[ColumnElement[Any]]
388
389 synchronize_pairs: _ColumnPairs
390 secondary_synchronize_pairs: Optional[_ColumnPairs]
391
392 local_remote_pairs: Optional[_ColumnPairs]
393
394 direction: RelationshipDirection
395
396 _init_args: _RelationshipArgs
397
398 def __init__(
399 self,
400 argument: Optional[_RelationshipArgumentType[_T]] = None,
401 secondary: Optional[_RelationshipSecondaryArgument] = None,
402 *,
403 uselist: Optional[bool] = None,
404 collection_class: Optional[
405 Union[Type[Collection[Any]], Callable[[], Collection[Any]]]
406 ] = None,
407 primaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
408 secondaryjoin: Optional[_RelationshipJoinConditionArgument] = None,
409 back_populates: Optional[_RelationshipBackPopulatesArgument] = None,
410 order_by: _ORMOrderByArgument = False,
411 backref: Optional[ORMBackrefArgument] = None,
412 overlaps: Optional[str] = None,
413 post_update: bool = False,
414 cascade: str = "save-update, merge",
415 viewonly: bool = False,
416 attribute_options: Optional[_AttributeOptions] = None,
417 lazy: _LazyLoadArgumentType = "select",
418 passive_deletes: Union[Literal["all"], bool] = False,
419 passive_updates: bool = True,
420 active_history: bool = False,
421 enable_typechecks: bool = True,
422 foreign_keys: Optional[_ORMColCollectionArgument] = None,
423 remote_side: Optional[_ORMColCollectionArgument] = None,
424 join_depth: Optional[int] = None,
425 comparator_factory: Optional[
426 Type[RelationshipProperty.Comparator[Any]]
427 ] = None,
428 single_parent: bool = False,
429 innerjoin: bool = False,
430 distinct_target_key: Optional[bool] = None,
431 load_on_pending: bool = False,
432 query_class: Optional[Type[Query[Any]]] = None,
433 info: Optional[_InfoType] = None,
434 omit_join: Literal[None, False] = None,
435 sync_backref: Optional[bool] = None,
436 doc: Optional[str] = None,
437 bake_queries: Literal[True] = True,
438 cascade_backrefs: Literal[False] = False,
439 _local_remote_pairs: Optional[_ColumnPairs] = None,
440 _legacy_inactive_history_style: bool = False,
441 ):
442 super().__init__(attribute_options=attribute_options)
443
444 self.uselist = uselist
445 self.argument = argument
446
447 self._init_args = _RelationshipArgs(
448 _RelationshipArg("secondary", secondary, None),
449 _RelationshipArg("primaryjoin", primaryjoin, None),
450 _RelationshipArg("secondaryjoin", secondaryjoin, None),
451 _RelationshipArg("order_by", order_by, None),
452 _RelationshipArg("foreign_keys", foreign_keys, None),
453 _RelationshipArg("remote_side", remote_side, None),
454 _StringRelationshipArg("back_populates", back_populates, None),
455 )
456
457 self.post_update = post_update
458 self.viewonly = viewonly
459 if viewonly:
460 self._warn_for_persistence_only_flags(
461 passive_deletes=passive_deletes,
462 passive_updates=passive_updates,
463 enable_typechecks=enable_typechecks,
464 active_history=active_history,
465 cascade_backrefs=cascade_backrefs,
466 )
467 if viewonly and sync_backref:
468 raise sa_exc.ArgumentError(
469 "sync_backref and viewonly cannot both be True"
470 )
471 self.sync_backref = sync_backref
472 self.lazy = lazy
473 self.single_parent = single_parent
474 self.collection_class = collection_class
475 self.passive_deletes = passive_deletes
476
477 if cascade_backrefs:
478 raise sa_exc.ArgumentError(
479 "The 'cascade_backrefs' parameter passed to "
480 "relationship() may only be set to False."
481 )
482
483 self.passive_updates = passive_updates
484 self.enable_typechecks = enable_typechecks
485 self.query_class = query_class
486 self.innerjoin = innerjoin
487 self.distinct_target_key = distinct_target_key
488 self.doc = doc
489 self.active_history = active_history
490 self._legacy_inactive_history_style = _legacy_inactive_history_style
491
492 self.join_depth = join_depth
493 if omit_join:
494 util.warn(
495 "setting omit_join to True is not supported; selectin "
496 "loading of this relationship may not work correctly if this "
497 "flag is set explicitly. omit_join optimization is "
498 "automatically detected for conditions under which it is "
499 "supported."
500 )
501
502 self.omit_join = omit_join
503 self.local_remote_pairs = _local_remote_pairs
504 self.load_on_pending = load_on_pending
505 self.comparator_factory = (
506 comparator_factory or RelationshipProperty.Comparator
507 )
508 util.set_creation_order(self)
509
510 if info is not None:
511 self.info.update(info)
512
513 self.strategy_key = (("lazy", self.lazy),)
514
515 self._reverse_property: Set[RelationshipProperty[Any]] = set()
516
517 if overlaps:
518 self._overlaps = set(re.split(r"\s*,\s*", overlaps)) # type: ignore # noqa: E501
519 else:
520 self._overlaps = ()
521
522 # mypy ignoring the @property setter
523 self.cascade = cascade # type: ignore
524
525 if back_populates:
526 if backref:
527 raise sa_exc.ArgumentError(
528 "backref and back_populates keyword arguments "
529 "are mutually exclusive"
530 )
531 self.backref = None
532 else:
533 self.backref = backref
534
535 @property
536 def back_populates(self) -> str:
537 return self._init_args.back_populates.effective_value() # type: ignore
538
539 @back_populates.setter
540 def back_populates(self, value: str) -> None:
541 self._init_args.back_populates.argument = value
542
543 def _warn_for_persistence_only_flags(self, **kw: Any) -> None:
544 for k, v in kw.items():
545 if v != self._persistence_only[k]:
546 # we are warning here rather than warn deprecated as this is a
547 # configuration mistake, and Python shows regular warnings more
548 # aggressively than deprecation warnings by default. Unlike the
549 # case of setting viewonly with cascade, the settings being
550 # warned about here are not actively doing the wrong thing
551 # against viewonly=True, so it is not as urgent to have these
552 # raise an error.
553 util.warn(
554 "Setting %s on relationship() while also "
555 "setting viewonly=True does not make sense, as a "
556 "viewonly=True relationship does not perform persistence "
557 "operations. This configuration may raise an error "
558 "in a future release." % (k,)
559 )
560
561 def instrument_class(self, mapper: Mapper[Any]) -> None:
562 attributes.register_descriptor(
563 mapper.class_,
564 self.key,
565 comparator=self.comparator_factory(self, mapper),
566 parententity=mapper,
567 doc=self.doc,
568 )
569
570 class Comparator(util.MemoizedSlots, PropComparator[_PT]):
571 """Produce boolean, comparison, and other operators for
572 :class:`.RelationshipProperty` attributes.
573
574 See the documentation for :class:`.PropComparator` for a brief
575 overview of ORM level operator definition.
576
577 .. seealso::
578
579 :class:`.PropComparator`
580
581 :class:`.ColumnProperty.Comparator`
582
583 :class:`.ColumnOperators`
584
585 :ref:`types_operators`
586
587 :attr:`.TypeEngine.comparator_factory`
588
589 """
590
591 __slots__ = (
592 "entity",
593 "mapper",
594 "property",
595 "_of_type",
596 "_extra_criteria",
597 )
598
599 prop: RODescriptorReference[RelationshipProperty[_PT]]
600 _of_type: Optional[_EntityType[_PT]]
601
602 def __init__(
603 self,
604 prop: RelationshipProperty[_PT],
605 parentmapper: _InternalEntityType[Any],
606 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
607 of_type: Optional[_EntityType[_PT]] = None,
608 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
609 ):
610 """Construction of :class:`.RelationshipProperty.Comparator`
611 is internal to the ORM's attribute mechanics.
612
613 """
614 self.prop = prop
615 self._parententity = parentmapper
616 self._adapt_to_entity = adapt_to_entity
617 if of_type:
618 self._of_type = of_type
619 else:
620 self._of_type = None
621 self._extra_criteria = extra_criteria
622
623 def adapt_to_entity(
624 self, adapt_to_entity: AliasedInsp[Any]
625 ) -> RelationshipProperty.Comparator[Any]:
626 return self.__class__(
627 self.prop,
628 self._parententity,
629 adapt_to_entity=adapt_to_entity,
630 of_type=self._of_type,
631 )
632
633 entity: _InternalEntityType[_PT]
634 """The target entity referred to by this
635 :class:`.RelationshipProperty.Comparator`.
636
637 This is either a :class:`_orm.Mapper` or :class:`.AliasedInsp`
638 object.
639
640 This is the "target" or "remote" side of the
641 :func:`_orm.relationship`.
642
643 """
644
645 mapper: Mapper[_PT]
646 """The target :class:`_orm.Mapper` referred to by this
647 :class:`.RelationshipProperty.Comparator`.
648
649 This is the "target" or "remote" side of the
650 :func:`_orm.relationship`.
651
652 """
653
654 def _memoized_attr_entity(self) -> _InternalEntityType[_PT]:
655 if self._of_type:
656 return inspect(self._of_type) # type: ignore
657 else:
658 return self.prop.entity
659
660 def _memoized_attr_mapper(self) -> Mapper[_PT]:
661 return self.entity.mapper
662
663 def _source_selectable(self) -> FromClause:
664 if self._adapt_to_entity:
665 return self._adapt_to_entity.selectable
666 else:
667 return self.property.parent._with_polymorphic_selectable
668
669 def __clause_element__(self) -> ColumnElement[bool]:
670 adapt_from = self._source_selectable()
671 if self._of_type:
672 of_type_entity = inspect(self._of_type)
673 else:
674 of_type_entity = None
675
676 (
677 pj,
678 sj,
679 source,
680 dest,
681 secondary,
682 target_adapter,
683 ) = self.prop._create_joins(
684 source_selectable=adapt_from,
685 source_polymorphic=True,
686 of_type_entity=of_type_entity,
687 alias_secondary=True,
688 extra_criteria=self._extra_criteria,
689 )
690 if sj is not None:
691 return pj & sj
692 else:
693 return pj
694
695 def of_type(self, class_: _EntityType[Any]) -> PropComparator[_PT]:
696 r"""Redefine this object in terms of a polymorphic subclass.
697
698 See :meth:`.PropComparator.of_type` for an example.
699
700
701 """
702 return RelationshipProperty.Comparator(
703 self.prop,
704 self._parententity,
705 adapt_to_entity=self._adapt_to_entity,
706 of_type=class_,
707 extra_criteria=self._extra_criteria,
708 )
709
710 def and_(
711 self, *criteria: _ColumnExpressionArgument[bool]
712 ) -> PropComparator[Any]:
713 """Add AND criteria.
714
715 See :meth:`.PropComparator.and_` for an example.
716
717 .. versionadded:: 1.4
718
719 """
720 exprs = tuple(
721 coercions.expect(roles.WhereHavingRole, clause)
722 for clause in util.coerce_generator_arg(criteria)
723 )
724
725 return RelationshipProperty.Comparator(
726 self.prop,
727 self._parententity,
728 adapt_to_entity=self._adapt_to_entity,
729 of_type=self._of_type,
730 extra_criteria=self._extra_criteria + exprs,
731 )
732
733 def in_(self, other: Any) -> NoReturn:
734 """Produce an IN clause - this is not implemented
735 for :func:`_orm.relationship`-based attributes at this time.
736
737 """
738 raise NotImplementedError(
739 "in_() not yet supported for "
740 "relationships. For a simple "
741 "many-to-one, use in_() against "
742 "the set of foreign key values."
743 )
744
745 # https://github.com/python/mypy/issues/4266
746 __hash__ = None # type: ignore
747
748 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
749 """Implement the ``==`` operator.
750
751 In a many-to-one context, such as::
752
753 MyClass.some_prop == <some object>
754
755 this will typically produce a
756 clause such as::
757
758 mytable.related_id == <some id>
759
760 Where ``<some id>`` is the primary key of the given
761 object.
762
763 The ``==`` operator provides partial functionality for non-
764 many-to-one comparisons:
765
766 * Comparisons against collections are not supported.
767 Use :meth:`~.Relationship.Comparator.contains`.
768 * Compared to a scalar one-to-many, will produce a
769 clause that compares the target columns in the parent to
770 the given target.
771 * Compared to a scalar many-to-many, an alias
772 of the association table will be rendered as
773 well, forming a natural join that is part of the
774 main body of the query. This will not work for
775 queries that go beyond simple AND conjunctions of
776 comparisons, such as those which use OR. Use
777 explicit joins, outerjoins, or
778 :meth:`~.Relationship.Comparator.has` for
779 more comprehensive non-many-to-one scalar
780 membership tests.
781 * Comparisons against ``None`` given in a one-to-many
782 or many-to-many context produce a NOT EXISTS clause.
783
784 """
785 if other is None or isinstance(other, expression.Null):
786 if self.property.direction in [ONETOMANY, MANYTOMANY]:
787 return ~self._criterion_exists()
788 else:
789 return _orm_annotate(
790 self.property._optimized_compare(
791 None, adapt_source=self.adapter
792 )
793 )
794 elif self.property.uselist:
795 raise sa_exc.InvalidRequestError(
796 "Can't compare a collection to an object or collection; "
797 "use contains() to test for membership."
798 )
799 else:
800 return _orm_annotate(
801 self.property._optimized_compare(
802 other, adapt_source=self.adapter
803 )
804 )
805
806 def _criterion_exists(
807 self,
808 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
809 **kwargs: Any,
810 ) -> Exists:
811 where_criteria = (
812 coercions.expect(roles.WhereHavingRole, criterion)
813 if criterion is not None
814 else None
815 )
816
817 if getattr(self, "_of_type", None):
818 info: Optional[_InternalEntityType[Any]] = inspect(
819 self._of_type
820 )
821 assert info is not None
822 target_mapper, to_selectable, is_aliased_class = (
823 info.mapper,
824 info.selectable,
825 info.is_aliased_class,
826 )
827 if self.property._is_self_referential and not is_aliased_class:
828 to_selectable = to_selectable._anonymous_fromclause()
829
830 single_crit = target_mapper._single_table_criterion
831 if single_crit is not None:
832 if where_criteria is not None:
833 where_criteria = single_crit & where_criteria
834 else:
835 where_criteria = single_crit
836 else:
837 is_aliased_class = False
838 to_selectable = None
839
840 if self.adapter:
841 source_selectable = self._source_selectable()
842 else:
843 source_selectable = None
844
845 (
846 pj,
847 sj,
848 source,
849 dest,
850 secondary,
851 target_adapter,
852 ) = self.property._create_joins(
853 dest_selectable=to_selectable,
854 source_selectable=source_selectable,
855 )
856
857 for k in kwargs:
858 crit = getattr(self.property.mapper.class_, k) == kwargs[k]
859 if where_criteria is None:
860 where_criteria = crit
861 else:
862 where_criteria = where_criteria & crit
863
864 # annotate the *local* side of the join condition, in the case
865 # of pj + sj this is the full primaryjoin, in the case of just
866 # pj its the local side of the primaryjoin.
867 if sj is not None:
868 j = _orm_annotate(pj) & sj
869 else:
870 j = _orm_annotate(pj, exclude=self.property.remote_side)
871
872 if (
873 where_criteria is not None
874 and target_adapter
875 and not is_aliased_class
876 ):
877 # limit this adapter to annotated only?
878 where_criteria = target_adapter.traverse(where_criteria)
879
880 # only have the "joined left side" of what we
881 # return be subject to Query adaption. The right
882 # side of it is used for an exists() subquery and
883 # should not correlate or otherwise reach out
884 # to anything in the enclosing query.
885 if where_criteria is not None:
886 where_criteria = where_criteria._annotate(
887 {"no_replacement_traverse": True}
888 )
889
890 crit = j & sql.True_._ifnone(where_criteria)
891
892 if secondary is not None:
893 ex = (
894 sql.exists(1)
895 .where(crit)
896 .select_from(dest, secondary)
897 .correlate_except(dest, secondary)
898 )
899 else:
900 ex = (
901 sql.exists(1)
902 .where(crit)
903 .select_from(dest)
904 .correlate_except(dest)
905 )
906 return ex
907
908 def any(
909 self,
910 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
911 **kwargs: Any,
912 ) -> ColumnElement[bool]:
913 """Produce an expression that tests a collection against
914 particular criterion, using EXISTS.
915
916 An expression like::
917
918 session.query(MyClass).filter(
919 MyClass.somereference.any(SomeRelated.x==2)
920 )
921
922
923 Will produce a query like::
924
925 SELECT * FROM my_table WHERE
926 EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
927 AND related.x=2)
928
929 Because :meth:`~.Relationship.Comparator.any` uses
930 a correlated subquery, its performance is not nearly as
931 good when compared against large target tables as that of
932 using a join.
933
934 :meth:`~.Relationship.Comparator.any` is particularly
935 useful for testing for empty collections::
936
937 session.query(MyClass).filter(
938 ~MyClass.somereference.any()
939 )
940
941 will produce::
942
943 SELECT * FROM my_table WHERE
944 NOT (EXISTS (SELECT 1 FROM related WHERE
945 related.my_id=my_table.id))
946
947 :meth:`~.Relationship.Comparator.any` is only
948 valid for collections, i.e. a :func:`_orm.relationship`
949 that has ``uselist=True``. For scalar references,
950 use :meth:`~.Relationship.Comparator.has`.
951
952 """
953 if not self.property.uselist:
954 raise sa_exc.InvalidRequestError(
955 "'any()' not implemented for scalar "
956 "attributes. Use has()."
957 )
958
959 return self._criterion_exists(criterion, **kwargs)
960
961 def has(
962 self,
963 criterion: Optional[_ColumnExpressionArgument[bool]] = None,
964 **kwargs: Any,
965 ) -> ColumnElement[bool]:
966 """Produce an expression that tests a scalar reference against
967 particular criterion, using EXISTS.
968
969 An expression like::
970
971 session.query(MyClass).filter(
972 MyClass.somereference.has(SomeRelated.x==2)
973 )
974
975
976 Will produce a query like::
977
978 SELECT * FROM my_table WHERE
979 EXISTS (SELECT 1 FROM related WHERE
980 related.id==my_table.related_id AND related.x=2)
981
982 Because :meth:`~.Relationship.Comparator.has` uses
983 a correlated subquery, its performance is not nearly as
984 good when compared against large target tables as that of
985 using a join.
986
987 :meth:`~.Relationship.Comparator.has` is only
988 valid for scalar references, i.e. a :func:`_orm.relationship`
989 that has ``uselist=False``. For collection references,
990 use :meth:`~.Relationship.Comparator.any`.
991
992 """
993 if self.property.uselist:
994 raise sa_exc.InvalidRequestError(
995 "'has()' not implemented for collections. Use any()."
996 )
997 return self._criterion_exists(criterion, **kwargs)
998
999 def contains(
1000 self, other: _ColumnExpressionArgument[Any], **kwargs: Any
1001 ) -> ColumnElement[bool]:
1002 """Return a simple expression that tests a collection for
1003 containment of a particular item.
1004
1005 :meth:`~.Relationship.Comparator.contains` is
1006 only valid for a collection, i.e. a
1007 :func:`_orm.relationship` that implements
1008 one-to-many or many-to-many with ``uselist=True``.
1009
1010 When used in a simple one-to-many context, an
1011 expression like::
1012
1013 MyClass.contains(other)
1014
1015 Produces a clause like::
1016
1017 mytable.id == <some id>
1018
1019 Where ``<some id>`` is the value of the foreign key
1020 attribute on ``other`` which refers to the primary
1021 key of its parent object. From this it follows that
1022 :meth:`~.Relationship.Comparator.contains` is
1023 very useful when used with simple one-to-many
1024 operations.
1025
1026 For many-to-many operations, the behavior of
1027 :meth:`~.Relationship.Comparator.contains`
1028 has more caveats. The association table will be
1029 rendered in the statement, producing an "implicit"
1030 join, that is, includes multiple tables in the FROM
1031 clause which are equated in the WHERE clause::
1032
1033 query(MyClass).filter(MyClass.contains(other))
1034
1035 Produces a query like::
1036
1037 SELECT * FROM my_table, my_association_table AS
1038 my_association_table_1 WHERE
1039 my_table.id = my_association_table_1.parent_id
1040 AND my_association_table_1.child_id = <some id>
1041
1042 Where ``<some id>`` would be the primary key of
1043 ``other``. From the above, it is clear that
1044 :meth:`~.Relationship.Comparator.contains`
1045 will **not** work with many-to-many collections when
1046 used in queries that move beyond simple AND
1047 conjunctions, such as multiple
1048 :meth:`~.Relationship.Comparator.contains`
1049 expressions joined by OR. In such cases subqueries or
1050 explicit "outer joins" will need to be used instead.
1051 See :meth:`~.Relationship.Comparator.any` for
1052 a less-performant alternative using EXISTS, or refer
1053 to :meth:`_query.Query.outerjoin`
1054 as well as :ref:`orm_queryguide_joins`
1055 for more details on constructing outer joins.
1056
1057 kwargs may be ignored by this operator but are required for API
1058 conformance.
1059 """
1060 if not self.prop.uselist:
1061 raise sa_exc.InvalidRequestError(
1062 "'contains' not implemented for scalar "
1063 "attributes. Use =="
1064 )
1065
1066 clause = self.prop._optimized_compare(
1067 other, adapt_source=self.adapter
1068 )
1069
1070 if self.prop.secondaryjoin is not None:
1071 clause.negation_clause = self.__negated_contains_or_equals(
1072 other
1073 )
1074
1075 return clause
1076
1077 def __negated_contains_or_equals(
1078 self, other: Any
1079 ) -> ColumnElement[bool]:
1080 if self.prop.direction == MANYTOONE:
1081 state = attributes.instance_state(other)
1082
1083 def state_bindparam(
1084 local_col: ColumnElement[Any],
1085 state: InstanceState[Any],
1086 remote_col: ColumnElement[Any],
1087 ) -> BindParameter[Any]:
1088 dict_ = state.dict
1089 return sql.bindparam(
1090 local_col.key,
1091 type_=local_col.type,
1092 unique=True,
1093 callable_=self.prop._get_attr_w_warn_on_none(
1094 self.prop.mapper, state, dict_, remote_col
1095 ),
1096 )
1097
1098 def adapt(col: _CE) -> _CE:
1099 if self.adapter:
1100 return self.adapter(col)
1101 else:
1102 return col
1103
1104 if self.property._use_get:
1105 return sql.and_(
1106 *[
1107 sql.or_(
1108 adapt(x)
1109 != state_bindparam(adapt(x), state, y),
1110 adapt(x) == None,
1111 )
1112 for (x, y) in self.property.local_remote_pairs
1113 ]
1114 )
1115
1116 criterion = sql.and_(
1117 *[
1118 x == y
1119 for (x, y) in zip(
1120 self.property.mapper.primary_key,
1121 self.property.mapper.primary_key_from_instance(other),
1122 )
1123 ]
1124 )
1125
1126 return ~self._criterion_exists(criterion)
1127
1128 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
1129 """Implement the ``!=`` operator.
1130
1131 In a many-to-one context, such as::
1132
1133 MyClass.some_prop != <some object>
1134
1135 This will typically produce a clause such as::
1136
1137 mytable.related_id != <some id>
1138
1139 Where ``<some id>`` is the primary key of the
1140 given object.
1141
1142 The ``!=`` operator provides partial functionality for non-
1143 many-to-one comparisons:
1144
1145 * Comparisons against collections are not supported.
1146 Use
1147 :meth:`~.Relationship.Comparator.contains`
1148 in conjunction with :func:`_expression.not_`.
1149 * Compared to a scalar one-to-many, will produce a
1150 clause that compares the target columns in the parent to
1151 the given target.
1152 * Compared to a scalar many-to-many, an alias
1153 of the association table will be rendered as
1154 well, forming a natural join that is part of the
1155 main body of the query. This will not work for
1156 queries that go beyond simple AND conjunctions of
1157 comparisons, such as those which use OR. Use
1158 explicit joins, outerjoins, or
1159 :meth:`~.Relationship.Comparator.has` in
1160 conjunction with :func:`_expression.not_` for
1161 more comprehensive non-many-to-one scalar
1162 membership tests.
1163 * Comparisons against ``None`` given in a one-to-many
1164 or many-to-many context produce an EXISTS clause.
1165
1166 """
1167 if other is None or isinstance(other, expression.Null):
1168 if self.property.direction == MANYTOONE:
1169 return _orm_annotate(
1170 ~self.property._optimized_compare(
1171 None, adapt_source=self.adapter
1172 )
1173 )
1174
1175 else:
1176 return self._criterion_exists()
1177 elif self.property.uselist:
1178 raise sa_exc.InvalidRequestError(
1179 "Can't compare a collection"
1180 " to an object or collection; use "
1181 "contains() to test for membership."
1182 )
1183 else:
1184 return _orm_annotate(self.__negated_contains_or_equals(other))
1185
1186 def _memoized_attr_property(self) -> RelationshipProperty[_PT]:
1187 self.prop.parent._check_configure()
1188 return self.prop
1189
1190 def _with_parent(
1191 self,
1192 instance: object,
1193 alias_secondary: bool = True,
1194 from_entity: Optional[_EntityType[Any]] = None,
1195 ) -> ColumnElement[bool]:
1196 assert instance is not None
1197 adapt_source: Optional[_CoreAdapterProto] = None
1198 if from_entity is not None:
1199 insp: Optional[_InternalEntityType[Any]] = inspect(from_entity)
1200 assert insp is not None
1201 if insp_is_aliased_class(insp):
1202 adapt_source = insp._adapter.adapt_clause
1203 return self._optimized_compare(
1204 instance,
1205 value_is_parent=True,
1206 adapt_source=adapt_source,
1207 alias_secondary=alias_secondary,
1208 )
1209
1210 def _optimized_compare(
1211 self,
1212 state: Any,
1213 value_is_parent: bool = False,
1214 adapt_source: Optional[_CoreAdapterProto] = None,
1215 alias_secondary: bool = True,
1216 ) -> ColumnElement[bool]:
1217 if state is not None:
1218 try:
1219 state = inspect(state)
1220 except sa_exc.NoInspectionAvailable:
1221 state = None
1222
1223 if state is None or not getattr(state, "is_instance", False):
1224 raise sa_exc.ArgumentError(
1225 "Mapped instance expected for relationship "
1226 "comparison to object. Classes, queries and other "
1227 "SQL elements are not accepted in this context; for "
1228 "comparison with a subquery, "
1229 "use %s.has(**criteria)." % self
1230 )
1231 reverse_direction = not value_is_parent
1232
1233 if state is None:
1234 return self._lazy_none_clause(
1235 reverse_direction, adapt_source=adapt_source
1236 )
1237
1238 if not reverse_direction:
1239 criterion, bind_to_col = (
1240 self._lazy_strategy._lazywhere,
1241 self._lazy_strategy._bind_to_col,
1242 )
1243 else:
1244 criterion, bind_to_col = (
1245 self._lazy_strategy._rev_lazywhere,
1246 self._lazy_strategy._rev_bind_to_col,
1247 )
1248
1249 if reverse_direction:
1250 mapper = self.mapper
1251 else:
1252 mapper = self.parent
1253
1254 dict_ = attributes.instance_dict(state.obj())
1255
1256 def visit_bindparam(bindparam: BindParameter[Any]) -> None:
1257 if bindparam._identifying_key in bind_to_col:
1258 bindparam.callable = self._get_attr_w_warn_on_none(
1259 mapper,
1260 state,
1261 dict_,
1262 bind_to_col[bindparam._identifying_key],
1263 )
1264
1265 if self.secondary is not None and alias_secondary:
1266 criterion = ClauseAdapter(
1267 self.secondary._anonymous_fromclause()
1268 ).traverse(criterion)
1269
1270 criterion = visitors.cloned_traverse(
1271 criterion, {}, {"bindparam": visit_bindparam}
1272 )
1273
1274 if adapt_source:
1275 criterion = adapt_source(criterion)
1276 return criterion
1277
1278 def _get_attr_w_warn_on_none(
1279 self,
1280 mapper: Mapper[Any],
1281 state: InstanceState[Any],
1282 dict_: _InstanceDict,
1283 column: ColumnElement[Any],
1284 ) -> Callable[[], Any]:
1285 """Create the callable that is used in a many-to-one expression.
1286
1287 E.g.::
1288
1289 u1 = s.query(User).get(5)
1290
1291 expr = Address.user == u1
1292
1293 Above, the SQL should be "address.user_id = 5". The callable
1294 returned by this method produces the value "5" based on the identity
1295 of ``u1``.
1296
1297 """
1298
1299 # in this callable, we're trying to thread the needle through
1300 # a wide variety of scenarios, including:
1301 #
1302 # * the object hasn't been flushed yet and there's no value for
1303 # the attribute as of yet
1304 #
1305 # * the object hasn't been flushed yet but it has a user-defined
1306 # value
1307 #
1308 # * the object has a value but it's expired and not locally present
1309 #
1310 # * the object has a value but it's expired and not locally present,
1311 # and the object is also detached
1312 #
1313 # * The object hadn't been flushed yet, there was no value, but
1314 # later, the object has been expired and detached, and *now*
1315 # they're trying to evaluate it
1316 #
1317 # * the object had a value, but it was changed to a new value, and
1318 # then expired
1319 #
1320 # * the object had a value, but it was changed to a new value, and
1321 # then expired, then the object was detached
1322 #
1323 # * the object has a user-set value, but it's None and we don't do
1324 # the comparison correctly for that so warn
1325 #
1326
1327 prop = mapper.get_property_by_column(column)
1328
1329 # by invoking this method, InstanceState will track the last known
1330 # value for this key each time the attribute is to be expired.
1331 # this feature was added explicitly for use in this method.
1332 state._track_last_known_value(prop.key)
1333
1334 lkv_fixed = state._last_known_values
1335
1336 def _go() -> Any:
1337 assert lkv_fixed is not None
1338 last_known = to_return = lkv_fixed[prop.key]
1339 existing_is_available = (
1340 last_known is not LoaderCallableStatus.NO_VALUE
1341 )
1342
1343 # we support that the value may have changed. so here we
1344 # try to get the most recent value including re-fetching.
1345 # only if we can't get a value now due to detachment do we return
1346 # the last known value
1347 current_value = mapper._get_state_attr_by_column(
1348 state,
1349 dict_,
1350 column,
1351 passive=(
1352 PassiveFlag.PASSIVE_OFF
1353 if state.persistent
1354 else PassiveFlag.PASSIVE_NO_FETCH ^ PassiveFlag.INIT_OK
1355 ),
1356 )
1357
1358 if current_value is LoaderCallableStatus.NEVER_SET:
1359 if not existing_is_available:
1360 raise sa_exc.InvalidRequestError(
1361 "Can't resolve value for column %s on object "
1362 "%s; no value has been set for this column"
1363 % (column, state_str(state))
1364 )
1365 elif current_value is LoaderCallableStatus.PASSIVE_NO_RESULT:
1366 if not existing_is_available:
1367 raise sa_exc.InvalidRequestError(
1368 "Can't resolve value for column %s on object "
1369 "%s; the object is detached and the value was "
1370 "expired" % (column, state_str(state))
1371 )
1372 else:
1373 to_return = current_value
1374 if to_return is None:
1375 util.warn(
1376 "Got None for value of column %s; this is unsupported "
1377 "for a relationship comparison and will not "
1378 "currently produce an IS comparison "
1379 "(but may in a future release)" % column
1380 )
1381 return to_return
1382
1383 return _go
1384
1385 def _lazy_none_clause(
1386 self,
1387 reverse_direction: bool = False,
1388 adapt_source: Optional[_CoreAdapterProto] = None,
1389 ) -> ColumnElement[bool]:
1390 if not reverse_direction:
1391 criterion, bind_to_col = (
1392 self._lazy_strategy._lazywhere,
1393 self._lazy_strategy._bind_to_col,
1394 )
1395 else:
1396 criterion, bind_to_col = (
1397 self._lazy_strategy._rev_lazywhere,
1398 self._lazy_strategy._rev_bind_to_col,
1399 )
1400
1401 criterion = adapt_criterion_to_null(criterion, bind_to_col)
1402
1403 if adapt_source:
1404 criterion = adapt_source(criterion)
1405 return criterion
1406
1407 def __str__(self) -> str:
1408 return str(self.parent.class_.__name__) + "." + self.key
1409
1410 def merge(
1411 self,
1412 session: Session,
1413 source_state: InstanceState[Any],
1414 source_dict: _InstanceDict,
1415 dest_state: InstanceState[Any],
1416 dest_dict: _InstanceDict,
1417 load: bool,
1418 _recursive: Dict[Any, object],
1419 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object],
1420 ) -> None:
1421 if load:
1422 for r in self._reverse_property:
1423 if (source_state, r) in _recursive:
1424 return
1425
1426 if "merge" not in self._cascade:
1427 return
1428
1429 if self.key not in source_dict:
1430 return
1431
1432 if self.uselist:
1433 impl = source_state.get_impl(self.key)
1434
1435 assert is_has_collection_adapter(impl)
1436 instances_iterable = impl.get_collection(source_state, source_dict)
1437
1438 # if this is a CollectionAttributeImpl, then empty should
1439 # be False, otherwise "self.key in source_dict" should not be
1440 # True
1441 assert not instances_iterable.empty if impl.collection else True
1442
1443 if load:
1444 # for a full merge, pre-load the destination collection,
1445 # so that individual _merge of each item pulls from identity
1446 # map for those already present.
1447 # also assumes CollectionAttributeImpl behavior of loading
1448 # "old" list in any case
1449 dest_state.get_impl(self.key).get(
1450 dest_state, dest_dict, passive=PassiveFlag.PASSIVE_MERGE
1451 )
1452
1453 dest_list = []
1454 for current in instances_iterable:
1455 current_state = attributes.instance_state(current)
1456 current_dict = attributes.instance_dict(current)
1457 _recursive[(current_state, self)] = True
1458 obj = session._merge(
1459 current_state,
1460 current_dict,
1461 load=load,
1462 _recursive=_recursive,
1463 _resolve_conflict_map=_resolve_conflict_map,
1464 )
1465 if obj is not None:
1466 dest_list.append(obj)
1467
1468 if not load:
1469 coll = attributes.init_state_collection(
1470 dest_state, dest_dict, self.key
1471 )
1472 for c in dest_list:
1473 coll.append_without_event(c)
1474 else:
1475 dest_impl = dest_state.get_impl(self.key)
1476 assert is_has_collection_adapter(dest_impl)
1477 dest_impl.set(
1478 dest_state,
1479 dest_dict,
1480 dest_list,
1481 _adapt=False,
1482 passive=PassiveFlag.PASSIVE_MERGE,
1483 )
1484 else:
1485 current = source_dict[self.key]
1486 if current is not None:
1487 current_state = attributes.instance_state(current)
1488 current_dict = attributes.instance_dict(current)
1489 _recursive[(current_state, self)] = True
1490 obj = session._merge(
1491 current_state,
1492 current_dict,
1493 load=load,
1494 _recursive=_recursive,
1495 _resolve_conflict_map=_resolve_conflict_map,
1496 )
1497 else:
1498 obj = None
1499
1500 if not load:
1501 dest_dict[self.key] = obj
1502 else:
1503 dest_state.get_impl(self.key).set(
1504 dest_state, dest_dict, obj, None
1505 )
1506
1507 def _value_as_iterable(
1508 self,
1509 state: InstanceState[_O],
1510 dict_: _InstanceDict,
1511 key: str,
1512 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1513 ) -> Sequence[Tuple[InstanceState[_O], _O]]:
1514 """Return a list of tuples (state, obj) for the given
1515 key.
1516
1517 returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
1518 """
1519
1520 impl = state.manager[key].impl
1521 x = impl.get(state, dict_, passive=passive)
1522 if x is LoaderCallableStatus.PASSIVE_NO_RESULT or x is None:
1523 return []
1524 elif is_has_collection_adapter(impl):
1525 return [
1526 (attributes.instance_state(o), o)
1527 for o in impl.get_collection(state, dict_, x, passive=passive)
1528 ]
1529 else:
1530 return [(attributes.instance_state(x), x)]
1531
1532 def cascade_iterator(
1533 self,
1534 type_: str,
1535 state: InstanceState[Any],
1536 dict_: _InstanceDict,
1537 visited_states: Set[InstanceState[Any]],
1538 halt_on: Optional[Callable[[InstanceState[Any]], bool]] = None,
1539 ) -> Iterator[Tuple[Any, Mapper[Any], InstanceState[Any], _InstanceDict]]:
1540 # assert type_ in self._cascade
1541
1542 # only actively lazy load on the 'delete' cascade
1543 if type_ != "delete" or self.passive_deletes:
1544 passive = PassiveFlag.PASSIVE_NO_INITIALIZE
1545 else:
1546 passive = PassiveFlag.PASSIVE_OFF | PassiveFlag.NO_RAISE
1547
1548 if type_ == "save-update":
1549 tuples = state.manager[self.key].impl.get_all_pending(state, dict_)
1550 else:
1551 tuples = self._value_as_iterable(
1552 state, dict_, self.key, passive=passive
1553 )
1554
1555 skip_pending = (
1556 type_ == "refresh-expire" and "delete-orphan" not in self._cascade
1557 )
1558
1559 for instance_state, c in tuples:
1560 if instance_state in visited_states:
1561 continue
1562
1563 if c is None:
1564 # would like to emit a warning here, but
1565 # would not be consistent with collection.append(None)
1566 # current behavior of silently skipping.
1567 # see [ticket:2229]
1568 continue
1569
1570 assert instance_state is not None
1571 instance_dict = attributes.instance_dict(c)
1572
1573 if halt_on and halt_on(instance_state):
1574 continue
1575
1576 if skip_pending and not instance_state.key:
1577 continue
1578
1579 instance_mapper = instance_state.manager.mapper
1580
1581 if not instance_mapper.isa(self.mapper.class_manager.mapper):
1582 raise AssertionError(
1583 "Attribute '%s' on class '%s' "
1584 "doesn't handle objects "
1585 "of type '%s'"
1586 % (self.key, self.parent.class_, c.__class__)
1587 )
1588
1589 visited_states.add(instance_state)
1590
1591 yield c, instance_mapper, instance_state, instance_dict
1592
1593 @property
1594 def _effective_sync_backref(self) -> bool:
1595 if self.viewonly:
1596 return False
1597 else:
1598 return self.sync_backref is not False
1599
1600 @staticmethod
1601 def _check_sync_backref(
1602 rel_a: RelationshipProperty[Any], rel_b: RelationshipProperty[Any]
1603 ) -> None:
1604 if rel_a.viewonly and rel_b.sync_backref:
1605 raise sa_exc.InvalidRequestError(
1606 "Relationship %s cannot specify sync_backref=True since %s "
1607 "includes viewonly=True." % (rel_b, rel_a)
1608 )
1609 if (
1610 rel_a.viewonly
1611 and not rel_b.viewonly
1612 and rel_b.sync_backref is not False
1613 ):
1614 rel_b.sync_backref = False
1615
1616 def _add_reverse_property(self, key: str) -> None:
1617 other = self.mapper.get_property(key, _configure_mappers=False)
1618 if not isinstance(other, RelationshipProperty):
1619 raise sa_exc.InvalidRequestError(
1620 "back_populates on relationship '%s' refers to attribute '%s' "
1621 "that is not a relationship. The back_populates parameter "
1622 "should refer to the name of a relationship on the target "
1623 "class." % (self, other)
1624 )
1625 # viewonly and sync_backref cases
1626 # 1. self.viewonly==True and other.sync_backref==True -> error
1627 # 2. self.viewonly==True and other.viewonly==False and
1628 # other.sync_backref==None -> warn sync_backref=False, set to False
1629 self._check_sync_backref(self, other)
1630 # 3. other.viewonly==True and self.sync_backref==True -> error
1631 # 4. other.viewonly==True and self.viewonly==False and
1632 # self.sync_backref==None -> warn sync_backref=False, set to False
1633 self._check_sync_backref(other, self)
1634
1635 self._reverse_property.add(other)
1636 other._reverse_property.add(self)
1637
1638 other._setup_entity()
1639
1640 if not other.mapper.common_parent(self.parent):
1641 raise sa_exc.ArgumentError(
1642 "reverse_property %r on "
1643 "relationship %s references relationship %s, which "
1644 "does not reference mapper %s"
1645 % (key, self, other, self.parent)
1646 )
1647
1648 if (
1649 other._configure_started
1650 and self.direction in (ONETOMANY, MANYTOONE)
1651 and self.direction == other.direction
1652 ):
1653 raise sa_exc.ArgumentError(
1654 "%s and back-reference %s are "
1655 "both of the same direction %r. Did you mean to "
1656 "set remote_side on the many-to-one side ?"
1657 % (other, self, self.direction)
1658 )
1659
1660 @util.memoized_property
1661 def entity(self) -> _InternalEntityType[_T]:
1662 """Return the target mapped entity, which is an inspect() of the
1663 class or aliased class that is referenced by this
1664 :class:`.RelationshipProperty`.
1665
1666 """
1667 self.parent._check_configure()
1668 return self.entity
1669
1670 @util.memoized_property
1671 def mapper(self) -> Mapper[_T]:
1672 """Return the targeted :class:`_orm.Mapper` for this
1673 :class:`.RelationshipProperty`.
1674
1675 """
1676 return self.entity.mapper
1677
1678 def do_init(self) -> None:
1679 self._check_conflicts()
1680 self._process_dependent_arguments()
1681 self._setup_entity()
1682 self._setup_registry_dependencies()
1683 self._setup_join_conditions()
1684 self._check_cascade_settings(self._cascade)
1685 self._post_init()
1686 self._generate_backref()
1687 self._join_condition._warn_for_conflicting_sync_targets()
1688 super().do_init()
1689 self._lazy_strategy = cast(
1690 "LazyLoader", self._get_strategy((("lazy", "select"),))
1691 )
1692
1693 def _setup_registry_dependencies(self) -> None:
1694 self.parent.mapper.registry._set_depends_on(
1695 self.entity.mapper.registry
1696 )
1697
1698 def _process_dependent_arguments(self) -> None:
1699 """Convert incoming configuration arguments to their
1700 proper form.
1701
1702 Callables are resolved, ORM annotations removed.
1703
1704 """
1705
1706 # accept callables for other attributes which may require
1707 # deferred initialization. This technique is used
1708 # by declarative "string configs" and some recipes.
1709 init_args = self._init_args
1710
1711 for attr in (
1712 "order_by",
1713 "primaryjoin",
1714 "secondaryjoin",
1715 "secondary",
1716 "foreign_keys",
1717 "remote_side",
1718 "back_populates",
1719 ):
1720 rel_arg = getattr(init_args, attr)
1721
1722 rel_arg._resolve_against_registry(self._clsregistry_resolvers[1])
1723
1724 # remove "annotations" which are present if mapped class
1725 # descriptors are used to create the join expression.
1726 for attr in "primaryjoin", "secondaryjoin":
1727 rel_arg = getattr(init_args, attr)
1728 val = rel_arg.resolved
1729 if val is not None:
1730 rel_arg.resolved = _orm_deannotate(
1731 coercions.expect(
1732 roles.ColumnArgumentRole, val, argname=attr
1733 )
1734 )
1735
1736 secondary = init_args.secondary.resolved
1737 if secondary is not None and _is_mapped_class(secondary):
1738 raise sa_exc.ArgumentError(
1739 "secondary argument %s passed to to relationship() %s must "
1740 "be a Table object or other FROM clause; can't send a mapped "
1741 "class directly as rows in 'secondary' are persisted "
1742 "independently of a class that is mapped "
1743 "to that same table." % (secondary, self)
1744 )
1745
1746 # ensure expressions in self.order_by, foreign_keys,
1747 # remote_side are all columns, not strings.
1748 if (
1749 init_args.order_by.resolved is not False
1750 and init_args.order_by.resolved is not None
1751 ):
1752 self.order_by = tuple(
1753 coercions.expect(
1754 roles.ColumnArgumentRole, x, argname="order_by"
1755 )
1756 for x in util.to_list(init_args.order_by.resolved)
1757 )
1758 else:
1759 self.order_by = False
1760
1761 self._user_defined_foreign_keys = util.column_set(
1762 coercions.expect(
1763 roles.ColumnArgumentRole, x, argname="foreign_keys"
1764 )
1765 for x in util.to_column_set(init_args.foreign_keys.resolved)
1766 )
1767
1768 self.remote_side = util.column_set(
1769 coercions.expect(
1770 roles.ColumnArgumentRole, x, argname="remote_side"
1771 )
1772 for x in util.to_column_set(init_args.remote_side.resolved)
1773 )
1774
1775 def declarative_scan(
1776 self,
1777 decl_scan: _ClassScanMapperConfig,
1778 registry: _RegistryType,
1779 cls: Type[Any],
1780 originating_module: Optional[str],
1781 key: str,
1782 mapped_container: Optional[Type[Mapped[Any]]],
1783 annotation: Optional[_AnnotationScanType],
1784 extracted_mapped_annotation: Optional[_AnnotationScanType],
1785 is_dataclass_field: bool,
1786 ) -> None:
1787 argument = extracted_mapped_annotation
1788
1789 if extracted_mapped_annotation is None:
1790 if self.argument is None:
1791 self._raise_for_required(key, cls)
1792 else:
1793 return
1794
1795 argument = extracted_mapped_annotation
1796 assert originating_module is not None
1797
1798 if mapped_container is not None:
1799 is_write_only = issubclass(mapped_container, WriteOnlyMapped)
1800 is_dynamic = issubclass(mapped_container, DynamicMapped)
1801 if is_write_only:
1802 self.lazy = "write_only"
1803 self.strategy_key = (("lazy", self.lazy),)
1804 elif is_dynamic:
1805 self.lazy = "dynamic"
1806 self.strategy_key = (("lazy", self.lazy),)
1807 else:
1808 is_write_only = is_dynamic = False
1809
1810 argument = de_optionalize_union_types(argument)
1811
1812 if hasattr(argument, "__origin__"):
1813 arg_origin = argument.__origin__
1814 if isinstance(arg_origin, type) and issubclass(
1815 arg_origin, abc.Collection
1816 ):
1817 if self.collection_class is None:
1818 if _py_inspect.isabstract(arg_origin):
1819 raise sa_exc.ArgumentError(
1820 f"Collection annotation type {arg_origin} cannot "
1821 "be instantiated; please provide an explicit "
1822 "'collection_class' parameter "
1823 "(e.g. list, set, etc.) to the "
1824 "relationship() function to accompany this "
1825 "annotation"
1826 )
1827
1828 self.collection_class = arg_origin
1829
1830 elif not is_write_only and not is_dynamic:
1831 self.uselist = False
1832
1833 if argument.__args__: # type: ignore
1834 if isinstance(arg_origin, type) and issubclass(
1835 arg_origin, typing.Mapping
1836 ):
1837 type_arg = argument.__args__[-1] # type: ignore
1838 else:
1839 type_arg = argument.__args__[0] # type: ignore
1840 if hasattr(type_arg, "__forward_arg__"):
1841 str_argument = type_arg.__forward_arg__
1842
1843 argument = resolve_name_to_real_class_name(
1844 str_argument, originating_module
1845 )
1846 else:
1847 argument = type_arg
1848 else:
1849 raise sa_exc.ArgumentError(
1850 f"Generic alias {argument} requires an argument"
1851 )
1852 elif hasattr(argument, "__forward_arg__"):
1853 argument = argument.__forward_arg__
1854
1855 argument = resolve_name_to_real_class_name(
1856 argument, originating_module
1857 )
1858
1859 if (
1860 self.collection_class is None
1861 and not is_write_only
1862 and not is_dynamic
1863 ):
1864 self.uselist = False
1865
1866 # ticket #8759
1867 # if a lead argument was given to relationship(), like
1868 # `relationship("B")`, use that, don't replace it with class we
1869 # found in the annotation. The declarative_scan() method call here is
1870 # still useful, as we continue to derive collection type and do
1871 # checking of the annotation in any case.
1872 if self.argument is None:
1873 self.argument = cast("_RelationshipArgumentType[_T]", argument)
1874
1875 @util.preload_module("sqlalchemy.orm.mapper")
1876 def _setup_entity(self, __argument: Any = None, /) -> None:
1877 if "entity" in self.__dict__:
1878 return
1879
1880 mapperlib = util.preloaded.orm_mapper
1881
1882 if __argument:
1883 argument = __argument
1884 else:
1885 argument = self.argument
1886
1887 resolved_argument: _ExternalEntityType[Any]
1888
1889 if isinstance(argument, str):
1890 # we might want to cleanup clsregistry API to make this
1891 # more straightforward
1892 resolved_argument = cast(
1893 "_ExternalEntityType[Any]",
1894 self._clsregistry_resolve_name(argument)(),
1895 )
1896 elif callable(argument) and not isinstance(
1897 argument, (type, mapperlib.Mapper)
1898 ):
1899 resolved_argument = argument()
1900 else:
1901 resolved_argument = argument
1902
1903 entity: _InternalEntityType[Any]
1904
1905 if isinstance(resolved_argument, type):
1906 entity = class_mapper(resolved_argument, configure=False)
1907 else:
1908 try:
1909 entity = inspect(resolved_argument)
1910 except sa_exc.NoInspectionAvailable:
1911 entity = None # type: ignore
1912
1913 if not hasattr(entity, "mapper"):
1914 raise sa_exc.ArgumentError(
1915 "relationship '%s' expects "
1916 "a class or a mapper argument (received: %s)"
1917 % (self.key, type(resolved_argument))
1918 )
1919
1920 self.entity = entity
1921 self.target = self.entity.persist_selectable
1922
1923 def _setup_join_conditions(self) -> None:
1924 self._join_condition = jc = JoinCondition(
1925 parent_persist_selectable=self.parent.persist_selectable,
1926 child_persist_selectable=self.entity.persist_selectable,
1927 parent_local_selectable=self.parent.local_table,
1928 child_local_selectable=self.entity.local_table,
1929 primaryjoin=self._init_args.primaryjoin.resolved,
1930 secondary=self._init_args.secondary.resolved,
1931 secondaryjoin=self._init_args.secondaryjoin.resolved,
1932 parent_equivalents=self.parent._equivalent_columns,
1933 child_equivalents=self.mapper._equivalent_columns,
1934 consider_as_foreign_keys=self._user_defined_foreign_keys,
1935 local_remote_pairs=self.local_remote_pairs,
1936 remote_side=self.remote_side,
1937 self_referential=self._is_self_referential,
1938 prop=self,
1939 support_sync=not self.viewonly,
1940 can_be_synced_fn=self._columns_are_mapped,
1941 )
1942 self.primaryjoin = jc.primaryjoin
1943 self.secondaryjoin = jc.secondaryjoin
1944 self.secondary = jc.secondary
1945 self.direction = jc.direction
1946 self.local_remote_pairs = jc.local_remote_pairs
1947 self.remote_side = jc.remote_columns
1948 self.local_columns = jc.local_columns
1949 self.synchronize_pairs = jc.synchronize_pairs
1950 self._calculated_foreign_keys = jc.foreign_key_columns
1951 self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs
1952
1953 @property
1954 def _clsregistry_resolve_arg(
1955 self,
1956 ) -> Callable[[str, bool], _class_resolver]:
1957 return self._clsregistry_resolvers[1]
1958
1959 @property
1960 def _clsregistry_resolve_name(
1961 self,
1962 ) -> Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]]:
1963 return self._clsregistry_resolvers[0]
1964
1965 @util.memoized_property
1966 @util.preload_module("sqlalchemy.orm.clsregistry")
1967 def _clsregistry_resolvers(
1968 self,
1969 ) -> Tuple[
1970 Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]],
1971 Callable[[str, bool], _class_resolver],
1972 ]:
1973 _resolver = util.preloaded.orm_clsregistry._resolver
1974
1975 return _resolver(self.parent.class_, self)
1976
1977 def _check_conflicts(self) -> None:
1978 """Test that this relationship is legal, warn about
1979 inheritance conflicts."""
1980 if self.parent.non_primary and not class_mapper(
1981 self.parent.class_, configure=False
1982 ).has_property(self.key):
1983 raise sa_exc.ArgumentError(
1984 "Attempting to assign a new "
1985 "relationship '%s' to a non-primary mapper on "
1986 "class '%s'. New relationships can only be added "
1987 "to the primary mapper, i.e. the very first mapper "
1988 "created for class '%s' "
1989 % (
1990 self.key,
1991 self.parent.class_.__name__,
1992 self.parent.class_.__name__,
1993 )
1994 )
1995
1996 @property
1997 def cascade(self) -> CascadeOptions:
1998 """Return the current cascade setting for this
1999 :class:`.RelationshipProperty`.
2000 """
2001 return self._cascade
2002
2003 @cascade.setter
2004 def cascade(self, cascade: Union[str, CascadeOptions]) -> None:
2005 self._set_cascade(cascade)
2006
2007 def _set_cascade(self, cascade_arg: Union[str, CascadeOptions]) -> None:
2008 cascade = CascadeOptions(cascade_arg)
2009
2010 if self.viewonly:
2011 cascade = CascadeOptions(
2012 cascade.intersection(CascadeOptions._viewonly_cascades)
2013 )
2014
2015 if "mapper" in self.__dict__:
2016 self._check_cascade_settings(cascade)
2017 self._cascade = cascade
2018
2019 if self._dependency_processor:
2020 self._dependency_processor.cascade = cascade
2021
2022 def _check_cascade_settings(self, cascade: CascadeOptions) -> None:
2023 if (
2024 cascade.delete_orphan
2025 and not self.single_parent
2026 and (self.direction is MANYTOMANY or self.direction is MANYTOONE)
2027 ):
2028 raise sa_exc.ArgumentError(
2029 "For %(direction)s relationship %(rel)s, delete-orphan "
2030 "cascade is normally "
2031 'configured only on the "one" side of a one-to-many '
2032 "relationship, "
2033 'and not on the "many" side of a many-to-one or many-to-many '
2034 "relationship. "
2035 "To force this relationship to allow a particular "
2036 '"%(relatedcls)s" object to be referenced by only '
2037 'a single "%(clsname)s" object at a time via the '
2038 "%(rel)s relationship, which "
2039 "would allow "
2040 "delete-orphan cascade to take place in this direction, set "
2041 "the single_parent=True flag."
2042 % {
2043 "rel": self,
2044 "direction": (
2045 "many-to-one"
2046 if self.direction is MANYTOONE
2047 else "many-to-many"
2048 ),
2049 "clsname": self.parent.class_.__name__,
2050 "relatedcls": self.mapper.class_.__name__,
2051 },
2052 code="bbf0",
2053 )
2054
2055 if self.passive_deletes == "all" and (
2056 "delete" in cascade or "delete-orphan" in cascade
2057 ):
2058 raise sa_exc.ArgumentError(
2059 "On %s, can't set passive_deletes='all' in conjunction "
2060 "with 'delete' or 'delete-orphan' cascade" % self
2061 )
2062
2063 if cascade.delete_orphan:
2064 self.mapper.primary_mapper()._delete_orphans.append(
2065 (self.key, self.parent.class_)
2066 )
2067
2068 def _persists_for(self, mapper: Mapper[Any]) -> bool:
2069 """Return True if this property will persist values on behalf
2070 of the given mapper.
2071
2072 """
2073
2074 return (
2075 self.key in mapper.relationships
2076 and mapper.relationships[self.key] is self
2077 )
2078
2079 def _columns_are_mapped(self, *cols: ColumnElement[Any]) -> bool:
2080 """Return True if all columns in the given collection are
2081 mapped by the tables referenced by this :class:`.RelationshipProperty`.
2082
2083 """
2084
2085 secondary = self._init_args.secondary.resolved
2086 for c in cols:
2087 if secondary is not None and secondary.c.contains_column(c):
2088 continue
2089 if not self.parent.persist_selectable.c.contains_column(
2090 c
2091 ) and not self.target.c.contains_column(c):
2092 return False
2093 return True
2094
2095 def _generate_backref(self) -> None:
2096 """Interpret the 'backref' instruction to create a
2097 :func:`_orm.relationship` complementary to this one."""
2098
2099 if self.parent.non_primary:
2100 return
2101
2102 resolve_back_populates = self._init_args.back_populates.resolved
2103
2104 if self.backref is not None and not resolve_back_populates:
2105 kwargs: Dict[str, Any]
2106 if isinstance(self.backref, str):
2107 backref_key, kwargs = self.backref, {}
2108 else:
2109 backref_key, kwargs = self.backref
2110 mapper = self.mapper.primary_mapper()
2111
2112 if not mapper.concrete:
2113 check = set(mapper.iterate_to_root()).union(
2114 mapper.self_and_descendants
2115 )
2116 for m in check:
2117 if m.has_property(backref_key) and not m.concrete:
2118 raise sa_exc.ArgumentError(
2119 "Error creating backref "
2120 "'%s' on relationship '%s': property of that "
2121 "name exists on mapper '%s'"
2122 % (backref_key, self, m)
2123 )
2124
2125 # determine primaryjoin/secondaryjoin for the
2126 # backref. Use the one we had, so that
2127 # a custom join doesn't have to be specified in
2128 # both directions.
2129 if self.secondary is not None:
2130 # for many to many, just switch primaryjoin/
2131 # secondaryjoin. use the annotated
2132 # pj/sj on the _join_condition.
2133 pj = kwargs.pop(
2134 "primaryjoin",
2135 self._join_condition.secondaryjoin_minus_local,
2136 )
2137 sj = kwargs.pop(
2138 "secondaryjoin",
2139 self._join_condition.primaryjoin_minus_local,
2140 )
2141 else:
2142 pj = kwargs.pop(
2143 "primaryjoin",
2144 self._join_condition.primaryjoin_reverse_remote,
2145 )
2146 sj = kwargs.pop("secondaryjoin", None)
2147 if sj:
2148 raise sa_exc.InvalidRequestError(
2149 "Can't assign 'secondaryjoin' on a backref "
2150 "against a non-secondary relationship."
2151 )
2152
2153 foreign_keys = kwargs.pop(
2154 "foreign_keys", self._user_defined_foreign_keys
2155 )
2156 parent = self.parent.primary_mapper()
2157 kwargs.setdefault("viewonly", self.viewonly)
2158 kwargs.setdefault("post_update", self.post_update)
2159 kwargs.setdefault("passive_updates", self.passive_updates)
2160 kwargs.setdefault("sync_backref", self.sync_backref)
2161 self.back_populates = backref_key
2162 relationship = RelationshipProperty(
2163 parent,
2164 self.secondary,
2165 primaryjoin=pj,
2166 secondaryjoin=sj,
2167 foreign_keys=foreign_keys,
2168 back_populates=self.key,
2169 **kwargs,
2170 )
2171 mapper._configure_property(
2172 backref_key, relationship, warn_for_existing=True
2173 )
2174
2175 if resolve_back_populates:
2176 if isinstance(resolve_back_populates, PropComparator):
2177 back_populates = resolve_back_populates.prop.key
2178 elif isinstance(resolve_back_populates, str):
2179 back_populates = resolve_back_populates
2180 else:
2181 # need test coverage for this case as well
2182 raise sa_exc.ArgumentError(
2183 f"Invalid back_populates value: {resolve_back_populates!r}"
2184 )
2185
2186 self._add_reverse_property(back_populates)
2187
2188 @util.preload_module("sqlalchemy.orm.dependency")
2189 def _post_init(self) -> None:
2190 dependency = util.preloaded.orm_dependency
2191
2192 if self.uselist is None:
2193 self.uselist = self.direction is not MANYTOONE
2194 if not self.viewonly:
2195 self._dependency_processor = ( # type: ignore
2196 dependency.DependencyProcessor.from_relationship
2197 )(self)
2198
2199 @util.memoized_property
2200 def _use_get(self) -> bool:
2201 """memoize the 'use_get' attribute of this RelationshipLoader's
2202 lazyloader."""
2203
2204 strategy = self._lazy_strategy
2205 return strategy.use_get
2206
2207 @util.memoized_property
2208 def _is_self_referential(self) -> bool:
2209 return self.mapper.common_parent(self.parent)
2210
2211 def _create_joins(
2212 self,
2213 source_polymorphic: bool = False,
2214 source_selectable: Optional[FromClause] = None,
2215 dest_selectable: Optional[FromClause] = None,
2216 of_type_entity: Optional[_InternalEntityType[Any]] = None,
2217 alias_secondary: bool = False,
2218 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
2219 ) -> Tuple[
2220 ColumnElement[bool],
2221 Optional[ColumnElement[bool]],
2222 FromClause,
2223 FromClause,
2224 Optional[FromClause],
2225 Optional[ClauseAdapter],
2226 ]:
2227 aliased = False
2228
2229 if alias_secondary and self.secondary is not None:
2230 aliased = True
2231
2232 if source_selectable is None:
2233 if source_polymorphic and self.parent.with_polymorphic:
2234 source_selectable = self.parent._with_polymorphic_selectable
2235
2236 if of_type_entity:
2237 dest_mapper = of_type_entity.mapper
2238 if dest_selectable is None:
2239 dest_selectable = of_type_entity.selectable
2240 aliased = True
2241 else:
2242 dest_mapper = self.mapper
2243
2244 if dest_selectable is None:
2245 dest_selectable = self.entity.selectable
2246 if self.mapper.with_polymorphic:
2247 aliased = True
2248
2249 if self._is_self_referential and source_selectable is None:
2250 dest_selectable = dest_selectable._anonymous_fromclause()
2251 aliased = True
2252 elif (
2253 dest_selectable is not self.mapper._with_polymorphic_selectable
2254 or self.mapper.with_polymorphic
2255 ):
2256 aliased = True
2257
2258 single_crit = dest_mapper._single_table_criterion
2259 aliased = aliased or (
2260 source_selectable is not None
2261 and (
2262 source_selectable
2263 is not self.parent._with_polymorphic_selectable
2264 or source_selectable._is_subquery
2265 )
2266 )
2267
2268 (
2269 primaryjoin,
2270 secondaryjoin,
2271 secondary,
2272 target_adapter,
2273 dest_selectable,
2274 ) = self._join_condition.join_targets(
2275 source_selectable,
2276 dest_selectable,
2277 aliased,
2278 single_crit,
2279 extra_criteria,
2280 )
2281 if source_selectable is None:
2282 source_selectable = self.parent.local_table
2283 if dest_selectable is None:
2284 dest_selectable = self.entity.local_table
2285 return (
2286 primaryjoin,
2287 secondaryjoin,
2288 source_selectable,
2289 dest_selectable,
2290 secondary,
2291 target_adapter,
2292 )
2293
2294
2295def _annotate_columns(element: _CE, annotations: _AnnotationDict) -> _CE:
2296 def clone(elem: _CE) -> _CE:
2297 if isinstance(elem, expression.ColumnClause):
2298 elem = elem._annotate(annotations.copy()) # type: ignore
2299 elem._copy_internals(clone=clone)
2300 return elem
2301
2302 if element is not None:
2303 element = clone(element)
2304 clone = None # type: ignore # remove gc cycles
2305 return element
2306
2307
2308class JoinCondition:
2309 primaryjoin_initial: Optional[ColumnElement[bool]]
2310 primaryjoin: ColumnElement[bool]
2311 secondaryjoin: Optional[ColumnElement[bool]]
2312 secondary: Optional[FromClause]
2313 prop: RelationshipProperty[Any]
2314
2315 synchronize_pairs: _ColumnPairs
2316 secondary_synchronize_pairs: _ColumnPairs
2317 direction: RelationshipDirection
2318
2319 parent_persist_selectable: FromClause
2320 child_persist_selectable: FromClause
2321 parent_local_selectable: FromClause
2322 child_local_selectable: FromClause
2323
2324 _local_remote_pairs: Optional[_ColumnPairs]
2325
2326 def __init__(
2327 self,
2328 parent_persist_selectable: FromClause,
2329 child_persist_selectable: FromClause,
2330 parent_local_selectable: FromClause,
2331 child_local_selectable: FromClause,
2332 *,
2333 primaryjoin: Optional[ColumnElement[bool]] = None,
2334 secondary: Optional[FromClause] = None,
2335 secondaryjoin: Optional[ColumnElement[bool]] = None,
2336 parent_equivalents: Optional[_EquivalentColumnMap] = None,
2337 child_equivalents: Optional[_EquivalentColumnMap] = None,
2338 consider_as_foreign_keys: Any = None,
2339 local_remote_pairs: Optional[_ColumnPairs] = None,
2340 remote_side: Any = None,
2341 self_referential: Any = False,
2342 prop: RelationshipProperty[Any],
2343 support_sync: bool = True,
2344 can_be_synced_fn: Callable[..., bool] = lambda *c: True,
2345 ):
2346 self.parent_persist_selectable = parent_persist_selectable
2347 self.parent_local_selectable = parent_local_selectable
2348 self.child_persist_selectable = child_persist_selectable
2349 self.child_local_selectable = child_local_selectable
2350 self.parent_equivalents = parent_equivalents
2351 self.child_equivalents = child_equivalents
2352 self.primaryjoin_initial = primaryjoin
2353 self.secondaryjoin = secondaryjoin
2354 self.secondary = secondary
2355 self.consider_as_foreign_keys = consider_as_foreign_keys
2356 self._local_remote_pairs = local_remote_pairs
2357 self._remote_side = remote_side
2358 self.prop = prop
2359 self.self_referential = self_referential
2360 self.support_sync = support_sync
2361 self.can_be_synced_fn = can_be_synced_fn
2362
2363 self._determine_joins()
2364 assert self.primaryjoin is not None
2365
2366 self._sanitize_joins()
2367 self._annotate_fks()
2368 self._annotate_remote()
2369 self._annotate_local()
2370 self._annotate_parentmapper()
2371 self._setup_pairs()
2372 self._check_foreign_cols(self.primaryjoin, True)
2373 if self.secondaryjoin is not None:
2374 self._check_foreign_cols(self.secondaryjoin, False)
2375 self._determine_direction()
2376 self._check_remote_side()
2377 self._log_joins()
2378
2379 def _log_joins(self) -> None:
2380 log = self.prop.logger
2381 log.info("%s setup primary join %s", self.prop, self.primaryjoin)
2382 log.info("%s setup secondary join %s", self.prop, self.secondaryjoin)
2383 log.info(
2384 "%s synchronize pairs [%s]",
2385 self.prop,
2386 ",".join(
2387 "(%s => %s)" % (l, r) for (l, r) in self.synchronize_pairs
2388 ),
2389 )
2390 log.info(
2391 "%s secondary synchronize pairs [%s]",
2392 self.prop,
2393 ",".join(
2394 "(%s => %s)" % (l, r)
2395 for (l, r) in self.secondary_synchronize_pairs or []
2396 ),
2397 )
2398 log.info(
2399 "%s local/remote pairs [%s]",
2400 self.prop,
2401 ",".join(
2402 "(%s / %s)" % (l, r) for (l, r) in self.local_remote_pairs
2403 ),
2404 )
2405 log.info(
2406 "%s remote columns [%s]",
2407 self.prop,
2408 ",".join("%s" % col for col in self.remote_columns),
2409 )
2410 log.info(
2411 "%s local columns [%s]",
2412 self.prop,
2413 ",".join("%s" % col for col in self.local_columns),
2414 )
2415 log.info("%s relationship direction %s", self.prop, self.direction)
2416
2417 def _sanitize_joins(self) -> None:
2418 """remove the parententity annotation from our join conditions which
2419 can leak in here based on some declarative patterns and maybe others.
2420
2421 "parentmapper" is relied upon both by the ORM evaluator as well as
2422 the use case in _join_fixture_inh_selfref_w_entity
2423 that relies upon it being present, see :ticket:`3364`.
2424
2425 """
2426
2427 self.primaryjoin = _deep_deannotate(
2428 self.primaryjoin, values=("parententity", "proxy_key")
2429 )
2430 if self.secondaryjoin is not None:
2431 self.secondaryjoin = _deep_deannotate(
2432 self.secondaryjoin, values=("parententity", "proxy_key")
2433 )
2434
2435 def _determine_joins(self) -> None:
2436 """Determine the 'primaryjoin' and 'secondaryjoin' attributes,
2437 if not passed to the constructor already.
2438
2439 This is based on analysis of the foreign key relationships
2440 between the parent and target mapped selectables.
2441
2442 """
2443 if self.secondaryjoin is not None and self.secondary is None:
2444 raise sa_exc.ArgumentError(
2445 "Property %s specified with secondary "
2446 "join condition but "
2447 "no secondary argument" % self.prop
2448 )
2449
2450 # find a join between the given mapper's mapped table and
2451 # the given table. will try the mapper's local table first
2452 # for more specificity, then if not found will try the more
2453 # general mapped table, which in the case of inheritance is
2454 # a join.
2455 try:
2456 consider_as_foreign_keys = self.consider_as_foreign_keys or None
2457 if self.secondary is not None:
2458 if self.secondaryjoin is None:
2459 self.secondaryjoin = join_condition(
2460 self.child_persist_selectable,
2461 self.secondary,
2462 a_subset=self.child_local_selectable,
2463 consider_as_foreign_keys=consider_as_foreign_keys,
2464 )
2465 if self.primaryjoin_initial is None:
2466 self.primaryjoin = join_condition(
2467 self.parent_persist_selectable,
2468 self.secondary,
2469 a_subset=self.parent_local_selectable,
2470 consider_as_foreign_keys=consider_as_foreign_keys,
2471 )
2472 else:
2473 self.primaryjoin = self.primaryjoin_initial
2474 else:
2475 if self.primaryjoin_initial is None:
2476 self.primaryjoin = join_condition(
2477 self.parent_persist_selectable,
2478 self.child_persist_selectable,
2479 a_subset=self.parent_local_selectable,
2480 consider_as_foreign_keys=consider_as_foreign_keys,
2481 )
2482 else:
2483 self.primaryjoin = self.primaryjoin_initial
2484 except sa_exc.NoForeignKeysError as nfe:
2485 if self.secondary is not None:
2486 raise sa_exc.NoForeignKeysError(
2487 "Could not determine join "
2488 "condition between parent/child tables on "
2489 "relationship %s - there are no foreign keys "
2490 "linking these tables via secondary table '%s'. "
2491 "Ensure that referencing columns are associated "
2492 "with a ForeignKey or ForeignKeyConstraint, or "
2493 "specify 'primaryjoin' and 'secondaryjoin' "
2494 "expressions." % (self.prop, self.secondary)
2495 ) from nfe
2496 else:
2497 raise sa_exc.NoForeignKeysError(
2498 "Could not determine join "
2499 "condition between parent/child tables on "
2500 "relationship %s - there are no foreign keys "
2501 "linking these tables. "
2502 "Ensure that referencing columns are associated "
2503 "with a ForeignKey or ForeignKeyConstraint, or "
2504 "specify a 'primaryjoin' expression." % self.prop
2505 ) from nfe
2506 except sa_exc.AmbiguousForeignKeysError as afe:
2507 if self.secondary is not None:
2508 raise sa_exc.AmbiguousForeignKeysError(
2509 "Could not determine join "
2510 "condition between parent/child tables on "
2511 "relationship %s - there are multiple foreign key "
2512 "paths linking the tables via secondary table '%s'. "
2513 "Specify the 'foreign_keys' "
2514 "argument, providing a list of those columns which "
2515 "should be counted as containing a foreign key "
2516 "reference from the secondary table to each of the "
2517 "parent and child tables." % (self.prop, self.secondary)
2518 ) from afe
2519 else:
2520 raise sa_exc.AmbiguousForeignKeysError(
2521 "Could not determine join "
2522 "condition between parent/child tables on "
2523 "relationship %s - there are multiple foreign key "
2524 "paths linking the tables. Specify the "
2525 "'foreign_keys' argument, providing a list of those "
2526 "columns which should be counted as containing a "
2527 "foreign key reference to the parent table." % self.prop
2528 ) from afe
2529
2530 @property
2531 def primaryjoin_minus_local(self) -> ColumnElement[bool]:
2532 return _deep_deannotate(self.primaryjoin, values=("local", "remote"))
2533
2534 @property
2535 def secondaryjoin_minus_local(self) -> ColumnElement[bool]:
2536 assert self.secondaryjoin is not None
2537 return _deep_deannotate(self.secondaryjoin, values=("local", "remote"))
2538
2539 @util.memoized_property
2540 def primaryjoin_reverse_remote(self) -> ColumnElement[bool]:
2541 """Return the primaryjoin condition suitable for the
2542 "reverse" direction.
2543
2544 If the primaryjoin was delivered here with pre-existing
2545 "remote" annotations, the local/remote annotations
2546 are reversed. Otherwise, the local/remote annotations
2547 are removed.
2548
2549 """
2550 if self._has_remote_annotations:
2551
2552 def replace(element: _CE, **kw: Any) -> Optional[_CE]:
2553 if "remote" in element._annotations:
2554 v = dict(element._annotations)
2555 del v["remote"]
2556 v["local"] = True
2557 return element._with_annotations(v)
2558 elif "local" in element._annotations:
2559 v = dict(element._annotations)
2560 del v["local"]
2561 v["remote"] = True
2562 return element._with_annotations(v)
2563
2564 return None
2565
2566 return visitors.replacement_traverse(self.primaryjoin, {}, replace)
2567 else:
2568 if self._has_foreign_annotations:
2569 # TODO: coverage
2570 return _deep_deannotate(
2571 self.primaryjoin, values=("local", "remote")
2572 )
2573 else:
2574 return _deep_deannotate(self.primaryjoin)
2575
2576 def _has_annotation(self, clause: ClauseElement, annotation: str) -> bool:
2577 for col in visitors.iterate(clause, {}):
2578 if annotation in col._annotations:
2579 return True
2580 else:
2581 return False
2582
2583 @util.memoized_property
2584 def _has_foreign_annotations(self) -> bool:
2585 return self._has_annotation(self.primaryjoin, "foreign")
2586
2587 @util.memoized_property
2588 def _has_remote_annotations(self) -> bool:
2589 return self._has_annotation(self.primaryjoin, "remote")
2590
2591 def _annotate_fks(self) -> None:
2592 """Annotate the primaryjoin and secondaryjoin
2593 structures with 'foreign' annotations marking columns
2594 considered as foreign.
2595
2596 """
2597 if self._has_foreign_annotations:
2598 return
2599
2600 if self.consider_as_foreign_keys:
2601 self._annotate_from_fk_list()
2602 else:
2603 self._annotate_present_fks()
2604
2605 def _annotate_from_fk_list(self) -> None:
2606 def check_fk(element: _CE, **kw: Any) -> Optional[_CE]:
2607 if element in self.consider_as_foreign_keys:
2608 return element._annotate({"foreign": True})
2609 return None
2610
2611 self.primaryjoin = visitors.replacement_traverse(
2612 self.primaryjoin, {}, check_fk
2613 )
2614 if self.secondaryjoin is not None:
2615 self.secondaryjoin = visitors.replacement_traverse(
2616 self.secondaryjoin, {}, check_fk
2617 )
2618
2619 def _annotate_present_fks(self) -> None:
2620 if self.secondary is not None:
2621 secondarycols = util.column_set(self.secondary.c)
2622 else:
2623 secondarycols = set()
2624
2625 def is_foreign(
2626 a: ColumnElement[Any], b: ColumnElement[Any]
2627 ) -> Optional[ColumnElement[Any]]:
2628 if isinstance(a, schema.Column) and isinstance(b, schema.Column):
2629 if a.references(b):
2630 return a
2631 elif b.references(a):
2632 return b
2633
2634 if secondarycols:
2635 if a in secondarycols and b not in secondarycols:
2636 return a
2637 elif b in secondarycols and a not in secondarycols:
2638 return b
2639
2640 return None
2641
2642 def visit_binary(binary: BinaryExpression[Any]) -> None:
2643 if not isinstance(
2644 binary.left, sql.ColumnElement
2645 ) or not isinstance(binary.right, sql.ColumnElement):
2646 return
2647
2648 if (
2649 "foreign" not in binary.left._annotations
2650 and "foreign" not in binary.right._annotations
2651 ):
2652 col = is_foreign(binary.left, binary.right)
2653 if col is not None:
2654 if col.compare(binary.left):
2655 binary.left = binary.left._annotate({"foreign": True})
2656 elif col.compare(binary.right):
2657 binary.right = binary.right._annotate(
2658 {"foreign": True}
2659 )
2660
2661 self.primaryjoin = visitors.cloned_traverse(
2662 self.primaryjoin, {}, {"binary": visit_binary}
2663 )
2664 if self.secondaryjoin is not None:
2665 self.secondaryjoin = visitors.cloned_traverse(
2666 self.secondaryjoin, {}, {"binary": visit_binary}
2667 )
2668
2669 def _refers_to_parent_table(self) -> bool:
2670 """Return True if the join condition contains column
2671 comparisons where both columns are in both tables.
2672
2673 """
2674 pt = self.parent_persist_selectable
2675 mt = self.child_persist_selectable
2676 result = False
2677
2678 def visit_binary(binary: BinaryExpression[Any]) -> None:
2679 nonlocal result
2680 c, f = binary.left, binary.right
2681 if (
2682 isinstance(c, expression.ColumnClause)
2683 and isinstance(f, expression.ColumnClause)
2684 and pt.is_derived_from(c.table)
2685 and pt.is_derived_from(f.table)
2686 and mt.is_derived_from(c.table)
2687 and mt.is_derived_from(f.table)
2688 ):
2689 result = True
2690
2691 visitors.traverse(self.primaryjoin, {}, {"binary": visit_binary})
2692 return result
2693
2694 def _tables_overlap(self) -> bool:
2695 """Return True if parent/child tables have some overlap."""
2696
2697 return selectables_overlap(
2698 self.parent_persist_selectable, self.child_persist_selectable
2699 )
2700
2701 def _annotate_remote(self) -> None:
2702 """Annotate the primaryjoin and secondaryjoin
2703 structures with 'remote' annotations marking columns
2704 considered as part of the 'remote' side.
2705
2706 """
2707 if self._has_remote_annotations:
2708 return
2709
2710 if self.secondary is not None:
2711 self._annotate_remote_secondary()
2712 elif self._local_remote_pairs or self._remote_side:
2713 self._annotate_remote_from_args()
2714 elif self._refers_to_parent_table():
2715 self._annotate_selfref(
2716 lambda col: "foreign" in col._annotations, False
2717 )
2718 elif self._tables_overlap():
2719 self._annotate_remote_with_overlap()
2720 else:
2721 self._annotate_remote_distinct_selectables()
2722
2723 def _annotate_remote_secondary(self) -> None:
2724 """annotate 'remote' in primaryjoin, secondaryjoin
2725 when 'secondary' is present.
2726
2727 """
2728
2729 assert self.secondary is not None
2730 fixed_secondary = self.secondary
2731
2732 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2733 if fixed_secondary.c.contains_column(element):
2734 return element._annotate({"remote": True})
2735 return None
2736
2737 self.primaryjoin = visitors.replacement_traverse(
2738 self.primaryjoin, {}, repl
2739 )
2740
2741 assert self.secondaryjoin is not None
2742 self.secondaryjoin = visitors.replacement_traverse(
2743 self.secondaryjoin, {}, repl
2744 )
2745
2746 def _annotate_selfref(
2747 self, fn: Callable[[ColumnElement[Any]], bool], remote_side_given: bool
2748 ) -> None:
2749 """annotate 'remote' in primaryjoin, secondaryjoin
2750 when the relationship is detected as self-referential.
2751
2752 """
2753
2754 def visit_binary(binary: BinaryExpression[Any]) -> None:
2755 equated = binary.left.compare(binary.right)
2756 if isinstance(binary.left, expression.ColumnClause) and isinstance(
2757 binary.right, expression.ColumnClause
2758 ):
2759 # assume one to many - FKs are "remote"
2760 if fn(binary.left):
2761 binary.left = binary.left._annotate({"remote": True})
2762 if fn(binary.right) and not equated:
2763 binary.right = binary.right._annotate({"remote": True})
2764 elif not remote_side_given:
2765 self._warn_non_column_elements()
2766
2767 self.primaryjoin = visitors.cloned_traverse(
2768 self.primaryjoin, {}, {"binary": visit_binary}
2769 )
2770
2771 def _annotate_remote_from_args(self) -> None:
2772 """annotate 'remote' in primaryjoin, secondaryjoin
2773 when the 'remote_side' or '_local_remote_pairs'
2774 arguments are used.
2775
2776 """
2777 if self._local_remote_pairs:
2778 if self._remote_side:
2779 raise sa_exc.ArgumentError(
2780 "remote_side argument is redundant "
2781 "against more detailed _local_remote_side "
2782 "argument."
2783 )
2784
2785 remote_side = [r for (l, r) in self._local_remote_pairs]
2786 else:
2787 remote_side = self._remote_side
2788
2789 if self._refers_to_parent_table():
2790 self._annotate_selfref(lambda col: col in remote_side, True)
2791 else:
2792
2793 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2794 # use set() to avoid generating ``__eq__()`` expressions
2795 # against each element
2796 if element in set(remote_side):
2797 return element._annotate({"remote": True})
2798 return None
2799
2800 self.primaryjoin = visitors.replacement_traverse(
2801 self.primaryjoin, {}, repl
2802 )
2803
2804 def _annotate_remote_with_overlap(self) -> None:
2805 """annotate 'remote' in primaryjoin, secondaryjoin
2806 when the parent/child tables have some set of
2807 tables in common, though is not a fully self-referential
2808 relationship.
2809
2810 """
2811
2812 def visit_binary(binary: BinaryExpression[Any]) -> None:
2813 binary.left, binary.right = proc_left_right(
2814 binary.left, binary.right
2815 )
2816 binary.right, binary.left = proc_left_right(
2817 binary.right, binary.left
2818 )
2819
2820 check_entities = (
2821 self.prop is not None and self.prop.mapper is not self.prop.parent
2822 )
2823
2824 def proc_left_right(
2825 left: ColumnElement[Any], right: ColumnElement[Any]
2826 ) -> Tuple[ColumnElement[Any], ColumnElement[Any]]:
2827 if isinstance(left, expression.ColumnClause) and isinstance(
2828 right, expression.ColumnClause
2829 ):
2830 if self.child_persist_selectable.c.contains_column(
2831 right
2832 ) and self.parent_persist_selectable.c.contains_column(left):
2833 right = right._annotate({"remote": True})
2834 elif (
2835 check_entities
2836 and right._annotations.get("parentmapper") is self.prop.mapper
2837 ):
2838 right = right._annotate({"remote": True})
2839 elif (
2840 check_entities
2841 and left._annotations.get("parentmapper") is self.prop.mapper
2842 ):
2843 left = left._annotate({"remote": True})
2844 else:
2845 self._warn_non_column_elements()
2846
2847 return left, right
2848
2849 self.primaryjoin = visitors.cloned_traverse(
2850 self.primaryjoin, {}, {"binary": visit_binary}
2851 )
2852
2853 def _annotate_remote_distinct_selectables(self) -> None:
2854 """annotate 'remote' in primaryjoin, secondaryjoin
2855 when the parent/child tables are entirely
2856 separate.
2857
2858 """
2859
2860 def repl(element: _CE, **kw: Any) -> Optional[_CE]:
2861 if self.child_persist_selectable.c.contains_column(element) and (
2862 not self.parent_local_selectable.c.contains_column(element)
2863 or self.child_local_selectable.c.contains_column(element)
2864 ):
2865 return element._annotate({"remote": True})
2866 return None
2867
2868 self.primaryjoin = visitors.replacement_traverse(
2869 self.primaryjoin, {}, repl
2870 )
2871
2872 def _warn_non_column_elements(self) -> None:
2873 util.warn(
2874 "Non-simple column elements in primary "
2875 "join condition for property %s - consider using "
2876 "remote() annotations to mark the remote side." % self.prop
2877 )
2878
2879 def _annotate_local(self) -> None:
2880 """Annotate the primaryjoin and secondaryjoin
2881 structures with 'local' annotations.
2882
2883 This annotates all column elements found
2884 simultaneously in the parent table
2885 and the join condition that don't have a
2886 'remote' annotation set up from
2887 _annotate_remote() or user-defined.
2888
2889 """
2890 if self._has_annotation(self.primaryjoin, "local"):
2891 return
2892
2893 if self._local_remote_pairs:
2894 local_side = util.column_set(
2895 [l for (l, r) in self._local_remote_pairs]
2896 )
2897 else:
2898 local_side = util.column_set(self.parent_persist_selectable.c)
2899
2900 def locals_(element: _CE, **kw: Any) -> Optional[_CE]:
2901 if "remote" not in element._annotations and element in local_side:
2902 return element._annotate({"local": True})
2903 return None
2904
2905 self.primaryjoin = visitors.replacement_traverse(
2906 self.primaryjoin, {}, locals_
2907 )
2908
2909 def _annotate_parentmapper(self) -> None:
2910 def parentmappers_(element: _CE, **kw: Any) -> Optional[_CE]:
2911 if "remote" in element._annotations:
2912 return element._annotate({"parentmapper": self.prop.mapper})
2913 elif "local" in element._annotations:
2914 return element._annotate({"parentmapper": self.prop.parent})
2915 return None
2916
2917 self.primaryjoin = visitors.replacement_traverse(
2918 self.primaryjoin, {}, parentmappers_
2919 )
2920
2921 def _check_remote_side(self) -> None:
2922 if not self.local_remote_pairs:
2923 raise sa_exc.ArgumentError(
2924 "Relationship %s could "
2925 "not determine any unambiguous local/remote column "
2926 "pairs based on join condition and remote_side "
2927 "arguments. "
2928 "Consider using the remote() annotation to "
2929 "accurately mark those elements of the join "
2930 "condition that are on the remote side of "
2931 "the relationship." % (self.prop,)
2932 )
2933 else:
2934 not_target = util.column_set(
2935 self.parent_persist_selectable.c
2936 ).difference(self.child_persist_selectable.c)
2937
2938 for _, rmt in self.local_remote_pairs:
2939 if rmt in not_target:
2940 util.warn(
2941 "Expression %s is marked as 'remote', but these "
2942 "column(s) are local to the local side. The "
2943 "remote() annotation is needed only for a "
2944 "self-referential relationship where both sides "
2945 "of the relationship refer to the same tables."
2946 % (rmt,)
2947 )
2948
2949 def _check_foreign_cols(
2950 self, join_condition: ColumnElement[bool], primary: bool
2951 ) -> None:
2952 """Check the foreign key columns collected and emit error
2953 messages."""
2954
2955 can_sync = False
2956
2957 foreign_cols = self._gather_columns_with_annotation(
2958 join_condition, "foreign"
2959 )
2960
2961 has_foreign = bool(foreign_cols)
2962
2963 if primary:
2964 can_sync = bool(self.synchronize_pairs)
2965 else:
2966 can_sync = bool(self.secondary_synchronize_pairs)
2967
2968 if (
2969 self.support_sync
2970 and can_sync
2971 or (not self.support_sync and has_foreign)
2972 ):
2973 return
2974
2975 # from here below is just determining the best error message
2976 # to report. Check for a join condition using any operator
2977 # (not just ==), perhaps they need to turn on "viewonly=True".
2978 if self.support_sync and has_foreign and not can_sync:
2979 err = (
2980 "Could not locate any simple equality expressions "
2981 "involving locally mapped foreign key columns for "
2982 "%s join condition "
2983 "'%s' on relationship %s."
2984 % (
2985 primary and "primary" or "secondary",
2986 join_condition,
2987 self.prop,
2988 )
2989 )
2990 err += (
2991 " Ensure that referencing columns are associated "
2992 "with a ForeignKey or ForeignKeyConstraint, or are "
2993 "annotated in the join condition with the foreign() "
2994 "annotation. To allow comparison operators other than "
2995 "'==', the relationship can be marked as viewonly=True."
2996 )
2997
2998 raise sa_exc.ArgumentError(err)
2999 else:
3000 err = (
3001 "Could not locate any relevant foreign key columns "
3002 "for %s join condition '%s' on relationship %s."
3003 % (
3004 primary and "primary" or "secondary",
3005 join_condition,
3006 self.prop,
3007 )
3008 )
3009 err += (
3010 " Ensure that referencing columns are associated "
3011 "with a ForeignKey or ForeignKeyConstraint, or are "
3012 "annotated in the join condition with the foreign() "
3013 "annotation."
3014 )
3015 raise sa_exc.ArgumentError(err)
3016
3017 def _determine_direction(self) -> None:
3018 """Determine if this relationship is one to many, many to one,
3019 many to many.
3020
3021 """
3022 if self.secondaryjoin is not None:
3023 self.direction = MANYTOMANY
3024 else:
3025 parentcols = util.column_set(self.parent_persist_selectable.c)
3026 targetcols = util.column_set(self.child_persist_selectable.c)
3027
3028 # fk collection which suggests ONETOMANY.
3029 onetomany_fk = targetcols.intersection(self.foreign_key_columns)
3030
3031 # fk collection which suggests MANYTOONE.
3032
3033 manytoone_fk = parentcols.intersection(self.foreign_key_columns)
3034
3035 if onetomany_fk and manytoone_fk:
3036 # fks on both sides. test for overlap of local/remote
3037 # with foreign key.
3038 # we will gather columns directly from their annotations
3039 # without deannotating, so that we can distinguish on a column
3040 # that refers to itself.
3041
3042 # 1. columns that are both remote and FK suggest
3043 # onetomany.
3044 onetomany_local = self._gather_columns_with_annotation(
3045 self.primaryjoin, "remote", "foreign"
3046 )
3047
3048 # 2. columns that are FK but are not remote (e.g. local)
3049 # suggest manytoone.
3050 manytoone_local = {
3051 c
3052 for c in self._gather_columns_with_annotation(
3053 self.primaryjoin, "foreign"
3054 )
3055 if "remote" not in c._annotations
3056 }
3057
3058 # 3. if both collections are present, remove columns that
3059 # refer to themselves. This is for the case of
3060 # and_(Me.id == Me.remote_id, Me.version == Me.version)
3061 if onetomany_local and manytoone_local:
3062 self_equated = self.remote_columns.intersection(
3063 self.local_columns
3064 )
3065 onetomany_local = onetomany_local.difference(self_equated)
3066 manytoone_local = manytoone_local.difference(self_equated)
3067
3068 # at this point, if only one or the other collection is
3069 # present, we know the direction, otherwise it's still
3070 # ambiguous.
3071
3072 if onetomany_local and not manytoone_local:
3073 self.direction = ONETOMANY
3074 elif manytoone_local and not onetomany_local:
3075 self.direction = MANYTOONE
3076 else:
3077 raise sa_exc.ArgumentError(
3078 "Can't determine relationship"
3079 " direction for relationship '%s' - foreign "
3080 "key columns within the join condition are present "
3081 "in both the parent and the child's mapped tables. "
3082 "Ensure that only those columns referring "
3083 "to a parent column are marked as foreign, "
3084 "either via the foreign() annotation or "
3085 "via the foreign_keys argument." % self.prop
3086 )
3087 elif onetomany_fk:
3088 self.direction = ONETOMANY
3089 elif manytoone_fk:
3090 self.direction = MANYTOONE
3091 else:
3092 raise sa_exc.ArgumentError(
3093 "Can't determine relationship "
3094 "direction for relationship '%s' - foreign "
3095 "key columns are present in neither the parent "
3096 "nor the child's mapped tables" % self.prop
3097 )
3098
3099 def _deannotate_pairs(
3100 self, collection: _ColumnPairIterable
3101 ) -> _MutableColumnPairs:
3102 """provide deannotation for the various lists of
3103 pairs, so that using them in hashes doesn't incur
3104 high-overhead __eq__() comparisons against
3105 original columns mapped.
3106
3107 """
3108 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3109
3110 def _setup_pairs(self) -> None:
3111 sync_pairs: _MutableColumnPairs = []
3112 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3113 util.OrderedSet([])
3114 )
3115 secondary_sync_pairs: _MutableColumnPairs = []
3116
3117 def go(
3118 joincond: ColumnElement[bool],
3119 collection: _MutableColumnPairs,
3120 ) -> None:
3121 def visit_binary(
3122 binary: BinaryExpression[Any],
3123 left: ColumnElement[Any],
3124 right: ColumnElement[Any],
3125 ) -> None:
3126 if (
3127 "remote" in right._annotations
3128 and "remote" not in left._annotations
3129 and self.can_be_synced_fn(left)
3130 ):
3131 lrp.add((left, right))
3132 elif (
3133 "remote" in left._annotations
3134 and "remote" not in right._annotations
3135 and self.can_be_synced_fn(right)
3136 ):
3137 lrp.add((right, left))
3138 if binary.operator is operators.eq and self.can_be_synced_fn(
3139 left, right
3140 ):
3141 if "foreign" in right._annotations:
3142 collection.append((left, right))
3143 elif "foreign" in left._annotations:
3144 collection.append((right, left))
3145
3146 visit_binary_product(visit_binary, joincond)
3147
3148 for joincond, collection in [
3149 (self.primaryjoin, sync_pairs),
3150 (self.secondaryjoin, secondary_sync_pairs),
3151 ]:
3152 if joincond is None:
3153 continue
3154 go(joincond, collection)
3155
3156 self.local_remote_pairs = self._deannotate_pairs(lrp)
3157 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3158 self.secondary_synchronize_pairs = self._deannotate_pairs(
3159 secondary_sync_pairs
3160 )
3161
3162 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3163 ColumnElement[Any],
3164 weakref.WeakKeyDictionary[
3165 RelationshipProperty[Any], ColumnElement[Any]
3166 ],
3167 ] = weakref.WeakKeyDictionary()
3168
3169 def _warn_for_conflicting_sync_targets(self) -> None:
3170 if not self.support_sync:
3171 return
3172
3173 # we would like to detect if we are synchronizing any column
3174 # pairs in conflict with another relationship that wishes to sync
3175 # an entirely different column to the same target. This is a
3176 # very rare edge case so we will try to minimize the memory/overhead
3177 # impact of this check
3178 for from_, to_ in [
3179 (from_, to_) for (from_, to_) in self.synchronize_pairs
3180 ] + [
3181 (from_, to_) for (from_, to_) in self.secondary_synchronize_pairs
3182 ]:
3183 # save ourselves a ton of memory and overhead by only
3184 # considering columns that are subject to a overlapping
3185 # FK constraints at the core level. This condition can arise
3186 # if multiple relationships overlap foreign() directly, but
3187 # we're going to assume it's typically a ForeignKeyConstraint-
3188 # level configuration that benefits from this warning.
3189
3190 if to_ not in self._track_overlapping_sync_targets:
3191 self._track_overlapping_sync_targets[to_] = (
3192 weakref.WeakKeyDictionary({self.prop: from_})
3193 )
3194 else:
3195 other_props = []
3196 prop_to_from = self._track_overlapping_sync_targets[to_]
3197
3198 for pr, fr_ in prop_to_from.items():
3199 if (
3200 not pr.mapper._dispose_called
3201 and pr not in self.prop._reverse_property
3202 and pr.key not in self.prop._overlaps
3203 and self.prop.key not in pr._overlaps
3204 # note: the "__*" symbol is used internally by
3205 # SQLAlchemy as a general means of suppressing the
3206 # overlaps warning for some extension cases, however
3207 # this is not currently
3208 # a publicly supported symbol and may change at
3209 # any time.
3210 and "__*" not in self.prop._overlaps
3211 and "__*" not in pr._overlaps
3212 and not self.prop.parent.is_sibling(pr.parent)
3213 and not self.prop.mapper.is_sibling(pr.mapper)
3214 and not self.prop.parent.is_sibling(pr.mapper)
3215 and not self.prop.mapper.is_sibling(pr.parent)
3216 and (
3217 self.prop.key != pr.key
3218 or not self.prop.parent.common_parent(pr.parent)
3219 )
3220 ):
3221 other_props.append((pr, fr_))
3222
3223 if other_props:
3224 util.warn(
3225 "relationship '%s' will copy column %s to column %s, "
3226 "which conflicts with relationship(s): %s. "
3227 "If this is not the intention, consider if these "
3228 "relationships should be linked with "
3229 "back_populates, or if viewonly=True should be "
3230 "applied to one or more if they are read-only. "
3231 "For the less common case that foreign key "
3232 "constraints are partially overlapping, the "
3233 "orm.foreign() "
3234 "annotation can be used to isolate the columns that "
3235 "should be written towards. To silence this "
3236 "warning, add the parameter 'overlaps=\"%s\"' to the "
3237 "'%s' relationship."
3238 % (
3239 self.prop,
3240 from_,
3241 to_,
3242 ", ".join(
3243 sorted(
3244 "'%s' (copies %s to %s)" % (pr, fr_, to_)
3245 for (pr, fr_) in other_props
3246 )
3247 ),
3248 ",".join(sorted(pr.key for pr, fr in other_props)),
3249 self.prop,
3250 ),
3251 code="qzyx",
3252 )
3253 self._track_overlapping_sync_targets[to_][self.prop] = from_
3254
3255 @util.memoized_property
3256 def remote_columns(self) -> Set[ColumnElement[Any]]:
3257 return self._gather_join_annotations("remote")
3258
3259 @util.memoized_property
3260 def local_columns(self) -> Set[ColumnElement[Any]]:
3261 return self._gather_join_annotations("local")
3262
3263 @util.memoized_property
3264 def foreign_key_columns(self) -> Set[ColumnElement[Any]]:
3265 return self._gather_join_annotations("foreign")
3266
3267 def _gather_join_annotations(
3268 self, annotation: str
3269 ) -> Set[ColumnElement[Any]]:
3270 s = set(
3271 self._gather_columns_with_annotation(self.primaryjoin, annotation)
3272 )
3273 if self.secondaryjoin is not None:
3274 s.update(
3275 self._gather_columns_with_annotation(
3276 self.secondaryjoin, annotation
3277 )
3278 )
3279 return {x._deannotate() for x in s}
3280
3281 def _gather_columns_with_annotation(
3282 self, clause: ColumnElement[Any], *annotation: Iterable[str]
3283 ) -> Set[ColumnElement[Any]]:
3284 annotation_set = set(annotation)
3285 return {
3286 cast(ColumnElement[Any], col)
3287 for col in visitors.iterate(clause, {})
3288 if annotation_set.issubset(col._annotations)
3289 }
3290
3291 @util.memoized_property
3292 def _secondary_lineage_set(self) -> FrozenSet[ColumnElement[Any]]:
3293 if self.secondary is not None:
3294 return frozenset(
3295 itertools.chain(*[c.proxy_set for c in self.secondary.c])
3296 )
3297 else:
3298 return util.EMPTY_SET
3299
3300 def join_targets(
3301 self,
3302 source_selectable: Optional[FromClause],
3303 dest_selectable: FromClause,
3304 aliased: bool,
3305 single_crit: Optional[ColumnElement[bool]] = None,
3306 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
3307 ) -> Tuple[
3308 ColumnElement[bool],
3309 Optional[ColumnElement[bool]],
3310 Optional[FromClause],
3311 Optional[ClauseAdapter],
3312 FromClause,
3313 ]:
3314 """Given a source and destination selectable, create a
3315 join between them.
3316
3317 This takes into account aliasing the join clause
3318 to reference the appropriate corresponding columns
3319 in the target objects, as well as the extra child
3320 criterion, equivalent column sets, etc.
3321
3322 """
3323 # place a barrier on the destination such that
3324 # replacement traversals won't ever dig into it.
3325 # its internal structure remains fixed
3326 # regardless of context.
3327 dest_selectable = _shallow_annotate(
3328 dest_selectable, {"no_replacement_traverse": True}
3329 )
3330
3331 primaryjoin, secondaryjoin, secondary = (
3332 self.primaryjoin,
3333 self.secondaryjoin,
3334 self.secondary,
3335 )
3336
3337 # adjust the join condition for single table inheritance,
3338 # in the case that the join is to a subclass
3339 # this is analogous to the
3340 # "_adjust_for_single_table_inheritance()" method in Query.
3341
3342 if single_crit is not None:
3343 if secondaryjoin is not None:
3344 secondaryjoin = secondaryjoin & single_crit
3345 else:
3346 primaryjoin = primaryjoin & single_crit
3347
3348 if extra_criteria:
3349
3350 def mark_exclude_cols(
3351 elem: SupportsAnnotations, annotations: _AnnotationDict
3352 ) -> SupportsAnnotations:
3353 """note unrelated columns in the "extra criteria" as either
3354 should be adapted or not adapted, even though they are not
3355 part of our "local" or "remote" side.
3356
3357 see #9779 for this case, as well as #11010 for a follow up
3358
3359 """
3360
3361 parentmapper_for_element = elem._annotations.get(
3362 "parentmapper", None
3363 )
3364
3365 if (
3366 parentmapper_for_element is not self.prop.parent
3367 and parentmapper_for_element is not self.prop.mapper
3368 and elem not in self._secondary_lineage_set
3369 ):
3370 return _safe_annotate(elem, annotations)
3371 else:
3372 return elem
3373
3374 extra_criteria = tuple(
3375 _deep_annotate(
3376 elem,
3377 {"should_not_adapt": True},
3378 annotate_callable=mark_exclude_cols,
3379 )
3380 for elem in extra_criteria
3381 )
3382
3383 if secondaryjoin is not None:
3384 secondaryjoin = secondaryjoin & sql.and_(*extra_criteria)
3385 else:
3386 primaryjoin = primaryjoin & sql.and_(*extra_criteria)
3387
3388 if aliased:
3389 if secondary is not None:
3390 secondary = secondary._anonymous_fromclause(flat=True)
3391 primary_aliasizer = ClauseAdapter(
3392 secondary,
3393 exclude_fn=_local_col_exclude,
3394 )
3395 secondary_aliasizer = ClauseAdapter(
3396 dest_selectable, equivalents=self.child_equivalents
3397 ).chain(primary_aliasizer)
3398 if source_selectable is not None:
3399 primary_aliasizer = ClauseAdapter(
3400 secondary,
3401 exclude_fn=_local_col_exclude,
3402 ).chain(
3403 ClauseAdapter(
3404 source_selectable,
3405 equivalents=self.parent_equivalents,
3406 )
3407 )
3408
3409 secondaryjoin = secondary_aliasizer.traverse(secondaryjoin)
3410 else:
3411 primary_aliasizer = ClauseAdapter(
3412 dest_selectable,
3413 exclude_fn=_local_col_exclude,
3414 equivalents=self.child_equivalents,
3415 )
3416 if source_selectable is not None:
3417 primary_aliasizer.chain(
3418 ClauseAdapter(
3419 source_selectable,
3420 exclude_fn=_remote_col_exclude,
3421 equivalents=self.parent_equivalents,
3422 )
3423 )
3424 secondary_aliasizer = None
3425
3426 primaryjoin = primary_aliasizer.traverse(primaryjoin)
3427 target_adapter = secondary_aliasizer or primary_aliasizer
3428 target_adapter.exclude_fn = None
3429 else:
3430 target_adapter = None
3431 return (
3432 primaryjoin,
3433 secondaryjoin,
3434 secondary,
3435 target_adapter,
3436 dest_selectable,
3437 )
3438
3439 def create_lazy_clause(self, reverse_direction: bool = False) -> Tuple[
3440 ColumnElement[bool],
3441 Dict[str, ColumnElement[Any]],
3442 Dict[ColumnElement[Any], ColumnElement[Any]],
3443 ]:
3444 binds: Dict[ColumnElement[Any], BindParameter[Any]] = {}
3445 equated_columns: Dict[ColumnElement[Any], ColumnElement[Any]] = {}
3446
3447 has_secondary = self.secondaryjoin is not None
3448
3449 if has_secondary:
3450 lookup = collections.defaultdict(list)
3451 for l, r in self.local_remote_pairs:
3452 lookup[l].append((l, r))
3453 equated_columns[r] = l
3454 elif not reverse_direction:
3455 for l, r in self.local_remote_pairs:
3456 equated_columns[r] = l
3457 else:
3458 for l, r in self.local_remote_pairs:
3459 equated_columns[l] = r
3460
3461 def col_to_bind(
3462 element: ColumnElement[Any], **kw: Any
3463 ) -> Optional[BindParameter[Any]]:
3464 if (
3465 (not reverse_direction and "local" in element._annotations)
3466 or reverse_direction
3467 and (
3468 (has_secondary and element in lookup)
3469 or (not has_secondary and "remote" in element._annotations)
3470 )
3471 ):
3472 if element not in binds:
3473 binds[element] = sql.bindparam(
3474 None, None, type_=element.type, unique=True
3475 )
3476 return binds[element]
3477 return None
3478
3479 lazywhere = self.primaryjoin
3480 if self.secondaryjoin is None or not reverse_direction:
3481 lazywhere = visitors.replacement_traverse(
3482 lazywhere, {}, col_to_bind
3483 )
3484
3485 if self.secondaryjoin is not None:
3486 secondaryjoin = self.secondaryjoin
3487 if reverse_direction:
3488 secondaryjoin = visitors.replacement_traverse(
3489 secondaryjoin, {}, col_to_bind
3490 )
3491 lazywhere = sql.and_(lazywhere, secondaryjoin)
3492
3493 bind_to_col = {binds[col].key: col for col in binds}
3494
3495 return lazywhere, bind_to_col, equated_columns
3496
3497
3498class _ColInAnnotations:
3499 """Serializable object that tests for names in c._annotations.
3500
3501 TODO: does this need to be serializable anymore? can we find what the
3502 use case was for that?
3503
3504 """
3505
3506 __slots__ = ("names",)
3507
3508 def __init__(self, *names: str):
3509 self.names = frozenset(names)
3510
3511 def __call__(self, c: ClauseElement) -> bool:
3512 return bool(self.names.intersection(c._annotations))
3513
3514
3515_local_col_exclude = _ColInAnnotations("local", "should_not_adapt")
3516_remote_col_exclude = _ColInAnnotations("remote", "should_not_adapt")
3517
3518
3519class Relationship(
3520 RelationshipProperty[_T],
3521 _DeclarativeMapped[_T],
3522):
3523 """Describes an object property that holds a single item or list
3524 of items that correspond to a related database table.
3525
3526 Public constructor is the :func:`_orm.relationship` function.
3527
3528 .. seealso::
3529
3530 :ref:`relationship_config_toplevel`
3531
3532 .. versionchanged:: 2.0 Added :class:`_orm.Relationship` as a Declarative
3533 compatible subclass for :class:`_orm.RelationshipProperty`.
3534
3535 """
3536
3537 inherit_cache = True
3538 """:meta private:"""
3539
3540
3541class _RelationshipDeclared( # type: ignore[misc]
3542 Relationship[_T],
3543 WriteOnlyMapped[_T], # not compatible with Mapped[_T]
3544 DynamicMapped[_T], # not compatible with Mapped[_T]
3545):
3546 """Relationship subclass used implicitly for declarative mapping."""
3547
3548 inherit_cache = True
3549 """:meta private:"""
3550
3551 @classmethod
3552 def _mapper_property_name(cls) -> str:
3553 return "Relationship"