1# orm/attributes.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Defines instrumentation for class attributes and their interaction
10with instances.
11
12This module is usually not directly visible to user applications, but
13defines a large part of the ORM's interactivity.
14
15
16"""
17
18from __future__ import annotations
19
20import dataclasses
21import operator
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import ClassVar
26from typing import Dict
27from typing import Iterable
28from typing import List
29from typing import NamedTuple
30from typing import Optional
31from typing import overload
32from typing import Sequence
33from typing import Tuple
34from typing import Type
35from typing import TYPE_CHECKING
36from typing import TypeVar
37from typing import Union
38
39from . import collections
40from . import exc as orm_exc
41from . import interfaces
42from ._typing import insp_is_aliased_class
43from .base import _DeclarativeMapped
44from .base import ATTR_EMPTY
45from .base import ATTR_WAS_SET
46from .base import CALLABLES_OK
47from .base import DEFERRED_HISTORY_LOAD
48from .base import DONT_SET
49from .base import INCLUDE_PENDING_MUTATIONS # noqa
50from .base import INIT_OK
51from .base import instance_dict as instance_dict
52from .base import instance_state as instance_state
53from .base import instance_str
54from .base import LOAD_AGAINST_COMMITTED
55from .base import LoaderCallableStatus
56from .base import manager_of_class as manager_of_class
57from .base import Mapped as Mapped # noqa
58from .base import NEVER_SET # noqa
59from .base import NO_AUTOFLUSH
60from .base import NO_CHANGE # noqa
61from .base import NO_KEY
62from .base import NO_RAISE
63from .base import NO_VALUE
64from .base import NON_PERSISTENT_OK # noqa
65from .base import opt_manager_of_class as opt_manager_of_class
66from .base import PASSIVE_CLASS_MISMATCH # noqa
67from .base import PASSIVE_NO_FETCH
68from .base import PASSIVE_NO_FETCH_RELATED # noqa
69from .base import PASSIVE_NO_INITIALIZE
70from .base import PASSIVE_NO_RESULT
71from .base import PASSIVE_OFF
72from .base import PASSIVE_ONLY_PERSISTENT
73from .base import PASSIVE_RETURN_NO_VALUE
74from .base import PassiveFlag
75from .base import RELATED_OBJECT_OK # noqa
76from .base import SQL_OK # noqa
77from .base import SQLORMExpression
78from .base import state_str
79from .. import event
80from .. import exc
81from .. import inspection
82from .. import util
83from ..event import dispatcher
84from ..event import EventTarget
85from ..sql import base as sql_base
86from ..sql import cache_key
87from ..sql import coercions
88from ..sql import roles
89from ..sql import visitors
90from ..sql.cache_key import HasCacheKey
91from ..sql.visitors import _TraverseInternalsType
92from ..sql.visitors import InternalTraversal
93from ..util.typing import Literal
94from ..util.typing import Self
95from ..util.typing import TypeGuard
96
97if TYPE_CHECKING:
98 from ._typing import _EntityType
99 from ._typing import _ExternalEntityType
100 from ._typing import _InstanceDict
101 from ._typing import _InternalEntityType
102 from ._typing import _LoaderCallable
103 from ._typing import _O
104 from .collections import _AdaptedCollectionProtocol
105 from .collections import CollectionAdapter
106 from .interfaces import MapperProperty
107 from .relationships import RelationshipProperty
108 from .state import InstanceState
109 from .util import AliasedInsp
110 from .writeonly import _WriteOnlyAttributeImpl
111 from ..event.base import _Dispatch
112 from ..sql._typing import _ColumnExpressionArgument
113 from ..sql._typing import _DMLColumnArgument
114 from ..sql._typing import _InfoType
115 from ..sql._typing import _PropagateAttrsType
116 from ..sql.annotation import _AnnotationDict
117 from ..sql.elements import ColumnElement
118 from ..sql.elements import Label
119 from ..sql.operators import OperatorType
120 from ..sql.selectable import FromClause
121
122
123_T = TypeVar("_T")
124_T_co = TypeVar("_T_co", bound=Any, covariant=True)
125
126
127_AllPendingType = Sequence[
128 Tuple[Optional["InstanceState[Any]"], Optional[object]]
129]
130
131
132_UNKNOWN_ATTR_KEY = object()
133
134
135@inspection._self_inspects
136class QueryableAttribute(
137 _DeclarativeMapped[_T_co],
138 SQLORMExpression[_T_co],
139 interfaces.InspectionAttr,
140 interfaces.PropComparator[_T_co],
141 roles.JoinTargetRole,
142 roles.OnClauseRole,
143 sql_base.Immutable,
144 cache_key.SlotsMemoizedHasCacheKey,
145 util.MemoizedSlots,
146 EventTarget,
147):
148 """Base class for :term:`descriptor` objects that intercept
149 attribute events on behalf of a :class:`.MapperProperty`
150 object. The actual :class:`.MapperProperty` is accessible
151 via the :attr:`.QueryableAttribute.property`
152 attribute.
153
154
155 .. seealso::
156
157 :class:`.InstrumentedAttribute`
158
159 :class:`.MapperProperty`
160
161 :attr:`_orm.Mapper.all_orm_descriptors`
162
163 :attr:`_orm.Mapper.attrs`
164 """
165
166 __slots__ = (
167 "class_",
168 "key",
169 "impl",
170 "comparator",
171 "property",
172 "parent",
173 "expression",
174 "_of_type",
175 "_extra_criteria",
176 "_slots_dispatch",
177 "_propagate_attrs",
178 "_doc",
179 )
180
181 is_attribute = True
182
183 dispatch: dispatcher[QueryableAttribute[_T_co]]
184
185 class_: _ExternalEntityType[Any]
186 key: str
187 parententity: _InternalEntityType[Any]
188 impl: _AttributeImpl
189 comparator: interfaces.PropComparator[_T_co]
190 _of_type: Optional[_InternalEntityType[Any]]
191 _extra_criteria: Tuple[ColumnElement[bool], ...]
192 _doc: Optional[str]
193
194 # PropComparator has a __visit_name__ to participate within
195 # traversals. Disambiguate the attribute vs. a comparator.
196 __visit_name__ = "orm_instrumented_attribute"
197
198 def __init__(
199 self,
200 class_: _ExternalEntityType[_O],
201 key: str,
202 parententity: _InternalEntityType[_O],
203 comparator: interfaces.PropComparator[_T_co],
204 impl: Optional[_AttributeImpl] = None,
205 of_type: Optional[_InternalEntityType[Any]] = None,
206 extra_criteria: Tuple[ColumnElement[bool], ...] = (),
207 ):
208 self.class_ = class_
209 self.key = key
210
211 self._parententity = self.parent = parententity
212
213 # this attribute is non-None after mappers are set up, however in the
214 # interim class manager setup, there's a check for None to see if it
215 # needs to be populated, so we assign None here leaving the attribute
216 # in a temporarily not-type-correct state
217 self.impl = impl # type: ignore
218
219 assert comparator is not None
220 self.comparator = comparator
221 self._of_type = of_type
222 self._extra_criteria = extra_criteria
223 self._doc = None
224
225 manager = opt_manager_of_class(class_)
226 # manager is None in the case of AliasedClass
227 if manager:
228 # propagate existing event listeners from
229 # immediate superclass
230 for base in manager._bases:
231 if key in base:
232 self.dispatch._update(base[key].dispatch)
233 if base[key].dispatch._active_history:
234 self.dispatch._active_history = True # type: ignore
235
236 _cache_key_traversal = [
237 ("key", visitors.ExtendedInternalTraversal.dp_string),
238 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
239 ("_of_type", visitors.ExtendedInternalTraversal.dp_multi),
240 ("_extra_criteria", visitors.InternalTraversal.dp_clauseelement_list),
241 ]
242
243 def __reduce__(self) -> Any:
244 # this method is only used in terms of the
245 # sqlalchemy.ext.serializer extension
246 return (
247 _queryable_attribute_unreduce,
248 (
249 self.key,
250 self._parententity.mapper.class_,
251 self._parententity,
252 self._parententity.entity,
253 ),
254 )
255
256 @property
257 def _impl_uses_objects(self) -> bool:
258 return self.impl.uses_objects
259
260 def get_history(
261 self, instance: Any, passive: PassiveFlag = PASSIVE_OFF
262 ) -> History:
263 return self.impl.get_history(
264 instance_state(instance), instance_dict(instance), passive
265 )
266
267 @property
268 def info(self) -> _InfoType:
269 """Return the 'info' dictionary for the underlying SQL element.
270
271 The behavior here is as follows:
272
273 * If the attribute is a column-mapped property, i.e.
274 :class:`.ColumnProperty`, which is mapped directly
275 to a schema-level :class:`_schema.Column` object, this attribute
276 will return the :attr:`.SchemaItem.info` dictionary associated
277 with the core-level :class:`_schema.Column` object.
278
279 * If the attribute is a :class:`.ColumnProperty` but is mapped to
280 any other kind of SQL expression other than a
281 :class:`_schema.Column`,
282 the attribute will refer to the :attr:`.MapperProperty.info`
283 dictionary associated directly with the :class:`.ColumnProperty`,
284 assuming the SQL expression itself does not have its own ``.info``
285 attribute (which should be the case, unless a user-defined SQL
286 construct has defined one).
287
288 * If the attribute refers to any other kind of
289 :class:`.MapperProperty`, including :class:`.Relationship`,
290 the attribute will refer to the :attr:`.MapperProperty.info`
291 dictionary associated with that :class:`.MapperProperty`.
292
293 * To access the :attr:`.MapperProperty.info` dictionary of the
294 :class:`.MapperProperty` unconditionally, including for a
295 :class:`.ColumnProperty` that's associated directly with a
296 :class:`_schema.Column`, the attribute can be referred to using
297 :attr:`.QueryableAttribute.property` attribute, as
298 ``MyClass.someattribute.property.info``.
299
300 .. seealso::
301
302 :attr:`.SchemaItem.info`
303
304 :attr:`.MapperProperty.info`
305
306 """
307 return self.comparator.info
308
309 parent: _InternalEntityType[Any]
310 """Return an inspection instance representing the parent.
311
312 This will be either an instance of :class:`_orm.Mapper`
313 or :class:`.AliasedInsp`, depending upon the nature
314 of the parent entity which this attribute is associated
315 with.
316
317 """
318
319 expression: ColumnElement[_T_co]
320 """The SQL expression object represented by this
321 :class:`.QueryableAttribute`.
322
323 This will typically be an instance of a :class:`_sql.ColumnElement`
324 subclass representing a column expression.
325
326 """
327
328 def _memoized_attr_expression(self) -> ColumnElement[_T]:
329 annotations: _AnnotationDict
330
331 # applies only to Proxy() as used by hybrid.
332 # currently is an exception to typing rather than feeding through
333 # non-string keys.
334 # ideally Proxy() would have a separate set of methods to deal
335 # with this case.
336 entity_namespace = self._entity_namespace
337 assert isinstance(entity_namespace, HasCacheKey)
338
339 if self.key is _UNKNOWN_ATTR_KEY:
340 annotations = {"entity_namespace": entity_namespace}
341 else:
342 annotations = {
343 "proxy_key": self.key,
344 "proxy_owner": self._parententity,
345 "entity_namespace": entity_namespace,
346 }
347
348 ce = self.comparator.__clause_element__()
349 try:
350 if TYPE_CHECKING:
351 assert isinstance(ce, ColumnElement)
352 anno = ce._annotate
353 except AttributeError as ae:
354 raise exc.InvalidRequestError(
355 'When interpreting attribute "%s" as a SQL expression, '
356 "expected __clause_element__() to return "
357 "a ClauseElement object, got: %r" % (self, ce)
358 ) from ae
359 else:
360 return anno(annotations)
361
362 def _memoized_attr__propagate_attrs(self) -> _PropagateAttrsType:
363 # this suits the case in coercions where we don't actually
364 # call ``__clause_element__()`` but still need to get
365 # resolved._propagate_attrs. See #6558.
366 return util.immutabledict(
367 {
368 "compile_state_plugin": "orm",
369 "plugin_subject": self._parentmapper,
370 }
371 )
372
373 @property
374 def _entity_namespace(self) -> _InternalEntityType[Any]:
375 return self._parententity
376
377 @property
378 def _annotations(self) -> _AnnotationDict:
379 return self.__clause_element__()._annotations
380
381 def __clause_element__(self) -> ColumnElement[_T_co]:
382 return self.expression
383
384 @property
385 def _from_objects(self) -> List[FromClause]:
386 return self.expression._from_objects
387
388 def _bulk_update_tuples(
389 self, value: Any
390 ) -> Sequence[Tuple[_DMLColumnArgument, Any]]:
391 """Return setter tuples for a bulk UPDATE."""
392
393 return self.comparator._bulk_update_tuples(value)
394
395 def _bulk_dml_setter(self, key: str) -> Optional[Callable[..., Any]]:
396 """return a callable that will process a bulk INSERT value"""
397
398 return self.comparator._bulk_dml_setter(key)
399
400 def adapt_to_entity(self, adapt_to_entity: AliasedInsp[Any]) -> Self:
401 assert not self._of_type
402 return self.__class__(
403 adapt_to_entity.entity,
404 self.key,
405 impl=self.impl,
406 comparator=self.comparator.adapt_to_entity(adapt_to_entity),
407 parententity=adapt_to_entity,
408 )
409
410 def of_type(self, entity: _EntityType[_T]) -> QueryableAttribute[_T]:
411 return QueryableAttribute(
412 self.class_,
413 self.key,
414 self._parententity,
415 impl=self.impl,
416 comparator=self.comparator.of_type(entity),
417 of_type=inspection.inspect(entity),
418 extra_criteria=self._extra_criteria,
419 )
420
421 def and_(
422 self, *clauses: _ColumnExpressionArgument[bool]
423 ) -> QueryableAttribute[bool]:
424 if TYPE_CHECKING:
425 assert isinstance(self.comparator, RelationshipProperty.Comparator)
426
427 exprs = tuple(
428 coercions.expect(roles.WhereHavingRole, clause)
429 for clause in util.coerce_generator_arg(clauses)
430 )
431
432 return QueryableAttribute(
433 self.class_,
434 self.key,
435 self._parententity,
436 impl=self.impl,
437 comparator=self.comparator.and_(*exprs),
438 of_type=self._of_type,
439 extra_criteria=self._extra_criteria + exprs,
440 )
441
442 def _clone(self, **kw: Any) -> QueryableAttribute[_T]:
443 return QueryableAttribute(
444 self.class_,
445 self.key,
446 self._parententity,
447 impl=self.impl,
448 comparator=self.comparator,
449 of_type=self._of_type,
450 extra_criteria=self._extra_criteria,
451 )
452
453 def label(self, name: Optional[str]) -> Label[_T_co]:
454 return self.__clause_element__().label(name)
455
456 def operate(
457 self, op: OperatorType, *other: Any, **kwargs: Any
458 ) -> ColumnElement[Any]:
459 return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
460
461 def reverse_operate(
462 self, op: OperatorType, other: Any, **kwargs: Any
463 ) -> ColumnElement[Any]:
464 return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
465
466 def hasparent(
467 self, state: InstanceState[Any], optimistic: bool = False
468 ) -> bool:
469 return self.impl.hasparent(state, optimistic=optimistic) is not False
470
471 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
472 return (self,)
473
474 def __getattr__(self, key: str) -> Any:
475 try:
476 return util.MemoizedSlots.__getattr__(self, key)
477 except AttributeError:
478 pass
479
480 try:
481 return getattr(self.comparator, key)
482 except AttributeError as err:
483 raise AttributeError(
484 "Neither %r object nor %r object associated with %s "
485 "has an attribute %r"
486 % (
487 type(self).__name__,
488 type(self.comparator).__name__,
489 self,
490 key,
491 )
492 ) from err
493
494 def __str__(self) -> str:
495 return f"{self.class_.__name__}.{self.key}"
496
497 def _memoized_attr_property(self) -> Optional[MapperProperty[Any]]:
498 return self.comparator.property
499
500
501def _queryable_attribute_unreduce(
502 key: str,
503 mapped_class: Type[_O],
504 parententity: _InternalEntityType[_O],
505 entity: _ExternalEntityType[Any],
506) -> Any:
507 # this method is only used in terms of the
508 # sqlalchemy.ext.serializer extension
509 if insp_is_aliased_class(parententity):
510 return entity._get_from_serialized(key, mapped_class, parententity)
511 else:
512 return getattr(entity, key)
513
514
515class InstrumentedAttribute(QueryableAttribute[_T_co]):
516 """Class bound instrumented attribute which adds basic
517 :term:`descriptor` methods.
518
519 See :class:`.QueryableAttribute` for a description of most features.
520
521
522 """
523
524 __slots__ = ()
525
526 inherit_cache = True
527 """:meta private:"""
528
529 # hack to make __doc__ writeable on instances of
530 # InstrumentedAttribute, while still keeping classlevel
531 # __doc__ correct
532
533 @util.rw_hybridproperty
534 def __doc__(self) -> Optional[str]:
535 return self._doc
536
537 @__doc__.setter # type: ignore
538 def __doc__(self, value: Optional[str]) -> None:
539 self._doc = value
540
541 @__doc__.classlevel # type: ignore
542 def __doc__(cls) -> Optional[str]:
543 return super().__doc__
544
545 def __set__(self, instance: object, value: Any) -> None:
546 self.impl.set(
547 instance_state(instance), instance_dict(instance), value, None
548 )
549
550 def __delete__(self, instance: object) -> None:
551 self.impl.delete(instance_state(instance), instance_dict(instance))
552
553 @overload
554 def __get__(
555 self, instance: None, owner: Any
556 ) -> InstrumentedAttribute[_T_co]: ...
557
558 @overload
559 def __get__(self, instance: object, owner: Any) -> _T_co: ...
560
561 def __get__(
562 self, instance: Optional[object], owner: Any
563 ) -> Union[InstrumentedAttribute[_T_co], _T_co]:
564 if instance is None:
565 return self
566
567 dict_ = instance_dict(instance)
568 if self.impl.supports_population and self.key in dict_:
569 return dict_[self.key] # type: ignore[no-any-return]
570 else:
571 try:
572 state = instance_state(instance)
573 except AttributeError as err:
574 raise orm_exc.UnmappedInstanceError(instance) from err
575 return self.impl.get(state, dict_) # type: ignore[no-any-return]
576
577
578@dataclasses.dataclass(frozen=True)
579class _AdHocHasEntityNamespace(HasCacheKey):
580 _traverse_internals: ClassVar[_TraverseInternalsType] = [
581 ("_entity_namespace", InternalTraversal.dp_has_cache_key),
582 ]
583
584 # py37 compat, no slots=True on dataclass
585 __slots__ = ("_entity_namespace",)
586 _entity_namespace: _InternalEntityType[Any]
587 is_mapper: ClassVar[bool] = False
588 is_aliased_class: ClassVar[bool] = False
589
590 @property
591 def entity_namespace(self):
592 return self._entity_namespace.entity_namespace
593
594
595def _create_proxied_attribute(
596 descriptor: Any,
597) -> Callable[..., QueryableAttribute[Any]]:
598 """Create an QueryableAttribute / user descriptor hybrid.
599
600 Returns a new QueryableAttribute type that delegates descriptor
601 behavior and getattr() to the given descriptor.
602 """
603
604 # TODO: can move this to descriptor_props if the need for this
605 # function is removed from ext/hybrid.py
606
607 class Proxy(QueryableAttribute[_T_co]):
608 """Presents the :class:`.QueryableAttribute` interface as a
609 proxy on top of a Python descriptor / :class:`.PropComparator`
610 combination.
611
612 """
613
614 _extra_criteria = ()
615
616 # the attribute error catches inside of __getattr__ basically create a
617 # singularity if you try putting slots on this too
618 # __slots__ = ("descriptor", "original_property", "_comparator")
619
620 def __init__(
621 self,
622 class_: _ExternalEntityType[Any],
623 key: str,
624 descriptor: Any,
625 comparator: interfaces.PropComparator[_T_co],
626 adapt_to_entity: Optional[AliasedInsp[Any]] = None,
627 doc: Optional[str] = None,
628 original_property: Optional[QueryableAttribute[_T_co]] = None,
629 ):
630 self.class_ = class_
631 self.key = key
632 self.descriptor = descriptor
633 self.original_property = original_property
634 self._comparator = comparator
635 self._adapt_to_entity = adapt_to_entity
636 self._doc = self.__doc__ = doc
637
638 @property
639 def _parententity(self): # type: ignore[override]
640 return inspection.inspect(self.class_, raiseerr=False)
641
642 @property
643 def parent(self): # type: ignore[override]
644 return inspection.inspect(self.class_, raiseerr=False)
645
646 _is_internal_proxy = True
647
648 _cache_key_traversal = [
649 ("key", visitors.ExtendedInternalTraversal.dp_string),
650 ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
651 ]
652
653 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]:
654 prop = self.original_property
655 if prop is None:
656 return ()
657 else:
658 return prop._column_strategy_attrs()
659
660 @property
661 def _impl_uses_objects(self):
662 return (
663 self.original_property is not None
664 and getattr(self.class_, self.key).impl.uses_objects
665 )
666
667 @property
668 def _entity_namespace(self):
669 if hasattr(self._comparator, "_parententity"):
670 return self._comparator._parententity
671 else:
672 # used by hybrid attributes which try to remain
673 # agnostic of any ORM concepts like mappers
674 return _AdHocHasEntityNamespace(self._parententity)
675
676 @property
677 def property(self):
678 return self.comparator.property
679
680 @util.memoized_property
681 def comparator(self):
682 if callable(self._comparator):
683 self._comparator = self._comparator()
684 if self._adapt_to_entity:
685 self._comparator = self._comparator.adapt_to_entity(
686 self._adapt_to_entity
687 )
688 return self._comparator
689
690 def adapt_to_entity(self, adapt_to_entity):
691 return self.__class__(
692 adapt_to_entity.entity,
693 self.key,
694 self.descriptor,
695 self._comparator,
696 adapt_to_entity,
697 )
698
699 def _clone(self, **kw):
700 return self.__class__(
701 self.class_,
702 self.key,
703 self.descriptor,
704 self._comparator,
705 adapt_to_entity=self._adapt_to_entity,
706 original_property=self.original_property,
707 )
708
709 def __get__(self, instance, owner):
710 retval = self.descriptor.__get__(instance, owner)
711 # detect if this is a plain Python @property, which just returns
712 # itself for class level access. If so, then return us.
713 # Otherwise, return the object returned by the descriptor.
714 if retval is self.descriptor and instance is None:
715 return self
716 else:
717 return retval
718
719 def __str__(self) -> str:
720 return f"{self.class_.__name__}.{self.key}"
721
722 def __getattr__(self, attribute):
723 """Delegate __getattr__ to the original descriptor and/or
724 comparator."""
725
726 # this is unfortunately very complicated, and is easily prone
727 # to recursion overflows when implementations of related
728 # __getattr__ schemes are changed
729
730 try:
731 return util.MemoizedSlots.__getattr__(self, attribute)
732 except AttributeError:
733 pass
734
735 try:
736 return getattr(descriptor, attribute)
737 except AttributeError as err:
738 if attribute == "comparator":
739 raise AttributeError("comparator") from err
740 try:
741 # comparator itself might be unreachable
742 comparator = self.comparator
743 except AttributeError as err2:
744 raise AttributeError(
745 "Neither %r object nor unconfigured comparator "
746 "object associated with %s has an attribute %r"
747 % (type(descriptor).__name__, self, attribute)
748 ) from err2
749 else:
750 try:
751 return getattr(comparator, attribute)
752 except AttributeError as err3:
753 raise AttributeError(
754 "Neither %r object nor %r object "
755 "associated with %s has an attribute %r"
756 % (
757 type(descriptor).__name__,
758 type(comparator).__name__,
759 self,
760 attribute,
761 )
762 ) from err3
763
764 Proxy.__name__ = type(descriptor).__name__ + "Proxy"
765
766 util.monkeypatch_proxied_specials(
767 Proxy, type(descriptor), name="descriptor", from_instance=descriptor
768 )
769 return Proxy
770
771
772OP_REMOVE = util.symbol("REMOVE")
773OP_APPEND = util.symbol("APPEND")
774OP_REPLACE = util.symbol("REPLACE")
775OP_BULK_REPLACE = util.symbol("BULK_REPLACE")
776OP_MODIFIED = util.symbol("MODIFIED")
777
778
779class AttributeEventToken:
780 """A token propagated throughout the course of a chain of attribute
781 events.
782
783 Serves as an indicator of the source of the event and also provides
784 a means of controlling propagation across a chain of attribute
785 operations.
786
787 The :class:`.Event` object is sent as the ``initiator`` argument
788 when dealing with events such as :meth:`.AttributeEvents.append`,
789 :meth:`.AttributeEvents.set`,
790 and :meth:`.AttributeEvents.remove`.
791
792 The :class:`.Event` object is currently interpreted by the backref
793 event handlers, and is used to control the propagation of operations
794 across two mutually-dependent attributes.
795
796 .. versionchanged:: 2.0 Changed the name from ``AttributeEvent``
797 to ``AttributeEventToken``.
798
799 :attribute impl: The :class:`.AttributeImpl` which is the current event
800 initiator.
801
802 :attribute op: The symbol :attr:`.OP_APPEND`, :attr:`.OP_REMOVE`,
803 :attr:`.OP_REPLACE`, or :attr:`.OP_BULK_REPLACE`, indicating the
804 source operation.
805
806 """
807
808 __slots__ = "impl", "op", "parent_token"
809
810 def __init__(self, attribute_impl: _AttributeImpl, op: util.symbol):
811 self.impl = attribute_impl
812 self.op = op
813 self.parent_token = self.impl.parent_token
814
815 def __eq__(self, other):
816 return (
817 isinstance(other, AttributeEventToken)
818 and other.impl is self.impl
819 and other.op == self.op
820 )
821
822 @property
823 def key(self):
824 return self.impl.key
825
826 def hasparent(self, state):
827 return self.impl.hasparent(state)
828
829
830AttributeEvent = AttributeEventToken # legacy
831Event = AttributeEventToken # legacy
832
833
834class _AttributeImpl:
835 """internal implementation for instrumented attributes."""
836
837 collection: bool
838 default_accepts_scalar_loader: bool
839 uses_objects: bool
840 supports_population: bool
841 dynamic: bool
842
843 _is_has_collection_adapter = False
844
845 _replace_token: AttributeEventToken
846 _remove_token: AttributeEventToken
847 _append_token: AttributeEventToken
848
849 def __init__(
850 self,
851 class_: _ExternalEntityType[_O],
852 key: str,
853 callable_: Optional[_LoaderCallable],
854 dispatch: _Dispatch[QueryableAttribute[Any]],
855 trackparent: bool = False,
856 compare_function: Optional[Callable[..., bool]] = None,
857 active_history: bool = False,
858 parent_token: Optional[AttributeEventToken] = None,
859 load_on_unexpire: bool = True,
860 send_modified_events: bool = True,
861 accepts_scalar_loader: Optional[bool] = None,
862 **kwargs: Any,
863 ):
864 r"""Construct an AttributeImpl.
865
866 :param \class_: associated class
867
868 :param key: string name of the attribute
869
870 :param \callable_:
871 optional function which generates a callable based on a parent
872 instance, which produces the "default" values for a scalar or
873 collection attribute when it's first accessed, if not present
874 already.
875
876 :param trackparent:
877 if True, attempt to track if an instance has a parent attached
878 to it via this attribute.
879
880 :param compare_function:
881 a function that compares two values which are normally
882 assignable to this attribute.
883
884 :param active_history:
885 indicates that get_history() should always return the "old" value,
886 even if it means executing a lazy callable upon attribute change.
887
888 :param parent_token:
889 Usually references the MapperProperty, used as a key for
890 the hasparent() function to identify an "owning" attribute.
891 Allows multiple AttributeImpls to all match a single
892 owner attribute.
893
894 :param load_on_unexpire:
895 if False, don't include this attribute in a load-on-expired
896 operation, i.e. the "expired_attribute_loader" process.
897 The attribute can still be in the "expired" list and be
898 considered to be "expired". Previously, this flag was called
899 "expire_missing" and is only used by a deferred column
900 attribute.
901
902 :param send_modified_events:
903 if False, the InstanceState._modified_event method will have no
904 effect; this means the attribute will never show up as changed in a
905 history entry.
906
907 """
908 self.class_ = class_
909 self.key = key
910 self.callable_ = callable_
911 self.dispatch = dispatch
912 self.trackparent = trackparent
913 self.parent_token = parent_token or self
914 self.send_modified_events = send_modified_events
915 if compare_function is None:
916 self.is_equal = operator.eq
917 else:
918 self.is_equal = compare_function
919
920 if accepts_scalar_loader is not None:
921 self.accepts_scalar_loader = accepts_scalar_loader
922 else:
923 self.accepts_scalar_loader = self.default_accepts_scalar_loader
924
925 _deferred_history = kwargs.pop("_deferred_history", False)
926 self._deferred_history = _deferred_history
927
928 if active_history:
929 self.dispatch._active_history = True
930
931 self.load_on_unexpire = load_on_unexpire
932 self._modified_token = AttributeEventToken(self, OP_MODIFIED)
933
934 __slots__ = (
935 "class_",
936 "key",
937 "callable_",
938 "dispatch",
939 "trackparent",
940 "parent_token",
941 "send_modified_events",
942 "is_equal",
943 "load_on_unexpire",
944 "_modified_token",
945 "accepts_scalar_loader",
946 "_deferred_history",
947 )
948
949 def __str__(self) -> str:
950 return f"{self.class_.__name__}.{self.key}"
951
952 def _get_active_history(self):
953 """Backwards compat for impl.active_history"""
954
955 return self.dispatch._active_history
956
957 def _set_active_history(self, value):
958 self.dispatch._active_history = value
959
960 active_history = property(_get_active_history, _set_active_history)
961
962 def hasparent(
963 self, state: InstanceState[Any], optimistic: bool = False
964 ) -> bool:
965 """Return the boolean value of a `hasparent` flag attached to
966 the given state.
967
968 The `optimistic` flag determines what the default return value
969 should be if no `hasparent` flag can be located.
970
971 As this function is used to determine if an instance is an
972 *orphan*, instances that were loaded from storage should be
973 assumed to not be orphans, until a True/False value for this
974 flag is set.
975
976 An instance attribute that is loaded by a callable function
977 will also not have a `hasparent` flag.
978
979 """
980 msg = "This AttributeImpl is not configured to track parents."
981 assert self.trackparent, msg
982
983 return (
984 state.parents.get(id(self.parent_token), optimistic) is not False
985 )
986
987 def sethasparent(
988 self,
989 state: InstanceState[Any],
990 parent_state: InstanceState[Any],
991 value: bool,
992 ) -> None:
993 """Set a boolean flag on the given item corresponding to
994 whether or not it is attached to a parent object via the
995 attribute represented by this ``InstrumentedAttribute``.
996
997 """
998 msg = "This AttributeImpl is not configured to track parents."
999 assert self.trackparent, msg
1000
1001 id_ = id(self.parent_token)
1002 if value:
1003 state.parents[id_] = parent_state
1004 else:
1005 if id_ in state.parents:
1006 last_parent = state.parents[id_]
1007
1008 if (
1009 last_parent is not False
1010 and last_parent.key != parent_state.key
1011 ):
1012 if last_parent.obj() is None:
1013 raise orm_exc.StaleDataError(
1014 "Removing state %s from parent "
1015 "state %s along attribute '%s', "
1016 "but the parent record "
1017 "has gone stale, can't be sure this "
1018 "is the most recent parent."
1019 % (
1020 state_str(state),
1021 state_str(parent_state),
1022 self.key,
1023 )
1024 )
1025
1026 return
1027
1028 state.parents[id_] = False
1029
1030 def get_history(
1031 self,
1032 state: InstanceState[Any],
1033 dict_: _InstanceDict,
1034 passive: PassiveFlag = PASSIVE_OFF,
1035 ) -> History:
1036 raise NotImplementedError()
1037
1038 def get_all_pending(
1039 self,
1040 state: InstanceState[Any],
1041 dict_: _InstanceDict,
1042 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1043 ) -> _AllPendingType:
1044 """Return a list of tuples of (state, obj)
1045 for all objects in this attribute's current state
1046 + history.
1047
1048 Only applies to object-based attributes.
1049
1050 This is an inlining of existing functionality
1051 which roughly corresponds to:
1052
1053 get_state_history(
1054 state,
1055 key,
1056 passive=PASSIVE_NO_INITIALIZE).sum()
1057
1058 """
1059 raise NotImplementedError()
1060
1061 def _default_value(
1062 self, state: InstanceState[Any], dict_: _InstanceDict
1063 ) -> Any:
1064 """Produce an empty value for an uninitialized attribute."""
1065
1066 raise NotImplementedError()
1067
1068 def get(
1069 self,
1070 state: InstanceState[Any],
1071 dict_: _InstanceDict,
1072 passive: PassiveFlag = PASSIVE_OFF,
1073 ) -> Any:
1074 """Retrieve a value from the given object.
1075 If a callable is assembled on this object's attribute, and
1076 passive is False, the callable will be executed and the
1077 resulting value will be set as the new value for this attribute.
1078 """
1079 if self.key in dict_:
1080 return dict_[self.key]
1081 else:
1082 # if history present, don't load
1083 key = self.key
1084 if (
1085 key not in state.committed_state
1086 or state.committed_state[key] is NO_VALUE
1087 ):
1088 if not passive & CALLABLES_OK:
1089 return PASSIVE_NO_RESULT
1090
1091 value = self._fire_loader_callables(state, key, passive)
1092
1093 if value is PASSIVE_NO_RESULT or value is NO_VALUE:
1094 return value
1095 elif value is ATTR_WAS_SET:
1096 try:
1097 return dict_[key]
1098 except KeyError as err:
1099 # TODO: no test coverage here.
1100 raise KeyError(
1101 "Deferred loader for attribute "
1102 "%r failed to populate "
1103 "correctly" % key
1104 ) from err
1105 elif value is not ATTR_EMPTY:
1106 return self.set_committed_value(state, dict_, value)
1107
1108 if not passive & INIT_OK:
1109 return NO_VALUE
1110 else:
1111 return self._default_value(state, dict_)
1112
1113 def _fire_loader_callables(
1114 self, state: InstanceState[Any], key: str, passive: PassiveFlag
1115 ) -> Any:
1116 if (
1117 self.accepts_scalar_loader
1118 and self.load_on_unexpire
1119 and key in state.expired_attributes
1120 ):
1121 return state._load_expired(state, passive)
1122 elif key in state.callables:
1123 callable_ = state.callables[key]
1124 return callable_(state, passive)
1125 elif self.callable_:
1126 return self.callable_(state, passive)
1127 else:
1128 return ATTR_EMPTY
1129
1130 def append(
1131 self,
1132 state: InstanceState[Any],
1133 dict_: _InstanceDict,
1134 value: Any,
1135 initiator: Optional[AttributeEventToken],
1136 passive: PassiveFlag = PASSIVE_OFF,
1137 ) -> None:
1138 self.set(state, dict_, value, initiator, passive=passive)
1139
1140 def remove(
1141 self,
1142 state: InstanceState[Any],
1143 dict_: _InstanceDict,
1144 value: Any,
1145 initiator: Optional[AttributeEventToken],
1146 passive: PassiveFlag = PASSIVE_OFF,
1147 ) -> None:
1148 self.set(
1149 state, dict_, None, initiator, passive=passive, check_old=value
1150 )
1151
1152 def pop(
1153 self,
1154 state: InstanceState[Any],
1155 dict_: _InstanceDict,
1156 value: Any,
1157 initiator: Optional[AttributeEventToken],
1158 passive: PassiveFlag = PASSIVE_OFF,
1159 ) -> None:
1160 self.set(
1161 state,
1162 dict_,
1163 None,
1164 initiator,
1165 passive=passive,
1166 check_old=value,
1167 pop=True,
1168 )
1169
1170 def set(
1171 self,
1172 state: InstanceState[Any],
1173 dict_: _InstanceDict,
1174 value: Any,
1175 initiator: Optional[AttributeEventToken] = None,
1176 passive: PassiveFlag = PASSIVE_OFF,
1177 check_old: Any = None,
1178 pop: bool = False,
1179 ) -> None:
1180 raise NotImplementedError()
1181
1182 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1183 raise NotImplementedError()
1184
1185 def get_committed_value(
1186 self,
1187 state: InstanceState[Any],
1188 dict_: _InstanceDict,
1189 passive: PassiveFlag = PASSIVE_OFF,
1190 ) -> Any:
1191 """return the unchanged value of this attribute"""
1192
1193 if self.key in state.committed_state:
1194 value = state.committed_state[self.key]
1195 if value is NO_VALUE:
1196 return None
1197 else:
1198 return value
1199 else:
1200 return self.get(state, dict_, passive=passive)
1201
1202 def set_committed_value(self, state, dict_, value):
1203 """set an attribute value on the given instance and 'commit' it."""
1204
1205 dict_[self.key] = value
1206 state._commit(dict_, [self.key])
1207 return value
1208
1209
1210class _ScalarAttributeImpl(_AttributeImpl):
1211 """represents a scalar value-holding InstrumentedAttribute."""
1212
1213 default_accepts_scalar_loader = True
1214 uses_objects = False
1215 supports_population = True
1216 collection = False
1217 dynamic = False
1218
1219 __slots__ = (
1220 "_default_scalar_value",
1221 "_replace_token",
1222 "_append_token",
1223 "_remove_token",
1224 )
1225
1226 def __init__(self, *arg, default_scalar_value=None, **kw):
1227 super().__init__(*arg, **kw)
1228 self._default_scalar_value = default_scalar_value
1229 self._replace_token = self._append_token = AttributeEventToken(
1230 self, OP_REPLACE
1231 )
1232 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1233
1234 def _default_value(
1235 self, state: InstanceState[Any], dict_: _InstanceDict
1236 ) -> Any:
1237 """Produce an empty value for an uninitialized scalar attribute."""
1238
1239 assert self.key not in dict_, (
1240 "_default_value should only be invoked for an "
1241 "uninitialized or expired attribute"
1242 )
1243 value = self._default_scalar_value
1244 for fn in self.dispatch.init_scalar:
1245 ret = fn(state, value, dict_)
1246 if ret is not ATTR_EMPTY:
1247 value = ret
1248
1249 return value
1250
1251 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1252 if self.dispatch._active_history:
1253 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1254 else:
1255 old = dict_.get(self.key, NO_VALUE)
1256
1257 if self.dispatch.remove:
1258 self.fire_remove_event(state, dict_, old, self._remove_token)
1259 state._modified_event(dict_, self, old)
1260
1261 existing = dict_.pop(self.key, NO_VALUE)
1262 if (
1263 existing is NO_VALUE
1264 and old is NO_VALUE
1265 and not state.expired
1266 and self.key not in state.expired_attributes
1267 ):
1268 raise AttributeError("%s object does not have a value" % self)
1269
1270 def get_history(
1271 self,
1272 state: InstanceState[Any],
1273 dict_: Dict[str, Any],
1274 passive: PassiveFlag = PASSIVE_OFF,
1275 ) -> History:
1276 if self.key in dict_:
1277 return History.from_scalar_attribute(self, state, dict_[self.key])
1278 elif self.key in state.committed_state:
1279 return History.from_scalar_attribute(self, state, NO_VALUE)
1280 else:
1281 if passive & INIT_OK:
1282 passive ^= INIT_OK
1283 current = self.get(state, dict_, passive=passive)
1284 if current is PASSIVE_NO_RESULT:
1285 return HISTORY_BLANK
1286 else:
1287 return History.from_scalar_attribute(self, state, current)
1288
1289 def set(
1290 self,
1291 state: InstanceState[Any],
1292 dict_: Dict[str, Any],
1293 value: Any,
1294 initiator: Optional[AttributeEventToken] = None,
1295 passive: PassiveFlag = PASSIVE_OFF,
1296 check_old: Optional[object] = None,
1297 pop: bool = False,
1298 ) -> None:
1299 if value is DONT_SET:
1300 return
1301
1302 if self.dispatch._active_history:
1303 old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
1304 else:
1305 old = dict_.get(self.key, NO_VALUE)
1306
1307 if self.dispatch.set:
1308 value = self.fire_replace_event(
1309 state, dict_, value, old, initiator
1310 )
1311 state._modified_event(dict_, self, old)
1312 dict_[self.key] = value
1313
1314 def fire_replace_event(
1315 self,
1316 state: InstanceState[Any],
1317 dict_: _InstanceDict,
1318 value: _T,
1319 previous: Any,
1320 initiator: Optional[AttributeEventToken],
1321 ) -> _T:
1322 for fn in self.dispatch.set:
1323 value = fn(
1324 state, value, previous, initiator or self._replace_token
1325 )
1326 return value
1327
1328 def fire_remove_event(
1329 self,
1330 state: InstanceState[Any],
1331 dict_: _InstanceDict,
1332 value: Any,
1333 initiator: Optional[AttributeEventToken],
1334 ) -> None:
1335 for fn in self.dispatch.remove:
1336 fn(state, value, initiator or self._remove_token)
1337
1338
1339class _ScalarObjectAttributeImpl(_ScalarAttributeImpl):
1340 """represents a scalar-holding InstrumentedAttribute,
1341 where the target object is also instrumented.
1342
1343 Adds events to delete/set operations.
1344
1345 """
1346
1347 default_accepts_scalar_loader = False
1348 uses_objects = True
1349 supports_population = True
1350 collection = False
1351
1352 __slots__ = ()
1353
1354 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1355 if self.dispatch._active_history:
1356 old = self.get(
1357 state,
1358 dict_,
1359 passive=PASSIVE_ONLY_PERSISTENT
1360 | NO_AUTOFLUSH
1361 | LOAD_AGAINST_COMMITTED,
1362 )
1363 else:
1364 old = self.get(
1365 state,
1366 dict_,
1367 passive=PASSIVE_NO_FETCH ^ INIT_OK
1368 | LOAD_AGAINST_COMMITTED
1369 | NO_RAISE,
1370 )
1371
1372 self.fire_remove_event(state, dict_, old, self._remove_token)
1373
1374 existing = dict_.pop(self.key, NO_VALUE)
1375
1376 # if the attribute is expired, we currently have no way to tell
1377 # that an object-attribute was expired vs. not loaded. So
1378 # for this test, we look to see if the object has a DB identity.
1379 if (
1380 existing is NO_VALUE
1381 and old is not PASSIVE_NO_RESULT
1382 and state.key is None
1383 ):
1384 raise AttributeError("%s object does not have a value" % self)
1385
1386 def get_history(
1387 self,
1388 state: InstanceState[Any],
1389 dict_: _InstanceDict,
1390 passive: PassiveFlag = PASSIVE_OFF,
1391 ) -> History:
1392 if self.key in dict_:
1393 current = dict_[self.key]
1394 else:
1395 if passive & INIT_OK:
1396 passive ^= INIT_OK
1397 current = self.get(state, dict_, passive=passive)
1398 if current is PASSIVE_NO_RESULT:
1399 return HISTORY_BLANK
1400
1401 if not self._deferred_history:
1402 return History.from_object_attribute(self, state, current)
1403 else:
1404 original = state.committed_state.get(self.key, _NO_HISTORY)
1405 if original is PASSIVE_NO_RESULT:
1406 loader_passive = passive | (
1407 PASSIVE_ONLY_PERSISTENT
1408 | NO_AUTOFLUSH
1409 | LOAD_AGAINST_COMMITTED
1410 | NO_RAISE
1411 | DEFERRED_HISTORY_LOAD
1412 )
1413 original = self._fire_loader_callables(
1414 state, self.key, loader_passive
1415 )
1416 return History.from_object_attribute(
1417 self, state, current, original=original
1418 )
1419
1420 def get_all_pending(
1421 self,
1422 state: InstanceState[Any],
1423 dict_: _InstanceDict,
1424 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1425 ) -> _AllPendingType:
1426 if self.key in dict_:
1427 current = dict_[self.key]
1428 elif passive & CALLABLES_OK:
1429 current = self.get(state, dict_, passive=passive)
1430 else:
1431 return []
1432
1433 ret: _AllPendingType
1434
1435 # can't use __hash__(), can't use __eq__() here
1436 if (
1437 current is not None
1438 and current is not PASSIVE_NO_RESULT
1439 and current is not NO_VALUE
1440 ):
1441 ret = [(instance_state(current), current)]
1442 else:
1443 ret = [(None, None)]
1444
1445 if self.key in state.committed_state:
1446 original = state.committed_state[self.key]
1447 if (
1448 original is not None
1449 and original is not PASSIVE_NO_RESULT
1450 and original is not NO_VALUE
1451 and original is not current
1452 ):
1453 ret.append((instance_state(original), original))
1454 return ret
1455
1456 def set(
1457 self,
1458 state: InstanceState[Any],
1459 dict_: _InstanceDict,
1460 value: Any,
1461 initiator: Optional[AttributeEventToken] = None,
1462 passive: PassiveFlag = PASSIVE_OFF,
1463 check_old: Any = None,
1464 pop: bool = False,
1465 ) -> None:
1466 """Set a value on the given InstanceState."""
1467
1468 if value is DONT_SET:
1469 return
1470
1471 if self.dispatch._active_history:
1472 old = self.get(
1473 state,
1474 dict_,
1475 passive=PASSIVE_ONLY_PERSISTENT
1476 | NO_AUTOFLUSH
1477 | LOAD_AGAINST_COMMITTED,
1478 )
1479 else:
1480 old = self.get(
1481 state,
1482 dict_,
1483 passive=PASSIVE_NO_FETCH ^ INIT_OK
1484 | LOAD_AGAINST_COMMITTED
1485 | NO_RAISE,
1486 )
1487
1488 if (
1489 check_old is not None
1490 and old is not PASSIVE_NO_RESULT
1491 and check_old is not old
1492 ):
1493 if pop:
1494 return
1495 else:
1496 raise ValueError(
1497 "Object %s not associated with %s on attribute '%s'"
1498 % (instance_str(check_old), state_str(state), self.key)
1499 )
1500
1501 value = self.fire_replace_event(state, dict_, value, old, initiator)
1502 dict_[self.key] = value
1503
1504 def fire_remove_event(
1505 self,
1506 state: InstanceState[Any],
1507 dict_: _InstanceDict,
1508 value: Any,
1509 initiator: Optional[AttributeEventToken],
1510 ) -> None:
1511 if self.trackparent and value not in (
1512 None,
1513 PASSIVE_NO_RESULT,
1514 NO_VALUE,
1515 ):
1516 self.sethasparent(instance_state(value), state, False)
1517
1518 for fn in self.dispatch.remove:
1519 fn(state, value, initiator or self._remove_token)
1520
1521 state._modified_event(dict_, self, value)
1522
1523 def fire_replace_event(
1524 self,
1525 state: InstanceState[Any],
1526 dict_: _InstanceDict,
1527 value: _T,
1528 previous: Any,
1529 initiator: Optional[AttributeEventToken],
1530 ) -> _T:
1531 if self.trackparent:
1532 if previous is not value and previous not in (
1533 None,
1534 PASSIVE_NO_RESULT,
1535 NO_VALUE,
1536 ):
1537 self.sethasparent(instance_state(previous), state, False)
1538
1539 for fn in self.dispatch.set:
1540 value = fn(
1541 state, value, previous, initiator or self._replace_token
1542 )
1543
1544 state._modified_event(dict_, self, previous)
1545
1546 if self.trackparent:
1547 if value is not None:
1548 self.sethasparent(instance_state(value), state, True)
1549
1550 return value
1551
1552
1553class _HasCollectionAdapter:
1554 __slots__ = ()
1555
1556 collection: bool
1557 _is_has_collection_adapter = True
1558
1559 def _dispose_previous_collection(
1560 self,
1561 state: InstanceState[Any],
1562 collection: _AdaptedCollectionProtocol,
1563 adapter: CollectionAdapter,
1564 fire_event: bool,
1565 ) -> None:
1566 raise NotImplementedError()
1567
1568 @overload
1569 def get_collection(
1570 self,
1571 state: InstanceState[Any],
1572 dict_: _InstanceDict,
1573 user_data: Literal[None] = ...,
1574 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
1575 ) -> CollectionAdapter: ...
1576
1577 @overload
1578 def get_collection(
1579 self,
1580 state: InstanceState[Any],
1581 dict_: _InstanceDict,
1582 user_data: _AdaptedCollectionProtocol = ...,
1583 passive: PassiveFlag = ...,
1584 ) -> CollectionAdapter: ...
1585
1586 @overload
1587 def get_collection(
1588 self,
1589 state: InstanceState[Any],
1590 dict_: _InstanceDict,
1591 user_data: Optional[_AdaptedCollectionProtocol] = ...,
1592 passive: PassiveFlag = ...,
1593 ) -> Union[
1594 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1595 ]: ...
1596
1597 def get_collection(
1598 self,
1599 state: InstanceState[Any],
1600 dict_: _InstanceDict,
1601 user_data: Optional[_AdaptedCollectionProtocol] = None,
1602 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1603 ) -> Union[
1604 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
1605 ]:
1606 raise NotImplementedError()
1607
1608 def set(
1609 self,
1610 state: InstanceState[Any],
1611 dict_: _InstanceDict,
1612 value: Any,
1613 initiator: Optional[AttributeEventToken] = None,
1614 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1615 check_old: Any = None,
1616 pop: bool = False,
1617 _adapt: bool = True,
1618 ) -> None:
1619 raise NotImplementedError()
1620
1621
1622if TYPE_CHECKING:
1623
1624 def _is_collection_attribute_impl(
1625 impl: _AttributeImpl,
1626 ) -> TypeGuard[_CollectionAttributeImpl]: ...
1627
1628else:
1629 _is_collection_attribute_impl = operator.attrgetter("collection")
1630
1631
1632class _CollectionAttributeImpl(_HasCollectionAdapter, _AttributeImpl):
1633 """A collection-holding attribute that instruments changes in membership.
1634
1635 Only handles collections of instrumented objects.
1636
1637 InstrumentedCollectionAttribute holds an arbitrary, user-specified
1638 container object (defaulting to a list) and brokers access to the
1639 CollectionAdapter, a "view" onto that object that presents consistent bag
1640 semantics to the orm layer independent of the user data implementation.
1641
1642 """
1643
1644 uses_objects = True
1645 collection = True
1646 default_accepts_scalar_loader = False
1647 supports_population = True
1648 dynamic = False
1649
1650 _bulk_replace_token: AttributeEventToken
1651
1652 __slots__ = (
1653 "copy",
1654 "collection_factory",
1655 "_append_token",
1656 "_remove_token",
1657 "_bulk_replace_token",
1658 "_duck_typed_as",
1659 )
1660
1661 def __init__(
1662 self,
1663 class_,
1664 key,
1665 callable_,
1666 dispatch,
1667 typecallable=None,
1668 trackparent=False,
1669 copy_function=None,
1670 compare_function=None,
1671 **kwargs,
1672 ):
1673 super().__init__(
1674 class_,
1675 key,
1676 callable_,
1677 dispatch,
1678 trackparent=trackparent,
1679 compare_function=compare_function,
1680 **kwargs,
1681 )
1682
1683 if copy_function is None:
1684 copy_function = self.__copy
1685 self.copy = copy_function
1686 self.collection_factory = typecallable
1687 self._append_token = AttributeEventToken(self, OP_APPEND)
1688 self._remove_token = AttributeEventToken(self, OP_REMOVE)
1689 self._bulk_replace_token = AttributeEventToken(self, OP_BULK_REPLACE)
1690 self._duck_typed_as = util.duck_type_collection(
1691 self.collection_factory()
1692 )
1693
1694 if getattr(self.collection_factory, "_sa_linker", None):
1695
1696 @event.listens_for(self, "init_collection")
1697 def link(target, collection, collection_adapter):
1698 collection._sa_linker(collection_adapter)
1699
1700 @event.listens_for(self, "dispose_collection")
1701 def unlink(target, collection, collection_adapter):
1702 collection._sa_linker(None)
1703
1704 def __copy(self, item):
1705 return [y for y in collections.collection_adapter(item)]
1706
1707 def get_history(
1708 self,
1709 state: InstanceState[Any],
1710 dict_: _InstanceDict,
1711 passive: PassiveFlag = PASSIVE_OFF,
1712 ) -> History:
1713 current = self.get(state, dict_, passive=passive)
1714
1715 if current is PASSIVE_NO_RESULT:
1716 if (
1717 passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS
1718 and self.key in state._pending_mutations
1719 ):
1720 pending = state._pending_mutations[self.key]
1721 return pending.merge_with_history(HISTORY_BLANK)
1722 else:
1723 return HISTORY_BLANK
1724 else:
1725 if passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS:
1726 # this collection is loaded / present. should not be any
1727 # pending mutations
1728 assert self.key not in state._pending_mutations
1729
1730 return History.from_collection(self, state, current)
1731
1732 def get_all_pending(
1733 self,
1734 state: InstanceState[Any],
1735 dict_: _InstanceDict,
1736 passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
1737 ) -> _AllPendingType:
1738 # NOTE: passive is ignored here at the moment
1739
1740 if self.key not in dict_:
1741 return []
1742
1743 current = dict_[self.key]
1744 current = getattr(current, "_sa_adapter")
1745
1746 if self.key in state.committed_state:
1747 original = state.committed_state[self.key]
1748 if original is not NO_VALUE:
1749 current_states = [
1750 ((c is not None) and instance_state(c) or None, c)
1751 for c in current
1752 ]
1753 original_states = [
1754 ((c is not None) and instance_state(c) or None, c)
1755 for c in original
1756 ]
1757
1758 current_set = dict(current_states)
1759 original_set = dict(original_states)
1760
1761 return (
1762 [
1763 (s, o)
1764 for s, o in current_states
1765 if s not in original_set
1766 ]
1767 + [(s, o) for s, o in current_states if s in original_set]
1768 + [
1769 (s, o)
1770 for s, o in original_states
1771 if s not in current_set
1772 ]
1773 )
1774
1775 return [(instance_state(o), o) for o in current]
1776
1777 def fire_append_event(
1778 self,
1779 state: InstanceState[Any],
1780 dict_: _InstanceDict,
1781 value: _T,
1782 initiator: Optional[AttributeEventToken],
1783 key: Optional[Any],
1784 ) -> _T:
1785 for fn in self.dispatch.append:
1786 value = fn(state, value, initiator or self._append_token, key=key)
1787
1788 state._modified_event(dict_, self, NO_VALUE, True)
1789
1790 if self.trackparent and value is not None:
1791 self.sethasparent(instance_state(value), state, True)
1792
1793 return value
1794
1795 def fire_append_wo_mutation_event(
1796 self,
1797 state: InstanceState[Any],
1798 dict_: _InstanceDict,
1799 value: _T,
1800 initiator: Optional[AttributeEventToken],
1801 key: Optional[Any],
1802 ) -> _T:
1803 for fn in self.dispatch.append_wo_mutation:
1804 value = fn(state, value, initiator or self._append_token, key=key)
1805
1806 return value
1807
1808 def fire_pre_remove_event(
1809 self,
1810 state: InstanceState[Any],
1811 dict_: _InstanceDict,
1812 initiator: Optional[AttributeEventToken],
1813 key: Optional[Any],
1814 ) -> None:
1815 """A special event used for pop() operations.
1816
1817 The "remove" event needs to have the item to be removed passed to
1818 it, which in the case of pop from a set, we don't have a way to access
1819 the item before the operation. the event is used for all pop()
1820 operations (even though set.pop is the one where it is really needed).
1821
1822 """
1823 state._modified_event(dict_, self, NO_VALUE, True)
1824
1825 def fire_remove_event(
1826 self,
1827 state: InstanceState[Any],
1828 dict_: _InstanceDict,
1829 value: Any,
1830 initiator: Optional[AttributeEventToken],
1831 key: Optional[Any],
1832 ) -> None:
1833 if self.trackparent and value is not None:
1834 self.sethasparent(instance_state(value), state, False)
1835
1836 for fn in self.dispatch.remove:
1837 fn(state, value, initiator or self._remove_token, key=key)
1838
1839 state._modified_event(dict_, self, NO_VALUE, True)
1840
1841 def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
1842 if self.key not in dict_:
1843 return
1844
1845 state._modified_event(dict_, self, NO_VALUE, True)
1846
1847 collection = self.get_collection(state, state.dict)
1848 collection.clear_with_event()
1849
1850 # key is always present because we checked above. e.g.
1851 # del is a no-op if collection not present.
1852 del dict_[self.key]
1853
1854 def _default_value(
1855 self, state: InstanceState[Any], dict_: _InstanceDict
1856 ) -> _AdaptedCollectionProtocol:
1857 """Produce an empty collection for an un-initialized attribute"""
1858
1859 assert self.key not in dict_, (
1860 "_default_value should only be invoked for an "
1861 "uninitialized or expired attribute"
1862 )
1863
1864 if self.key in state._empty_collections:
1865 return state._empty_collections[self.key]
1866
1867 adapter, user_data = self._initialize_collection(state)
1868 adapter._set_empty(user_data)
1869 return user_data
1870
1871 def _initialize_collection(
1872 self, state: InstanceState[Any]
1873 ) -> Tuple[CollectionAdapter, _AdaptedCollectionProtocol]:
1874 adapter, collection = state.manager.initialize_collection(
1875 self.key, state, self.collection_factory
1876 )
1877
1878 self.dispatch.init_collection(state, collection, adapter)
1879
1880 return adapter, collection
1881
1882 def append(
1883 self,
1884 state: InstanceState[Any],
1885 dict_: _InstanceDict,
1886 value: Any,
1887 initiator: Optional[AttributeEventToken],
1888 passive: PassiveFlag = PASSIVE_OFF,
1889 ) -> None:
1890 collection = self.get_collection(
1891 state, dict_, user_data=None, passive=passive
1892 )
1893 if collection is PASSIVE_NO_RESULT:
1894 value = self.fire_append_event(
1895 state, dict_, value, initiator, key=NO_KEY
1896 )
1897 assert (
1898 self.key not in dict_
1899 ), "Collection was loaded during event handling."
1900 state._get_pending_mutation(self.key).append(value)
1901 else:
1902 if TYPE_CHECKING:
1903 assert isinstance(collection, CollectionAdapter)
1904 collection.append_with_event(value, initiator)
1905
1906 def remove(
1907 self,
1908 state: InstanceState[Any],
1909 dict_: _InstanceDict,
1910 value: Any,
1911 initiator: Optional[AttributeEventToken],
1912 passive: PassiveFlag = PASSIVE_OFF,
1913 ) -> None:
1914 collection = self.get_collection(
1915 state, state.dict, user_data=None, passive=passive
1916 )
1917 if collection is PASSIVE_NO_RESULT:
1918 self.fire_remove_event(state, dict_, value, initiator, key=NO_KEY)
1919 assert (
1920 self.key not in dict_
1921 ), "Collection was loaded during event handling."
1922 state._get_pending_mutation(self.key).remove(value)
1923 else:
1924 if TYPE_CHECKING:
1925 assert isinstance(collection, CollectionAdapter)
1926 collection.remove_with_event(value, initiator)
1927
1928 def pop(
1929 self,
1930 state: InstanceState[Any],
1931 dict_: _InstanceDict,
1932 value: Any,
1933 initiator: Optional[AttributeEventToken],
1934 passive: PassiveFlag = PASSIVE_OFF,
1935 ) -> None:
1936 try:
1937 # TODO: better solution here would be to add
1938 # a "popper" role to collections.py to complement
1939 # "remover".
1940 self.remove(state, dict_, value, initiator, passive=passive)
1941 except (ValueError, KeyError, IndexError):
1942 pass
1943
1944 def set(
1945 self,
1946 state: InstanceState[Any],
1947 dict_: _InstanceDict,
1948 value: Any,
1949 initiator: Optional[AttributeEventToken] = None,
1950 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
1951 check_old: Any = None,
1952 pop: bool = False,
1953 _adapt: bool = True,
1954 ) -> None:
1955
1956 if value is DONT_SET:
1957 return
1958
1959 iterable = orig_iterable = value
1960 new_keys = None
1961
1962 # pulling a new collection first so that an adaptation exception does
1963 # not trigger a lazy load of the old collection.
1964 new_collection, user_data = self._initialize_collection(state)
1965 if _adapt:
1966 setting_type = util.duck_type_collection(iterable)
1967 receiving_type = self._duck_typed_as
1968
1969 if setting_type is not receiving_type:
1970 given = (
1971 "None" if iterable is None else iterable.__class__.__name__
1972 )
1973 wanted = (
1974 "None"
1975 if self._duck_typed_as is None
1976 else self._duck_typed_as.__name__
1977 )
1978 raise TypeError(
1979 "Incompatible collection type: %s is not %s-like"
1980 % (given, wanted)
1981 )
1982
1983 # If the object is an adapted collection, return the (iterable)
1984 # adapter.
1985 if hasattr(iterable, "_sa_iterator"):
1986 iterable = iterable._sa_iterator()
1987 elif setting_type is dict:
1988 new_keys = list(iterable)
1989 iterable = iterable.values()
1990 else:
1991 iterable = iter(iterable)
1992 elif util.duck_type_collection(iterable) is dict:
1993 new_keys = list(value)
1994
1995 new_values = list(iterable)
1996
1997 evt = self._bulk_replace_token
1998
1999 self.dispatch.bulk_replace(state, new_values, evt, keys=new_keys)
2000
2001 # propagate NO_RAISE in passive through to the get() for the
2002 # existing object (ticket #8862)
2003 old = self.get(
2004 state,
2005 dict_,
2006 passive=PASSIVE_ONLY_PERSISTENT ^ (passive & PassiveFlag.NO_RAISE),
2007 )
2008 if old is PASSIVE_NO_RESULT:
2009 old = self._default_value(state, dict_)
2010 elif old is orig_iterable:
2011 # ignore re-assignment of the current collection, as happens
2012 # implicitly with in-place operators (foo.collection |= other)
2013 return
2014
2015 # place a copy of "old" in state.committed_state
2016 state._modified_event(dict_, self, old, True)
2017
2018 old_collection = old._sa_adapter
2019
2020 dict_[self.key] = user_data
2021
2022 collections.bulk_replace(
2023 new_values, old_collection, new_collection, initiator=evt
2024 )
2025
2026 self._dispose_previous_collection(state, old, old_collection, True)
2027
2028 def _dispose_previous_collection(
2029 self,
2030 state: InstanceState[Any],
2031 collection: _AdaptedCollectionProtocol,
2032 adapter: CollectionAdapter,
2033 fire_event: bool,
2034 ) -> None:
2035 del collection._sa_adapter
2036
2037 # discarding old collection make sure it is not referenced in empty
2038 # collections.
2039 state._empty_collections.pop(self.key, None)
2040 if fire_event:
2041 self.dispatch.dispose_collection(state, collection, adapter)
2042
2043 def _invalidate_collection(
2044 self, collection: _AdaptedCollectionProtocol
2045 ) -> None:
2046 adapter = getattr(collection, "_sa_adapter")
2047 adapter.invalidated = True
2048
2049 def set_committed_value(
2050 self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
2051 ) -> _AdaptedCollectionProtocol:
2052 """Set an attribute value on the given instance and 'commit' it."""
2053
2054 collection, user_data = self._initialize_collection(state)
2055
2056 if value:
2057 collection.append_multiple_without_event(value)
2058
2059 state.dict[self.key] = user_data
2060
2061 state._commit(dict_, [self.key])
2062
2063 if self.key in state._pending_mutations:
2064 # pending items exist. issue a modified event,
2065 # add/remove new items.
2066 state._modified_event(dict_, self, user_data, True)
2067
2068 pending = state._pending_mutations.pop(self.key)
2069 added = pending.added_items
2070 removed = pending.deleted_items
2071 for item in added:
2072 collection.append_without_event(item)
2073 for item in removed:
2074 collection.remove_without_event(item)
2075
2076 return user_data
2077
2078 @overload
2079 def get_collection(
2080 self,
2081 state: InstanceState[Any],
2082 dict_: _InstanceDict,
2083 user_data: Literal[None] = ...,
2084 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
2085 ) -> CollectionAdapter: ...
2086
2087 @overload
2088 def get_collection(
2089 self,
2090 state: InstanceState[Any],
2091 dict_: _InstanceDict,
2092 user_data: _AdaptedCollectionProtocol = ...,
2093 passive: PassiveFlag = ...,
2094 ) -> CollectionAdapter: ...
2095
2096 @overload
2097 def get_collection(
2098 self,
2099 state: InstanceState[Any],
2100 dict_: _InstanceDict,
2101 user_data: Optional[_AdaptedCollectionProtocol] = ...,
2102 passive: PassiveFlag = PASSIVE_OFF,
2103 ) -> Union[
2104 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2105 ]: ...
2106
2107 def get_collection(
2108 self,
2109 state: InstanceState[Any],
2110 dict_: _InstanceDict,
2111 user_data: Optional[_AdaptedCollectionProtocol] = None,
2112 passive: PassiveFlag = PASSIVE_OFF,
2113 ) -> Union[
2114 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
2115 ]:
2116 """Retrieve the CollectionAdapter associated with the given state.
2117
2118 if user_data is None, retrieves it from the state using normal
2119 "get()" rules, which will fire lazy callables or return the "empty"
2120 collection value.
2121
2122 """
2123 if user_data is None:
2124 fetch_user_data = self.get(state, dict_, passive=passive)
2125 if fetch_user_data is LoaderCallableStatus.PASSIVE_NO_RESULT:
2126 return fetch_user_data
2127 else:
2128 user_data = cast("_AdaptedCollectionProtocol", fetch_user_data)
2129
2130 return user_data._sa_adapter
2131
2132
2133def _backref_listeners(
2134 attribute: QueryableAttribute[Any], key: str, uselist: bool
2135) -> None:
2136 """Apply listeners to synchronize a two-way relationship."""
2137
2138 # use easily recognizable names for stack traces.
2139
2140 # in the sections marked "tokens to test for a recursive loop",
2141 # this is somewhat brittle and very performance-sensitive logic
2142 # that is specific to how we might arrive at each event. a marker
2143 # that can target us directly to arguments being invoked against
2144 # the impl might be simpler, but could interfere with other systems.
2145
2146 parent_token = attribute.impl.parent_token
2147 parent_impl = attribute.impl
2148
2149 def _acceptable_key_err(child_state, initiator, child_impl):
2150 raise ValueError(
2151 "Bidirectional attribute conflict detected: "
2152 'Passing object %s to attribute "%s" '
2153 'triggers a modify event on attribute "%s" '
2154 'via the backref "%s".'
2155 % (
2156 state_str(child_state),
2157 initiator.parent_token,
2158 child_impl.parent_token,
2159 attribute.impl.parent_token,
2160 )
2161 )
2162
2163 def emit_backref_from_scalar_set_event(
2164 state, child, oldchild, initiator, **kw
2165 ):
2166 if oldchild is child:
2167 return child
2168 if (
2169 oldchild is not None
2170 and oldchild is not PASSIVE_NO_RESULT
2171 and oldchild is not NO_VALUE
2172 ):
2173 # With lazy=None, there's no guarantee that the full collection is
2174 # present when updating via a backref.
2175 old_state, old_dict = (
2176 instance_state(oldchild),
2177 instance_dict(oldchild),
2178 )
2179 impl = old_state.manager[key].impl
2180
2181 # tokens to test for a recursive loop.
2182 if not impl.collection and not impl.dynamic:
2183 check_recursive_token = impl._replace_token
2184 else:
2185 check_recursive_token = impl._remove_token
2186
2187 if initiator is not check_recursive_token:
2188 impl.pop(
2189 old_state,
2190 old_dict,
2191 state.obj(),
2192 parent_impl._append_token,
2193 passive=PASSIVE_NO_FETCH,
2194 )
2195
2196 if child is not None:
2197 child_state, child_dict = (
2198 instance_state(child),
2199 instance_dict(child),
2200 )
2201 child_impl = child_state.manager[key].impl
2202
2203 if (
2204 initiator.parent_token is not parent_token
2205 and initiator.parent_token is not child_impl.parent_token
2206 ):
2207 _acceptable_key_err(state, initiator, child_impl)
2208
2209 # tokens to test for a recursive loop.
2210 check_append_token = child_impl._append_token
2211 check_bulk_replace_token = (
2212 child_impl._bulk_replace_token
2213 if _is_collection_attribute_impl(child_impl)
2214 else None
2215 )
2216
2217 if (
2218 initiator is not check_append_token
2219 and initiator is not check_bulk_replace_token
2220 ):
2221 child_impl.append(
2222 child_state,
2223 child_dict,
2224 state.obj(),
2225 initiator,
2226 passive=PASSIVE_NO_FETCH,
2227 )
2228 return child
2229
2230 def emit_backref_from_collection_append_event(
2231 state, child, initiator, **kw
2232 ):
2233 if child is None:
2234 return
2235
2236 child_state, child_dict = instance_state(child), instance_dict(child)
2237 child_impl = child_state.manager[key].impl
2238
2239 if (
2240 initiator.parent_token is not parent_token
2241 and initiator.parent_token is not child_impl.parent_token
2242 ):
2243 _acceptable_key_err(state, initiator, child_impl)
2244
2245 # tokens to test for a recursive loop.
2246 check_append_token = child_impl._append_token
2247 check_bulk_replace_token = (
2248 child_impl._bulk_replace_token
2249 if _is_collection_attribute_impl(child_impl)
2250 else None
2251 )
2252
2253 if (
2254 initiator is not check_append_token
2255 and initiator is not check_bulk_replace_token
2256 ):
2257 child_impl.append(
2258 child_state,
2259 child_dict,
2260 state.obj(),
2261 initiator,
2262 passive=PASSIVE_NO_FETCH,
2263 )
2264 return child
2265
2266 def emit_backref_from_collection_remove_event(
2267 state, child, initiator, **kw
2268 ):
2269 if (
2270 child is not None
2271 and child is not PASSIVE_NO_RESULT
2272 and child is not NO_VALUE
2273 ):
2274 child_state, child_dict = (
2275 instance_state(child),
2276 instance_dict(child),
2277 )
2278 child_impl = child_state.manager[key].impl
2279
2280 check_replace_token: Optional[AttributeEventToken]
2281
2282 # tokens to test for a recursive loop.
2283 if not child_impl.collection and not child_impl.dynamic:
2284 check_remove_token = child_impl._remove_token
2285 check_replace_token = child_impl._replace_token
2286 check_for_dupes_on_remove = uselist and not parent_impl.dynamic
2287 else:
2288 check_remove_token = child_impl._remove_token
2289 check_replace_token = (
2290 child_impl._bulk_replace_token
2291 if _is_collection_attribute_impl(child_impl)
2292 else None
2293 )
2294 check_for_dupes_on_remove = False
2295
2296 if (
2297 initiator is not check_remove_token
2298 and initiator is not check_replace_token
2299 ):
2300 if not check_for_dupes_on_remove or not util.has_dupes(
2301 # when this event is called, the item is usually
2302 # present in the list, except for a pop() operation.
2303 state.dict[parent_impl.key],
2304 child,
2305 ):
2306 child_impl.pop(
2307 child_state,
2308 child_dict,
2309 state.obj(),
2310 initiator,
2311 passive=PASSIVE_NO_FETCH,
2312 )
2313
2314 if uselist:
2315 event.listen(
2316 attribute,
2317 "append",
2318 emit_backref_from_collection_append_event,
2319 retval=True,
2320 raw=True,
2321 include_key=True,
2322 )
2323 else:
2324 event.listen(
2325 attribute,
2326 "set",
2327 emit_backref_from_scalar_set_event,
2328 retval=True,
2329 raw=True,
2330 include_key=True,
2331 )
2332 # TODO: need coverage in test/orm/ of remove event
2333 event.listen(
2334 attribute,
2335 "remove",
2336 emit_backref_from_collection_remove_event,
2337 retval=True,
2338 raw=True,
2339 include_key=True,
2340 )
2341
2342
2343_NO_HISTORY = util.symbol("NO_HISTORY")
2344_NO_STATE_SYMBOLS = frozenset([id(PASSIVE_NO_RESULT), id(NO_VALUE)])
2345
2346
2347class History(NamedTuple):
2348 """A 3-tuple of added, unchanged and deleted values,
2349 representing the changes which have occurred on an instrumented
2350 attribute.
2351
2352 The easiest way to get a :class:`.History` object for a particular
2353 attribute on an object is to use the :func:`_sa.inspect` function::
2354
2355 from sqlalchemy import inspect
2356
2357 hist = inspect(myobject).attrs.myattribute.history
2358
2359 Each tuple member is an iterable sequence:
2360
2361 * ``added`` - the collection of items added to the attribute (the first
2362 tuple element).
2363
2364 * ``unchanged`` - the collection of items that have not changed on the
2365 attribute (the second tuple element).
2366
2367 * ``deleted`` - the collection of items that have been removed from the
2368 attribute (the third tuple element).
2369
2370 """
2371
2372 added: Union[Tuple[()], List[Any]]
2373 unchanged: Union[Tuple[()], List[Any]]
2374 deleted: Union[Tuple[()], List[Any]]
2375
2376 def __bool__(self) -> bool:
2377 return self != HISTORY_BLANK
2378
2379 def empty(self) -> bool:
2380 """Return True if this :class:`.History` has no changes
2381 and no existing, unchanged state.
2382
2383 """
2384
2385 return not bool((self.added or self.deleted) or self.unchanged)
2386
2387 def sum(self) -> Sequence[Any]:
2388 """Return a collection of added + unchanged + deleted."""
2389
2390 return (
2391 (self.added or []) + (self.unchanged or []) + (self.deleted or [])
2392 )
2393
2394 def non_deleted(self) -> Sequence[Any]:
2395 """Return a collection of added + unchanged."""
2396
2397 return (self.added or []) + (self.unchanged or [])
2398
2399 def non_added(self) -> Sequence[Any]:
2400 """Return a collection of unchanged + deleted."""
2401
2402 return (self.unchanged or []) + (self.deleted or [])
2403
2404 def has_changes(self) -> bool:
2405 """Return True if this :class:`.History` has changes."""
2406
2407 return bool(self.added or self.deleted)
2408
2409 def _merge(self, added: Iterable[Any], deleted: Iterable[Any]) -> History:
2410 return History(
2411 list(self.added) + list(added),
2412 self.unchanged,
2413 list(self.deleted) + list(deleted),
2414 )
2415
2416 def as_state(self) -> History:
2417 return History(
2418 [
2419 (c is not None) and instance_state(c) or None
2420 for c in self.added
2421 ],
2422 [
2423 (c is not None) and instance_state(c) or None
2424 for c in self.unchanged
2425 ],
2426 [
2427 (c is not None) and instance_state(c) or None
2428 for c in self.deleted
2429 ],
2430 )
2431
2432 @classmethod
2433 def from_scalar_attribute(
2434 cls,
2435 attribute: _ScalarAttributeImpl,
2436 state: InstanceState[Any],
2437 current: Any,
2438 ) -> History:
2439 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2440
2441 deleted: Union[Tuple[()], List[Any]]
2442
2443 if original is _NO_HISTORY:
2444 if current is NO_VALUE:
2445 return cls((), (), ())
2446 else:
2447 return cls((), [current], ())
2448 # don't let ClauseElement expressions here trip things up
2449 elif (
2450 current is not NO_VALUE
2451 and attribute.is_equal(current, original) is True
2452 ):
2453 return cls((), [current], ())
2454 else:
2455 # current convention on native scalars is to not
2456 # include information
2457 # about missing previous value in "deleted", but
2458 # we do include None, which helps in some primary
2459 # key situations
2460 if id(original) in _NO_STATE_SYMBOLS:
2461 deleted = ()
2462 # indicate a "del" operation occurred when we don't have
2463 # the previous value as: ([None], (), ())
2464 if id(current) in _NO_STATE_SYMBOLS:
2465 current = None
2466 else:
2467 deleted = [original]
2468 if current is NO_VALUE:
2469 return cls((), (), deleted)
2470 else:
2471 return cls([current], (), deleted)
2472
2473 @classmethod
2474 def from_object_attribute(
2475 cls,
2476 attribute: _ScalarObjectAttributeImpl,
2477 state: InstanceState[Any],
2478 current: Any,
2479 original: Any = _NO_HISTORY,
2480 ) -> History:
2481 deleted: Union[Tuple[()], List[Any]]
2482
2483 if original is _NO_HISTORY:
2484 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2485
2486 if original is _NO_HISTORY:
2487 if current is NO_VALUE:
2488 return cls((), (), ())
2489 else:
2490 return cls((), [current], ())
2491 elif current is original and current is not NO_VALUE:
2492 return cls((), [current], ())
2493 else:
2494 # current convention on related objects is to not
2495 # include information
2496 # about missing previous value in "deleted", and
2497 # to also not include None - the dependency.py rules
2498 # ignore the None in any case.
2499 if id(original) in _NO_STATE_SYMBOLS or original is None:
2500 deleted = ()
2501 # indicate a "del" operation occurred when we don't have
2502 # the previous value as: ([None], (), ())
2503 if id(current) in _NO_STATE_SYMBOLS:
2504 current = None
2505 else:
2506 deleted = [original]
2507 if current is NO_VALUE:
2508 return cls((), (), deleted)
2509 else:
2510 return cls([current], (), deleted)
2511
2512 @classmethod
2513 def from_collection(
2514 cls,
2515 attribute: _CollectionAttributeImpl,
2516 state: InstanceState[Any],
2517 current: Any,
2518 ) -> History:
2519 original = state.committed_state.get(attribute.key, _NO_HISTORY)
2520 if current is NO_VALUE:
2521 return cls((), (), ())
2522
2523 current = getattr(current, "_sa_adapter")
2524 if original is NO_VALUE:
2525 return cls(list(current), (), ())
2526 elif original is _NO_HISTORY:
2527 return cls((), list(current), ())
2528 else:
2529 current_states = [
2530 ((c is not None) and instance_state(c) or None, c)
2531 for c in current
2532 ]
2533 original_states = [
2534 ((c is not None) and instance_state(c) or None, c)
2535 for c in original
2536 ]
2537
2538 current_set = dict(current_states)
2539 original_set = dict(original_states)
2540
2541 return cls(
2542 [o for s, o in current_states if s not in original_set],
2543 [o for s, o in current_states if s in original_set],
2544 [o for s, o in original_states if s not in current_set],
2545 )
2546
2547
2548HISTORY_BLANK = History((), (), ())
2549
2550
2551def get_history(
2552 obj: object, key: str, passive: PassiveFlag = PASSIVE_OFF
2553) -> History:
2554 """Return a :class:`.History` record for the given object
2555 and attribute key.
2556
2557 This is the **pre-flush** history for a given attribute, which is
2558 reset each time the :class:`.Session` flushes changes to the
2559 current database transaction.
2560
2561 .. note::
2562
2563 Prefer to use the :attr:`.AttributeState.history` and
2564 :meth:`.AttributeState.load_history` accessors to retrieve the
2565 :class:`.History` for instance attributes.
2566
2567
2568 :param obj: an object whose class is instrumented by the
2569 attributes package.
2570
2571 :param key: string attribute name.
2572
2573 :param passive: indicates loading behavior for the attribute
2574 if the value is not already present. This is a
2575 bitflag attribute, which defaults to the symbol
2576 :attr:`.PASSIVE_OFF` indicating all necessary SQL
2577 should be emitted.
2578
2579 .. seealso::
2580
2581 :attr:`.AttributeState.history`
2582
2583 :meth:`.AttributeState.load_history` - retrieve history
2584 using loader callables if the value is not locally present.
2585
2586 """
2587
2588 return get_state_history(instance_state(obj), key, passive)
2589
2590
2591def get_state_history(
2592 state: InstanceState[Any], key: str, passive: PassiveFlag = PASSIVE_OFF
2593) -> History:
2594 return state.get_history(key, passive)
2595
2596
2597def has_parent(
2598 cls: Type[_O], obj: _O, key: str, optimistic: bool = False
2599) -> bool:
2600 """TODO"""
2601 manager = manager_of_class(cls)
2602 state = instance_state(obj)
2603 return manager.has_parent(state, key, optimistic)
2604
2605
2606def _register_attribute(
2607 class_: Type[_O],
2608 key: str,
2609 *,
2610 comparator: interfaces.PropComparator[_T],
2611 parententity: _InternalEntityType[_O],
2612 doc: Optional[str] = None,
2613 **kw: Any,
2614) -> InstrumentedAttribute[_T]:
2615 desc = _register_descriptor(
2616 class_, key, comparator=comparator, parententity=parententity, doc=doc
2617 )
2618 _register_attribute_impl(class_, key, **kw)
2619 return desc
2620
2621
2622def _register_attribute_impl(
2623 class_: Type[_O],
2624 key: str,
2625 uselist: bool = False,
2626 callable_: Optional[_LoaderCallable] = None,
2627 useobject: bool = False,
2628 impl_class: Optional[Type[_AttributeImpl]] = None,
2629 backref: Optional[str] = None,
2630 **kw: Any,
2631) -> QueryableAttribute[Any]:
2632 manager = manager_of_class(class_)
2633 if uselist:
2634 factory = kw.pop("typecallable", None)
2635 typecallable = manager.instrument_collection_class(
2636 key, factory or list
2637 )
2638 else:
2639 typecallable = kw.pop("typecallable", None)
2640
2641 dispatch = cast(
2642 "_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
2643 ) # noqa: E501
2644
2645 impl: _AttributeImpl
2646
2647 if impl_class:
2648 # TODO: this appears to be the WriteOnlyAttributeImpl /
2649 # DynamicAttributeImpl constructor which is hardcoded
2650 impl = cast("Type[_WriteOnlyAttributeImpl]", impl_class)(
2651 class_, key, dispatch, **kw
2652 )
2653 elif uselist:
2654 impl = _CollectionAttributeImpl(
2655 class_, key, callable_, dispatch, typecallable=typecallable, **kw
2656 )
2657 elif useobject:
2658 impl = _ScalarObjectAttributeImpl(
2659 class_, key, callable_, dispatch, **kw
2660 )
2661 else:
2662 impl = _ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
2663
2664 manager[key].impl = impl
2665
2666 if backref:
2667 _backref_listeners(manager[key], backref, uselist)
2668
2669 manager.post_configure_attribute(key)
2670 return manager[key]
2671
2672
2673def _register_descriptor(
2674 class_: Type[Any],
2675 key: str,
2676 *,
2677 comparator: interfaces.PropComparator[_T],
2678 parententity: _InternalEntityType[Any],
2679 doc: Optional[str] = None,
2680) -> InstrumentedAttribute[_T]:
2681 manager = manager_of_class(class_)
2682
2683 descriptor = InstrumentedAttribute(
2684 class_, key, comparator=comparator, parententity=parententity
2685 )
2686
2687 descriptor.__doc__ = doc # type: ignore
2688
2689 manager.instrument_attribute(key, descriptor)
2690 return descriptor
2691
2692
2693def _unregister_attribute(class_: Type[Any], key: str) -> None:
2694 manager_of_class(class_).uninstrument_attribute(key)
2695
2696
2697def init_collection(obj: object, key: str) -> CollectionAdapter:
2698 """Initialize a collection attribute and return the collection adapter.
2699
2700 This function is used to provide direct access to collection internals
2701 for a previously unloaded attribute. e.g.::
2702
2703 collection_adapter = init_collection(someobject, "elements")
2704 for elem in values:
2705 collection_adapter.append_without_event(elem)
2706
2707 For an easier way to do the above, see
2708 :func:`~sqlalchemy.orm.attributes.set_committed_value`.
2709
2710 :param obj: a mapped object
2711
2712 :param key: string attribute name where the collection is located.
2713
2714 """
2715 state = instance_state(obj)
2716 dict_ = state.dict
2717 return init_state_collection(state, dict_, key)
2718
2719
2720def init_state_collection(
2721 state: InstanceState[Any], dict_: _InstanceDict, key: str
2722) -> CollectionAdapter:
2723 """Initialize a collection attribute and return the collection adapter.
2724
2725 Discards any existing collection which may be there.
2726
2727 """
2728 attr = state.manager[key].impl
2729
2730 if TYPE_CHECKING:
2731 assert isinstance(attr, _HasCollectionAdapter)
2732
2733 old = dict_.pop(key, None) # discard old collection
2734 if old is not None:
2735 old_collection = old._sa_adapter
2736 attr._dispose_previous_collection(state, old, old_collection, False)
2737
2738 user_data = attr._default_value(state, dict_)
2739 adapter: CollectionAdapter = attr.get_collection(
2740 state, dict_, user_data, passive=PassiveFlag.PASSIVE_NO_FETCH
2741 )
2742 adapter._reset_empty()
2743
2744 return adapter
2745
2746
2747def set_committed_value(instance: object, key: str, value: Any) -> None:
2748 """Set the value of an attribute with no history events.
2749
2750 Cancels any previous history present. The value should be
2751 a scalar value for scalar-holding attributes, or
2752 an iterable for any collection-holding attribute.
2753
2754 This is the same underlying method used when a lazy loader
2755 fires off and loads additional data from the database.
2756 In particular, this method can be used by application code
2757 which has loaded additional attributes or collections through
2758 separate queries, which can then be attached to an instance
2759 as though it were part of its original loaded state.
2760
2761 """
2762 state, dict_ = instance_state(instance), instance_dict(instance)
2763 state.manager[key].impl.set_committed_value(state, dict_, value)
2764
2765
2766def set_attribute(
2767 instance: object,
2768 key: str,
2769 value: Any,
2770 initiator: Optional[AttributeEventToken] = None,
2771) -> None:
2772 """Set the value of an attribute, firing history events.
2773
2774 This function may be used regardless of instrumentation
2775 applied directly to the class, i.e. no descriptors are required.
2776 Custom attribute management schemes will need to make usage
2777 of this method to establish attribute state as understood
2778 by SQLAlchemy.
2779
2780 :param instance: the object that will be modified
2781
2782 :param key: string name of the attribute
2783
2784 :param value: value to assign
2785
2786 :param initiator: an instance of :class:`.Event` that would have
2787 been propagated from a previous event listener. This argument
2788 is used when the :func:`.set_attribute` function is being used within
2789 an existing event listening function where an :class:`.Event` object
2790 is being supplied; the object may be used to track the origin of the
2791 chain of events.
2792
2793 """
2794 state, dict_ = instance_state(instance), instance_dict(instance)
2795 state.manager[key].impl.set(state, dict_, value, initiator)
2796
2797
2798def get_attribute(instance: object, key: str) -> Any:
2799 """Get the value of an attribute, firing any callables required.
2800
2801 This function may be used regardless of instrumentation
2802 applied directly to the class, i.e. no descriptors are required.
2803 Custom attribute management schemes will need to make usage
2804 of this method to make usage of attribute state as understood
2805 by SQLAlchemy.
2806
2807 """
2808 state, dict_ = instance_state(instance), instance_dict(instance)
2809 return state.manager[key].impl.get(state, dict_)
2810
2811
2812def del_attribute(instance: object, key: str) -> None:
2813 """Delete the value of an attribute, firing history events.
2814
2815 This function may be used regardless of instrumentation
2816 applied directly to the class, i.e. no descriptors are required.
2817 Custom attribute management schemes will need to make usage
2818 of this method to establish attribute state as understood
2819 by SQLAlchemy.
2820
2821 """
2822 state, dict_ = instance_state(instance), instance_dict(instance)
2823 state.manager[key].impl.delete(state, dict_)
2824
2825
2826def flag_modified(instance: object, key: str) -> None:
2827 """Mark an attribute on an instance as 'modified'.
2828
2829 This sets the 'modified' flag on the instance and
2830 establishes an unconditional change event for the given attribute.
2831 The attribute must have a value present, else an
2832 :class:`.InvalidRequestError` is raised.
2833
2834 To mark an object "dirty" without referring to any specific attribute
2835 so that it is considered within a flush, use the
2836 :func:`.attributes.flag_dirty` call.
2837
2838 .. seealso::
2839
2840 :func:`.attributes.flag_dirty`
2841
2842 """
2843 state, dict_ = instance_state(instance), instance_dict(instance)
2844 impl = state.manager[key].impl
2845 impl.dispatch.modified(state, impl._modified_token)
2846 state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
2847
2848
2849def flag_dirty(instance: object) -> None:
2850 """Mark an instance as 'dirty' without any specific attribute mentioned.
2851
2852 This is a special operation that will allow the object to travel through
2853 the flush process for interception by events such as
2854 :meth:`.SessionEvents.before_flush`. Note that no SQL will be emitted in
2855 the flush process for an object that has no changes, even if marked dirty
2856 via this method. However, a :meth:`.SessionEvents.before_flush` handler
2857 will be able to see the object in the :attr:`.Session.dirty` collection and
2858 may establish changes on it, which will then be included in the SQL
2859 emitted.
2860
2861 .. seealso::
2862
2863 :func:`.attributes.flag_modified`
2864
2865 """
2866
2867 state, dict_ = instance_state(instance), instance_dict(instance)
2868 state._modified_event(dict_, None, NO_VALUE, is_userland=True)