1# orm/instrumentation.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Defines SQLAlchemy's system of class instrumentation.
10
11This module is usually not directly visible to user applications, but
12defines a large part of the ORM's interactivity.
13
14instrumentation.py deals with registration of end-user classes
15for state tracking. It interacts closely with state.py
16and attributes.py which establish per-instance and per-class-attribute
17instrumentation, respectively.
18
19The class instrumentation system can be customized on a per-class
20or global basis using the :mod:`sqlalchemy.ext.instrumentation`
21module, which provides the means to build and specify
22alternate instrumentation forms.
23
24.. versionchanged: 0.8
25 The instrumentation extension system was moved out of the
26 ORM and into the external :mod:`sqlalchemy.ext.instrumentation`
27 package. When that package is imported, it installs
28 itself within sqlalchemy.orm so that its more comprehensive
29 resolution mechanics take effect.
30
31"""
32
33from __future__ import annotations
34
35from typing import Any
36from typing import Callable
37from typing import cast
38from typing import Collection
39from typing import Dict
40from typing import Generic
41from typing import Iterable
42from typing import List
43from typing import Optional
44from typing import Set
45from typing import Tuple
46from typing import Type
47from typing import TYPE_CHECKING
48from typing import TypeVar
49from typing import Union
50import weakref
51
52from . import base
53from . import collections
54from . import exc
55from . import interfaces
56from . import state
57from ._typing import _O
58from .attributes import _is_collection_attribute_impl
59from .. import util
60from ..event import EventTarget
61from ..util import HasMemoized
62from ..util.typing import Literal
63from ..util.typing import Protocol
64
65if TYPE_CHECKING:
66 from ._typing import _RegistryType
67 from .attributes import AttributeImpl
68 from .attributes import QueryableAttribute
69 from .collections import _AdaptedCollectionProtocol
70 from .collections import _CollectionFactoryType
71 from .decl_base import _MapperConfig
72 from .events import InstanceEvents
73 from .mapper import Mapper
74 from .state import InstanceState
75 from ..event import dispatcher
76
77_T = TypeVar("_T", bound=Any)
78DEL_ATTR = util.symbol("DEL_ATTR")
79
80
81class _ExpiredAttributeLoaderProto(Protocol):
82 def __call__(
83 self,
84 state: state.InstanceState[Any],
85 toload: Set[str],
86 passive: base.PassiveFlag,
87 ) -> None: ...
88
89
90class _ManagerFactory(Protocol):
91 def __call__(self, class_: Type[_O]) -> ClassManager[_O]: ...
92
93
94class ClassManager(
95 HasMemoized,
96 Dict[str, "QueryableAttribute[Any]"],
97 Generic[_O],
98 EventTarget,
99):
100 """Tracks state information at the class level."""
101
102 dispatch: dispatcher[ClassManager[_O]]
103
104 MANAGER_ATTR = base.DEFAULT_MANAGER_ATTR
105 STATE_ATTR = base.DEFAULT_STATE_ATTR
106
107 _state_setter = staticmethod(util.attrsetter(STATE_ATTR))
108
109 expired_attribute_loader: _ExpiredAttributeLoaderProto
110 "previously known as deferred_scalar_loader"
111
112 init_method: Optional[Callable[..., None]]
113 original_init: Optional[Callable[..., None]] = None
114
115 factory: Optional[_ManagerFactory]
116
117 declarative_scan: Optional[weakref.ref[_MapperConfig]] = None
118
119 registry: _RegistryType
120
121 if not TYPE_CHECKING:
122 # starts as None during setup
123 registry = None
124
125 class_: Type[_O]
126
127 _bases: List[ClassManager[Any]]
128
129 @property
130 @util.deprecated(
131 "1.4",
132 message="The ClassManager.deferred_scalar_loader attribute is now "
133 "named expired_attribute_loader",
134 )
135 def deferred_scalar_loader(self):
136 return self.expired_attribute_loader
137
138 @deferred_scalar_loader.setter
139 @util.deprecated(
140 "1.4",
141 message="The ClassManager.deferred_scalar_loader attribute is now "
142 "named expired_attribute_loader",
143 )
144 def deferred_scalar_loader(self, obj):
145 self.expired_attribute_loader = obj
146
147 def __init__(self, class_):
148 self.class_ = class_
149 self.info = {}
150 self.new_init = None
151 self.local_attrs = {}
152 self.originals = {}
153 self._finalized = False
154 self.factory = None
155 self.init_method = None
156
157 self._bases = [
158 mgr
159 for mgr in cast(
160 "List[Optional[ClassManager[Any]]]",
161 [
162 opt_manager_of_class(base)
163 for base in self.class_.__bases__
164 if isinstance(base, type)
165 ],
166 )
167 if mgr is not None
168 ]
169
170 for base_ in self._bases:
171 self.update(base_)
172
173 cast(
174 "InstanceEvents", self.dispatch._events
175 )._new_classmanager_instance(class_, self)
176
177 for basecls in class_.__mro__:
178 mgr = opt_manager_of_class(basecls)
179 if mgr is not None:
180 self.dispatch._update(mgr.dispatch)
181
182 self.manage()
183
184 if "__del__" in class_.__dict__:
185 util.warn(
186 "__del__() method on class %s will "
187 "cause unreachable cycles and memory leaks, "
188 "as SQLAlchemy instrumentation often creates "
189 "reference cycles. Please remove this method." % class_
190 )
191
192 def _update_state(
193 self,
194 finalize: bool = False,
195 mapper: Optional[Mapper[_O]] = None,
196 registry: Optional[_RegistryType] = None,
197 declarative_scan: Optional[_MapperConfig] = None,
198 expired_attribute_loader: Optional[
199 _ExpiredAttributeLoaderProto
200 ] = None,
201 init_method: Optional[Callable[..., None]] = None,
202 ) -> None:
203 if mapper:
204 self.mapper = mapper #
205 if registry:
206 registry._add_manager(self)
207 if declarative_scan:
208 self.declarative_scan = weakref.ref(declarative_scan)
209 if expired_attribute_loader:
210 self.expired_attribute_loader = expired_attribute_loader
211
212 if init_method:
213 assert not self._finalized, (
214 "class is already instrumented, "
215 "init_method %s can't be applied" % init_method
216 )
217 self.init_method = init_method
218
219 if not self._finalized:
220 self.original_init = (
221 self.init_method
222 if self.init_method is not None
223 and self.class_.__init__ is object.__init__
224 else self.class_.__init__
225 )
226
227 if finalize and not self._finalized:
228 self._finalize()
229
230 def _finalize(self) -> None:
231 if self._finalized:
232 return
233 self._finalized = True
234
235 self._instrument_init()
236
237 _instrumentation_factory.dispatch.class_instrument(self.class_)
238
239 def __hash__(self) -> int: # type: ignore[override]
240 return id(self)
241
242 def __eq__(self, other: Any) -> bool:
243 return other is self
244
245 @property
246 def is_mapped(self) -> bool:
247 return "mapper" in self.__dict__
248
249 @HasMemoized.memoized_attribute
250 def _all_key_set(self):
251 return frozenset(self)
252
253 @HasMemoized.memoized_attribute
254 def _collection_impl_keys(self):
255 return frozenset(
256 [attr.key for attr in self.values() if attr.impl.collection]
257 )
258
259 @HasMemoized.memoized_attribute
260 def _scalar_loader_impls(self):
261 return frozenset(
262 [
263 attr.impl
264 for attr in self.values()
265 if attr.impl.accepts_scalar_loader
266 ]
267 )
268
269 @HasMemoized.memoized_attribute
270 def _loader_impls(self):
271 return frozenset([attr.impl for attr in self.values()])
272
273 @util.memoized_property
274 def mapper(self) -> Mapper[_O]:
275 # raises unless self.mapper has been assigned
276 raise exc.UnmappedClassError(self.class_)
277
278 def _all_sqla_attributes(self, exclude=None):
279 """return an iterator of all classbound attributes that are
280 implement :class:`.InspectionAttr`.
281
282 This includes :class:`.QueryableAttribute` as well as extension
283 types such as :class:`.hybrid_property` and
284 :class:`.AssociationProxy`.
285
286 """
287
288 found: Dict[str, Any] = {}
289
290 # constraints:
291 # 1. yield keys in cls.__dict__ order
292 # 2. if a subclass has the same key as a superclass, include that
293 # key as part of the ordering of the superclass, because an
294 # overridden key is usually installed by the mapper which is going
295 # on a different ordering
296 # 3. don't use getattr() as this fires off descriptors
297
298 for supercls in self.class_.__mro__[0:-1]:
299 inherits = supercls.__mro__[1]
300 for key in supercls.__dict__:
301 found.setdefault(key, supercls)
302 if key in inherits.__dict__:
303 continue
304 val = found[key].__dict__[key]
305 if (
306 isinstance(val, interfaces.InspectionAttr)
307 and val.is_attribute
308 ):
309 yield key, val
310
311 def _get_class_attr_mro(self, key, default=None):
312 """return an attribute on the class without tripping it."""
313
314 for supercls in self.class_.__mro__:
315 if key in supercls.__dict__:
316 return supercls.__dict__[key]
317 else:
318 return default
319
320 def _attr_has_impl(self, key: str) -> bool:
321 """Return True if the given attribute is fully initialized.
322
323 i.e. has an impl.
324 """
325
326 return key in self and self[key].impl is not None
327
328 def _subclass_manager(self, cls: Type[_T]) -> ClassManager[_T]:
329 """Create a new ClassManager for a subclass of this ClassManager's
330 class.
331
332 This is called automatically when attributes are instrumented so that
333 the attributes can be propagated to subclasses against their own
334 class-local manager, without the need for mappers etc. to have already
335 pre-configured managers for the full class hierarchy. Mappers
336 can post-configure the auto-generated ClassManager when needed.
337
338 """
339 return register_class(cls, finalize=False)
340
341 def _instrument_init(self):
342 self.new_init = _generate_init(self.class_, self, self.original_init)
343 self.install_member("__init__", self.new_init)
344
345 @util.memoized_property
346 def _state_constructor(self) -> Type[state.InstanceState[_O]]:
347 self.dispatch.first_init(self, self.class_)
348 return state.InstanceState
349
350 def manage(self):
351 """Mark this instance as the manager for its class."""
352
353 setattr(self.class_, self.MANAGER_ATTR, self)
354
355 @util.hybridmethod
356 def manager_getter(self):
357 return _default_manager_getter
358
359 @util.hybridmethod
360 def state_getter(self):
361 """Return a (instance) -> InstanceState callable.
362
363 "state getter" callables should raise either KeyError or
364 AttributeError if no InstanceState could be found for the
365 instance.
366 """
367
368 return _default_state_getter
369
370 @util.hybridmethod
371 def dict_getter(self):
372 return _default_dict_getter
373
374 def instrument_attribute(
375 self,
376 key: str,
377 inst: QueryableAttribute[Any],
378 propagated: bool = False,
379 ) -> None:
380 if propagated:
381 if key in self.local_attrs:
382 return # don't override local attr with inherited attr
383 else:
384 self.local_attrs[key] = inst
385 self.install_descriptor(key, inst)
386 self._reset_memoizations()
387 self[key] = inst
388
389 for cls in self.class_.__subclasses__():
390 manager = self._subclass_manager(cls)
391 manager.instrument_attribute(key, inst, True)
392
393 def subclass_managers(self, recursive):
394 for cls in self.class_.__subclasses__():
395 mgr = opt_manager_of_class(cls)
396 if mgr is not None and mgr is not self:
397 yield mgr
398 if recursive:
399 yield from mgr.subclass_managers(True)
400
401 def post_configure_attribute(self, key):
402 _instrumentation_factory.dispatch.attribute_instrument(
403 self.class_, key, self[key]
404 )
405
406 def uninstrument_attribute(self, key, propagated=False):
407 if key not in self:
408 return
409 if propagated:
410 if key in self.local_attrs:
411 return # don't get rid of local attr
412 else:
413 del self.local_attrs[key]
414 self.uninstall_descriptor(key)
415 self._reset_memoizations()
416 del self[key]
417 for cls in self.class_.__subclasses__():
418 manager = opt_manager_of_class(cls)
419 if manager:
420 manager.uninstrument_attribute(key, True)
421
422 def unregister(self) -> None:
423 """remove all instrumentation established by this ClassManager."""
424
425 for key in list(self.originals):
426 self.uninstall_member(key)
427
428 self.mapper = None
429 self.dispatch = None # type: ignore
430 self.new_init = None
431 self.info.clear()
432
433 for key in list(self):
434 if key in self.local_attrs:
435 self.uninstrument_attribute(key)
436
437 if self.MANAGER_ATTR in self.class_.__dict__:
438 delattr(self.class_, self.MANAGER_ATTR)
439
440 def install_descriptor(
441 self, key: str, inst: QueryableAttribute[Any]
442 ) -> None:
443 if key in (self.STATE_ATTR, self.MANAGER_ATTR):
444 raise KeyError(
445 "%r: requested attribute name conflicts with "
446 "instrumentation attribute of the same name." % key
447 )
448 setattr(self.class_, key, inst)
449
450 def uninstall_descriptor(self, key: str) -> None:
451 delattr(self.class_, key)
452
453 def install_member(self, key: str, implementation: Any) -> None:
454 if key in (self.STATE_ATTR, self.MANAGER_ATTR):
455 raise KeyError(
456 "%r: requested attribute name conflicts with "
457 "instrumentation attribute of the same name." % key
458 )
459 self.originals.setdefault(key, self.class_.__dict__.get(key, DEL_ATTR))
460 setattr(self.class_, key, implementation)
461
462 def uninstall_member(self, key: str) -> None:
463 original = self.originals.pop(key, None)
464 if original is not DEL_ATTR:
465 setattr(self.class_, key, original)
466 else:
467 delattr(self.class_, key)
468
469 def instrument_collection_class(
470 self, key: str, collection_class: Type[Collection[Any]]
471 ) -> _CollectionFactoryType:
472 return collections.prepare_instrumentation(collection_class)
473
474 def initialize_collection(
475 self,
476 key: str,
477 state: InstanceState[_O],
478 factory: _CollectionFactoryType,
479 ) -> Tuple[collections.CollectionAdapter, _AdaptedCollectionProtocol]:
480 user_data = factory()
481 impl = self.get_impl(key)
482 assert _is_collection_attribute_impl(impl)
483 adapter = collections.CollectionAdapter(impl, state, user_data)
484 return adapter, user_data
485
486 def is_instrumented(self, key: str, search: bool = False) -> bool:
487 if search:
488 return key in self
489 else:
490 return key in self.local_attrs
491
492 def get_impl(self, key: str) -> AttributeImpl:
493 return self[key].impl
494
495 @property
496 def attributes(self) -> Iterable[Any]:
497 return iter(self.values())
498
499 # InstanceState management
500
501 def new_instance(self, state: Optional[InstanceState[_O]] = None) -> _O:
502 # here, we would prefer _O to be bound to "object"
503 # so that mypy sees that __new__ is present. currently
504 # it's bound to Any as there were other problems not having
505 # it that way but these can be revisited
506 instance = self.class_.__new__(self.class_)
507 if state is None:
508 state = self._state_constructor(instance, self)
509 self._state_setter(instance, state)
510 return instance
511
512 def setup_instance(
513 self, instance: _O, state: Optional[InstanceState[_O]] = None
514 ) -> None:
515 if state is None:
516 state = self._state_constructor(instance, self)
517 self._state_setter(instance, state)
518
519 def teardown_instance(self, instance: _O) -> None:
520 delattr(instance, self.STATE_ATTR)
521
522 def _serialize(
523 self, state: InstanceState[_O], state_dict: Dict[str, Any]
524 ) -> _SerializeManager:
525 return _SerializeManager(state, state_dict)
526
527 def _new_state_if_none(
528 self, instance: _O
529 ) -> Union[Literal[False], InstanceState[_O]]:
530 """Install a default InstanceState if none is present.
531
532 A private convenience method used by the __init__ decorator.
533
534 """
535 if hasattr(instance, self.STATE_ATTR):
536 return False
537 elif self.class_ is not instance.__class__ and self.is_mapped:
538 # this will create a new ClassManager for the
539 # subclass, without a mapper. This is likely a
540 # user error situation but allow the object
541 # to be constructed, so that it is usable
542 # in a non-ORM context at least.
543 return self._subclass_manager(
544 instance.__class__
545 )._new_state_if_none(instance)
546 else:
547 state = self._state_constructor(instance, self)
548 self._state_setter(instance, state)
549 return state
550
551 def has_state(self, instance: _O) -> bool:
552 return hasattr(instance, self.STATE_ATTR)
553
554 def has_parent(
555 self, state: InstanceState[_O], key: str, optimistic: bool = False
556 ) -> bool:
557 """TODO"""
558 return self.get_impl(key).hasparent(state, optimistic=optimistic)
559
560 def __bool__(self) -> bool:
561 """All ClassManagers are non-zero regardless of attribute state."""
562 return True
563
564 def __repr__(self) -> str:
565 return "<%s of %r at %x>" % (
566 self.__class__.__name__,
567 self.class_,
568 id(self),
569 )
570
571
572class _SerializeManager:
573 """Provide serialization of a :class:`.ClassManager`.
574
575 The :class:`.InstanceState` uses ``__init__()`` on serialize
576 and ``__call__()`` on deserialize.
577
578 """
579
580 def __init__(self, state: state.InstanceState[Any], d: Dict[str, Any]):
581 self.class_ = state.class_
582 manager = state.manager
583 manager.dispatch.pickle(state, d)
584
585 def __call__(self, state, inst, state_dict):
586 state.manager = manager = opt_manager_of_class(self.class_)
587 if manager is None:
588 raise exc.UnmappedInstanceError(
589 inst,
590 "Cannot deserialize object of type %r - "
591 "no mapper() has "
592 "been configured for this class within the current "
593 "Python process!" % self.class_,
594 )
595 elif manager.is_mapped and not manager.mapper.configured:
596 manager.mapper._check_configure()
597
598 # setup _sa_instance_state ahead of time so that
599 # unpickle events can access the object normally.
600 # see [ticket:2362]
601 if inst is not None:
602 manager.setup_instance(inst, state)
603 manager.dispatch.unpickle(state, state_dict)
604
605
606class InstrumentationFactory(EventTarget):
607 """Factory for new ClassManager instances."""
608
609 dispatch: dispatcher[InstrumentationFactory]
610
611 def create_manager_for_cls(self, class_: Type[_O]) -> ClassManager[_O]:
612 assert class_ is not None
613 assert opt_manager_of_class(class_) is None
614
615 # give a more complicated subclass
616 # a chance to do what it wants here
617 manager, factory = self._locate_extended_factory(class_)
618
619 if factory is None:
620 factory = ClassManager
621 manager = ClassManager(class_)
622 else:
623 assert manager is not None
624
625 self._check_conflicts(class_, factory)
626
627 manager.factory = factory
628
629 return manager
630
631 def _locate_extended_factory(
632 self, class_: Type[_O]
633 ) -> Tuple[Optional[ClassManager[_O]], Optional[_ManagerFactory]]:
634 """Overridden by a subclass to do an extended lookup."""
635 return None, None
636
637 def _check_conflicts(
638 self, class_: Type[_O], factory: Callable[[Type[_O]], ClassManager[_O]]
639 ) -> None:
640 """Overridden by a subclass to test for conflicting factories."""
641
642 def unregister(self, class_: Type[_O]) -> None:
643 manager = manager_of_class(class_)
644 manager.unregister()
645 self.dispatch.class_uninstrument(class_)
646
647
648# this attribute is replaced by sqlalchemy.ext.instrumentation
649# when imported.
650_instrumentation_factory = InstrumentationFactory()
651
652# these attributes are replaced by sqlalchemy.ext.instrumentation
653# when a non-standard InstrumentationManager class is first
654# used to instrument a class.
655instance_state = _default_state_getter = base.instance_state
656
657instance_dict = _default_dict_getter = base.instance_dict
658
659manager_of_class = _default_manager_getter = base.manager_of_class
660opt_manager_of_class = _default_opt_manager_getter = base.opt_manager_of_class
661
662
663def register_class(
664 class_: Type[_O],
665 finalize: bool = True,
666 mapper: Optional[Mapper[_O]] = None,
667 registry: Optional[_RegistryType] = None,
668 declarative_scan: Optional[_MapperConfig] = None,
669 expired_attribute_loader: Optional[_ExpiredAttributeLoaderProto] = None,
670 init_method: Optional[Callable[..., None]] = None,
671) -> ClassManager[_O]:
672 """Register class instrumentation.
673
674 Returns the existing or newly created class manager.
675
676 """
677
678 manager = opt_manager_of_class(class_)
679 if manager is None:
680 manager = _instrumentation_factory.create_manager_for_cls(class_)
681 manager._update_state(
682 mapper=mapper,
683 registry=registry,
684 declarative_scan=declarative_scan,
685 expired_attribute_loader=expired_attribute_loader,
686 init_method=init_method,
687 finalize=finalize,
688 )
689
690 return manager
691
692
693def unregister_class(class_):
694 """Unregister class instrumentation."""
695
696 _instrumentation_factory.unregister(class_)
697
698
699def is_instrumented(instance, key):
700 """Return True if the given attribute on the given instance is
701 instrumented by the attributes package.
702
703 This function may be used regardless of instrumentation
704 applied directly to the class, i.e. no descriptors are required.
705
706 """
707 return manager_of_class(instance.__class__).is_instrumented(
708 key, search=True
709 )
710
711
712def _generate_init(class_, class_manager, original_init):
713 """Build an __init__ decorator that triggers ClassManager events."""
714
715 # TODO: we should use the ClassManager's notion of the
716 # original '__init__' method, once ClassManager is fixed
717 # to always reference that.
718
719 if original_init is None:
720 original_init = class_.__init__
721
722 # Go through some effort here and don't change the user's __init__
723 # calling signature, including the unlikely case that it has
724 # a return value.
725 # FIXME: need to juggle local names to avoid constructor argument
726 # clashes.
727 func_body = """\
728def __init__(%(apply_pos)s):
729 new_state = class_manager._new_state_if_none(%(self_arg)s)
730 if new_state:
731 return new_state._initialize_instance(%(apply_kw)s)
732 else:
733 return original_init(%(apply_kw)s)
734"""
735 func_vars = util.format_argspec_init(original_init, grouped=False)
736 func_text = func_body % func_vars
737
738 func_defaults = getattr(original_init, "__defaults__", None)
739 func_kw_defaults = getattr(original_init, "__kwdefaults__", None)
740
741 env = locals().copy()
742 env["__name__"] = __name__
743 exec(func_text, env)
744 __init__ = env["__init__"]
745 __init__.__doc__ = original_init.__doc__
746 __init__._sa_original_init = original_init
747
748 if func_defaults:
749 __init__.__defaults__ = func_defaults
750 if func_kw_defaults:
751 __init__.__kwdefaults__ = func_kw_defaults
752
753 return __init__