1# orm/attributes.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Defines instrumentation for class attributes and their interaction
10with instances.
11
12This module is usually not directly visible to user applications, but
13defines a large part of the ORM's interactivity.
14
15
16"""
17
18from __future__ import annotations
19
20import dataclasses
21import operator
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import ClassVar
26from typing import Dict
27from typing import Iterable
28from typing import List
29from typing import NamedTuple
30from typing import Optional
31from typing import overload
32from typing import Sequence
33from typing import Tuple
34from typing import Type
35from typing import TYPE_CHECKING
36from typing import TypeVar
37from typing import Union
38
39from . import collections
40from . import exc as orm_exc
41from . import interfaces
42from ._typing import insp_is_aliased_class
43from .base import _DeclarativeMapped
44from .base import ATTR_EMPTY
45from .base import ATTR_WAS_SET
46from .base import CALLABLES_OK
47from .base import DEFERRED_HISTORY_LOAD
48from .base import DONT_SET
49from .base import INCLUDE_PENDING_MUTATIONS # noqa
50from .base import INIT_OK
51from .base import instance_dict as instance_dict
52from .base import instance_state as instance_state
53from .base import instance_str
54from .base import LOAD_AGAINST_COMMITTED
55from .base import LoaderCallableStatus
56from .base import manager_of_class as manager_of_class
57from .base import Mapped as Mapped # noqa
58from .base import NEVER_SET # noqa
59from .base import NO_AUTOFLUSH
60from .base import NO_CHANGE # noqa
61from .base import NO_KEY
62from .base import NO_RAISE
63from .base import NO_VALUE
64from .base import NON_PERSISTENT_OK # noqa
65from .base import opt_manager_of_class as opt_manager_of_class
66from .base import PASSIVE_CLASS_MISMATCH # noqa
67from .base import PASSIVE_NO_FETCH
68from .base import PASSIVE_NO_FETCH_RELATED # noqa
69from .base import PASSIVE_NO_INITIALIZE
70from .base import PASSIVE_NO_RESULT
71from .base import PASSIVE_OFF
72from .base import PASSIVE_ONLY_PERSISTENT
73from .base import PASSIVE_RETURN_NO_VALUE
74from .base import PassiveFlag
75from .base import RELATED_OBJECT_OK # noqa
76from .base import SQL_OK # noqa
77from .base import SQLORMExpression
78from .base import state_str
79from .. import event
80from .. import exc
81from .. import inspection
82from .. import util
83from ..event import dispatcher
84from ..event import EventTarget
85from ..sql import base as sql_base
86from ..sql import cache_key
87from ..sql import coercions
88from ..sql import roles
89from ..sql import visitors
90from ..sql.cache_key import HasCacheKey
91from ..sql.visitors import _TraverseInternalsType
92from ..sql.visitors import InternalTraversal
93from ..util.typing import Literal
94from ..util.typing import Self
95from ..util.typing import TypeGuard
96
97if TYPE_CHECKING:
98 from ._typing import _EntityType
99 from ._typing import _ExternalEntityType
100 from ._typing import _InstanceDict
101 from ._typing import _InternalEntityType
102 from ._typing import _LoaderCallable
103 from ._typing import _O
104 from .collections import _AdaptedCollectionProtocol
105 from .collections import CollectionAdapter
106 from .interfaces import MapperProperty
107 from .relationships import RelationshipProperty
108 from .state import InstanceState
109 from .util import AliasedInsp
110 from .writeonly import _WriteOnlyAttributeImpl
111 from ..event.base import _Dispatch
112 from ..sql._typing import _ColumnExpressionArgument
113 from ..sql._typing import _DMLColumnArgument
114 from ..sql._typing import _InfoType
115 from ..sql._typing import _PropagateAttrsType
116 from ..sql.annotation import _AnnotationDict
117 from ..sql.elements import ColumnElement
118 from ..sql.elements import Label
119 from ..sql.operators import OperatorType
120 from ..sql.selectable import FromClause
121
122
123_T = TypeVar("_T")
124_T_co = TypeVar("_T_co", bound=Any, covariant=True)
125
126
127_AllPendingType = Sequence[
128 Tuple[Optional["InstanceState[Any]"], Optional[object]]
129]
130
131
132_UNKNOWN_ATTR_KEY = object()
133
134
135@inspection._self_inspects
136class QueryableAttribute(
137 _DeclarativeMapped[_T_co],
138 SQLORMExpression[_T_co],
139 interfaces.InspectionAttr,
140 interfaces.PropComparator[_T_co],
141 roles.JoinTargetRole,
142 roles.OnClauseRole,
143 sql_base.Immutable,
144 cache_key.SlotsMemoizedHasCacheKey,
145 util.MemoizedSlots,
146 EventTarget,
147):
148 """Base class for :term:`descriptor` objects that intercept
149 attribute events on behalf of a :class:`.MapperProperty`
150 object. The actual :class:`.MapperProperty` is accessible
151 via the :attr:`.QueryableAttribute.property`
152 attribute.
153
154
155 .. seealso::
156
157 :class:`.InstrumentedAttribute`
158
159 :class:`.MapperProperty`
160
161 :attr:`_orm.Mapper.all_orm_descriptors`
162
163 :attr:`_orm.Mapper.attrs`
164 """
165
166 __slots__ = (
167 "class_",
168 "key",
169 "impl",
170 "comparator",
171 "property",
172 "parent",
173 "expression",
174 "_of_type",
175 "_extra_criteria",
176 "_slots_dispatch",
177 "_propagate_attrs",
178 "_doc",
179 )
180
181 is_attribute = True
182
183 dispatch: dispatcher[QueryableAttribute[_T_co]]
184
185 class_: _ExternalEntityType[Any]
186 key: str
187 parententity: _InternalEntityType[Any]
188 impl: _AttributeImpl
189 comparator: interfaces.PropComparator[_T_co]
190 _of_type: Optional[_InternalEntityType[Any]]
191 _extra_criteria: Tuple[ColumnElement[bool], ...]
192 _doc: Optional[str]
193
194 # PropComparator has a __visit_name__ to participate within
195 # traversals. Disambiguate the attribute vs. a comparator.
196 __visit_name__ = "orm_instrumented_attribute"
197
198 def __init__(
199 self,
200 class_: _ExternalEntityType[_O],
201 key: str,
202 parententity: _InternalEntityType[_O],
203 comparator: interfaces.PropComparator[_T_co],
204 impl: Optional[_AttributeImpl] = None,
205 of_type: Optional[_InternalEntityType[Any]] = None,
206 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
207 ):
208 self.class_ = class_
209 self.key = key
210
211 self._parententity = self.parent = parententity
212
213 # this attribute is non-None after mappers are set up, however in the
214 # interim class manager setup, there's a check for None to see if it
215 # needs to be populated, so we assign None here leaving the attribute
216 # in a temporarily not-type-correct state
217 self.impl = impl # type: ignore
218
219 assert comparator is not None
220 self.comparator = comparator
221 self._of_type = of_type
222 self._extra_criteria = extra_criteria
223 self._doc = None
224
225 manager = opt_manager_of_class(class_)
226 # manager is None in the case of AliasedClass
227 if manager:
228 # propagate existing event listeners from
229 # immediate superclass
230 for base in manager._bases:
231 if key in base:
232 self.dispatch._update(base[key].dispatch)
233 if base[key].dispatch._active_history:
234 self.dispatch._active_history = True # type: ignore
235
236 _cache_key_traversal = [
237 ("key", visitors.ExtendedInternalTraversal.dp_string),
238 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
239 ("_of_type", visitors.ExtendedInternalTraversal.dp_multi),
240 ("_extra_criteria", visitors.InternalTraversal.dp_clauseelement_list),
241 ]
242
243 def __reduce__(self) -> Any:
244 # this method is only used in terms of the
245 # sqlalchemy.ext.serializer extension
246 return (
247 _queryable_attribute_unreduce,
248 (
249 self.key,
250 self._parententity.mapper.class_,
251 self._parententity,
252 self._parententity.entity,
253 ),
254 )
255
256 @property
257 def _impl_uses_objects(self) -> bool:
258 return self.impl.uses_objects
259
260 def get_history(
261 self, instance: Any, passive: PassiveFlag = PASSIVE_OFF
262 ) -> History:
263 return self.impl.get_history(
264 instance_state(instance), instance_dict(instance), passive
265 )
266
267 @property
268 def info(self) -> _InfoType:
269 """Return the 'info' dictionary for the underlying SQL element.
270
271 The behavior here is as follows:
272
273 * If the attribute is a column-mapped property, i.e.
274 :class:`.ColumnProperty`, which is mapped directly
275 to a schema-level :class:`_schema.Column` object, this attribute
276 will return the :attr:`.SchemaItem.info` dictionary associated
277 with the core-level :class:`_schema.Column` object.
278
279 * If the attribute is a :class:`.ColumnProperty` but is mapped to
280 any other kind of SQL expression other than a
281 :class:`_schema.Column`,
282 the attribute will refer to the :attr:`.MapperProperty.info`
283 dictionary associated directly with the :class:`.ColumnProperty`,
284 assuming the SQL expression itself does not have its own ``.info``
285 attribute (which should be the case, unless a user-defined SQL
286 construct has defined one).
287
288 * If the attribute refers to any other kind of
289 :class:`.MapperProperty`, including :class:`.Relationship`,
290 the attribute will refer to the :attr:`.MapperProperty.info`
291 dictionary associated with that :class:`.MapperProperty`.
292
293 * To access the :attr:`.MapperProperty.info` dictionary of the
294 :class:`.MapperProperty` unconditionally, including for a
295 :class:`.ColumnProperty` that's associated directly with a
296 :class:`_schema.Column`, the attribute can be referred to using
297 :attr:`.QueryableAttribute.property` attribute, as
298 ``MyClass.someattribute.property.info``.
299
300 .. seealso::
301
302 :attr:`.SchemaItem.info`
303
304 :attr:`.MapperProperty.info`
305
306 """
307 return self.comparator.info
308
309 parent: _InternalEntityType[Any]
310 """Return an inspection instance representing the parent.
311
312 This will be either an instance of :class:`_orm.Mapper`
313 or :class:`.AliasedInsp`, depending upon the nature
314 of the parent entity which this attribute is associated
315 with.
316
317 """
318
319 expression: ColumnElement[_T_co]
320 """The SQL expression object represented by this
321 :class:`.QueryableAttribute`.
322
323 This will typically be an instance of a :class:`_sql.ColumnElement`
324 subclass representing a column expression.
325
326 """
327
328 def _memoized_attr_expression(self) -> ColumnElement[_T]:
329 annotations: _AnnotationDict
330
331 # applies only to Proxy() as used by hybrid.
332 # currently is an exception to typing rather than feeding through
333 # non-string keys.
334 # ideally Proxy() would have a separate set of methods to deal
335 # with this case.
336 entity_namespace = self._entity_namespace
337 assert isinstance(entity_namespace, HasCacheKey)
338
339 if self.key is _UNKNOWN_ATTR_KEY:
340 annotations = {"entity_namespace": entity_namespace}
341 else:
342 annotations = {
343 "proxy_key": self.key,
344 "proxy_owner": self._parententity,
345 "entity_namespace": entity_namespace,
346 }
347
348 ce = self.comparator.__clause_element__()
349 try:
350 if TYPE_CHECKING:
351 assert isinstance(ce, ColumnElement)
352 anno = ce._annotate
353 except AttributeError as ae:
354 raise exc.InvalidRequestError(
355 'When interpreting attribute "%s" as a SQL expression, '
356 "expected __clause_element__() to return "
357 "a ClauseElement object, got: %r" % (self, ce)
358 ) from ae
359 else:
360 return anno(annotations)
361
362 def _memoized_attr__propagate_attrs(self) -> _PropagateAttrsType:
363 # this suits the case in coercions where we don't actually
364 # call ``__clause_element__()`` but still need to get
365 # resolved._propagate_attrs. See #6558.
366 return util.immutabledict(
367 {
368 "compile_state_plugin": "orm",
369 "plugin_subject": self._parentmapper,
370 }
371 )
372
373 @property
374 def _entity_namespace(self) -> _InternalEntityType[Any]:
375 return self._parententity
376
377 @property
378 def _annotations(self) -> _AnnotationDict:
379 return self.__clause_element__()._annotations
380
381 def __clause_element__(self) -> ColumnElement[_T_co]:
382 return self.expression
383
384 @property
385 def _from_objects(self) -> List[FromClause]:
386 return self.expression._from_objects
387
388 def _bulk_update_tuples(
389 self, value: Any
390 ) -> Sequence[Tuple[_DMLColumnArgument, Any]]:
391 """Return setter tuples for a bulk UPDATE."""
392
393 return self.comparator._bulk_update_tuples(value)
394
395 def adapt_to_entity(self, adapt_to_entity: AliasedInsp[Any]) -> Self:
396 assert not self._of_type
397 return self.__class__(
398 adapt_to_entity.entity,
399 self.key,
400 impl=self.impl,
401 comparator=self.comparator.adapt_to_entity(adapt_to_entity),
402 parententity=adapt_to_entity,
403 )
404
405 def of_type(self, entity: _EntityType[_T]) -> QueryableAttribute[_T]:
406 return QueryableAttribute(
407 self.class_,
408 self.key,
409 self._parententity,
410 impl=self.impl,
411 comparator=self.comparator.of_type(entity),
412 of_type=inspection.inspect(entity),
413 extra_criteria=self._extra_criteria,
414 )
415
416 def and_(
417 self, *clauses: _ColumnExpressionArgument[bool]
418 ) -> QueryableAttribute[bool]:
419 if TYPE_CHECKING:
420 assert isinstance(self.comparator, RelationshipProperty.Comparator)
421
422 exprs = tuple(
423 coercions.expect(roles.WhereHavingRole, clause)
424 for clause in util.coerce_generator_arg(clauses)
425 )
426
427 return QueryableAttribute(
428 self.class_,
429 self.key,
430 self._parententity,
431 impl=self.impl,
432 comparator=self.comparator.and_(*exprs),
433 of_type=self._of_type,
434 extra_criteria=self._extra_criteria + exprs,
435 )
436
437 def _clone(self, **kw: Any) -> QueryableAttribute[_T]:
438 return QueryableAttribute(
439 self.class_,
440 self.key,
441 self._parententity,
442 impl=self.impl,
443 comparator=self.comparator,
444 of_type=self._of_type,
445 extra_criteria=self._extra_criteria,
446 )
447
448 def label(self, name: Optional[str]) -> Label[_T_co]:
449 return self.__clause_element__().label(name)
450
451 def operate(
452 self, op: OperatorType, *other: Any, **kwargs: Any
453 ) -> ColumnElement[Any]:
454 return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
455
456 def reverse_operate(
457 self, op: OperatorType, other: Any, **kwargs: Any
458 ) -> ColumnElement[Any]:
459 return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
460
461 def hasparent(
462 self, state: InstanceState[Any], optimistic: bool = False
463 ) -> bool:
464 return self.impl.hasparent(state, optimistic=optimistic) is not False
465
466 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
467 return (self,)
468
469 def __getattr__(self, key: str) -> Any:
470 try:
471 return util.MemoizedSlots.__getattr__(self, key)
472 except AttributeError:
473 pass
474
475 try:
476 return getattr(self.comparator, key)
477 except AttributeError as err:
478 raise AttributeError(
479 "Neither %r object nor %r object associated with %s "
480 "has an attribute %r"
481 % (
482 type(self).__name__,
483 type(self.comparator).__name__,
484 self,
485 key,
486 )
487 ) from err
488
489 def __str__(self) -> str:
490 return f"{self.class_.__name__}.{self.key}"
491
492 def _memoized_attr_property(self) -> Optional[MapperProperty[Any]]:
493 return self.comparator.property
494
495
496def _queryable_attribute_unreduce(
497 key: str,
498 mapped_class: Type[_O],
499 parententity: _InternalEntityType[_O],
500 entity: _ExternalEntityType[Any],
501) -> Any:
502 # this method is only used in terms of the
503 # sqlalchemy.ext.serializer extension
504 if insp_is_aliased_class(parententity):
505 return entity._get_from_serialized(key, mapped_class, parententity)
506 else:
507 return getattr(entity, key)
508
509
510class InstrumentedAttribute(QueryableAttribute[_T_co]):
511 """Class bound instrumented attribute which adds basic
512 :term:`descriptor` methods.
513
514 See :class:`.QueryableAttribute` for a description of most features.
515
516
517 """
518
519 __slots__ = ()
520
521 inherit_cache = True
522 """:meta private:"""
523
524 # hack to make __doc__ writeable on instances of
525 # InstrumentedAttribute, while still keeping classlevel
526 # __doc__ correct
527
528 @util.rw_hybridproperty
529 def __doc__(self) -> Optional[str]:
530 return self._doc
531
532 @__doc__.setter # type: ignore
533 def __doc__(self, value: Optional[str]) -> None:
534 self._doc = value
535
536 @__doc__.classlevel # type: ignore
537 def __doc__(cls) -> Optional[str]:
538 return super().__doc__
539
540 def __set__(self, instance: object, value: Any) -> None:
541 self.impl.set(
542 instance_state(instance), instance_dict(instance), value, None
543 )
544
545 def __delete__(self, instance: object) -> None:
546 self.impl.delete(instance_state(instance), instance_dict(instance))
547
548 @overload
549 def __get__(
550 self, instance: None, owner: Any
551 ) -> InstrumentedAttribute[_T_co]: ...
552
553 @overload
554 def __get__(self, instance: object, owner: Any) -> _T_co: ...
555
556 def __get__(
557 self, instance: Optional[object], owner: Any
558 ) -> Union[InstrumentedAttribute[_T_co], _T_co]:
559 if instance is None:
560 return self
561
562 dict_ = instance_dict(instance)
563 if self.impl.supports_population and self.key in dict_:
564 return dict_[self.key] # type: ignore[no-any-return]
565 else:
566 try:
567 state = instance_state(instance)
568 except AttributeError as err:
569 raise orm_exc.UnmappedInstanceError(instance) from err
570 return self.impl.get(state, dict_) # type: ignore[no-any-return]
571
572
573@dataclasses.dataclass(frozen=True)
574class _AdHocHasEntityNamespace(HasCacheKey):
575 _traverse_internals: ClassVar[_TraverseInternalsType] = [
576 ("_entity_namespace", InternalTraversal.dp_has_cache_key),
577 ]
578
579 # py37 compat, no slots=True on dataclass
580 __slots__ = ("_entity_namespace",)
581 _entity_namespace: _InternalEntityType[Any]
582 is_mapper: ClassVar[bool] = False
583 is_aliased_class: ClassVar[bool] = False
584
585 @property
586 def entity_namespace(self):
587 return self._entity_namespace.entity_namespace
588
589
590def _create_proxied_attribute(
591 descriptor: Any,
592) -> Callable[..., QueryableAttribute[Any]]:
593 """Create an QueryableAttribute / user descriptor hybrid.
594
595 Returns a new QueryableAttribute type that delegates descriptor
596 behavior and getattr() to the given descriptor.
597 """
598
599 # TODO: can move this to descriptor_props if the need for this
600 # function is removed from ext/hybrid.py
601
602 class Proxy(QueryableAttribute[_T_co]):
603 """Presents the :class:`.QueryableAttribute` interface as a
604 proxy on top of a Python descriptor / :class:`.PropComparator`
605 combination.
606
607 """
608
609 _extra_criteria = ()
610
611 # the attribute error catches inside of __getattr__ basically create a
612 # singularity if you try putting slots on this too
613 # __slots__ = ("descriptor", "original_property", "_comparator")
614
615 def __init__(
616 self,
617 class_: _ExternalEntityType[Any],
618 key: str,
619 descriptor: Any,
620 comparator: interfaces.PropComparator[_T_co],
621 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
622 doc: Optional[str] = None,
623 original_property: Optional[QueryableAttribute[_T_co]] = None,
624 ):
625 self.class_ = class_
626 self.key = key
627 self.descriptor = descriptor
628 self.original_property = original_property
629 self._comparator = comparator
630 self._adapt_to_entity = adapt_to_entity
631 self._doc = self.__doc__ = doc
632
633 @property
634 def _parententity(self): # type: ignore[override]
635 return inspection.inspect(self.class_, raiseerr=False)
636
637 @property
638 def parent(self): # type: ignore[override]
639 return inspection.inspect(self.class_, raiseerr=False)
640
641 _is_internal_proxy = True
642
643 _cache_key_traversal = [
644 ("key", visitors.ExtendedInternalTraversal.dp_string),
645 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
646 ]
647
648 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
649 prop = self.original_property
650 if prop is None:
651 return ()
652 else:
653 return prop._column_strategy_attrs()
654
655 @property
656 def _impl_uses_objects(self):
657 return (
658 self.original_property is not None
659 and getattr(self.class_, self.key).impl.uses_objects
660 )
661
662 @property
663 def _entity_namespace(self):
664 if hasattr(self._comparator, "_parententity"):
665 return self._comparator._parententity
666 else:
667 # used by hybrid attributes which try to remain
668 # agnostic of any ORM concepts like mappers
669 return _AdHocHasEntityNamespace(self._parententity)
670
671 @property
672 def property(self):
673 return self.comparator.property
674
675 @util.memoized_property
676 def comparator(self):
677 if callable(self._comparator):
678 self._comparator = self._comparator()
679 if self._adapt_to_entity:
680 self._comparator = self._comparator.adapt_to_entity(
681 self._adapt_to_entity
682 )
683 return self._comparator
684
685 def adapt_to_entity(self, adapt_to_entity):
686 return self.__class__(
687 adapt_to_entity.entity,
688 self.key,
689 self.descriptor,
690 self._comparator,
691 adapt_to_entity,
692 )
693
694 def _clone(self, **kw):
695 return self.__class__(
696 self.class_,
697 self.key,
698 self.descriptor,
699 self._comparator,
700 adapt_to_entity=self._adapt_to_entity,
701 original_property=self.original_property,
702 )
703
704 def __get__(self, instance, owner):
705 retval = self.descriptor.__get__(instance, owner)
706 # detect if this is a plain Python @property, which just returns
707 # itself for class level access. If so, then return us.
708 # Otherwise, return the object returned by the descriptor.
709 if retval is self.descriptor and instance is None:
710 return self
711 else:
712 return retval
713
714 def __str__(self) -> str:
715 return f"{self.class_.__name__}.{self.key}"
716
717 def __getattr__(self, attribute):
718 """Delegate __getattr__ to the original descriptor and/or
719 comparator."""
720
721 # this is unfortunately very complicated, and is easily prone
722 # to recursion overflows when implementations of related
723 # __getattr__ schemes are changed
724
725 try:
726 return util.MemoizedSlots.__getattr__(self, attribute)
727 except AttributeError:
728 pass
729
730 try:
731 return getattr(descriptor, attribute)
732 except AttributeError as err:
733 if attribute == "comparator":
734 raise AttributeError("comparator") from err
735 try:
736 # comparator itself might be unreachable
737 comparator = self.comparator
738 except AttributeError as err2:
739 raise AttributeError(
740 "Neither %r object nor unconfigured comparator "
741 "object associated with %s has an attribute %r"
742 % (type(descriptor).__name__, self, attribute)
743 ) from err2
744 else:
745 try:
746 return getattr(comparator, attribute)
747 except AttributeError as err3:
748 raise AttributeError(
749 "Neither %r object nor %r object "
750 "associated with %s has an attribute %r"
751 % (
752 type(descriptor).__name__,
753 type(comparator).__name__,
754 self,
755 attribute,
756 )
757 ) from err3
758
759 Proxy.__name__ = type(descriptor).__name__ + "Proxy"
760
761 util.monkeypatch_proxied_specials(
762 Proxy, type(descriptor), name="descriptor", from_instance=descriptor
763 )
764 return Proxy
765
766
767OP_REMOVE = util.symbol("REMOVE")
768OP_APPEND = util.symbol("APPEND")
769OP_REPLACE = util.symbol("REPLACE")
770OP_BULK_REPLACE = util.symbol("BULK_REPLACE")
771OP_MODIFIED = util.symbol("MODIFIED")
772
773
774class AttributeEventToken:
775 """A token propagated throughout the course of a chain of attribute
776 events.
777
778 Serves as an indicator of the source of the event and also provides
779 a means of controlling propagation across a chain of attribute
780 operations.
781
782 The :class:`.Event` object is sent as the ``initiator`` argument
783 when dealing with events such as :meth:`.AttributeEvents.append`,
784 :meth:`.AttributeEvents.set`,
785 and :meth:`.AttributeEvents.remove`.
786
787 The :class:`.Event` object is currently interpreted by the backref
788 event handlers, and is used to control the propagation of operations
789 across two mutually-dependent attributes.
790
791 .. versionchanged:: 2.0 Changed the name from ``AttributeEvent``
792 to ``AttributeEventToken``.
793
794 :attribute impl: The :class:`.AttributeImpl` which is the current event
795 initiator.
796
797 :attribute op: The symbol :attr:`.OP_APPEND`, :attr:`.OP_REMOVE`,
798 :attr:`.OP_REPLACE`, or :attr:`.OP_BULK_REPLACE`, indicating the
799 source operation.
800
801 """
802
803 __slots__ = "impl", "op", "parent_token"
804
805 def __init__(self, attribute_impl: _AttributeImpl, op: util.symbol):
806 self.impl = attribute_impl
807 self.op = op
808 self.parent_token = self.impl.parent_token
809
810 def __eq__(self, other):
811 return (
812 isinstance(other, AttributeEventToken)
813 and other.impl is self.impl
814 and other.op == self.op
815 )
816
817 @property
818 def key(self):
819 return self.impl.key
820
821 def hasparent(self, state):
822 return self.impl.hasparent(state)
823
824
825AttributeEvent = AttributeEventToken # legacy
826Event = AttributeEventToken # legacy
827
828
829class _AttributeImpl:
830 """internal implementation for instrumented attributes."""
831
832 collection: bool
833 default_accepts_scalar_loader: bool
834 uses_objects: bool
835 supports_population: bool
836 dynamic: bool
837
838 _is_has_collection_adapter = False
839
840 _replace_token: AttributeEventToken
841 _remove_token: AttributeEventToken
842 _append_token: AttributeEventToken
843
844 def __init__(
845 self,
846 class_: _ExternalEntityType[_O],
847 key: str,
848 callable_: Optional[_LoaderCallable],
849 dispatch: _Dispatch[QueryableAttribute[Any]],
850 trackparent: bool = False,
851 compare_function: Optional[Callable[..., bool]] = None,
852 active_history: bool = False,
853 parent_token: Optional[AttributeEventToken] = None,
854 load_on_unexpire: bool = True,
855 send_modified_events: bool = True,
856 accepts_scalar_loader: Optional[bool] = None,
857 **kwargs: Any,
858 ):
859 r"""Construct an AttributeImpl.
860
861 :param \class_: associated class
862
863 :param key: string name of the attribute
864
865 :param \callable_:
866 optional function which generates a callable based on a parent
867 instance, which produces the "default" values for a scalar or
868 collection attribute when it's first accessed, if not present
869 already.
870
871 :param trackparent:
872 if True, attempt to track if an instance has a parent attached
873 to it via this attribute.
874
875 :param compare_function:
876 a function that compares two values which are normally
877 assignable to this attribute.
878
879 :param active_history:
880 indicates that get_history() should always return the "old" value,
881 even if it means executing a lazy callable upon attribute change.
882
883 :param parent_token:
884 Usually references the MapperProperty, used as a key for
885 the hasparent() function to identify an "owning" attribute.
886 Allows multiple AttributeImpls to all match a single
887 owner attribute.
888
889 :param load_on_unexpire:
890 if False, don't include this attribute in a load-on-expired
891 operation, i.e. the "expired_attribute_loader" process.
892 The attribute can still be in the "expired" list and be
893 considered to be "expired". Previously, this flag was called
894 "expire_missing" and is only used by a deferred column
895 attribute.
896
897 :param send_modified_events:
898 if False, the InstanceState._modified_event method will have no
899 effect; this means the attribute will never show up as changed in a
900 history entry.
901
902 """
903 self.class_ = class_
904 self.key = key
905 self.callable_ = callable_
906 self.dispatch = dispatch
907 self.trackparent = trackparent
908 self.parent_token = parent_token or self
909 self.send_modified_events = send_modified_events
910 if compare_function is None:
911 self.is_equal = operator.eq
912 else:
913 self.is_equal = compare_function
914
915 if accepts_scalar_loader is not None:
916 self.accepts_scalar_loader = accepts_scalar_loader
917 else:
918 self.accepts_scalar_loader = self.default_accepts_scalar_loader
919
920 _deferred_history = kwargs.pop("_deferred_history", False)
921 self._deferred_history = _deferred_history
922
923 if active_history:
924 self.dispatch._active_history = True
925
926 self.load_on_unexpire = load_on_unexpire
927 self._modified_token = AttributeEventToken(self, OP_MODIFIED)
928
929 __slots__ = (
930 "class_",
931 "key",
932 "callable_",
933 "dispatch",
934 "trackparent",
935 "parent_token",
936 "send_modified_events",
937 "is_equal",
938 "load_on_unexpire",
939 "_modified_token",
940 "accepts_scalar_loader",
941 "_deferred_history",
942 )
943
944 def __str__(self) -> str:
945 return f"{self.class_.__name__}.{self.key}"
946
947 def _get_active_history(self):
948 """Backwards compat for impl.active_history"""
949
950 return self.dispatch._active_history
951
952 def _set_active_history(self, value):
953 self.dispatch._active_history = value
954
955 active_history = property(_get_active_history, _set_active_history)
956
957 def hasparent(
958 self, state: InstanceState[Any], optimistic: bool = False
959 ) -> bool:
960 """Return the boolean value of a `hasparent` flag attached to
961 the given state.
962
963 The `optimistic` flag determines what the default return value
964 should be if no `hasparent` flag can be located.
965
966 As this function is used to determine if an instance is an
967 *orphan*, instances that were loaded from storage should be
968 assumed to not be orphans, until a True/False value for this
969 flag is set.
970
971 An instance attribute that is loaded by a callable function
972 will also not have a `hasparent` flag.
973
974 """
975 msg = "This AttributeImpl is not configured to track parents."
976 assert self.trackparent, msg
977
978 return (
979 state.parents.get(id(self.parent_token), optimistic) is not False
980 )
981
982 def sethasparent(
983 self,
984 state: InstanceState[Any],
985 parent_state: InstanceState[Any],
986 value: bool,
987 ) -> None:
988 """Set a boolean flag on the given item corresponding to
989 whether or not it is attached to a parent object via the
990 attribute represented by this ``InstrumentedAttribute``.
991
992 """
993 msg = "This AttributeImpl is not configured to track parents."
994 assert self.trackparent, msg
995
996 id_ = id(self.parent_token)
997 if value:
998 state.parents[id_] = parent_state
999 else:
1000 if id_ in state.parents:
1001 last_parent = state.parents[id_]
1002
1003 if (
1004 last_parent is not False
1005 and last_parent.key != parent_state.key
1006 ):
1007 if last_parent.obj() is None:
1008 raise orm_exc.StaleDataError(
1009 "Removing state %s from parent "
1010 "state %s along attribute '%s', "
1011 "but the parent record "
1012 "has gone stale, can't be sure this "
1013 "is the most recent parent."
1014 % (
1015 state_str(state),
1016 state_str(parent_state),
1017 self.key,
1018 )
1019 )
1020
1021 return
1022
1023 state.parents[id_] = False
1024
1025 def get_history(
1026 self,
1027 state: InstanceState[Any],
1028 dict_: _InstanceDict,
1029 passive: PassiveFlag = PASSIVE_OFF,
1030 ) -> History:
1031 raise NotImplementedError()
1032
1033 def get_all_pending(
1034 self,
1035 state: InstanceState[Any],
1036 dict_: _InstanceDict,
1037 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1038 ) -> _AllPendingType:
1039 """Return a list of tuples of (state, obj)
1040 for all objects in this attribute's current state
1041 + history.
1042
1043 Only applies to object-based attributes.
1044
1045 This is an inlining of existing functionality
1046 which roughly corresponds to:
1047
1048 get_state_history(
1049 state,
1050 key,
1051 passive=PASSIVE_NO_INITIALIZE).sum()
1052
1053 """
1054 raise NotImplementedError()
1055
1056 def _default_value(
1057 self, state: InstanceState[Any], dict_: _InstanceDict
1058 ) -> Any:
1059 """Produce an empty value for an uninitialized attribute."""
1060
1061 raise NotImplementedError()
1062
1063 def get(
1064 self,
1065 state: InstanceState[Any],
1066 dict_: _InstanceDict,
1067 passive: PassiveFlag = PASSIVE_OFF,
1068 ) -> Any:
1069 """Retrieve a value from the given object.
1070 If a callable is assembled on this object's attribute, and
1071 passive is False, the callable will be executed and the
1072 resulting value will be set as the new value for this attribute.
1073 """
1074 if self.key in dict_:
1075 return dict_[self.key]
1076 else:
1077 # if history present, don't load
1078 key = self.key
1079 if (
1080 key not in state.committed_state
1081 or state.committed_state[key] is NO_VALUE
1082 ):
1083 if not passive & CALLABLES_OK:
1084 return PASSIVE_NO_RESULT
1085
1086 value = self._fire_loader_callables(state, key, passive)
1087
1088 if value is PASSIVE_NO_RESULT or value is NO_VALUE:
1089 return value
1090 elif value is ATTR_WAS_SET:
1091 try:
1092 return dict_[key]
1093 except KeyError as err:
1094 # TODO: no test coverage here.
1095 raise KeyError(
1096 "Deferred loader for attribute "
1097 "%r failed to populate "
1098 "correctly" % key
1099 ) from err
1100 elif value is not ATTR_EMPTY:
1101 return self.set_committed_value(state, dict_, value)
1102
1103 if not passive & INIT_OK:
1104 return NO_VALUE
1105 else:
1106 return self._default_value(state, dict_)
1107
1108 def _fire_loader_callables(
1109 self, state: InstanceState[Any], key: str, passive: PassiveFlag
1110 ) -> Any:
1111 if (
1112 self.accepts_scalar_loader
1113 and self.load_on_unexpire
1114 and key in state.expired_attributes
1115 ):
1116 return state._load_expired(state, passive)
1117 elif key in state.callables:
1118 callable_ = state.callables[key]
1119 return callable_(state, passive)
1120 elif self.callable_:
1121 return self.callable_(state, passive)
1122 else:
1123 return ATTR_EMPTY
1124
1125 def append(
1126 self,
1127 state: InstanceState[Any],
1128 dict_: _InstanceDict,
1129 value: Any,
1130 initiator: Optional[AttributeEventToken],
1131 passive: PassiveFlag = PASSIVE_OFF,
1132 ) -> None:
1133 self.set(state, dict_, value, initiator, passive=passive)
1134
1135 def remove(
1136 self,
1137 state: InstanceState[Any],
1138 dict_: _InstanceDict,
1139 value: Any,
1140 initiator: Optional[AttributeEventToken],
1141 passive: PassiveFlag = PASSIVE_OFF,
1142 ) -> None:
1143 self.set(
1144 state, dict_, None, initiator, passive=passive, check_old=value
1145 )
1146
1147 def pop(
1148 self,
1149 state: InstanceState[Any],
1150 dict_: _InstanceDict,
1151 value: Any,
1152 initiator: Optional[AttributeEventToken],
1153 passive: PassiveFlag = PASSIVE_OFF,
1154 ) -> None:
1155 self.set(
1156 state,
1157 dict_,
1158 None,
1159 initiator,
1160 passive=passive,
1161 check_old=value,
1162 pop=True,
1163 )
1164
1165 def set(
1166 self,
1167 state: InstanceState[Any],
1168 dict_: _InstanceDict,
1169 value: Any,
1170 initiator: Optional[AttributeEventToken] = None,
1171 passive: PassiveFlag = PASSIVE_OFF,
1172 check_old: Any = None,
1173 pop: bool = False,
1174 ) -> None:
1175 raise NotImplementedError()
1176
1177 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1178 raise NotImplementedError()
1179
1180 def get_committed_value(
1181 self,
1182 state: InstanceState[Any],
1183 dict_: _InstanceDict,
1184 passive: PassiveFlag = PASSIVE_OFF,
1185 ) -> Any:
1186 """return the unchanged value of this attribute"""
1187
1188 if self.key in state.committed_state:
1189 value = state.committed_state[self.key]
1190 if value is NO_VALUE:
1191 return None
1192 else:
1193 return value
1194 else:
1195 return self.get(state, dict_, passive=passive)
1196
1197 def set_committed_value(self, state, dict_, value):
1198 """set an attribute value on the given instance and 'commit' it."""
1199
1200 dict_[self.key] = value
1201 state._commit(dict_, [self.key])
1202 return value
1203
1204
1205class _ScalarAttributeImpl(_AttributeImpl):
1206 """represents a scalar value-holding InstrumentedAttribute."""
1207
1208 default_accepts_scalar_loader = True
1209 uses_objects = False
1210 supports_population = True
1211 collection = False
1212 dynamic = False
1213
1214 __slots__ = (
1215 "_default_scalar_value",
1216 "_replace_token",
1217 "_append_token",
1218 "_remove_token",
1219 )
1220
1221 def __init__(self, *arg, default_scalar_value=None, **kw):
1222 super().__init__(*arg, **kw)
1223 self._default_scalar_value = default_scalar_value
1224 self._replace_token = self._append_token = AttributeEventToken(
1225 self, OP_REPLACE
1226 )
1227 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1228
1229 def _default_value(
1230 self, state: InstanceState[Any], dict_: _InstanceDict
1231 ) -> Any:
1232 """Produce an empty value for an uninitialized scalar attribute."""
1233
1234 assert self.key not in dict_, (
1235 "_default_value should only be invoked for an "
1236 "uninitialized or expired attribute"
1237 )
1238 value = self._default_scalar_value
1239 for fn in self.dispatch.init_scalar:
1240 ret = fn(state, value, dict_)
1241 if ret is not ATTR_EMPTY:
1242 value = ret
1243
1244 return value
1245
1246 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1247 if self.dispatch._active_history:
1248 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1249 else:
1250 old = dict_.get(self.key, NO_VALUE)
1251
1252 if self.dispatch.remove:
1253 self.fire_remove_event(state, dict_, old, self._remove_token)
1254 state._modified_event(dict_, self, old)
1255
1256 existing = dict_.pop(self.key, NO_VALUE)
1257 if (
1258 existing is NO_VALUE
1259 and old is NO_VALUE
1260 and not state.expired
1261 and self.key not in state.expired_attributes
1262 ):
1263 raise AttributeError("%s object does not have a value" % self)
1264
1265 def get_history(
1266 self,
1267 state: InstanceState[Any],
1268 dict_: Dict[str, Any],
1269 passive: PassiveFlag = PASSIVE_OFF,
1270 ) -> History:
1271 if self.key in dict_:
1272 return History.from_scalar_attribute(self, state, dict_[self.key])
1273 elif self.key in state.committed_state:
1274 return History.from_scalar_attribute(self, state, NO_VALUE)
1275 else:
1276 if passive & INIT_OK:
1277 passive ^= INIT_OK
1278 current = self.get(state, dict_, passive=passive)
1279 if current is PASSIVE_NO_RESULT:
1280 return HISTORY_BLANK
1281 else:
1282 return History.from_scalar_attribute(self, state, current)
1283
1284 def set(
1285 self,
1286 state: InstanceState[Any],
1287 dict_: Dict[str, Any],
1288 value: Any,
1289 initiator: Optional[AttributeEventToken] = None,
1290 passive: PassiveFlag = PASSIVE_OFF,
1291 check_old: Optional[object] = None,
1292 pop: bool = False,
1293 ) -> None:
1294 if value is DONT_SET:
1295 return
1296
1297 if self.dispatch._active_history:
1298 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1299 else:
1300 old = dict_.get(self.key, NO_VALUE)
1301
1302 if self.dispatch.set:
1303 value = self.fire_replace_event(
1304 state, dict_, value, old, initiator
1305 )
1306 state._modified_event(dict_, self, old)
1307 dict_[self.key] = value
1308
1309 def fire_replace_event(
1310 self,
1311 state: InstanceState[Any],
1312 dict_: _InstanceDict,
1313 value: _T,
1314 previous: Any,
1315 initiator: Optional[AttributeEventToken],
1316 ) -> _T:
1317 for fn in self.dispatch.set:
1318 value = fn(
1319 state, value, previous, initiator or self._replace_token
1320 )
1321 return value
1322
1323 def fire_remove_event(
1324 self,
1325 state: InstanceState[Any],
1326 dict_: _InstanceDict,
1327 value: Any,
1328 initiator: Optional[AttributeEventToken],
1329 ) -> None:
1330 for fn in self.dispatch.remove:
1331 fn(state, value, initiator or self._remove_token)
1332
1333
1334class _ScalarObjectAttributeImpl(_ScalarAttributeImpl):
1335 """represents a scalar-holding InstrumentedAttribute,
1336 where the target object is also instrumented.
1337
1338 Adds events to delete/set operations.
1339
1340 """
1341
1342 default_accepts_scalar_loader = False
1343 uses_objects = True
1344 supports_population = True
1345 collection = False
1346
1347 __slots__ = ()
1348
1349 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1350 if self.dispatch._active_history:
1351 old = self.get(
1352 state,
1353 dict_,
1354 passive=PASSIVE_ONLY_PERSISTENT
1355 | NO_AUTOFLUSH
1356 | LOAD_AGAINST_COMMITTED,
1357 )
1358 else:
1359 old = self.get(
1360 state,
1361 dict_,
1362 passive=PASSIVE_NO_FETCH ^ INIT_OK
1363 | LOAD_AGAINST_COMMITTED
1364 | NO_RAISE,
1365 )
1366
1367 self.fire_remove_event(state, dict_, old, self._remove_token)
1368
1369 existing = dict_.pop(self.key, NO_VALUE)
1370
1371 # if the attribute is expired, we currently have no way to tell
1372 # that an object-attribute was expired vs. not loaded. So
1373 # for this test, we look to see if the object has a DB identity.
1374 if (
1375 existing is NO_VALUE
1376 and old is not PASSIVE_NO_RESULT
1377 and state.key is None
1378 ):
1379 raise AttributeError("%s object does not have a value" % self)
1380
1381 def get_history(
1382 self,
1383 state: InstanceState[Any],
1384 dict_: _InstanceDict,
1385 passive: PassiveFlag = PASSIVE_OFF,
1386 ) -> History:
1387 if self.key in dict_:
1388 current = dict_[self.key]
1389 else:
1390 if passive & INIT_OK:
1391 passive ^= INIT_OK
1392 current = self.get(state, dict_, passive=passive)
1393 if current is PASSIVE_NO_RESULT:
1394 return HISTORY_BLANK
1395
1396 if not self._deferred_history:
1397 return History.from_object_attribute(self, state, current)
1398 else:
1399 original = state.committed_state.get(self.key, _NO_HISTORY)
1400 if original is PASSIVE_NO_RESULT:
1401 loader_passive = passive | (
1402 PASSIVE_ONLY_PERSISTENT
1403 | NO_AUTOFLUSH
1404 | LOAD_AGAINST_COMMITTED
1405 | NO_RAISE
1406 | DEFERRED_HISTORY_LOAD
1407 )
1408 original = self._fire_loader_callables(
1409 state, self.key, loader_passive
1410 )
1411 return History.from_object_attribute(
1412 self, state, current, original=original
1413 )
1414
1415 def get_all_pending(
1416 self,
1417 state: InstanceState[Any],
1418 dict_: _InstanceDict,
1419 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1420 ) -> _AllPendingType:
1421 if self.key in dict_:
1422 current = dict_[self.key]
1423 elif passive & CALLABLES_OK:
1424 current = self.get(state, dict_, passive=passive)
1425 else:
1426 return []
1427
1428 ret: _AllPendingType
1429
1430 # can't use __hash__(), can't use __eq__() here
1431 if (
1432 current is not None
1433 and current is not PASSIVE_NO_RESULT
1434 and current is not NO_VALUE
1435 ):
1436 ret = [(instance_state(current), current)]
1437 else:
1438 ret = [(None, None)]
1439
1440 if self.key in state.committed_state:
1441 original = state.committed_state[self.key]
1442 if (
1443 original is not None
1444 and original is not PASSIVE_NO_RESULT
1445 and original is not NO_VALUE
1446 and original is not current
1447 ):
1448 ret.append((instance_state(original), original))
1449 return ret
1450
1451 def set(
1452 self,
1453 state: InstanceState[Any],
1454 dict_: _InstanceDict,
1455 value: Any,
1456 initiator: Optional[AttributeEventToken] = None,
1457 passive: PassiveFlag = PASSIVE_OFF,
1458 check_old: Any = None,
1459 pop: bool = False,
1460 ) -> None:
1461 """Set a value on the given InstanceState."""
1462
1463 if value is DONT_SET:
1464 return
1465
1466 if self.dispatch._active_history:
1467 old = self.get(
1468 state,
1469 dict_,
1470 passive=PASSIVE_ONLY_PERSISTENT
1471 | NO_AUTOFLUSH
1472 | LOAD_AGAINST_COMMITTED,
1473 )
1474 else:
1475 old = self.get(
1476 state,
1477 dict_,
1478 passive=PASSIVE_NO_FETCH ^ INIT_OK
1479 | LOAD_AGAINST_COMMITTED
1480 | NO_RAISE,
1481 )
1482
1483 if (
1484 check_old is not None
1485 and old is not PASSIVE_NO_RESULT
1486 and check_old is not old
1487 ):
1488 if pop:
1489 return
1490 else:
1491 raise ValueError(
1492 "Object %s not associated with %s on attribute '%s'"
1493 % (instance_str(check_old), state_str(state), self.key)
1494 )
1495
1496 value = self.fire_replace_event(state, dict_, value, old, initiator)
1497 dict_[self.key] = value
1498
1499 def fire_remove_event(
1500 self,
1501 state: InstanceState[Any],
1502 dict_: _InstanceDict,
1503 value: Any,
1504 initiator: Optional[AttributeEventToken],
1505 ) -> None:
1506 if self.trackparent and value not in (
1507 None,
1508 PASSIVE_NO_RESULT,
1509 NO_VALUE,
1510 ):
1511 self.sethasparent(instance_state(value), state, False)
1512
1513 for fn in self.dispatch.remove:
1514 fn(state, value, initiator or self._remove_token)
1515
1516 state._modified_event(dict_, self, value)
1517
1518 def fire_replace_event(
1519 self,
1520 state: InstanceState[Any],
1521 dict_: _InstanceDict,
1522 value: _T,
1523 previous: Any,
1524 initiator: Optional[AttributeEventToken],
1525 ) -> _T:
1526 if self.trackparent:
1527 if previous is not value and previous not in (
1528 None,
1529 PASSIVE_NO_RESULT,
1530 NO_VALUE,
1531 ):
1532 self.sethasparent(instance_state(previous), state, False)
1533
1534 for fn in self.dispatch.set:
1535 value = fn(
1536 state, value, previous, initiator or self._replace_token
1537 )
1538
1539 state._modified_event(dict_, self, previous)
1540
1541 if self.trackparent:
1542 if value is not None:
1543 self.sethasparent(instance_state(value), state, True)
1544
1545 return value
1546
1547
1548class _HasCollectionAdapter:
1549 __slots__ = ()
1550
1551 collection: bool
1552 _is_has_collection_adapter = True
1553
1554 def _dispose_previous_collection(
1555 self,
1556 state: InstanceState[Any],
1557 collection: _AdaptedCollectionProtocol,
1558 adapter: CollectionAdapter,
1559 fire_event: bool,
1560 ) -> None:
1561 raise NotImplementedError()
1562
1563 @overload
1564 def get_collection(
1565 self,
1566 state: InstanceState[Any],
1567 dict_: _InstanceDict,
1568 user_data: Literal[None] = ...,
1569 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
1570 ) -> CollectionAdapter: ...
1571
1572 @overload
1573 def get_collection(
1574 self,
1575 state: InstanceState[Any],
1576 dict_: _InstanceDict,
1577 user_data: _AdaptedCollectionProtocol = ...,
1578 passive: PassiveFlag = ...,
1579 ) -> CollectionAdapter: ...
1580
1581 @overload
1582 def get_collection(
1583 self,
1584 state: InstanceState[Any],
1585 dict_: _InstanceDict,
1586 user_data: Optional[_AdaptedCollectionProtocol] = ...,
1587 passive: PassiveFlag = ...,
1588 ) -> Union[
1589 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1590 ]: ...
1591
1592 def get_collection(
1593 self,
1594 state: InstanceState[Any],
1595 dict_: _InstanceDict,
1596 user_data: Optional[_AdaptedCollectionProtocol] = None,
1597 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1598 ) -> Union[
1599 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1600 ]:
1601 raise NotImplementedError()
1602
1603 def set(
1604 self,
1605 state: InstanceState[Any],
1606 dict_: _InstanceDict,
1607 value: Any,
1608 initiator: Optional[AttributeEventToken] = None,
1609 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1610 check_old: Any = None,
1611 pop: bool = False,
1612 _adapt: bool = True,
1613 ) -> None:
1614 raise NotImplementedError()
1615
1616
1617if TYPE_CHECKING:
1618
1619 def _is_collection_attribute_impl(
1620 impl: _AttributeImpl,
1621 ) -> TypeGuard[_CollectionAttributeImpl]: ...
1622
1623else:
1624 _is_collection_attribute_impl = operator.attrgetter("collection")
1625
1626
1627class _CollectionAttributeImpl(_HasCollectionAdapter, _AttributeImpl):
1628 """A collection-holding attribute that instruments changes in membership.
1629
1630 Only handles collections of instrumented objects.
1631
1632 InstrumentedCollectionAttribute holds an arbitrary, user-specified
1633 container object (defaulting to a list) and brokers access to the
1634 CollectionAdapter, a "view" onto that object that presents consistent bag
1635 semantics to the orm layer independent of the user data implementation.
1636
1637 """
1638
1639 uses_objects = True
1640 collection = True
1641 default_accepts_scalar_loader = False
1642 supports_population = True
1643 dynamic = False
1644
1645 _bulk_replace_token: AttributeEventToken
1646
1647 __slots__ = (
1648 "copy",
1649 "collection_factory",
1650 "_append_token",
1651 "_remove_token",
1652 "_bulk_replace_token",
1653 "_duck_typed_as",
1654 )
1655
1656 def __init__(
1657 self,
1658 class_,
1659 key,
1660 callable_,
1661 dispatch,
1662 typecallable=None,
1663 trackparent=False,
1664 copy_function=None,
1665 compare_function=None,
1666 **kwargs,
1667 ):
1668 super().__init__(
1669 class_,
1670 key,
1671 callable_,
1672 dispatch,
1673 trackparent=trackparent,
1674 compare_function=compare_function,
1675 **kwargs,
1676 )
1677
1678 if copy_function is None:
1679 copy_function = self.__copy
1680 self.copy = copy_function
1681 self.collection_factory = typecallable
1682 self._append_token = AttributeEventToken(self, OP_APPEND)
1683 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1684 self._bulk_replace_token = AttributeEventToken(self, OP_BULK_REPLACE)
1685 self._duck_typed_as = util.duck_type_collection(
1686 self.collection_factory()
1687 )
1688
1689 if getattr(self.collection_factory, "_sa_linker", None):
1690
1691 @event.listens_for(self, "init_collection")
1692 def link(target, collection, collection_adapter):
1693 collection._sa_linker(collection_adapter)
1694
1695 @event.listens_for(self, "dispose_collection")
1696 def unlink(target, collection, collection_adapter):
1697 collection._sa_linker(None)
1698
1699 def __copy(self, item):
1700 return [y for y in collections.collection_adapter(item)]
1701
1702 def get_history(
1703 self,
1704 state: InstanceState[Any],
1705 dict_: _InstanceDict,
1706 passive: PassiveFlag = PASSIVE_OFF,
1707 ) -> History:
1708 current = self.get(state, dict_, passive=passive)
1709
1710 if current is PASSIVE_NO_RESULT:
1711 if (
1712 passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS
1713 and self.key in state._pending_mutations
1714 ):
1715 pending = state._pending_mutations[self.key]
1716 return pending.merge_with_history(HISTORY_BLANK)
1717 else:
1718 return HISTORY_BLANK
1719 else:
1720 if passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS:
1721 # this collection is loaded / present. should not be any
1722 # pending mutations
1723 assert self.key not in state._pending_mutations
1724
1725 return History.from_collection(self, state, current)
1726
1727 def get_all_pending(
1728 self,
1729 state: InstanceState[Any],
1730 dict_: _InstanceDict,
1731 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1732 ) -> _AllPendingType:
1733 # NOTE: passive is ignored here at the moment
1734
1735 if self.key not in dict_:
1736 return []
1737
1738 current = dict_[self.key]
1739 current = getattr(current, "_sa_adapter")
1740
1741 if self.key in state.committed_state:
1742 original = state.committed_state[self.key]
1743 if original is not NO_VALUE:
1744 current_states = [
1745 ((c is not None) and instance_state(c) or None, c)
1746 for c in current
1747 ]
1748 original_states = [
1749 ((c is not None) and instance_state(c) or None, c)
1750 for c in original
1751 ]
1752
1753 current_set = dict(current_states)
1754 original_set = dict(original_states)
1755
1756 return (
1757 [
1758 (s, o)
1759 for s, o in current_states
1760 if s not in original_set
1761 ]
1762 + [(s, o) for s, o in current_states if s in original_set]
1763 + [
1764 (s, o)
1765 for s, o in original_states
1766 if s not in current_set
1767 ]
1768 )
1769
1770 return [(instance_state(o), o) for o in current]
1771
1772 def fire_append_event(
1773 self,
1774 state: InstanceState[Any],
1775 dict_: _InstanceDict,
1776 value: _T,
1777 initiator: Optional[AttributeEventToken],
1778 key: Optional[Any],
1779 ) -> _T:
1780 for fn in self.dispatch.append:
1781 value = fn(state, value, initiator or self._append_token, key=key)
1782
1783 state._modified_event(dict_, self, NO_VALUE, True)
1784
1785 if self.trackparent and value is not None:
1786 self.sethasparent(instance_state(value), state, True)
1787
1788 return value
1789
1790 def fire_append_wo_mutation_event(
1791 self,
1792 state: InstanceState[Any],
1793 dict_: _InstanceDict,
1794 value: _T,
1795 initiator: Optional[AttributeEventToken],
1796 key: Optional[Any],
1797 ) -> _T:
1798 for fn in self.dispatch.append_wo_mutation:
1799 value = fn(state, value, initiator or self._append_token, key=key)
1800
1801 return value
1802
1803 def fire_pre_remove_event(
1804 self,
1805 state: InstanceState[Any],
1806 dict_: _InstanceDict,
1807 initiator: Optional[AttributeEventToken],
1808 key: Optional[Any],
1809 ) -> None:
1810 """A special event used for pop() operations.
1811
1812 The "remove" event needs to have the item to be removed passed to
1813 it, which in the case of pop from a set, we don't have a way to access
1814 the item before the operation. the event is used for all pop()
1815 operations (even though set.pop is the one where it is really needed).
1816
1817 """
1818 state._modified_event(dict_, self, NO_VALUE, True)
1819
1820 def fire_remove_event(
1821 self,
1822 state: InstanceState[Any],
1823 dict_: _InstanceDict,
1824 value: Any,
1825 initiator: Optional[AttributeEventToken],
1826 key: Optional[Any],
1827 ) -> None:
1828 if self.trackparent and value is not None:
1829 self.sethasparent(instance_state(value), state, False)
1830
1831 for fn in self.dispatch.remove:
1832 fn(state, value, initiator or self._remove_token, key=key)
1833
1834 state._modified_event(dict_, self, NO_VALUE, True)
1835
1836 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1837 if self.key not in dict_:
1838 return
1839
1840 state._modified_event(dict_, self, NO_VALUE, True)
1841
1842 collection = self.get_collection(state, state.dict)
1843 collection.clear_with_event()
1844
1845 # key is always present because we checked above. e.g.
1846 # del is a no-op if collection not present.
1847 del dict_[self.key]
1848
1849 def _default_value(
1850 self, state: InstanceState[Any], dict_: _InstanceDict
1851 ) -> _AdaptedCollectionProtocol:
1852 """Produce an empty collection for an un-initialized attribute"""
1853
1854 assert self.key not in dict_, (
1855 "_default_value should only be invoked for an "
1856 "uninitialized or expired attribute"
1857 )
1858
1859 if self.key in state._empty_collections:
1860 return state._empty_collections[self.key]
1861
1862 adapter, user_data = self._initialize_collection(state)
1863 adapter._set_empty(user_data)
1864 return user_data
1865
1866 def _initialize_collection(
1867 self, state: InstanceState[Any]
1868 ) -> Tuple[CollectionAdapter, _AdaptedCollectionProtocol]:
1869 adapter, collection = state.manager.initialize_collection(
1870 self.key, state, self.collection_factory
1871 )
1872
1873 self.dispatch.init_collection(state, collection, adapter)
1874
1875 return adapter, collection
1876
1877 def append(
1878 self,
1879 state: InstanceState[Any],
1880 dict_: _InstanceDict,
1881 value: Any,
1882 initiator: Optional[AttributeEventToken],
1883 passive: PassiveFlag = PASSIVE_OFF,
1884 ) -> None:
1885 collection = self.get_collection(
1886 state, dict_, user_data=None, passive=passive
1887 )
1888 if collection is PASSIVE_NO_RESULT:
1889 value = self.fire_append_event(
1890 state, dict_, value, initiator, key=NO_KEY
1891 )
1892 assert (
1893 self.key not in dict_
1894 ), "Collection was loaded during event handling."
1895 state._get_pending_mutation(self.key).append(value)
1896 else:
1897 if TYPE_CHECKING:
1898 assert isinstance(collection, CollectionAdapter)
1899 collection.append_with_event(value, initiator)
1900
1901 def remove(
1902 self,
1903 state: InstanceState[Any],
1904 dict_: _InstanceDict,
1905 value: Any,
1906 initiator: Optional[AttributeEventToken],
1907 passive: PassiveFlag = PASSIVE_OFF,
1908 ) -> None:
1909 collection = self.get_collection(
1910 state, state.dict, user_data=None, passive=passive
1911 )
1912 if collection is PASSIVE_NO_RESULT:
1913 self.fire_remove_event(state, dict_, value, initiator, key=NO_KEY)
1914 assert (
1915 self.key not in dict_
1916 ), "Collection was loaded during event handling."
1917 state._get_pending_mutation(self.key).remove(value)
1918 else:
1919 if TYPE_CHECKING:
1920 assert isinstance(collection, CollectionAdapter)
1921 collection.remove_with_event(value, initiator)
1922
1923 def pop(
1924 self,
1925 state: InstanceState[Any],
1926 dict_: _InstanceDict,
1927 value: Any,
1928 initiator: Optional[AttributeEventToken],
1929 passive: PassiveFlag = PASSIVE_OFF,
1930 ) -> None:
1931 try:
1932 # TODO: better solution here would be to add
1933 # a "popper" role to collections.py to complement
1934 # "remover".
1935 self.remove(state, dict_, value, initiator, passive=passive)
1936 except (ValueError, KeyError, IndexError):
1937 pass
1938
1939 def set(
1940 self,
1941 state: InstanceState[Any],
1942 dict_: _InstanceDict,
1943 value: Any,
1944 initiator: Optional[AttributeEventToken] = None,
1945 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1946 check_old: Any = None,
1947 pop: bool = False,
1948 _adapt: bool = True,
1949 ) -> None:
1950
1951 if value is DONT_SET:
1952 return
1953
1954 iterable = orig_iterable = value
1955 new_keys = None
1956
1957 # pulling a new collection first so that an adaptation exception does
1958 # not trigger a lazy load of the old collection.
1959 new_collection, user_data = self._initialize_collection(state)
1960 if _adapt:
1961 setting_type = util.duck_type_collection(iterable)
1962 receiving_type = self._duck_typed_as
1963
1964 if setting_type is not receiving_type:
1965 given = (
1966 "None" if iterable is None else iterable.__class__.__name__
1967 )
1968 wanted = (
1969 "None"
1970 if self._duck_typed_as is None
1971 else self._duck_typed_as.__name__
1972 )
1973 raise TypeError(
1974 "Incompatible collection type: %s is not %s-like"
1975 % (given, wanted)
1976 )
1977
1978 # If the object is an adapted collection, return the (iterable)
1979 # adapter.
1980 if hasattr(iterable, "_sa_iterator"):
1981 iterable = iterable._sa_iterator()
1982 elif setting_type is dict:
1983 new_keys = list(iterable)
1984 iterable = iterable.values()
1985 else:
1986 iterable = iter(iterable)
1987 elif util.duck_type_collection(iterable) is dict:
1988 new_keys = list(value)
1989
1990 new_values = list(iterable)
1991
1992 evt = self._bulk_replace_token
1993
1994 self.dispatch.bulk_replace(state, new_values, evt, keys=new_keys)
1995
1996 # propagate NO_RAISE in passive through to the get() for the
1997 # existing object (ticket #8862)
1998 old = self.get(
1999 state,
2000 dict_,
2001 passive=PASSIVE_ONLY_PERSISTENT ^ (passive & PassiveFlag.NO_RAISE),
2002 )
2003 if old is PASSIVE_NO_RESULT:
2004 old = self._default_value(state, dict_)
2005 elif old is orig_iterable:
2006 # ignore re-assignment of the current collection, as happens
2007 # implicitly with in-place operators (foo.collection |= other)
2008 return
2009
2010 # place a copy of "old" in state.committed_state
2011 state._modified_event(dict_, self, old, True)
2012
2013 old_collection = old._sa_adapter
2014
2015 dict_[self.key] = user_data
2016
2017 collections.bulk_replace(
2018 new_values, old_collection, new_collection, initiator=evt
2019 )
2020
2021 self._dispose_previous_collection(state, old, old_collection, True)
2022
2023 def _dispose_previous_collection(
2024 self,
2025 state: InstanceState[Any],
2026 collection: _AdaptedCollectionProtocol,
2027 adapter: CollectionAdapter,
2028 fire_event: bool,
2029 ) -> None:
2030 del collection._sa_adapter
2031
2032 # discarding old collection make sure it is not referenced in empty
2033 # collections.
2034 state._empty_collections.pop(self.key, None)
2035 if fire_event:
2036 self.dispatch.dispose_collection(state, collection, adapter)
2037
2038 def _invalidate_collection(
2039 self, collection: _AdaptedCollectionProtocol
2040 ) -> None:
2041 adapter = getattr(collection, "_sa_adapter")
2042 adapter.invalidated = True
2043
2044 def set_committed_value(
2045 self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
2046 ) -> _AdaptedCollectionProtocol:
2047 """Set an attribute value on the given instance and 'commit' it."""
2048
2049 collection, user_data = self._initialize_collection(state)
2050
2051 if value:
2052 collection.append_multiple_without_event(value)
2053
2054 state.dict[self.key] = user_data
2055
2056 state._commit(dict_, [self.key])
2057
2058 if self.key in state._pending_mutations:
2059 # pending items exist. issue a modified event,
2060 # add/remove new items.
2061 state._modified_event(dict_, self, user_data, True)
2062
2063 pending = state._pending_mutations.pop(self.key)
2064 added = pending.added_items
2065 removed = pending.deleted_items
2066 for item in added:
2067 collection.append_without_event(item)
2068 for item in removed:
2069 collection.remove_without_event(item)
2070
2071 return user_data
2072
2073 @overload
2074 def get_collection(
2075 self,
2076 state: InstanceState[Any],
2077 dict_: _InstanceDict,
2078 user_data: Literal[None] = ...,
2079 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
2080 ) -> CollectionAdapter: ...
2081
2082 @overload
2083 def get_collection(
2084 self,
2085 state: InstanceState[Any],
2086 dict_: _InstanceDict,
2087 user_data: _AdaptedCollectionProtocol = ...,
2088 passive: PassiveFlag = ...,
2089 ) -> CollectionAdapter: ...
2090
2091 @overload
2092 def get_collection(
2093 self,
2094 state: InstanceState[Any],
2095 dict_: _InstanceDict,
2096 user_data: Optional[_AdaptedCollectionProtocol] = ...,
2097 passive: PassiveFlag = PASSIVE_OFF,
2098 ) -> Union[
2099 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2100 ]: ...
2101
2102 def get_collection(
2103 self,
2104 state: InstanceState[Any],
2105 dict_: _InstanceDict,
2106 user_data: Optional[_AdaptedCollectionProtocol] = None,
2107 passive: PassiveFlag = PASSIVE_OFF,
2108 ) -> Union[
2109 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2110 ]:
2111 """Retrieve the CollectionAdapter associated with the given state.
2112
2113 if user_data is None, retrieves it from the state using normal
2114 "get()" rules, which will fire lazy callables or return the "empty"
2115 collection value.
2116
2117 """
2118 if user_data is None:
2119 fetch_user_data = self.get(state, dict_, passive=passive)
2120 if fetch_user_data is LoaderCallableStatus.PASSIVE_NO_RESULT:
2121 return fetch_user_data
2122 else:
2123 user_data = cast("_AdaptedCollectionProtocol", fetch_user_data)
2124
2125 return user_data._sa_adapter
2126
2127
2128def _backref_listeners(
2129 attribute: QueryableAttribute[Any], key: str, uselist: bool
2130) -> None:
2131 """Apply listeners to synchronize a two-way relationship."""
2132
2133 # use easily recognizable names for stack traces.
2134
2135 # in the sections marked "tokens to test for a recursive loop",
2136 # this is somewhat brittle and very performance-sensitive logic
2137 # that is specific to how we might arrive at each event. a marker
2138 # that can target us directly to arguments being invoked against
2139 # the impl might be simpler, but could interfere with other systems.
2140
2141 parent_token = attribute.impl.parent_token
2142 parent_impl = attribute.impl
2143
2144 def _acceptable_key_err(child_state, initiator, child_impl):
2145 raise ValueError(
2146 "Bidirectional attribute conflict detected: "
2147 'Passing object %s to attribute "%s" '
2148 'triggers a modify event on attribute "%s" '
2149 'via the backref "%s".'
2150 % (
2151 state_str(child_state),
2152 initiator.parent_token,
2153 child_impl.parent_token,
2154 attribute.impl.parent_token,
2155 )
2156 )
2157
2158 def emit_backref_from_scalar_set_event(
2159 state, child, oldchild, initiator, **kw
2160 ):
2161 if oldchild is child:
2162 return child
2163 if (
2164 oldchild is not None
2165 and oldchild is not PASSIVE_NO_RESULT
2166 and oldchild is not NO_VALUE
2167 ):
2168 # With lazy=None, there's no guarantee that the full collection is
2169 # present when updating via a backref.
2170 old_state, old_dict = (
2171 instance_state(oldchild),
2172 instance_dict(oldchild),
2173 )
2174 impl = old_state.manager[key].impl
2175
2176 # tokens to test for a recursive loop.
2177 if not impl.collection and not impl.dynamic:
2178 check_recursive_token = impl._replace_token
2179 else:
2180 check_recursive_token = impl._remove_token
2181
2182 if initiator is not check_recursive_token:
2183 impl.pop(
2184 old_state,
2185 old_dict,
2186 state.obj(),
2187 parent_impl._append_token,
2188 passive=PASSIVE_NO_FETCH,
2189 )
2190
2191 if child is not None:
2192 child_state, child_dict = (
2193 instance_state(child),
2194 instance_dict(child),
2195 )
2196 child_impl = child_state.manager[key].impl
2197
2198 if (
2199 initiator.parent_token is not parent_token
2200 and initiator.parent_token is not child_impl.parent_token
2201 ):
2202 _acceptable_key_err(state, initiator, child_impl)
2203
2204 # tokens to test for a recursive loop.
2205 check_append_token = child_impl._append_token
2206 check_bulk_replace_token = (
2207 child_impl._bulk_replace_token
2208 if _is_collection_attribute_impl(child_impl)
2209 else None
2210 )
2211
2212 if (
2213 initiator is not check_append_token
2214 and initiator is not check_bulk_replace_token
2215 ):
2216 child_impl.append(
2217 child_state,
2218 child_dict,
2219 state.obj(),
2220 initiator,
2221 passive=PASSIVE_NO_FETCH,
2222 )
2223 return child
2224
2225 def emit_backref_from_collection_append_event(
2226 state, child, initiator, **kw
2227 ):
2228 if child is None:
2229 return
2230
2231 child_state, child_dict = instance_state(child), instance_dict(child)
2232 child_impl = child_state.manager[key].impl
2233
2234 if (
2235 initiator.parent_token is not parent_token
2236 and initiator.parent_token is not child_impl.parent_token
2237 ):
2238 _acceptable_key_err(state, initiator, child_impl)
2239
2240 # tokens to test for a recursive loop.
2241 check_append_token = child_impl._append_token
2242 check_bulk_replace_token = (
2243 child_impl._bulk_replace_token
2244 if _is_collection_attribute_impl(child_impl)
2245 else None
2246 )
2247
2248 if (
2249 initiator is not check_append_token
2250 and initiator is not check_bulk_replace_token
2251 ):
2252 child_impl.append(
2253 child_state,
2254 child_dict,
2255 state.obj(),
2256 initiator,
2257 passive=PASSIVE_NO_FETCH,
2258 )
2259 return child
2260
2261 def emit_backref_from_collection_remove_event(
2262 state, child, initiator, **kw
2263 ):
2264 if (
2265 child is not None
2266 and child is not PASSIVE_NO_RESULT
2267 and child is not NO_VALUE
2268 ):
2269 child_state, child_dict = (
2270 instance_state(child),
2271 instance_dict(child),
2272 )
2273 child_impl = child_state.manager[key].impl
2274
2275 check_replace_token: Optional[AttributeEventToken]
2276
2277 # tokens to test for a recursive loop.
2278 if not child_impl.collection and not child_impl.dynamic:
2279 check_remove_token = child_impl._remove_token
2280 check_replace_token = child_impl._replace_token
2281 check_for_dupes_on_remove = uselist and not parent_impl.dynamic
2282 else:
2283 check_remove_token = child_impl._remove_token
2284 check_replace_token = (
2285 child_impl._bulk_replace_token
2286 if _is_collection_attribute_impl(child_impl)
2287 else None
2288 )
2289 check_for_dupes_on_remove = False
2290
2291 if (
2292 initiator is not check_remove_token
2293 and initiator is not check_replace_token
2294 ):
2295 if not check_for_dupes_on_remove or not util.has_dupes(
2296 # when this event is called, the item is usually
2297 # present in the list, except for a pop() operation.
2298 state.dict[parent_impl.key],
2299 child,
2300 ):
2301 child_impl.pop(
2302 child_state,
2303 child_dict,
2304 state.obj(),
2305 initiator,
2306 passive=PASSIVE_NO_FETCH,
2307 )
2308
2309 if uselist:
2310 event.listen(
2311 attribute,
2312 "append",
2313 emit_backref_from_collection_append_event,
2314 retval=True,
2315 raw=True,
2316 include_key=True,
2317 )
2318 else:
2319 event.listen(
2320 attribute,
2321 "set",
2322 emit_backref_from_scalar_set_event,
2323 retval=True,
2324 raw=True,
2325 include_key=True,
2326 )
2327 # TODO: need coverage in test/orm/ of remove event
2328 event.listen(
2329 attribute,
2330 "remove",
2331 emit_backref_from_collection_remove_event,
2332 retval=True,
2333 raw=True,
2334 include_key=True,
2335 )
2336
2337
2338_NO_HISTORY = util.symbol("NO_HISTORY")
2339_NO_STATE_SYMBOLS = frozenset([id(PASSIVE_NO_RESULT), id(NO_VALUE)])
2340
2341
2342class History(NamedTuple):
2343 """A 3-tuple of added, unchanged and deleted values,
2344 representing the changes which have occurred on an instrumented
2345 attribute.
2346
2347 The easiest way to get a :class:`.History` object for a particular
2348 attribute on an object is to use the :func:`_sa.inspect` function::
2349
2350 from sqlalchemy import inspect
2351
2352 hist = inspect(myobject).attrs.myattribute.history
2353
2354 Each tuple member is an iterable sequence:
2355
2356 * ``added`` - the collection of items added to the attribute (the first
2357 tuple element).
2358
2359 * ``unchanged`` - the collection of items that have not changed on the
2360 attribute (the second tuple element).
2361
2362 * ``deleted`` - the collection of items that have been removed from the
2363 attribute (the third tuple element).
2364
2365 """
2366
2367 added: Union[Tuple[()], List[Any]]
2368 unchanged: Union[Tuple[()], List[Any]]
2369 deleted: Union[Tuple[()], List[Any]]
2370
2371 def __bool__(self) -> bool:
2372 return self != HISTORY_BLANK
2373
2374 def empty(self) -> bool:
2375 """Return True if this :class:`.History` has no changes
2376 and no existing, unchanged state.
2377
2378 """
2379
2380 return not bool((self.added or self.deleted) or self.unchanged)
2381
2382 def sum(self) -> Sequence[Any]:
2383 """Return a collection of added + unchanged + deleted."""
2384
2385 return (
2386 (self.added or []) + (self.unchanged or []) + (self.deleted or [])
2387 )
2388
2389 def non_deleted(self) -> Sequence[Any]:
2390 """Return a collection of added + unchanged."""
2391
2392 return (self.added or []) + (self.unchanged or [])
2393
2394 def non_added(self) -> Sequence[Any]:
2395 """Return a collection of unchanged + deleted."""
2396
2397 return (self.unchanged or []) + (self.deleted or [])
2398
2399 def has_changes(self) -> bool:
2400 """Return True if this :class:`.History` has changes."""
2401
2402 return bool(self.added or self.deleted)
2403
2404 def _merge(self, added: Iterable[Any], deleted: Iterable[Any]) -> History:
2405 return History(
2406 list(self.added) + list(added),
2407 self.unchanged,
2408 list(self.deleted) + list(deleted),
2409 )
2410
2411 def as_state(self) -> History:
2412 return History(
2413 [
2414 (c is not None) and instance_state(c) or None
2415 for c in self.added
2416 ],
2417 [
2418 (c is not None) and instance_state(c) or None
2419 for c in self.unchanged
2420 ],
2421 [
2422 (c is not None) and instance_state(c) or None
2423 for c in self.deleted
2424 ],
2425 )
2426
2427 @classmethod
2428 def from_scalar_attribute(
2429 cls,
2430 attribute: _ScalarAttributeImpl,
2431 state: InstanceState[Any],
2432 current: Any,
2433 ) -> History:
2434 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2435
2436 deleted: Union[Tuple[()], List[Any]]
2437
2438 if original is _NO_HISTORY:
2439 if current is NO_VALUE:
2440 return cls((), (), ())
2441 else:
2442 return cls((), [current], ())
2443 # don't let ClauseElement expressions here trip things up
2444 elif (
2445 current is not NO_VALUE
2446 and attribute.is_equal(current, original) is True
2447 ):
2448 return cls((), [current], ())
2449 else:
2450 # current convention on native scalars is to not
2451 # include information
2452 # about missing previous value in "deleted", but
2453 # we do include None, which helps in some primary
2454 # key situations
2455 if id(original) in _NO_STATE_SYMBOLS:
2456 deleted = ()
2457 # indicate a "del" operation occurred when we don't have
2458 # the previous value as: ([None], (), ())
2459 if id(current) in _NO_STATE_SYMBOLS:
2460 current = None
2461 else:
2462 deleted = [original]
2463 if current is NO_VALUE:
2464 return cls((), (), deleted)
2465 else:
2466 return cls([current], (), deleted)
2467
2468 @classmethod
2469 def from_object_attribute(
2470 cls,
2471 attribute: _ScalarObjectAttributeImpl,
2472 state: InstanceState[Any],
2473 current: Any,
2474 original: Any = _NO_HISTORY,
2475 ) -> History:
2476 deleted: Union[Tuple[()], List[Any]]
2477
2478 if original is _NO_HISTORY:
2479 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2480
2481 if original is _NO_HISTORY:
2482 if current is NO_VALUE:
2483 return cls((), (), ())
2484 else:
2485 return cls((), [current], ())
2486 elif current is original and current is not NO_VALUE:
2487 return cls((), [current], ())
2488 else:
2489 # current convention on related objects is to not
2490 # include information
2491 # about missing previous value in "deleted", and
2492 # to also not include None - the dependency.py rules
2493 # ignore the None in any case.
2494 if id(original) in _NO_STATE_SYMBOLS or original is None:
2495 deleted = ()
2496 # indicate a "del" operation occurred when we don't have
2497 # the previous value as: ([None], (), ())
2498 if id(current) in _NO_STATE_SYMBOLS:
2499 current = None
2500 else:
2501 deleted = [original]
2502 if current is NO_VALUE:
2503 return cls((), (), deleted)
2504 else:
2505 return cls([current], (), deleted)
2506
2507 @classmethod
2508 def from_collection(
2509 cls,
2510 attribute: _CollectionAttributeImpl,
2511 state: InstanceState[Any],
2512 current: Any,
2513 ) -> History:
2514 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2515 if current is NO_VALUE:
2516 return cls((), (), ())
2517
2518 current = getattr(current, "_sa_adapter")
2519 if original is NO_VALUE:
2520 return cls(list(current), (), ())
2521 elif original is _NO_HISTORY:
2522 return cls((), list(current), ())
2523 else:
2524 current_states = [
2525 ((c is not None) and instance_state(c) or None, c)
2526 for c in current
2527 ]
2528 original_states = [
2529 ((c is not None) and instance_state(c) or None, c)
2530 for c in original
2531 ]
2532
2533 current_set = dict(current_states)
2534 original_set = dict(original_states)
2535
2536 return cls(
2537 [o for s, o in current_states if s not in original_set],
2538 [o for s, o in current_states if s in original_set],
2539 [o for s, o in original_states if s not in current_set],
2540 )
2541
2542
2543HISTORY_BLANK = History((), (), ())
2544
2545
2546def get_history(
2547 obj: object, key: str, passive: PassiveFlag = PASSIVE_OFF
2548) -> History:
2549 """Return a :class:`.History` record for the given object
2550 and attribute key.
2551
2552 This is the **pre-flush** history for a given attribute, which is
2553 reset each time the :class:`.Session` flushes changes to the
2554 current database transaction.
2555
2556 .. note::
2557
2558 Prefer to use the :attr:`.AttributeState.history` and
2559 :meth:`.AttributeState.load_history` accessors to retrieve the
2560 :class:`.History` for instance attributes.
2561
2562
2563 :param obj: an object whose class is instrumented by the
2564 attributes package.
2565
2566 :param key: string attribute name.
2567
2568 :param passive: indicates loading behavior for the attribute
2569 if the value is not already present. This is a
2570 bitflag attribute, which defaults to the symbol
2571 :attr:`.PASSIVE_OFF` indicating all necessary SQL
2572 should be emitted.
2573
2574 .. seealso::
2575
2576 :attr:`.AttributeState.history`
2577
2578 :meth:`.AttributeState.load_history` - retrieve history
2579 using loader callables if the value is not locally present.
2580
2581 """
2582
2583 return get_state_history(instance_state(obj), key, passive)
2584
2585
2586def get_state_history(
2587 state: InstanceState[Any], key: str, passive: PassiveFlag = PASSIVE_OFF
2588) -> History:
2589 return state.get_history(key, passive)
2590
2591
2592def has_parent(
2593 cls: Type[_O], obj: _O, key: str, optimistic: bool = False
2594) -> bool:
2595 """TODO"""
2596 manager = manager_of_class(cls)
2597 state = instance_state(obj)
2598 return manager.has_parent(state, key, optimistic)
2599
2600
2601def _register_attribute(
2602 class_: Type[_O],
2603 key: str,
2604 *,
2605 comparator: interfaces.PropComparator[_T],
2606 parententity: _InternalEntityType[_O],
2607 doc: Optional[str] = None,
2608 **kw: Any,
2609) -> InstrumentedAttribute[_T]:
2610 desc = _register_descriptor(
2611 class_, key, comparator=comparator, parententity=parententity, doc=doc
2612 )
2613 _register_attribute_impl(class_, key, **kw)
2614 return desc
2615
2616
2617def _register_attribute_impl(
2618 class_: Type[_O],
2619 key: str,
2620 uselist: bool = False,
2621 callable_: Optional[_LoaderCallable] = None,
2622 useobject: bool = False,
2623 impl_class: Optional[Type[_AttributeImpl]] = None,
2624 backref: Optional[str] = None,
2625 **kw: Any,
2626) -> QueryableAttribute[Any]:
2627 manager = manager_of_class(class_)
2628 if uselist:
2629 factory = kw.pop("typecallable", None)
2630 typecallable = manager.instrument_collection_class(
2631 key, factory or list
2632 )
2633 else:
2634 typecallable = kw.pop("typecallable", None)
2635
2636 dispatch = cast(
2637 "_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
2638 ) # noqa: E501
2639
2640 impl: _AttributeImpl
2641
2642 if impl_class:
2643 # TODO: this appears to be the WriteOnlyAttributeImpl /
2644 # DynamicAttributeImpl constructor which is hardcoded
2645 impl = cast("Type[_WriteOnlyAttributeImpl]", impl_class)(
2646 class_, key, dispatch, **kw
2647 )
2648 elif uselist:
2649 impl = _CollectionAttributeImpl(
2650 class_, key, callable_, dispatch, typecallable=typecallable, **kw
2651 )
2652 elif useobject:
2653 impl = _ScalarObjectAttributeImpl(
2654 class_, key, callable_, dispatch, **kw
2655 )
2656 else:
2657 impl = _ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
2658
2659 manager[key].impl = impl
2660
2661 if backref:
2662 _backref_listeners(manager[key], backref, uselist)
2663
2664 manager.post_configure_attribute(key)
2665 return manager[key]
2666
2667
2668def _register_descriptor(
2669 class_: Type[Any],
2670 key: str,
2671 *,
2672 comparator: interfaces.PropComparator[_T],
2673 parententity: _InternalEntityType[Any],
2674 doc: Optional[str] = None,
2675) -> InstrumentedAttribute[_T]:
2676 manager = manager_of_class(class_)
2677
2678 descriptor = InstrumentedAttribute(
2679 class_, key, comparator=comparator, parententity=parententity
2680 )
2681
2682 descriptor.__doc__ = doc # type: ignore
2683
2684 manager.instrument_attribute(key, descriptor)
2685 return descriptor
2686
2687
2688def _unregister_attribute(class_: Type[Any], key: str) -> None:
2689 manager_of_class(class_).uninstrument_attribute(key)
2690
2691
2692def init_collection(obj: object, key: str) -> CollectionAdapter:
2693 """Initialize a collection attribute and return the collection adapter.
2694
2695 This function is used to provide direct access to collection internals
2696 for a previously unloaded attribute. e.g.::
2697
2698 collection_adapter = init_collection(someobject, "elements")
2699 for elem in values:
2700 collection_adapter.append_without_event(elem)
2701
2702 For an easier way to do the above, see
2703 :func:`~sqlalchemy.orm.attributes.set_committed_value`.
2704
2705 :param obj: a mapped object
2706
2707 :param key: string attribute name where the collection is located.
2708
2709 """
2710 state = instance_state(obj)
2711 dict_ = state.dict
2712 return init_state_collection(state, dict_, key)
2713
2714
2715def init_state_collection(
2716 state: InstanceState[Any], dict_: _InstanceDict, key: str
2717) -> CollectionAdapter:
2718 """Initialize a collection attribute and return the collection adapter.
2719
2720 Discards any existing collection which may be there.
2721
2722 """
2723 attr = state.manager[key].impl
2724
2725 if TYPE_CHECKING:
2726 assert isinstance(attr, _HasCollectionAdapter)
2727
2728 old = dict_.pop(key, None) # discard old collection
2729 if old is not None:
2730 old_collection = old._sa_adapter
2731 attr._dispose_previous_collection(state, old, old_collection, False)
2732
2733 user_data = attr._default_value(state, dict_)
2734 adapter: CollectionAdapter = attr.get_collection(
2735 state, dict_, user_data, passive=PassiveFlag.PASSIVE_NO_FETCH
2736 )
2737 adapter._reset_empty()
2738
2739 return adapter
2740
2741
2742def set_committed_value(instance: object, key: str, value: Any) -> None:
2743 """Set the value of an attribute with no history events.
2744
2745 Cancels any previous history present. The value should be
2746 a scalar value for scalar-holding attributes, or
2747 an iterable for any collection-holding attribute.
2748
2749 This is the same underlying method used when a lazy loader
2750 fires off and loads additional data from the database.
2751 In particular, this method can be used by application code
2752 which has loaded additional attributes or collections through
2753 separate queries, which can then be attached to an instance
2754 as though it were part of its original loaded state.
2755
2756 """
2757 state, dict_ = instance_state(instance), instance_dict(instance)
2758 state.manager[key].impl.set_committed_value(state, dict_, value)
2759
2760
2761def set_attribute(
2762 instance: object,
2763 key: str,
2764 value: Any,
2765 initiator: Optional[AttributeEventToken] = None,
2766) -> None:
2767 """Set the value of an attribute, firing history events.
2768
2769 This function may be used regardless of instrumentation
2770 applied directly to the class, i.e. no descriptors are required.
2771 Custom attribute management schemes will need to make usage
2772 of this method to establish attribute state as understood
2773 by SQLAlchemy.
2774
2775 :param instance: the object that will be modified
2776
2777 :param key: string name of the attribute
2778
2779 :param value: value to assign
2780
2781 :param initiator: an instance of :class:`.Event` that would have
2782 been propagated from a previous event listener. This argument
2783 is used when the :func:`.set_attribute` function is being used within
2784 an existing event listening function where an :class:`.Event` object
2785 is being supplied; the object may be used to track the origin of the
2786 chain of events.
2787
2788 """
2789 state, dict_ = instance_state(instance), instance_dict(instance)
2790 state.manager[key].impl.set(state, dict_, value, initiator)
2791
2792
2793def get_attribute(instance: object, key: str) -> Any:
2794 """Get the value of an attribute, firing any callables required.
2795
2796 This function may be used regardless of instrumentation
2797 applied directly to the class, i.e. no descriptors are required.
2798 Custom attribute management schemes will need to make usage
2799 of this method to make usage of attribute state as understood
2800 by SQLAlchemy.
2801
2802 """
2803 state, dict_ = instance_state(instance), instance_dict(instance)
2804 return state.manager[key].impl.get(state, dict_)
2805
2806
2807def del_attribute(instance: object, key: str) -> None:
2808 """Delete the value of an attribute, firing history events.
2809
2810 This function may be used regardless of instrumentation
2811 applied directly to the class, i.e. no descriptors are required.
2812 Custom attribute management schemes will need to make usage
2813 of this method to establish attribute state as understood
2814 by SQLAlchemy.
2815
2816 """
2817 state, dict_ = instance_state(instance), instance_dict(instance)
2818 state.manager[key].impl.delete(state, dict_)
2819
2820
2821def flag_modified(instance: object, key: str) -> None:
2822 """Mark an attribute on an instance as 'modified'.
2823
2824 This sets the 'modified' flag on the instance and
2825 establishes an unconditional change event for the given attribute.
2826 The attribute must have a value present, else an
2827 :class:`.InvalidRequestError` is raised.
2828
2829 To mark an object "dirty" without referring to any specific attribute
2830 so that it is considered within a flush, use the
2831 :func:`.attributes.flag_dirty` call.
2832
2833 .. seealso::
2834
2835 :func:`.attributes.flag_dirty`
2836
2837 """
2838 state, dict_ = instance_state(instance), instance_dict(instance)
2839 impl = state.manager[key].impl
2840 impl.dispatch.modified(state, impl._modified_token)
2841 state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
2842
2843
2844def flag_dirty(instance: object) -> None:
2845 """Mark an instance as 'dirty' without any specific attribute mentioned.
2846
2847 This is a special operation that will allow the object to travel through
2848 the flush process for interception by events such as
2849 :meth:`.SessionEvents.before_flush`. Note that no SQL will be emitted in
2850 the flush process for an object that has no changes, even if marked dirty
2851 via this method. However, a :meth:`.SessionEvents.before_flush` handler
2852 will be able to see the object in the :attr:`.Session.dirty` collection and
2853 may establish changes on it, which will then be included in the SQL
2854 emitted.
2855
2856 .. seealso::
2857
2858 :func:`.attributes.flag_modified`
2859
2860 """
2861
2862 state, dict_ = instance_state(instance), instance_dict(instance)
2863 state._modified_event(dict_, None, NO_VALUE, is_userland=True)