1# orm/attributes.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Defines instrumentation for class attributes and their interaction
10with instances.
11
12This module is usually not directly visible to user applications, but
13defines a large part of the ORM's interactivity.
14
15
16"""
17
18from __future__ import annotations
19
20import dataclasses
21import operator
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import ClassVar
26from typing import Dict
27from typing import Iterable
28from typing import List
29from typing import NamedTuple
30from typing import Optional
31from typing import overload
32from typing import Sequence
33from typing import Tuple
34from typing import Type
35from typing import TYPE_CHECKING
36from typing import TypeVar
37from typing import Union
38
39from . import collections
40from . import exc as orm_exc
41from . import interfaces
42from ._typing import insp_is_aliased_class
43from .base import _DeclarativeMapped
44from .base import ATTR_EMPTY
45from .base import ATTR_WAS_SET
46from .base import CALLABLES_OK
47from .base import DEFERRED_HISTORY_LOAD
48from .base import INCLUDE_PENDING_MUTATIONS # noqa
49from .base import INIT_OK
50from .base import instance_dict as instance_dict
51from .base import instance_state as instance_state
52from .base import instance_str
53from .base import LOAD_AGAINST_COMMITTED
54from .base import LoaderCallableStatus
55from .base import manager_of_class as manager_of_class
56from .base import Mapped as Mapped # noqa
57from .base import NEVER_SET # noqa
58from .base import NO_AUTOFLUSH
59from .base import NO_CHANGE # noqa
60from .base import NO_KEY
61from .base import NO_RAISE
62from .base import NO_VALUE
63from .base import NON_PERSISTENT_OK # noqa
64from .base import opt_manager_of_class as opt_manager_of_class
65from .base import PASSIVE_CLASS_MISMATCH # noqa
66from .base import PASSIVE_NO_FETCH
67from .base import PASSIVE_NO_FETCH_RELATED # noqa
68from .base import PASSIVE_NO_INITIALIZE
69from .base import PASSIVE_NO_RESULT
70from .base import PASSIVE_OFF
71from .base import PASSIVE_ONLY_PERSISTENT
72from .base import PASSIVE_RETURN_NO_VALUE
73from .base import PassiveFlag
74from .base import RELATED_OBJECT_OK # noqa
75from .base import SQL_OK # noqa
76from .base import SQLORMExpression
77from .base import state_str
78from .. import event
79from .. import exc
80from .. import inspection
81from .. import util
82from ..event import dispatcher
83from ..event import EventTarget
84from ..sql import base as sql_base
85from ..sql import cache_key
86from ..sql import coercions
87from ..sql import roles
88from ..sql import visitors
89from ..sql.cache_key import HasCacheKey
90from ..sql.visitors import _TraverseInternalsType
91from ..sql.visitors import InternalTraversal
92from ..util.typing import Literal
93from ..util.typing import Self
94from ..util.typing import TypeGuard
95
96if TYPE_CHECKING:
97 from ._typing import _EntityType
98 from ._typing import _ExternalEntityType
99 from ._typing import _InstanceDict
100 from ._typing import _InternalEntityType
101 from ._typing import _LoaderCallable
102 from ._typing import _O
103 from .collections import _AdaptedCollectionProtocol
104 from .collections import CollectionAdapter
105 from .interfaces import MapperProperty
106 from .relationships import RelationshipProperty
107 from .state import InstanceState
108 from .util import AliasedInsp
109 from .writeonly import WriteOnlyAttributeImpl
110 from ..event.base import _Dispatch
111 from ..sql._typing import _ColumnExpressionArgument
112 from ..sql._typing import _DMLColumnArgument
113 from ..sql._typing import _InfoType
114 from ..sql._typing import _PropagateAttrsType
115 from ..sql.annotation import _AnnotationDict
116 from ..sql.elements import ColumnElement
117 from ..sql.elements import Label
118 from ..sql.operators import OperatorType
119 from ..sql.selectable import FromClause
120
121
122_T = TypeVar("_T")
123_T_co = TypeVar("_T_co", bound=Any, covariant=True)
124
125
126_AllPendingType = Sequence[
127 Tuple[Optional["InstanceState[Any]"], Optional[object]]
128]
129
130
131_UNKNOWN_ATTR_KEY = object()
132
133
134@inspection._self_inspects
135class QueryableAttribute(
136 _DeclarativeMapped[_T_co],
137 SQLORMExpression[_T_co],
138 interfaces.InspectionAttr,
139 interfaces.PropComparator[_T_co],
140 roles.JoinTargetRole,
141 roles.OnClauseRole,
142 sql_base.Immutable,
143 cache_key.SlotsMemoizedHasCacheKey,
144 util.MemoizedSlots,
145 EventTarget,
146):
147 """Base class for :term:`descriptor` objects that intercept
148 attribute events on behalf of a :class:`.MapperProperty`
149 object. The actual :class:`.MapperProperty` is accessible
150 via the :attr:`.QueryableAttribute.property`
151 attribute.
152
153
154 .. seealso::
155
156 :class:`.InstrumentedAttribute`
157
158 :class:`.MapperProperty`
159
160 :attr:`_orm.Mapper.all_orm_descriptors`
161
162 :attr:`_orm.Mapper.attrs`
163 """
164
165 __slots__ = (
166 "class_",
167 "key",
168 "impl",
169 "comparator",
170 "property",
171 "parent",
172 "expression",
173 "_of_type",
174 "_extra_criteria",
175 "_slots_dispatch",
176 "_propagate_attrs",
177 "_doc",
178 )
179
180 is_attribute = True
181
182 dispatch: dispatcher[QueryableAttribute[_T_co]]
183
184 class_: _ExternalEntityType[Any]
185 key: str
186 parententity: _InternalEntityType[Any]
187 impl: AttributeImpl
188 comparator: interfaces.PropComparator[_T_co]
189 _of_type: Optional[_InternalEntityType[Any]]
190 _extra_criteria: Tuple[ColumnElement[bool], ...]
191 _doc: Optional[str]
192
193 # PropComparator has a __visit_name__ to participate within
194 # traversals. Disambiguate the attribute vs. a comparator.
195 __visit_name__ = "orm_instrumented_attribute"
196
197 def __init__(
198 self,
199 class_: _ExternalEntityType[_O],
200 key: str,
201 parententity: _InternalEntityType[_O],
202 comparator: interfaces.PropComparator[_T_co],
203 impl: Optional[AttributeImpl] = None,
204 of_type: Optional[_InternalEntityType[Any]] = None,
205 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
206 ):
207 self.class_ = class_
208 self.key = key
209
210 self._parententity = self.parent = parententity
211
212 # this attribute is non-None after mappers are set up, however in the
213 # interim class manager setup, there's a check for None to see if it
214 # needs to be populated, so we assign None here leaving the attribute
215 # in a temporarily not-type-correct state
216 self.impl = impl # type: ignore
217
218 assert comparator is not None
219 self.comparator = comparator
220 self._of_type = of_type
221 self._extra_criteria = extra_criteria
222 self._doc = None
223
224 manager = opt_manager_of_class(class_)
225 # manager is None in the case of AliasedClass
226 if manager:
227 # propagate existing event listeners from
228 # immediate superclass
229 for base in manager._bases:
230 if key in base:
231 self.dispatch._update(base[key].dispatch)
232 if base[key].dispatch._active_history:
233 self.dispatch._active_history = True # type: ignore
234
235 _cache_key_traversal = [
236 ("key", visitors.ExtendedInternalTraversal.dp_string),
237 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
238 ("_of_type", visitors.ExtendedInternalTraversal.dp_multi),
239 ("_extra_criteria", visitors.InternalTraversal.dp_clauseelement_list),
240 ]
241
242 def __reduce__(self) -> Any:
243 # this method is only used in terms of the
244 # sqlalchemy.ext.serializer extension
245 return (
246 _queryable_attribute_unreduce,
247 (
248 self.key,
249 self._parententity.mapper.class_,
250 self._parententity,
251 self._parententity.entity,
252 ),
253 )
254
255 @property
256 def _impl_uses_objects(self) -> bool:
257 return self.impl.uses_objects
258
259 def get_history(
260 self, instance: Any, passive: PassiveFlag = PASSIVE_OFF
261 ) -> History:
262 return self.impl.get_history(
263 instance_state(instance), instance_dict(instance), passive
264 )
265
266 @property
267 def info(self) -> _InfoType:
268 """Return the 'info' dictionary for the underlying SQL element.
269
270 The behavior here is as follows:
271
272 * If the attribute is a column-mapped property, i.e.
273 :class:`.ColumnProperty`, which is mapped directly
274 to a schema-level :class:`_schema.Column` object, this attribute
275 will return the :attr:`.SchemaItem.info` dictionary associated
276 with the core-level :class:`_schema.Column` object.
277
278 * If the attribute is a :class:`.ColumnProperty` but is mapped to
279 any other kind of SQL expression other than a
280 :class:`_schema.Column`,
281 the attribute will refer to the :attr:`.MapperProperty.info`
282 dictionary associated directly with the :class:`.ColumnProperty`,
283 assuming the SQL expression itself does not have its own ``.info``
284 attribute (which should be the case, unless a user-defined SQL
285 construct has defined one).
286
287 * If the attribute refers to any other kind of
288 :class:`.MapperProperty`, including :class:`.Relationship`,
289 the attribute will refer to the :attr:`.MapperProperty.info`
290 dictionary associated with that :class:`.MapperProperty`.
291
292 * To access the :attr:`.MapperProperty.info` dictionary of the
293 :class:`.MapperProperty` unconditionally, including for a
294 :class:`.ColumnProperty` that's associated directly with a
295 :class:`_schema.Column`, the attribute can be referred to using
296 :attr:`.QueryableAttribute.property` attribute, as
297 ``MyClass.someattribute.property.info``.
298
299 .. seealso::
300
301 :attr:`.SchemaItem.info`
302
303 :attr:`.MapperProperty.info`
304
305 """
306 return self.comparator.info
307
308 parent: _InternalEntityType[Any]
309 """Return an inspection instance representing the parent.
310
311 This will be either an instance of :class:`_orm.Mapper`
312 or :class:`.AliasedInsp`, depending upon the nature
313 of the parent entity which this attribute is associated
314 with.
315
316 """
317
318 expression: ColumnElement[_T_co]
319 """The SQL expression object represented by this
320 :class:`.QueryableAttribute`.
321
322 This will typically be an instance of a :class:`_sql.ColumnElement`
323 subclass representing a column expression.
324
325 """
326
327 def _memoized_attr_expression(self) -> ColumnElement[_T]:
328 annotations: _AnnotationDict
329
330 # applies only to Proxy() as used by hybrid.
331 # currently is an exception to typing rather than feeding through
332 # non-string keys.
333 # ideally Proxy() would have a separate set of methods to deal
334 # with this case.
335 entity_namespace = self._entity_namespace
336 assert isinstance(entity_namespace, HasCacheKey)
337
338 if self.key is _UNKNOWN_ATTR_KEY:
339 annotations = {"entity_namespace": entity_namespace}
340 else:
341 annotations = {
342 "proxy_key": self.key,
343 "proxy_owner": self._parententity,
344 "entity_namespace": entity_namespace,
345 }
346
347 ce = self.comparator.__clause_element__()
348 try:
349 if TYPE_CHECKING:
350 assert isinstance(ce, ColumnElement)
351 anno = ce._annotate
352 except AttributeError as ae:
353 raise exc.InvalidRequestError(
354 'When interpreting attribute "%s" as a SQL expression, '
355 "expected __clause_element__() to return "
356 "a ClauseElement object, got: %r" % (self, ce)
357 ) from ae
358 else:
359 return anno(annotations)
360
361 def _memoized_attr__propagate_attrs(self) -> _PropagateAttrsType:
362 # this suits the case in coercions where we don't actually
363 # call ``__clause_element__()`` but still need to get
364 # resolved._propagate_attrs. See #6558.
365 return util.immutabledict(
366 {
367 "compile_state_plugin": "orm",
368 "plugin_subject": self._parentmapper,
369 }
370 )
371
372 @property
373 def _entity_namespace(self) -> _InternalEntityType[Any]:
374 return self._parententity
375
376 @property
377 def _annotations(self) -> _AnnotationDict:
378 return self.__clause_element__()._annotations
379
380 def __clause_element__(self) -> ColumnElement[_T_co]:
381 return self.expression
382
383 @property
384 def _from_objects(self) -> List[FromClause]:
385 return self.expression._from_objects
386
387 def _bulk_update_tuples(
388 self, value: Any
389 ) -> Sequence[Tuple[_DMLColumnArgument, Any]]:
390 """Return setter tuples for a bulk UPDATE."""
391
392 return self.comparator._bulk_update_tuples(value)
393
394 def adapt_to_entity(self, adapt_to_entity: AliasedInsp[Any]) -> Self:
395 assert not self._of_type
396 return self.__class__(
397 adapt_to_entity.entity,
398 self.key,
399 impl=self.impl,
400 comparator=self.comparator.adapt_to_entity(adapt_to_entity),
401 parententity=adapt_to_entity,
402 )
403
404 def of_type(self, entity: _EntityType[_T]) -> QueryableAttribute[_T]:
405 return QueryableAttribute(
406 self.class_,
407 self.key,
408 self._parententity,
409 impl=self.impl,
410 comparator=self.comparator.of_type(entity),
411 of_type=inspection.inspect(entity),
412 extra_criteria=self._extra_criteria,
413 )
414
415 def and_(
416 self, *clauses: _ColumnExpressionArgument[bool]
417 ) -> QueryableAttribute[bool]:
418 if TYPE_CHECKING:
419 assert isinstance(self.comparator, RelationshipProperty.Comparator)
420
421 exprs = tuple(
422 coercions.expect(roles.WhereHavingRole, clause)
423 for clause in util.coerce_generator_arg(clauses)
424 )
425
426 return QueryableAttribute(
427 self.class_,
428 self.key,
429 self._parententity,
430 impl=self.impl,
431 comparator=self.comparator.and_(*exprs),
432 of_type=self._of_type,
433 extra_criteria=self._extra_criteria + exprs,
434 )
435
436 def _clone(self, **kw: Any) -> QueryableAttribute[_T]:
437 return QueryableAttribute(
438 self.class_,
439 self.key,
440 self._parententity,
441 impl=self.impl,
442 comparator=self.comparator,
443 of_type=self._of_type,
444 extra_criteria=self._extra_criteria,
445 )
446
447 def label(self, name: Optional[str]) -> Label[_T_co]:
448 return self.__clause_element__().label(name)
449
450 def operate(
451 self, op: OperatorType, *other: Any, **kwargs: Any
452 ) -> ColumnElement[Any]:
453 return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
454
455 def reverse_operate(
456 self, op: OperatorType, other: Any, **kwargs: Any
457 ) -> ColumnElement[Any]:
458 return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
459
460 def hasparent(
461 self, state: InstanceState[Any], optimistic: bool = False
462 ) -> bool:
463 return self.impl.hasparent(state, optimistic=optimistic) is not False
464
465 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
466 return (self,)
467
468 def __getattr__(self, key: str) -> Any:
469 try:
470 return util.MemoizedSlots.__getattr__(self, key)
471 except AttributeError:
472 pass
473
474 try:
475 return getattr(self.comparator, key)
476 except AttributeError as err:
477 raise AttributeError(
478 "Neither %r object nor %r object associated with %s "
479 "has an attribute %r"
480 % (
481 type(self).__name__,
482 type(self.comparator).__name__,
483 self,
484 key,
485 )
486 ) from err
487
488 def __str__(self) -> str:
489 return f"{self.class_.__name__}.{self.key}"
490
491 def _memoized_attr_property(self) -> Optional[MapperProperty[Any]]:
492 return self.comparator.property
493
494
495def _queryable_attribute_unreduce(
496 key: str,
497 mapped_class: Type[_O],
498 parententity: _InternalEntityType[_O],
499 entity: _ExternalEntityType[Any],
500) -> Any:
501 # this method is only used in terms of the
502 # sqlalchemy.ext.serializer extension
503 if insp_is_aliased_class(parententity):
504 return entity._get_from_serialized(key, mapped_class, parententity)
505 else:
506 return getattr(entity, key)
507
508
509class InstrumentedAttribute(QueryableAttribute[_T_co]):
510 """Class bound instrumented attribute which adds basic
511 :term:`descriptor` methods.
512
513 See :class:`.QueryableAttribute` for a description of most features.
514
515
516 """
517
518 __slots__ = ()
519
520 inherit_cache = True
521 """:meta private:"""
522
523 # hack to make __doc__ writeable on instances of
524 # InstrumentedAttribute, while still keeping classlevel
525 # __doc__ correct
526
527 @util.rw_hybridproperty
528 def __doc__(self) -> Optional[str]:
529 return self._doc
530
531 @__doc__.setter # type: ignore
532 def __doc__(self, value: Optional[str]) -> None:
533 self._doc = value
534
535 @__doc__.classlevel # type: ignore
536 def __doc__(cls) -> Optional[str]:
537 return super().__doc__
538
539 def __set__(self, instance: object, value: Any) -> None:
540 self.impl.set(
541 instance_state(instance), instance_dict(instance), value, None
542 )
543
544 def __delete__(self, instance: object) -> None:
545 self.impl.delete(instance_state(instance), instance_dict(instance))
546
547 @overload
548 def __get__(
549 self, instance: None, owner: Any
550 ) -> InstrumentedAttribute[_T_co]: ...
551
552 @overload
553 def __get__(self, instance: object, owner: Any) -> _T_co: ...
554
555 def __get__(
556 self, instance: Optional[object], owner: Any
557 ) -> Union[InstrumentedAttribute[_T_co], _T_co]:
558 if instance is None:
559 return self
560
561 dict_ = instance_dict(instance)
562 if self.impl.supports_population and self.key in dict_:
563 return dict_[self.key] # type: ignore[no-any-return]
564 else:
565 try:
566 state = instance_state(instance)
567 except AttributeError as err:
568 raise orm_exc.UnmappedInstanceError(instance) from err
569 return self.impl.get(state, dict_) # type: ignore[no-any-return]
570
571
572@dataclasses.dataclass(frozen=True)
573class AdHocHasEntityNamespace(HasCacheKey):
574 _traverse_internals: ClassVar[_TraverseInternalsType] = [
575 ("_entity_namespace", InternalTraversal.dp_has_cache_key),
576 ]
577
578 # py37 compat, no slots=True on dataclass
579 __slots__ = ("_entity_namespace",)
580 _entity_namespace: _InternalEntityType[Any]
581 is_mapper: ClassVar[bool] = False
582 is_aliased_class: ClassVar[bool] = False
583
584 @property
585 def entity_namespace(self):
586 return self._entity_namespace.entity_namespace
587
588
589def create_proxied_attribute(
590 descriptor: Any,
591) -> Callable[..., QueryableAttribute[Any]]:
592 """Create an QueryableAttribute / user descriptor hybrid.
593
594 Returns a new QueryableAttribute type that delegates descriptor
595 behavior and getattr() to the given descriptor.
596 """
597
598 # TODO: can move this to descriptor_props if the need for this
599 # function is removed from ext/hybrid.py
600
601 class Proxy(QueryableAttribute[_T_co]):
602 """Presents the :class:`.QueryableAttribute` interface as a
603 proxy on top of a Python descriptor / :class:`.PropComparator`
604 combination.
605
606 """
607
608 _extra_criteria = ()
609
610 # the attribute error catches inside of __getattr__ basically create a
611 # singularity if you try putting slots on this too
612 # __slots__ = ("descriptor", "original_property", "_comparator")
613
614 def __init__(
615 self,
616 class_: _ExternalEntityType[Any],
617 key: str,
618 descriptor: Any,
619 comparator: interfaces.PropComparator[_T_co],
620 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
621 doc: Optional[str] = None,
622 original_property: Optional[QueryableAttribute[_T_co]] = None,
623 ):
624 self.class_ = class_
625 self.key = key
626 self.descriptor = descriptor
627 self.original_property = original_property
628 self._comparator = comparator
629 self._adapt_to_entity = adapt_to_entity
630 self._doc = self.__doc__ = doc
631
632 @property
633 def _parententity(self): # type: ignore[override]
634 return inspection.inspect(self.class_, raiseerr=False)
635
636 @property
637 def parent(self): # type: ignore[override]
638 return inspection.inspect(self.class_, raiseerr=False)
639
640 _is_internal_proxy = True
641
642 _cache_key_traversal = [
643 ("key", visitors.ExtendedInternalTraversal.dp_string),
644 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
645 ]
646
647 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
648 prop = self.original_property
649 if prop is None:
650 return ()
651 else:
652 return prop._column_strategy_attrs()
653
654 @property
655 def _impl_uses_objects(self):
656 return (
657 self.original_property is not None
658 and getattr(self.class_, self.key).impl.uses_objects
659 )
660
661 @property
662 def _entity_namespace(self):
663 if hasattr(self._comparator, "_parententity"):
664 return self._comparator._parententity
665 else:
666 # used by hybrid attributes which try to remain
667 # agnostic of any ORM concepts like mappers
668 return AdHocHasEntityNamespace(self._parententity)
669
670 @property
671 def property(self):
672 return self.comparator.property
673
674 @util.memoized_property
675 def comparator(self):
676 if callable(self._comparator):
677 self._comparator = self._comparator()
678 if self._adapt_to_entity:
679 self._comparator = self._comparator.adapt_to_entity(
680 self._adapt_to_entity
681 )
682 return self._comparator
683
684 def adapt_to_entity(self, adapt_to_entity):
685 return self.__class__(
686 adapt_to_entity.entity,
687 self.key,
688 self.descriptor,
689 self._comparator,
690 adapt_to_entity,
691 )
692
693 def _clone(self, **kw):
694 return self.__class__(
695 self.class_,
696 self.key,
697 self.descriptor,
698 self._comparator,
699 adapt_to_entity=self._adapt_to_entity,
700 original_property=self.original_property,
701 )
702
703 def __get__(self, instance, owner):
704 retval = self.descriptor.__get__(instance, owner)
705 # detect if this is a plain Python @property, which just returns
706 # itself for class level access. If so, then return us.
707 # Otherwise, return the object returned by the descriptor.
708 if retval is self.descriptor and instance is None:
709 return self
710 else:
711 return retval
712
713 def __str__(self) -> str:
714 return f"{self.class_.__name__}.{self.key}"
715
716 def __getattr__(self, attribute):
717 """Delegate __getattr__ to the original descriptor and/or
718 comparator."""
719
720 # this is unfortunately very complicated, and is easily prone
721 # to recursion overflows when implementations of related
722 # __getattr__ schemes are changed
723
724 try:
725 return util.MemoizedSlots.__getattr__(self, attribute)
726 except AttributeError:
727 pass
728
729 try:
730 return getattr(descriptor, attribute)
731 except AttributeError as err:
732 if attribute == "comparator":
733 raise AttributeError("comparator") from err
734 try:
735 # comparator itself might be unreachable
736 comparator = self.comparator
737 except AttributeError as err2:
738 raise AttributeError(
739 "Neither %r object nor unconfigured comparator "
740 "object associated with %s has an attribute %r"
741 % (type(descriptor).__name__, self, attribute)
742 ) from err2
743 else:
744 try:
745 return getattr(comparator, attribute)
746 except AttributeError as err3:
747 raise AttributeError(
748 "Neither %r object nor %r object "
749 "associated with %s has an attribute %r"
750 % (
751 type(descriptor).__name__,
752 type(comparator).__name__,
753 self,
754 attribute,
755 )
756 ) from err3
757
758 Proxy.__name__ = type(descriptor).__name__ + "Proxy"
759
760 util.monkeypatch_proxied_specials(
761 Proxy, type(descriptor), name="descriptor", from_instance=descriptor
762 )
763 return Proxy
764
765
766OP_REMOVE = util.symbol("REMOVE")
767OP_APPEND = util.symbol("APPEND")
768OP_REPLACE = util.symbol("REPLACE")
769OP_BULK_REPLACE = util.symbol("BULK_REPLACE")
770OP_MODIFIED = util.symbol("MODIFIED")
771
772
773class AttributeEventToken:
774 """A token propagated throughout the course of a chain of attribute
775 events.
776
777 Serves as an indicator of the source of the event and also provides
778 a means of controlling propagation across a chain of attribute
779 operations.
780
781 The :class:`.Event` object is sent as the ``initiator`` argument
782 when dealing with events such as :meth:`.AttributeEvents.append`,
783 :meth:`.AttributeEvents.set`,
784 and :meth:`.AttributeEvents.remove`.
785
786 The :class:`.Event` object is currently interpreted by the backref
787 event handlers, and is used to control the propagation of operations
788 across two mutually-dependent attributes.
789
790 .. versionchanged:: 2.0 Changed the name from ``AttributeEvent``
791 to ``AttributeEventToken``.
792
793 :attribute impl: The :class:`.AttributeImpl` which is the current event
794 initiator.
795
796 :attribute op: The symbol :attr:`.OP_APPEND`, :attr:`.OP_REMOVE`,
797 :attr:`.OP_REPLACE`, or :attr:`.OP_BULK_REPLACE`, indicating the
798 source operation.
799
800 """
801
802 __slots__ = "impl", "op", "parent_token"
803
804 def __init__(self, attribute_impl: AttributeImpl, op: util.symbol):
805 self.impl = attribute_impl
806 self.op = op
807 self.parent_token = self.impl.parent_token
808
809 def __eq__(self, other):
810 return (
811 isinstance(other, AttributeEventToken)
812 and other.impl is self.impl
813 and other.op == self.op
814 )
815
816 @property
817 def key(self):
818 return self.impl.key
819
820 def hasparent(self, state):
821 return self.impl.hasparent(state)
822
823
824AttributeEvent = AttributeEventToken # legacy
825Event = AttributeEventToken # legacy
826
827
828class AttributeImpl:
829 """internal implementation for instrumented attributes."""
830
831 collection: bool
832 default_accepts_scalar_loader: bool
833 uses_objects: bool
834 supports_population: bool
835 dynamic: bool
836
837 _is_has_collection_adapter = False
838
839 _replace_token: AttributeEventToken
840 _remove_token: AttributeEventToken
841 _append_token: AttributeEventToken
842
843 def __init__(
844 self,
845 class_: _ExternalEntityType[_O],
846 key: str,
847 callable_: Optional[_LoaderCallable],
848 dispatch: _Dispatch[QueryableAttribute[Any]],
849 trackparent: bool = False,
850 compare_function: Optional[Callable[..., bool]] = None,
851 active_history: bool = False,
852 parent_token: Optional[AttributeEventToken] = None,
853 load_on_unexpire: bool = True,
854 send_modified_events: bool = True,
855 accepts_scalar_loader: Optional[bool] = None,
856 **kwargs: Any,
857 ):
858 r"""Construct an AttributeImpl.
859
860 :param \class_: associated class
861
862 :param key: string name of the attribute
863
864 :param \callable_:
865 optional function which generates a callable based on a parent
866 instance, which produces the "default" values for a scalar or
867 collection attribute when it's first accessed, if not present
868 already.
869
870 :param trackparent:
871 if True, attempt to track if an instance has a parent attached
872 to it via this attribute.
873
874 :param compare_function:
875 a function that compares two values which are normally
876 assignable to this attribute.
877
878 :param active_history:
879 indicates that get_history() should always return the "old" value,
880 even if it means executing a lazy callable upon attribute change.
881
882 :param parent_token:
883 Usually references the MapperProperty, used as a key for
884 the hasparent() function to identify an "owning" attribute.
885 Allows multiple AttributeImpls to all match a single
886 owner attribute.
887
888 :param load_on_unexpire:
889 if False, don't include this attribute in a load-on-expired
890 operation, i.e. the "expired_attribute_loader" process.
891 The attribute can still be in the "expired" list and be
892 considered to be "expired". Previously, this flag was called
893 "expire_missing" and is only used by a deferred column
894 attribute.
895
896 :param send_modified_events:
897 if False, the InstanceState._modified_event method will have no
898 effect; this means the attribute will never show up as changed in a
899 history entry.
900
901 """
902 self.class_ = class_
903 self.key = key
904 self.callable_ = callable_
905 self.dispatch = dispatch
906 self.trackparent = trackparent
907 self.parent_token = parent_token or self
908 self.send_modified_events = send_modified_events
909 if compare_function is None:
910 self.is_equal = operator.eq
911 else:
912 self.is_equal = compare_function
913
914 if accepts_scalar_loader is not None:
915 self.accepts_scalar_loader = accepts_scalar_loader
916 else:
917 self.accepts_scalar_loader = self.default_accepts_scalar_loader
918
919 _deferred_history = kwargs.pop("_deferred_history", False)
920 self._deferred_history = _deferred_history
921
922 if active_history:
923 self.dispatch._active_history = True
924
925 self.load_on_unexpire = load_on_unexpire
926 self._modified_token = AttributeEventToken(self, OP_MODIFIED)
927
928 __slots__ = (
929 "class_",
930 "key",
931 "callable_",
932 "dispatch",
933 "trackparent",
934 "parent_token",
935 "send_modified_events",
936 "is_equal",
937 "load_on_unexpire",
938 "_modified_token",
939 "accepts_scalar_loader",
940 "_deferred_history",
941 )
942
943 def __str__(self) -> str:
944 return f"{self.class_.__name__}.{self.key}"
945
946 def _get_active_history(self):
947 """Backwards compat for impl.active_history"""
948
949 return self.dispatch._active_history
950
951 def _set_active_history(self, value):
952 self.dispatch._active_history = value
953
954 active_history = property(_get_active_history, _set_active_history)
955
956 def hasparent(
957 self, state: InstanceState[Any], optimistic: bool = False
958 ) -> bool:
959 """Return the boolean value of a `hasparent` flag attached to
960 the given state.
961
962 The `optimistic` flag determines what the default return value
963 should be if no `hasparent` flag can be located.
964
965 As this function is used to determine if an instance is an
966 *orphan*, instances that were loaded from storage should be
967 assumed to not be orphans, until a True/False value for this
968 flag is set.
969
970 An instance attribute that is loaded by a callable function
971 will also not have a `hasparent` flag.
972
973 """
974 msg = "This AttributeImpl is not configured to track parents."
975 assert self.trackparent, msg
976
977 return (
978 state.parents.get(id(self.parent_token), optimistic) is not False
979 )
980
981 def sethasparent(
982 self,
983 state: InstanceState[Any],
984 parent_state: InstanceState[Any],
985 value: bool,
986 ) -> None:
987 """Set a boolean flag on the given item corresponding to
988 whether or not it is attached to a parent object via the
989 attribute represented by this ``InstrumentedAttribute``.
990
991 """
992 msg = "This AttributeImpl is not configured to track parents."
993 assert self.trackparent, msg
994
995 id_ = id(self.parent_token)
996 if value:
997 state.parents[id_] = parent_state
998 else:
999 if id_ in state.parents:
1000 last_parent = state.parents[id_]
1001
1002 if (
1003 last_parent is not False
1004 and last_parent.key != parent_state.key
1005 ):
1006 if last_parent.obj() is None:
1007 raise orm_exc.StaleDataError(
1008 "Removing state %s from parent "
1009 "state %s along attribute '%s', "
1010 "but the parent record "
1011 "has gone stale, can't be sure this "
1012 "is the most recent parent."
1013 % (
1014 state_str(state),
1015 state_str(parent_state),
1016 self.key,
1017 )
1018 )
1019
1020 return
1021
1022 state.parents[id_] = False
1023
1024 def get_history(
1025 self,
1026 state: InstanceState[Any],
1027 dict_: _InstanceDict,
1028 passive: PassiveFlag = PASSIVE_OFF,
1029 ) -> History:
1030 raise NotImplementedError()
1031
1032 def get_all_pending(
1033 self,
1034 state: InstanceState[Any],
1035 dict_: _InstanceDict,
1036 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1037 ) -> _AllPendingType:
1038 """Return a list of tuples of (state, obj)
1039 for all objects in this attribute's current state
1040 + history.
1041
1042 Only applies to object-based attributes.
1043
1044 This is an inlining of existing functionality
1045 which roughly corresponds to:
1046
1047 get_state_history(
1048 state,
1049 key,
1050 passive=PASSIVE_NO_INITIALIZE).sum()
1051
1052 """
1053 raise NotImplementedError()
1054
1055 def _default_value(
1056 self, state: InstanceState[Any], dict_: _InstanceDict
1057 ) -> Any:
1058 """Produce an empty value for an uninitialized scalar attribute."""
1059
1060 assert self.key not in dict_, (
1061 "_default_value should only be invoked for an "
1062 "uninitialized or expired attribute"
1063 )
1064
1065 value = None
1066 for fn in self.dispatch.init_scalar:
1067 ret = fn(state, value, dict_)
1068 if ret is not ATTR_EMPTY:
1069 value = ret
1070
1071 return value
1072
1073 def get(
1074 self,
1075 state: InstanceState[Any],
1076 dict_: _InstanceDict,
1077 passive: PassiveFlag = PASSIVE_OFF,
1078 ) -> Any:
1079 """Retrieve a value from the given object.
1080 If a callable is assembled on this object's attribute, and
1081 passive is False, the callable will be executed and the
1082 resulting value will be set as the new value for this attribute.
1083 """
1084 if self.key in dict_:
1085 return dict_[self.key]
1086 else:
1087 # if history present, don't load
1088 key = self.key
1089 if (
1090 key not in state.committed_state
1091 or state.committed_state[key] is NO_VALUE
1092 ):
1093 if not passive & CALLABLES_OK:
1094 return PASSIVE_NO_RESULT
1095
1096 value = self._fire_loader_callables(state, key, passive)
1097
1098 if value is PASSIVE_NO_RESULT or value is NO_VALUE:
1099 return value
1100 elif value is ATTR_WAS_SET:
1101 try:
1102 return dict_[key]
1103 except KeyError as err:
1104 # TODO: no test coverage here.
1105 raise KeyError(
1106 "Deferred loader for attribute "
1107 "%r failed to populate "
1108 "correctly" % key
1109 ) from err
1110 elif value is not ATTR_EMPTY:
1111 return self.set_committed_value(state, dict_, value)
1112
1113 if not passive & INIT_OK:
1114 return NO_VALUE
1115 else:
1116 return self._default_value(state, dict_)
1117
1118 def _fire_loader_callables(
1119 self, state: InstanceState[Any], key: str, passive: PassiveFlag
1120 ) -> Any:
1121 if (
1122 self.accepts_scalar_loader
1123 and self.load_on_unexpire
1124 and key in state.expired_attributes
1125 ):
1126 return state._load_expired(state, passive)
1127 elif key in state.callables:
1128 callable_ = state.callables[key]
1129 return callable_(state, passive)
1130 elif self.callable_:
1131 return self.callable_(state, passive)
1132 else:
1133 return ATTR_EMPTY
1134
1135 def append(
1136 self,
1137 state: InstanceState[Any],
1138 dict_: _InstanceDict,
1139 value: Any,
1140 initiator: Optional[AttributeEventToken],
1141 passive: PassiveFlag = PASSIVE_OFF,
1142 ) -> None:
1143 self.set(state, dict_, value, initiator, passive=passive)
1144
1145 def remove(
1146 self,
1147 state: InstanceState[Any],
1148 dict_: _InstanceDict,
1149 value: Any,
1150 initiator: Optional[AttributeEventToken],
1151 passive: PassiveFlag = PASSIVE_OFF,
1152 ) -> None:
1153 self.set(
1154 state, dict_, None, initiator, passive=passive, check_old=value
1155 )
1156
1157 def pop(
1158 self,
1159 state: InstanceState[Any],
1160 dict_: _InstanceDict,
1161 value: Any,
1162 initiator: Optional[AttributeEventToken],
1163 passive: PassiveFlag = PASSIVE_OFF,
1164 ) -> None:
1165 self.set(
1166 state,
1167 dict_,
1168 None,
1169 initiator,
1170 passive=passive,
1171 check_old=value,
1172 pop=True,
1173 )
1174
1175 def set(
1176 self,
1177 state: InstanceState[Any],
1178 dict_: _InstanceDict,
1179 value: Any,
1180 initiator: Optional[AttributeEventToken] = None,
1181 passive: PassiveFlag = PASSIVE_OFF,
1182 check_old: Any = None,
1183 pop: bool = False,
1184 ) -> None:
1185 raise NotImplementedError()
1186
1187 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1188 raise NotImplementedError()
1189
1190 def get_committed_value(
1191 self,
1192 state: InstanceState[Any],
1193 dict_: _InstanceDict,
1194 passive: PassiveFlag = PASSIVE_OFF,
1195 ) -> Any:
1196 """return the unchanged value of this attribute"""
1197
1198 if self.key in state.committed_state:
1199 value = state.committed_state[self.key]
1200 if value is NO_VALUE:
1201 return None
1202 else:
1203 return value
1204 else:
1205 return self.get(state, dict_, passive=passive)
1206
1207 def set_committed_value(self, state, dict_, value):
1208 """set an attribute value on the given instance and 'commit' it."""
1209
1210 dict_[self.key] = value
1211 state._commit(dict_, [self.key])
1212 return value
1213
1214
1215class ScalarAttributeImpl(AttributeImpl):
1216 """represents a scalar value-holding InstrumentedAttribute."""
1217
1218 default_accepts_scalar_loader = True
1219 uses_objects = False
1220 supports_population = True
1221 collection = False
1222 dynamic = False
1223
1224 __slots__ = "_replace_token", "_append_token", "_remove_token"
1225
1226 def __init__(self, *arg, **kw):
1227 super().__init__(*arg, **kw)
1228 self._replace_token = self._append_token = AttributeEventToken(
1229 self, OP_REPLACE
1230 )
1231 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1232
1233 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1234 if self.dispatch._active_history:
1235 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1236 else:
1237 old = dict_.get(self.key, NO_VALUE)
1238
1239 if self.dispatch.remove:
1240 self.fire_remove_event(state, dict_, old, self._remove_token)
1241 state._modified_event(dict_, self, old)
1242
1243 existing = dict_.pop(self.key, NO_VALUE)
1244 if (
1245 existing is NO_VALUE
1246 and old is NO_VALUE
1247 and not state.expired
1248 and self.key not in state.expired_attributes
1249 ):
1250 raise AttributeError("%s object does not have a value" % self)
1251
1252 def get_history(
1253 self,
1254 state: InstanceState[Any],
1255 dict_: Dict[str, Any],
1256 passive: PassiveFlag = PASSIVE_OFF,
1257 ) -> History:
1258 if self.key in dict_:
1259 return History.from_scalar_attribute(self, state, dict_[self.key])
1260 elif self.key in state.committed_state:
1261 return History.from_scalar_attribute(self, state, NO_VALUE)
1262 else:
1263 if passive & INIT_OK:
1264 passive ^= INIT_OK
1265 current = self.get(state, dict_, passive=passive)
1266 if current is PASSIVE_NO_RESULT:
1267 return HISTORY_BLANK
1268 else:
1269 return History.from_scalar_attribute(self, state, current)
1270
1271 def set(
1272 self,
1273 state: InstanceState[Any],
1274 dict_: Dict[str, Any],
1275 value: Any,
1276 initiator: Optional[AttributeEventToken] = None,
1277 passive: PassiveFlag = PASSIVE_OFF,
1278 check_old: Optional[object] = None,
1279 pop: bool = False,
1280 ) -> None:
1281 if self.dispatch._active_history:
1282 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1283 else:
1284 old = dict_.get(self.key, NO_VALUE)
1285
1286 if self.dispatch.set:
1287 value = self.fire_replace_event(
1288 state, dict_, value, old, initiator
1289 )
1290 state._modified_event(dict_, self, old)
1291 dict_[self.key] = value
1292
1293 def fire_replace_event(
1294 self,
1295 state: InstanceState[Any],
1296 dict_: _InstanceDict,
1297 value: _T,
1298 previous: Any,
1299 initiator: Optional[AttributeEventToken],
1300 ) -> _T:
1301 for fn in self.dispatch.set:
1302 value = fn(
1303 state, value, previous, initiator or self._replace_token
1304 )
1305 return value
1306
1307 def fire_remove_event(
1308 self,
1309 state: InstanceState[Any],
1310 dict_: _InstanceDict,
1311 value: Any,
1312 initiator: Optional[AttributeEventToken],
1313 ) -> None:
1314 for fn in self.dispatch.remove:
1315 fn(state, value, initiator or self._remove_token)
1316
1317
1318class ScalarObjectAttributeImpl(ScalarAttributeImpl):
1319 """represents a scalar-holding InstrumentedAttribute,
1320 where the target object is also instrumented.
1321
1322 Adds events to delete/set operations.
1323
1324 """
1325
1326 default_accepts_scalar_loader = False
1327 uses_objects = True
1328 supports_population = True
1329 collection = False
1330
1331 __slots__ = ()
1332
1333 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1334 if self.dispatch._active_history:
1335 old = self.get(
1336 state,
1337 dict_,
1338 passive=PASSIVE_ONLY_PERSISTENT
1339 | NO_AUTOFLUSH
1340 | LOAD_AGAINST_COMMITTED,
1341 )
1342 else:
1343 old = self.get(
1344 state,
1345 dict_,
1346 passive=PASSIVE_NO_FETCH ^ INIT_OK
1347 | LOAD_AGAINST_COMMITTED
1348 | NO_RAISE,
1349 )
1350
1351 self.fire_remove_event(state, dict_, old, self._remove_token)
1352
1353 existing = dict_.pop(self.key, NO_VALUE)
1354
1355 # if the attribute is expired, we currently have no way to tell
1356 # that an object-attribute was expired vs. not loaded. So
1357 # for this test, we look to see if the object has a DB identity.
1358 if (
1359 existing is NO_VALUE
1360 and old is not PASSIVE_NO_RESULT
1361 and state.key is None
1362 ):
1363 raise AttributeError("%s object does not have a value" % self)
1364
1365 def get_history(
1366 self,
1367 state: InstanceState[Any],
1368 dict_: _InstanceDict,
1369 passive: PassiveFlag = PASSIVE_OFF,
1370 ) -> History:
1371 if self.key in dict_:
1372 current = dict_[self.key]
1373 else:
1374 if passive & INIT_OK:
1375 passive ^= INIT_OK
1376 current = self.get(state, dict_, passive=passive)
1377 if current is PASSIVE_NO_RESULT:
1378 return HISTORY_BLANK
1379
1380 if not self._deferred_history:
1381 return History.from_object_attribute(self, state, current)
1382 else:
1383 original = state.committed_state.get(self.key, _NO_HISTORY)
1384 if original is PASSIVE_NO_RESULT:
1385 loader_passive = passive | (
1386 PASSIVE_ONLY_PERSISTENT
1387 | NO_AUTOFLUSH
1388 | LOAD_AGAINST_COMMITTED
1389 | NO_RAISE
1390 | DEFERRED_HISTORY_LOAD
1391 )
1392 original = self._fire_loader_callables(
1393 state, self.key, loader_passive
1394 )
1395 return History.from_object_attribute(
1396 self, state, current, original=original
1397 )
1398
1399 def get_all_pending(
1400 self,
1401 state: InstanceState[Any],
1402 dict_: _InstanceDict,
1403 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1404 ) -> _AllPendingType:
1405 if self.key in dict_:
1406 current = dict_[self.key]
1407 elif passive & CALLABLES_OK:
1408 current = self.get(state, dict_, passive=passive)
1409 else:
1410 return []
1411
1412 ret: _AllPendingType
1413
1414 # can't use __hash__(), can't use __eq__() here
1415 if (
1416 current is not None
1417 and current is not PASSIVE_NO_RESULT
1418 and current is not NO_VALUE
1419 ):
1420 ret = [(instance_state(current), current)]
1421 else:
1422 ret = [(None, None)]
1423
1424 if self.key in state.committed_state:
1425 original = state.committed_state[self.key]
1426 if (
1427 original is not None
1428 and original is not PASSIVE_NO_RESULT
1429 and original is not NO_VALUE
1430 and original is not current
1431 ):
1432 ret.append((instance_state(original), original))
1433 return ret
1434
1435 def set(
1436 self,
1437 state: InstanceState[Any],
1438 dict_: _InstanceDict,
1439 value: Any,
1440 initiator: Optional[AttributeEventToken] = None,
1441 passive: PassiveFlag = PASSIVE_OFF,
1442 check_old: Any = None,
1443 pop: bool = False,
1444 ) -> None:
1445 """Set a value on the given InstanceState."""
1446
1447 if self.dispatch._active_history:
1448 old = self.get(
1449 state,
1450 dict_,
1451 passive=PASSIVE_ONLY_PERSISTENT
1452 | NO_AUTOFLUSH
1453 | LOAD_AGAINST_COMMITTED,
1454 )
1455 else:
1456 old = self.get(
1457 state,
1458 dict_,
1459 passive=PASSIVE_NO_FETCH ^ INIT_OK
1460 | LOAD_AGAINST_COMMITTED
1461 | NO_RAISE,
1462 )
1463
1464 if (
1465 check_old is not None
1466 and old is not PASSIVE_NO_RESULT
1467 and check_old is not old
1468 ):
1469 if pop:
1470 return
1471 else:
1472 raise ValueError(
1473 "Object %s not associated with %s on attribute '%s'"
1474 % (instance_str(check_old), state_str(state), self.key)
1475 )
1476
1477 value = self.fire_replace_event(state, dict_, value, old, initiator)
1478 dict_[self.key] = value
1479
1480 def fire_remove_event(
1481 self,
1482 state: InstanceState[Any],
1483 dict_: _InstanceDict,
1484 value: Any,
1485 initiator: Optional[AttributeEventToken],
1486 ) -> None:
1487 if self.trackparent and value not in (
1488 None,
1489 PASSIVE_NO_RESULT,
1490 NO_VALUE,
1491 ):
1492 self.sethasparent(instance_state(value), state, False)
1493
1494 for fn in self.dispatch.remove:
1495 fn(state, value, initiator or self._remove_token)
1496
1497 state._modified_event(dict_, self, value)
1498
1499 def fire_replace_event(
1500 self,
1501 state: InstanceState[Any],
1502 dict_: _InstanceDict,
1503 value: _T,
1504 previous: Any,
1505 initiator: Optional[AttributeEventToken],
1506 ) -> _T:
1507 if self.trackparent:
1508 if previous is not value and previous not in (
1509 None,
1510 PASSIVE_NO_RESULT,
1511 NO_VALUE,
1512 ):
1513 self.sethasparent(instance_state(previous), state, False)
1514
1515 for fn in self.dispatch.set:
1516 value = fn(
1517 state, value, previous, initiator or self._replace_token
1518 )
1519
1520 state._modified_event(dict_, self, previous)
1521
1522 if self.trackparent:
1523 if value is not None:
1524 self.sethasparent(instance_state(value), state, True)
1525
1526 return value
1527
1528
1529class HasCollectionAdapter:
1530 __slots__ = ()
1531
1532 collection: bool
1533 _is_has_collection_adapter = True
1534
1535 def _dispose_previous_collection(
1536 self,
1537 state: InstanceState[Any],
1538 collection: _AdaptedCollectionProtocol,
1539 adapter: CollectionAdapter,
1540 fire_event: bool,
1541 ) -> None:
1542 raise NotImplementedError()
1543
1544 @overload
1545 def get_collection(
1546 self,
1547 state: InstanceState[Any],
1548 dict_: _InstanceDict,
1549 user_data: Literal[None] = ...,
1550 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
1551 ) -> CollectionAdapter: ...
1552
1553 @overload
1554 def get_collection(
1555 self,
1556 state: InstanceState[Any],
1557 dict_: _InstanceDict,
1558 user_data: _AdaptedCollectionProtocol = ...,
1559 passive: PassiveFlag = ...,
1560 ) -> CollectionAdapter: ...
1561
1562 @overload
1563 def get_collection(
1564 self,
1565 state: InstanceState[Any],
1566 dict_: _InstanceDict,
1567 user_data: Optional[_AdaptedCollectionProtocol] = ...,
1568 passive: PassiveFlag = ...,
1569 ) -> Union[
1570 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1571 ]: ...
1572
1573 def get_collection(
1574 self,
1575 state: InstanceState[Any],
1576 dict_: _InstanceDict,
1577 user_data: Optional[_AdaptedCollectionProtocol] = None,
1578 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1579 ) -> Union[
1580 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1581 ]:
1582 raise NotImplementedError()
1583
1584 def set(
1585 self,
1586 state: InstanceState[Any],
1587 dict_: _InstanceDict,
1588 value: Any,
1589 initiator: Optional[AttributeEventToken] = None,
1590 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1591 check_old: Any = None,
1592 pop: bool = False,
1593 _adapt: bool = True,
1594 ) -> None:
1595 raise NotImplementedError()
1596
1597
1598if TYPE_CHECKING:
1599
1600 def _is_collection_attribute_impl(
1601 impl: AttributeImpl,
1602 ) -> TypeGuard[CollectionAttributeImpl]: ...
1603
1604else:
1605 _is_collection_attribute_impl = operator.attrgetter("collection")
1606
1607
1608class CollectionAttributeImpl(HasCollectionAdapter, AttributeImpl):
1609 """A collection-holding attribute that instruments changes in membership.
1610
1611 Only handles collections of instrumented objects.
1612
1613 InstrumentedCollectionAttribute holds an arbitrary, user-specified
1614 container object (defaulting to a list) and brokers access to the
1615 CollectionAdapter, a "view" onto that object that presents consistent bag
1616 semantics to the orm layer independent of the user data implementation.
1617
1618 """
1619
1620 uses_objects = True
1621 collection = True
1622 default_accepts_scalar_loader = False
1623 supports_population = True
1624 dynamic = False
1625
1626 _bulk_replace_token: AttributeEventToken
1627
1628 __slots__ = (
1629 "copy",
1630 "collection_factory",
1631 "_append_token",
1632 "_remove_token",
1633 "_bulk_replace_token",
1634 "_duck_typed_as",
1635 )
1636
1637 def __init__(
1638 self,
1639 class_,
1640 key,
1641 callable_,
1642 dispatch,
1643 typecallable=None,
1644 trackparent=False,
1645 copy_function=None,
1646 compare_function=None,
1647 **kwargs,
1648 ):
1649 super().__init__(
1650 class_,
1651 key,
1652 callable_,
1653 dispatch,
1654 trackparent=trackparent,
1655 compare_function=compare_function,
1656 **kwargs,
1657 )
1658
1659 if copy_function is None:
1660 copy_function = self.__copy
1661 self.copy = copy_function
1662 self.collection_factory = typecallable
1663 self._append_token = AttributeEventToken(self, OP_APPEND)
1664 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1665 self._bulk_replace_token = AttributeEventToken(self, OP_BULK_REPLACE)
1666 self._duck_typed_as = util.duck_type_collection(
1667 self.collection_factory()
1668 )
1669
1670 if getattr(self.collection_factory, "_sa_linker", None):
1671
1672 @event.listens_for(self, "init_collection")
1673 def link(target, collection, collection_adapter):
1674 collection._sa_linker(collection_adapter)
1675
1676 @event.listens_for(self, "dispose_collection")
1677 def unlink(target, collection, collection_adapter):
1678 collection._sa_linker(None)
1679
1680 def __copy(self, item):
1681 return [y for y in collections.collection_adapter(item)]
1682
1683 def get_history(
1684 self,
1685 state: InstanceState[Any],
1686 dict_: _InstanceDict,
1687 passive: PassiveFlag = PASSIVE_OFF,
1688 ) -> History:
1689 current = self.get(state, dict_, passive=passive)
1690
1691 if current is PASSIVE_NO_RESULT:
1692 if (
1693 passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS
1694 and self.key in state._pending_mutations
1695 ):
1696 pending = state._pending_mutations[self.key]
1697 return pending.merge_with_history(HISTORY_BLANK)
1698 else:
1699 return HISTORY_BLANK
1700 else:
1701 if passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS:
1702 # this collection is loaded / present. should not be any
1703 # pending mutations
1704 assert self.key not in state._pending_mutations
1705
1706 return History.from_collection(self, state, current)
1707
1708 def get_all_pending(
1709 self,
1710 state: InstanceState[Any],
1711 dict_: _InstanceDict,
1712 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1713 ) -> _AllPendingType:
1714 # NOTE: passive is ignored here at the moment
1715
1716 if self.key not in dict_:
1717 return []
1718
1719 current = dict_[self.key]
1720 current = getattr(current, "_sa_adapter")
1721
1722 if self.key in state.committed_state:
1723 original = state.committed_state[self.key]
1724 if original is not NO_VALUE:
1725 current_states = [
1726 ((c is not None) and instance_state(c) or None, c)
1727 for c in current
1728 ]
1729 original_states = [
1730 ((c is not None) and instance_state(c) or None, c)
1731 for c in original
1732 ]
1733
1734 current_set = dict(current_states)
1735 original_set = dict(original_states)
1736
1737 return (
1738 [
1739 (s, o)
1740 for s, o in current_states
1741 if s not in original_set
1742 ]
1743 + [(s, o) for s, o in current_states if s in original_set]
1744 + [
1745 (s, o)
1746 for s, o in original_states
1747 if s not in current_set
1748 ]
1749 )
1750
1751 return [(instance_state(o), o) for o in current]
1752
1753 def fire_append_event(
1754 self,
1755 state: InstanceState[Any],
1756 dict_: _InstanceDict,
1757 value: _T,
1758 initiator: Optional[AttributeEventToken],
1759 key: Optional[Any],
1760 ) -> _T:
1761 for fn in self.dispatch.append:
1762 value = fn(state, value, initiator or self._append_token, key=key)
1763
1764 state._modified_event(dict_, self, NO_VALUE, True)
1765
1766 if self.trackparent and value is not None:
1767 self.sethasparent(instance_state(value), state, True)
1768
1769 return value
1770
1771 def fire_append_wo_mutation_event(
1772 self,
1773 state: InstanceState[Any],
1774 dict_: _InstanceDict,
1775 value: _T,
1776 initiator: Optional[AttributeEventToken],
1777 key: Optional[Any],
1778 ) -> _T:
1779 for fn in self.dispatch.append_wo_mutation:
1780 value = fn(state, value, initiator or self._append_token, key=key)
1781
1782 return value
1783
1784 def fire_pre_remove_event(
1785 self,
1786 state: InstanceState[Any],
1787 dict_: _InstanceDict,
1788 initiator: Optional[AttributeEventToken],
1789 key: Optional[Any],
1790 ) -> None:
1791 """A special event used for pop() operations.
1792
1793 The "remove" event needs to have the item to be removed passed to
1794 it, which in the case of pop from a set, we don't have a way to access
1795 the item before the operation. the event is used for all pop()
1796 operations (even though set.pop is the one where it is really needed).
1797
1798 """
1799 state._modified_event(dict_, self, NO_VALUE, True)
1800
1801 def fire_remove_event(
1802 self,
1803 state: InstanceState[Any],
1804 dict_: _InstanceDict,
1805 value: Any,
1806 initiator: Optional[AttributeEventToken],
1807 key: Optional[Any],
1808 ) -> None:
1809 if self.trackparent and value is not None:
1810 self.sethasparent(instance_state(value), state, False)
1811
1812 for fn in self.dispatch.remove:
1813 fn(state, value, initiator or self._remove_token, key=key)
1814
1815 state._modified_event(dict_, self, NO_VALUE, True)
1816
1817 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1818 if self.key not in dict_:
1819 return
1820
1821 state._modified_event(dict_, self, NO_VALUE, True)
1822
1823 collection = self.get_collection(state, state.dict)
1824 collection.clear_with_event()
1825
1826 # key is always present because we checked above. e.g.
1827 # del is a no-op if collection not present.
1828 del dict_[self.key]
1829
1830 def _default_value(
1831 self, state: InstanceState[Any], dict_: _InstanceDict
1832 ) -> _AdaptedCollectionProtocol:
1833 """Produce an empty collection for an un-initialized attribute"""
1834
1835 assert self.key not in dict_, (
1836 "_default_value should only be invoked for an "
1837 "uninitialized or expired attribute"
1838 )
1839
1840 if self.key in state._empty_collections:
1841 return state._empty_collections[self.key]
1842
1843 adapter, user_data = self._initialize_collection(state)
1844 adapter._set_empty(user_data)
1845 return user_data
1846
1847 def _initialize_collection(
1848 self, state: InstanceState[Any]
1849 ) -> Tuple[CollectionAdapter, _AdaptedCollectionProtocol]:
1850 adapter, collection = state.manager.initialize_collection(
1851 self.key, state, self.collection_factory
1852 )
1853
1854 self.dispatch.init_collection(state, collection, adapter)
1855
1856 return adapter, collection
1857
1858 def append(
1859 self,
1860 state: InstanceState[Any],
1861 dict_: _InstanceDict,
1862 value: Any,
1863 initiator: Optional[AttributeEventToken],
1864 passive: PassiveFlag = PASSIVE_OFF,
1865 ) -> None:
1866 collection = self.get_collection(
1867 state, dict_, user_data=None, passive=passive
1868 )
1869 if collection is PASSIVE_NO_RESULT:
1870 value = self.fire_append_event(
1871 state, dict_, value, initiator, key=NO_KEY
1872 )
1873 assert (
1874 self.key not in dict_
1875 ), "Collection was loaded during event handling."
1876 state._get_pending_mutation(self.key).append(value)
1877 else:
1878 if TYPE_CHECKING:
1879 assert isinstance(collection, CollectionAdapter)
1880 collection.append_with_event(value, initiator)
1881
1882 def remove(
1883 self,
1884 state: InstanceState[Any],
1885 dict_: _InstanceDict,
1886 value: Any,
1887 initiator: Optional[AttributeEventToken],
1888 passive: PassiveFlag = PASSIVE_OFF,
1889 ) -> None:
1890 collection = self.get_collection(
1891 state, state.dict, user_data=None, passive=passive
1892 )
1893 if collection is PASSIVE_NO_RESULT:
1894 self.fire_remove_event(state, dict_, value, initiator, key=NO_KEY)
1895 assert (
1896 self.key not in dict_
1897 ), "Collection was loaded during event handling."
1898 state._get_pending_mutation(self.key).remove(value)
1899 else:
1900 if TYPE_CHECKING:
1901 assert isinstance(collection, CollectionAdapter)
1902 collection.remove_with_event(value, initiator)
1903
1904 def pop(
1905 self,
1906 state: InstanceState[Any],
1907 dict_: _InstanceDict,
1908 value: Any,
1909 initiator: Optional[AttributeEventToken],
1910 passive: PassiveFlag = PASSIVE_OFF,
1911 ) -> None:
1912 try:
1913 # TODO: better solution here would be to add
1914 # a "popper" role to collections.py to complement
1915 # "remover".
1916 self.remove(state, dict_, value, initiator, passive=passive)
1917 except (ValueError, KeyError, IndexError):
1918 pass
1919
1920 def set(
1921 self,
1922 state: InstanceState[Any],
1923 dict_: _InstanceDict,
1924 value: Any,
1925 initiator: Optional[AttributeEventToken] = None,
1926 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1927 check_old: Any = None,
1928 pop: bool = False,
1929 _adapt: bool = True,
1930 ) -> None:
1931 iterable = orig_iterable = value
1932 new_keys = None
1933
1934 # pulling a new collection first so that an adaptation exception does
1935 # not trigger a lazy load of the old collection.
1936 new_collection, user_data = self._initialize_collection(state)
1937 if _adapt:
1938 if new_collection._converter is not None:
1939 iterable = new_collection._converter(iterable)
1940 else:
1941 setting_type = util.duck_type_collection(iterable)
1942 receiving_type = self._duck_typed_as
1943
1944 if setting_type is not receiving_type:
1945 given = (
1946 iterable is None
1947 and "None"
1948 or iterable.__class__.__name__
1949 )
1950 wanted = self._duck_typed_as.__name__
1951 raise TypeError(
1952 "Incompatible collection type: %s is not %s-like"
1953 % (given, wanted)
1954 )
1955
1956 # If the object is an adapted collection, return the (iterable)
1957 # adapter.
1958 if hasattr(iterable, "_sa_iterator"):
1959 iterable = iterable._sa_iterator()
1960 elif setting_type is dict:
1961 new_keys = list(iterable)
1962 iterable = iterable.values()
1963 else:
1964 iterable = iter(iterable)
1965 elif util.duck_type_collection(iterable) is dict:
1966 new_keys = list(value)
1967
1968 new_values = list(iterable)
1969
1970 evt = self._bulk_replace_token
1971
1972 self.dispatch.bulk_replace(state, new_values, evt, keys=new_keys)
1973
1974 # propagate NO_RAISE in passive through to the get() for the
1975 # existing object (ticket #8862)
1976 old = self.get(
1977 state,
1978 dict_,
1979 passive=PASSIVE_ONLY_PERSISTENT ^ (passive & PassiveFlag.NO_RAISE),
1980 )
1981 if old is PASSIVE_NO_RESULT:
1982 old = self._default_value(state, dict_)
1983 elif old is orig_iterable:
1984 # ignore re-assignment of the current collection, as happens
1985 # implicitly with in-place operators (foo.collection |= other)
1986 return
1987
1988 # place a copy of "old" in state.committed_state
1989 state._modified_event(dict_, self, old, True)
1990
1991 old_collection = old._sa_adapter
1992
1993 dict_[self.key] = user_data
1994
1995 collections.bulk_replace(
1996 new_values, old_collection, new_collection, initiator=evt
1997 )
1998
1999 self._dispose_previous_collection(state, old, old_collection, True)
2000
2001 def _dispose_previous_collection(
2002 self,
2003 state: InstanceState[Any],
2004 collection: _AdaptedCollectionProtocol,
2005 adapter: CollectionAdapter,
2006 fire_event: bool,
2007 ) -> None:
2008 del collection._sa_adapter
2009
2010 # discarding old collection make sure it is not referenced in empty
2011 # collections.
2012 state._empty_collections.pop(self.key, None)
2013 if fire_event:
2014 self.dispatch.dispose_collection(state, collection, adapter)
2015
2016 def _invalidate_collection(
2017 self, collection: _AdaptedCollectionProtocol
2018 ) -> None:
2019 adapter = getattr(collection, "_sa_adapter")
2020 adapter.invalidated = True
2021
2022 def set_committed_value(
2023 self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
2024 ) -> _AdaptedCollectionProtocol:
2025 """Set an attribute value on the given instance and 'commit' it."""
2026
2027 collection, user_data = self._initialize_collection(state)
2028
2029 if value:
2030 collection.append_multiple_without_event(value)
2031
2032 state.dict[self.key] = user_data
2033
2034 state._commit(dict_, [self.key])
2035
2036 if self.key in state._pending_mutations:
2037 # pending items exist. issue a modified event,
2038 # add/remove new items.
2039 state._modified_event(dict_, self, user_data, True)
2040
2041 pending = state._pending_mutations.pop(self.key)
2042 added = pending.added_items
2043 removed = pending.deleted_items
2044 for item in added:
2045 collection.append_without_event(item)
2046 for item in removed:
2047 collection.remove_without_event(item)
2048
2049 return user_data
2050
2051 @overload
2052 def get_collection(
2053 self,
2054 state: InstanceState[Any],
2055 dict_: _InstanceDict,
2056 user_data: Literal[None] = ...,
2057 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
2058 ) -> CollectionAdapter: ...
2059
2060 @overload
2061 def get_collection(
2062 self,
2063 state: InstanceState[Any],
2064 dict_: _InstanceDict,
2065 user_data: _AdaptedCollectionProtocol = ...,
2066 passive: PassiveFlag = ...,
2067 ) -> CollectionAdapter: ...
2068
2069 @overload
2070 def get_collection(
2071 self,
2072 state: InstanceState[Any],
2073 dict_: _InstanceDict,
2074 user_data: Optional[_AdaptedCollectionProtocol] = ...,
2075 passive: PassiveFlag = PASSIVE_OFF,
2076 ) -> Union[
2077 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2078 ]: ...
2079
2080 def get_collection(
2081 self,
2082 state: InstanceState[Any],
2083 dict_: _InstanceDict,
2084 user_data: Optional[_AdaptedCollectionProtocol] = None,
2085 passive: PassiveFlag = PASSIVE_OFF,
2086 ) -> Union[
2087 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2088 ]:
2089 """Retrieve the CollectionAdapter associated with the given state.
2090
2091 if user_data is None, retrieves it from the state using normal
2092 "get()" rules, which will fire lazy callables or return the "empty"
2093 collection value.
2094
2095 """
2096 if user_data is None:
2097 fetch_user_data = self.get(state, dict_, passive=passive)
2098 if fetch_user_data is LoaderCallableStatus.PASSIVE_NO_RESULT:
2099 return fetch_user_data
2100 else:
2101 user_data = cast("_AdaptedCollectionProtocol", fetch_user_data)
2102
2103 return user_data._sa_adapter
2104
2105
2106def backref_listeners(
2107 attribute: QueryableAttribute[Any], key: str, uselist: bool
2108) -> None:
2109 """Apply listeners to synchronize a two-way relationship."""
2110
2111 # use easily recognizable names for stack traces.
2112
2113 # in the sections marked "tokens to test for a recursive loop",
2114 # this is somewhat brittle and very performance-sensitive logic
2115 # that is specific to how we might arrive at each event. a marker
2116 # that can target us directly to arguments being invoked against
2117 # the impl might be simpler, but could interfere with other systems.
2118
2119 parent_token = attribute.impl.parent_token
2120 parent_impl = attribute.impl
2121
2122 def _acceptable_key_err(child_state, initiator, child_impl):
2123 raise ValueError(
2124 "Bidirectional attribute conflict detected: "
2125 'Passing object %s to attribute "%s" '
2126 'triggers a modify event on attribute "%s" '
2127 'via the backref "%s".'
2128 % (
2129 state_str(child_state),
2130 initiator.parent_token,
2131 child_impl.parent_token,
2132 attribute.impl.parent_token,
2133 )
2134 )
2135
2136 def emit_backref_from_scalar_set_event(
2137 state, child, oldchild, initiator, **kw
2138 ):
2139 if oldchild is child:
2140 return child
2141 if (
2142 oldchild is not None
2143 and oldchild is not PASSIVE_NO_RESULT
2144 and oldchild is not NO_VALUE
2145 ):
2146 # With lazy=None, there's no guarantee that the full collection is
2147 # present when updating via a backref.
2148 old_state, old_dict = (
2149 instance_state(oldchild),
2150 instance_dict(oldchild),
2151 )
2152 impl = old_state.manager[key].impl
2153
2154 # tokens to test for a recursive loop.
2155 if not impl.collection and not impl.dynamic:
2156 check_recursive_token = impl._replace_token
2157 else:
2158 check_recursive_token = impl._remove_token
2159
2160 if initiator is not check_recursive_token:
2161 impl.pop(
2162 old_state,
2163 old_dict,
2164 state.obj(),
2165 parent_impl._append_token,
2166 passive=PASSIVE_NO_FETCH,
2167 )
2168
2169 if child is not None:
2170 child_state, child_dict = (
2171 instance_state(child),
2172 instance_dict(child),
2173 )
2174 child_impl = child_state.manager[key].impl
2175
2176 if (
2177 initiator.parent_token is not parent_token
2178 and initiator.parent_token is not child_impl.parent_token
2179 ):
2180 _acceptable_key_err(state, initiator, child_impl)
2181
2182 # tokens to test for a recursive loop.
2183 check_append_token = child_impl._append_token
2184 check_bulk_replace_token = (
2185 child_impl._bulk_replace_token
2186 if _is_collection_attribute_impl(child_impl)
2187 else None
2188 )
2189
2190 if (
2191 initiator is not check_append_token
2192 and initiator is not check_bulk_replace_token
2193 ):
2194 child_impl.append(
2195 child_state,
2196 child_dict,
2197 state.obj(),
2198 initiator,
2199 passive=PASSIVE_NO_FETCH,
2200 )
2201 return child
2202
2203 def emit_backref_from_collection_append_event(
2204 state, child, initiator, **kw
2205 ):
2206 if child is None:
2207 return
2208
2209 child_state, child_dict = instance_state(child), instance_dict(child)
2210 child_impl = child_state.manager[key].impl
2211
2212 if (
2213 initiator.parent_token is not parent_token
2214 and initiator.parent_token is not child_impl.parent_token
2215 ):
2216 _acceptable_key_err(state, initiator, child_impl)
2217
2218 # tokens to test for a recursive loop.
2219 check_append_token = child_impl._append_token
2220 check_bulk_replace_token = (
2221 child_impl._bulk_replace_token
2222 if _is_collection_attribute_impl(child_impl)
2223 else None
2224 )
2225
2226 if (
2227 initiator is not check_append_token
2228 and initiator is not check_bulk_replace_token
2229 ):
2230 child_impl.append(
2231 child_state,
2232 child_dict,
2233 state.obj(),
2234 initiator,
2235 passive=PASSIVE_NO_FETCH,
2236 )
2237 return child
2238
2239 def emit_backref_from_collection_remove_event(
2240 state, child, initiator, **kw
2241 ):
2242 if (
2243 child is not None
2244 and child is not PASSIVE_NO_RESULT
2245 and child is not NO_VALUE
2246 ):
2247 child_state, child_dict = (
2248 instance_state(child),
2249 instance_dict(child),
2250 )
2251 child_impl = child_state.manager[key].impl
2252
2253 check_replace_token: Optional[AttributeEventToken]
2254
2255 # tokens to test for a recursive loop.
2256 if not child_impl.collection and not child_impl.dynamic:
2257 check_remove_token = child_impl._remove_token
2258 check_replace_token = child_impl._replace_token
2259 check_for_dupes_on_remove = uselist and not parent_impl.dynamic
2260 else:
2261 check_remove_token = child_impl._remove_token
2262 check_replace_token = (
2263 child_impl._bulk_replace_token
2264 if _is_collection_attribute_impl(child_impl)
2265 else None
2266 )
2267 check_for_dupes_on_remove = False
2268
2269 if (
2270 initiator is not check_remove_token
2271 and initiator is not check_replace_token
2272 ):
2273 if not check_for_dupes_on_remove or not util.has_dupes(
2274 # when this event is called, the item is usually
2275 # present in the list, except for a pop() operation.
2276 state.dict[parent_impl.key],
2277 child,
2278 ):
2279 child_impl.pop(
2280 child_state,
2281 child_dict,
2282 state.obj(),
2283 initiator,
2284 passive=PASSIVE_NO_FETCH,
2285 )
2286
2287 if uselist:
2288 event.listen(
2289 attribute,
2290 "append",
2291 emit_backref_from_collection_append_event,
2292 retval=True,
2293 raw=True,
2294 include_key=True,
2295 )
2296 else:
2297 event.listen(
2298 attribute,
2299 "set",
2300 emit_backref_from_scalar_set_event,
2301 retval=True,
2302 raw=True,
2303 include_key=True,
2304 )
2305 # TODO: need coverage in test/orm/ of remove event
2306 event.listen(
2307 attribute,
2308 "remove",
2309 emit_backref_from_collection_remove_event,
2310 retval=True,
2311 raw=True,
2312 include_key=True,
2313 )
2314
2315
2316_NO_HISTORY = util.symbol("NO_HISTORY")
2317_NO_STATE_SYMBOLS = frozenset([id(PASSIVE_NO_RESULT), id(NO_VALUE)])
2318
2319
2320class History(NamedTuple):
2321 """A 3-tuple of added, unchanged and deleted values,
2322 representing the changes which have occurred on an instrumented
2323 attribute.
2324
2325 The easiest way to get a :class:`.History` object for a particular
2326 attribute on an object is to use the :func:`_sa.inspect` function::
2327
2328 from sqlalchemy import inspect
2329
2330 hist = inspect(myobject).attrs.myattribute.history
2331
2332 Each tuple member is an iterable sequence:
2333
2334 * ``added`` - the collection of items added to the attribute (the first
2335 tuple element).
2336
2337 * ``unchanged`` - the collection of items that have not changed on the
2338 attribute (the second tuple element).
2339
2340 * ``deleted`` - the collection of items that have been removed from the
2341 attribute (the third tuple element).
2342
2343 """
2344
2345 added: Union[Tuple[()], List[Any]]
2346 unchanged: Union[Tuple[()], List[Any]]
2347 deleted: Union[Tuple[()], List[Any]]
2348
2349 def __bool__(self) -> bool:
2350 return self != HISTORY_BLANK
2351
2352 def empty(self) -> bool:
2353 """Return True if this :class:`.History` has no changes
2354 and no existing, unchanged state.
2355
2356 """
2357
2358 return not bool((self.added or self.deleted) or self.unchanged)
2359
2360 def sum(self) -> Sequence[Any]:
2361 """Return a collection of added + unchanged + deleted."""
2362
2363 return (
2364 (self.added or []) + (self.unchanged or []) + (self.deleted or [])
2365 )
2366
2367 def non_deleted(self) -> Sequence[Any]:
2368 """Return a collection of added + unchanged."""
2369
2370 return (self.added or []) + (self.unchanged or [])
2371
2372 def non_added(self) -> Sequence[Any]:
2373 """Return a collection of unchanged + deleted."""
2374
2375 return (self.unchanged or []) + (self.deleted or [])
2376
2377 def has_changes(self) -> bool:
2378 """Return True if this :class:`.History` has changes."""
2379
2380 return bool(self.added or self.deleted)
2381
2382 def _merge(self, added: Iterable[Any], deleted: Iterable[Any]) -> History:
2383 return History(
2384 list(self.added) + list(added),
2385 self.unchanged,
2386 list(self.deleted) + list(deleted),
2387 )
2388
2389 def as_state(self) -> History:
2390 return History(
2391 [
2392 (c is not None) and instance_state(c) or None
2393 for c in self.added
2394 ],
2395 [
2396 (c is not None) and instance_state(c) or None
2397 for c in self.unchanged
2398 ],
2399 [
2400 (c is not None) and instance_state(c) or None
2401 for c in self.deleted
2402 ],
2403 )
2404
2405 @classmethod
2406 def from_scalar_attribute(
2407 cls,
2408 attribute: ScalarAttributeImpl,
2409 state: InstanceState[Any],
2410 current: Any,
2411 ) -> History:
2412 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2413
2414 deleted: Union[Tuple[()], List[Any]]
2415
2416 if original is _NO_HISTORY:
2417 if current is NO_VALUE:
2418 return cls((), (), ())
2419 else:
2420 return cls((), [current], ())
2421 # don't let ClauseElement expressions here trip things up
2422 elif (
2423 current is not NO_VALUE
2424 and attribute.is_equal(current, original) is True
2425 ):
2426 return cls((), [current], ())
2427 else:
2428 # current convention on native scalars is to not
2429 # include information
2430 # about missing previous value in "deleted", but
2431 # we do include None, which helps in some primary
2432 # key situations
2433 if id(original) in _NO_STATE_SYMBOLS:
2434 deleted = ()
2435 # indicate a "del" operation occurred when we don't have
2436 # the previous value as: ([None], (), ())
2437 if id(current) in _NO_STATE_SYMBOLS:
2438 current = None
2439 else:
2440 deleted = [original]
2441 if current is NO_VALUE:
2442 return cls((), (), deleted)
2443 else:
2444 return cls([current], (), deleted)
2445
2446 @classmethod
2447 def from_object_attribute(
2448 cls,
2449 attribute: ScalarObjectAttributeImpl,
2450 state: InstanceState[Any],
2451 current: Any,
2452 original: Any = _NO_HISTORY,
2453 ) -> History:
2454 deleted: Union[Tuple[()], List[Any]]
2455
2456 if original is _NO_HISTORY:
2457 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2458
2459 if original is _NO_HISTORY:
2460 if current is NO_VALUE:
2461 return cls((), (), ())
2462 else:
2463 return cls((), [current], ())
2464 elif current is original and current is not NO_VALUE:
2465 return cls((), [current], ())
2466 else:
2467 # current convention on related objects is to not
2468 # include information
2469 # about missing previous value in "deleted", and
2470 # to also not include None - the dependency.py rules
2471 # ignore the None in any case.
2472 if id(original) in _NO_STATE_SYMBOLS or original is None:
2473 deleted = ()
2474 # indicate a "del" operation occurred when we don't have
2475 # the previous value as: ([None], (), ())
2476 if id(current) in _NO_STATE_SYMBOLS:
2477 current = None
2478 else:
2479 deleted = [original]
2480 if current is NO_VALUE:
2481 return cls((), (), deleted)
2482 else:
2483 return cls([current], (), deleted)
2484
2485 @classmethod
2486 def from_collection(
2487 cls,
2488 attribute: CollectionAttributeImpl,
2489 state: InstanceState[Any],
2490 current: Any,
2491 ) -> History:
2492 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2493 if current is NO_VALUE:
2494 return cls((), (), ())
2495
2496 current = getattr(current, "_sa_adapter")
2497 if original is NO_VALUE:
2498 return cls(list(current), (), ())
2499 elif original is _NO_HISTORY:
2500 return cls((), list(current), ())
2501 else:
2502 current_states = [
2503 ((c is not None) and instance_state(c) or None, c)
2504 for c in current
2505 ]
2506 original_states = [
2507 ((c is not None) and instance_state(c) or None, c)
2508 for c in original
2509 ]
2510
2511 current_set = dict(current_states)
2512 original_set = dict(original_states)
2513
2514 return cls(
2515 [o for s, o in current_states if s not in original_set],
2516 [o for s, o in current_states if s in original_set],
2517 [o for s, o in original_states if s not in current_set],
2518 )
2519
2520
2521HISTORY_BLANK = History((), (), ())
2522
2523
2524def get_history(
2525 obj: object, key: str, passive: PassiveFlag = PASSIVE_OFF
2526) -> History:
2527 """Return a :class:`.History` record for the given object
2528 and attribute key.
2529
2530 This is the **pre-flush** history for a given attribute, which is
2531 reset each time the :class:`.Session` flushes changes to the
2532 current database transaction.
2533
2534 .. note::
2535
2536 Prefer to use the :attr:`.AttributeState.history` and
2537 :meth:`.AttributeState.load_history` accessors to retrieve the
2538 :class:`.History` for instance attributes.
2539
2540
2541 :param obj: an object whose class is instrumented by the
2542 attributes package.
2543
2544 :param key: string attribute name.
2545
2546 :param passive: indicates loading behavior for the attribute
2547 if the value is not already present. This is a
2548 bitflag attribute, which defaults to the symbol
2549 :attr:`.PASSIVE_OFF` indicating all necessary SQL
2550 should be emitted.
2551
2552 .. seealso::
2553
2554 :attr:`.AttributeState.history`
2555
2556 :meth:`.AttributeState.load_history` - retrieve history
2557 using loader callables if the value is not locally present.
2558
2559 """
2560
2561 return get_state_history(instance_state(obj), key, passive)
2562
2563
2564def get_state_history(
2565 state: InstanceState[Any], key: str, passive: PassiveFlag = PASSIVE_OFF
2566) -> History:
2567 return state.get_history(key, passive)
2568
2569
2570def has_parent(
2571 cls: Type[_O], obj: _O, key: str, optimistic: bool = False
2572) -> bool:
2573 """TODO"""
2574 manager = manager_of_class(cls)
2575 state = instance_state(obj)
2576 return manager.has_parent(state, key, optimistic)
2577
2578
2579def register_attribute(
2580 class_: Type[_O],
2581 key: str,
2582 *,
2583 comparator: interfaces.PropComparator[_T],
2584 parententity: _InternalEntityType[_O],
2585 doc: Optional[str] = None,
2586 **kw: Any,
2587) -> InstrumentedAttribute[_T]:
2588 desc = register_descriptor(
2589 class_, key, comparator=comparator, parententity=parententity, doc=doc
2590 )
2591 register_attribute_impl(class_, key, **kw)
2592 return desc
2593
2594
2595def register_attribute_impl(
2596 class_: Type[_O],
2597 key: str,
2598 uselist: bool = False,
2599 callable_: Optional[_LoaderCallable] = None,
2600 useobject: bool = False,
2601 impl_class: Optional[Type[AttributeImpl]] = None,
2602 backref: Optional[str] = None,
2603 **kw: Any,
2604) -> QueryableAttribute[Any]:
2605 manager = manager_of_class(class_)
2606 if uselist:
2607 factory = kw.pop("typecallable", None)
2608 typecallable = manager.instrument_collection_class(
2609 key, factory or list
2610 )
2611 else:
2612 typecallable = kw.pop("typecallable", None)
2613
2614 dispatch = cast(
2615 "_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
2616 ) # noqa: E501
2617
2618 impl: AttributeImpl
2619
2620 if impl_class:
2621 # TODO: this appears to be the WriteOnlyAttributeImpl /
2622 # DynamicAttributeImpl constructor which is hardcoded
2623 impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)(
2624 class_, key, dispatch, **kw
2625 )
2626 elif uselist:
2627 impl = CollectionAttributeImpl(
2628 class_, key, callable_, dispatch, typecallable=typecallable, **kw
2629 )
2630 elif useobject:
2631 impl = ScalarObjectAttributeImpl(
2632 class_, key, callable_, dispatch, **kw
2633 )
2634 else:
2635 impl = ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
2636
2637 manager[key].impl = impl
2638
2639 if backref:
2640 backref_listeners(manager[key], backref, uselist)
2641
2642 manager.post_configure_attribute(key)
2643 return manager[key]
2644
2645
2646def register_descriptor(
2647 class_: Type[Any],
2648 key: str,
2649 *,
2650 comparator: interfaces.PropComparator[_T],
2651 parententity: _InternalEntityType[Any],
2652 doc: Optional[str] = None,
2653) -> InstrumentedAttribute[_T]:
2654 manager = manager_of_class(class_)
2655
2656 descriptor = InstrumentedAttribute(
2657 class_, key, comparator=comparator, parententity=parententity
2658 )
2659
2660 descriptor.__doc__ = doc # type: ignore
2661
2662 manager.instrument_attribute(key, descriptor)
2663 return descriptor
2664
2665
2666def unregister_attribute(class_: Type[Any], key: str) -> None:
2667 manager_of_class(class_).uninstrument_attribute(key)
2668
2669
2670def init_collection(obj: object, key: str) -> CollectionAdapter:
2671 """Initialize a collection attribute and return the collection adapter.
2672
2673 This function is used to provide direct access to collection internals
2674 for a previously unloaded attribute. e.g.::
2675
2676 collection_adapter = init_collection(someobject, "elements")
2677 for elem in values:
2678 collection_adapter.append_without_event(elem)
2679
2680 For an easier way to do the above, see
2681 :func:`~sqlalchemy.orm.attributes.set_committed_value`.
2682
2683 :param obj: a mapped object
2684
2685 :param key: string attribute name where the collection is located.
2686
2687 """
2688 state = instance_state(obj)
2689 dict_ = state.dict
2690 return init_state_collection(state, dict_, key)
2691
2692
2693def init_state_collection(
2694 state: InstanceState[Any], dict_: _InstanceDict, key: str
2695) -> CollectionAdapter:
2696 """Initialize a collection attribute and return the collection adapter.
2697
2698 Discards any existing collection which may be there.
2699
2700 """
2701 attr = state.manager[key].impl
2702
2703 if TYPE_CHECKING:
2704 assert isinstance(attr, HasCollectionAdapter)
2705
2706 old = dict_.pop(key, None) # discard old collection
2707 if old is not None:
2708 old_collection = old._sa_adapter
2709 attr._dispose_previous_collection(state, old, old_collection, False)
2710
2711 user_data = attr._default_value(state, dict_)
2712 adapter: CollectionAdapter = attr.get_collection(
2713 state, dict_, user_data, passive=PassiveFlag.PASSIVE_NO_FETCH
2714 )
2715 adapter._reset_empty()
2716
2717 return adapter
2718
2719
2720def set_committed_value(instance: object, key: str, value: Any) -> None:
2721 """Set the value of an attribute with no history events.
2722
2723 Cancels any previous history present. The value should be
2724 a scalar value for scalar-holding attributes, or
2725 an iterable for any collection-holding attribute.
2726
2727 This is the same underlying method used when a lazy loader
2728 fires off and loads additional data from the database.
2729 In particular, this method can be used by application code
2730 which has loaded additional attributes or collections through
2731 separate queries, which can then be attached to an instance
2732 as though it were part of its original loaded state.
2733
2734 """
2735 state, dict_ = instance_state(instance), instance_dict(instance)
2736 state.manager[key].impl.set_committed_value(state, dict_, value)
2737
2738
2739def set_attribute(
2740 instance: object,
2741 key: str,
2742 value: Any,
2743 initiator: Optional[AttributeEventToken] = None,
2744) -> None:
2745 """Set the value of an attribute, firing history events.
2746
2747 This function may be used regardless of instrumentation
2748 applied directly to the class, i.e. no descriptors are required.
2749 Custom attribute management schemes will need to make usage
2750 of this method to establish attribute state as understood
2751 by SQLAlchemy.
2752
2753 :param instance: the object that will be modified
2754
2755 :param key: string name of the attribute
2756
2757 :param value: value to assign
2758
2759 :param initiator: an instance of :class:`.Event` that would have
2760 been propagated from a previous event listener. This argument
2761 is used when the :func:`.set_attribute` function is being used within
2762 an existing event listening function where an :class:`.Event` object
2763 is being supplied; the object may be used to track the origin of the
2764 chain of events.
2765
2766 .. versionadded:: 1.2.3
2767
2768 """
2769 state, dict_ = instance_state(instance), instance_dict(instance)
2770 state.manager[key].impl.set(state, dict_, value, initiator)
2771
2772
2773def get_attribute(instance: object, key: str) -> Any:
2774 """Get the value of an attribute, firing any callables required.
2775
2776 This function may be used regardless of instrumentation
2777 applied directly to the class, i.e. no descriptors are required.
2778 Custom attribute management schemes will need to make usage
2779 of this method to make usage of attribute state as understood
2780 by SQLAlchemy.
2781
2782 """
2783 state, dict_ = instance_state(instance), instance_dict(instance)
2784 return state.manager[key].impl.get(state, dict_)
2785
2786
2787def del_attribute(instance: object, key: str) -> None:
2788 """Delete the value of an attribute, firing history events.
2789
2790 This function may be used regardless of instrumentation
2791 applied directly to the class, i.e. no descriptors are required.
2792 Custom attribute management schemes will need to make usage
2793 of this method to establish attribute state as understood
2794 by SQLAlchemy.
2795
2796 """
2797 state, dict_ = instance_state(instance), instance_dict(instance)
2798 state.manager[key].impl.delete(state, dict_)
2799
2800
2801def flag_modified(instance: object, key: str) -> None:
2802 """Mark an attribute on an instance as 'modified'.
2803
2804 This sets the 'modified' flag on the instance and
2805 establishes an unconditional change event for the given attribute.
2806 The attribute must have a value present, else an
2807 :class:`.InvalidRequestError` is raised.
2808
2809 To mark an object "dirty" without referring to any specific attribute
2810 so that it is considered within a flush, use the
2811 :func:`.attributes.flag_dirty` call.
2812
2813 .. seealso::
2814
2815 :func:`.attributes.flag_dirty`
2816
2817 """
2818 state, dict_ = instance_state(instance), instance_dict(instance)
2819 impl = state.manager[key].impl
2820 impl.dispatch.modified(state, impl._modified_token)
2821 state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
2822
2823
2824def flag_dirty(instance: object) -> None:
2825 """Mark an instance as 'dirty' without any specific attribute mentioned.
2826
2827 This is a special operation that will allow the object to travel through
2828 the flush process for interception by events such as
2829 :meth:`.SessionEvents.before_flush`. Note that no SQL will be emitted in
2830 the flush process for an object that has no changes, even if marked dirty
2831 via this method. However, a :meth:`.SessionEvents.before_flush` handler
2832 will be able to see the object in the :attr:`.Session.dirty` collection and
2833 may establish changes on it, which will then be included in the SQL
2834 emitted.
2835
2836 .. versionadded:: 1.2
2837
2838 .. seealso::
2839
2840 :func:`.attributes.flag_modified`
2841
2842 """
2843
2844 state, dict_ = instance_state(instance), instance_dict(instance)
2845 state._modified_event(dict_, None, NO_VALUE, is_userland=True)