1# orm/instrumentation.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Defines SQLAlchemy's system of class instrumentation.
10
11This module is usually not directly visible to user applications, but
12defines a large part of the ORM's interactivity.
13
14instrumentation.py deals with registration of end-user classes
15for state tracking. It interacts closely with state.py
16and attributes.py which establish per-instance and per-class-attribute
17instrumentation, respectively.
18
19The class instrumentation system can be customized on a per-class
20or global basis using the :mod:`sqlalchemy.ext.instrumentation`
21module, which provides the means to build and specify
22alternate instrumentation forms.
23
24"""
25
26
27from __future__ import annotations
28
29from typing import Any
30from typing import Callable
31from typing import cast
32from typing import Collection
33from typing import Dict
34from typing import Generic
35from typing import Iterable
36from typing import List
37from typing import Optional
38from typing import Protocol
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
45import weakref
46
47from . import base
48from . import collections
49from . import exc
50from . import interfaces
51from . import state
52from ._typing import _O
53from .attributes import _is_collection_attribute_impl
54from .. import util
55from ..event import EventTarget
56from ..util import HasMemoized
57from ..util.typing import Literal
58
59if TYPE_CHECKING:
60 from ._typing import _RegistryType
61 from .attributes import _AttributeImpl
62 from .attributes import QueryableAttribute
63 from .collections import _AdaptedCollectionProtocol
64 from .collections import _CollectionFactoryType
65 from .decl_base import _MapperConfig
66 from .events import InstanceEvents
67 from .mapper import Mapper
68 from .state import InstanceState
69 from ..event import dispatcher
70
71_T = TypeVar("_T", bound=Any)
72DEL_ATTR = util.symbol("DEL_ATTR")
73
74
75class _ExpiredAttributeLoaderProto(Protocol):
76 def __call__(
77 self,
78 state: state.InstanceState[Any],
79 toload: Set[str],
80 passive: base.PassiveFlag,
81 ) -> None: ...
82
83
84class _ManagerFactory(Protocol):
85 def __call__(self, class_: Type[_O]) -> ClassManager[_O]: ...
86
87
88class ClassManager(
89 HasMemoized,
90 Dict[str, "QueryableAttribute[Any]"],
91 Generic[_O],
92 EventTarget,
93):
94 """Tracks state information at the class level."""
95
96 dispatch: dispatcher[ClassManager[_O]]
97
98 MANAGER_ATTR = base.DEFAULT_MANAGER_ATTR
99 STATE_ATTR = base.DEFAULT_STATE_ATTR
100
101 _state_setter = staticmethod(util.attrsetter(STATE_ATTR))
102
103 expired_attribute_loader: _ExpiredAttributeLoaderProto
104 "previously known as deferred_scalar_loader"
105
106 init_method: Optional[Callable[..., None]]
107 original_init: Optional[Callable[..., None]] = None
108
109 factory: Optional[_ManagerFactory]
110
111 declarative_scan: Optional[weakref.ref[_MapperConfig]] = None
112
113 registry: _RegistryType
114
115 if not TYPE_CHECKING:
116 # starts as None during setup
117 registry = None
118
119 class_: Type[_O]
120
121 _bases: List[ClassManager[Any]]
122
123 @property
124 @util.deprecated(
125 "1.4",
126 message="The ClassManager.deferred_scalar_loader attribute is now "
127 "named expired_attribute_loader",
128 )
129 def deferred_scalar_loader(self):
130 return self.expired_attribute_loader
131
132 @deferred_scalar_loader.setter
133 @util.deprecated(
134 "1.4",
135 message="The ClassManager.deferred_scalar_loader attribute is now "
136 "named expired_attribute_loader",
137 )
138 def deferred_scalar_loader(self, obj):
139 self.expired_attribute_loader = obj
140
141 def __init__(self, class_):
142 self.class_ = class_
143 self.info = {}
144 self.new_init = None
145 self.local_attrs = {}
146 self.originals = {}
147 self._finalized = False
148 self.factory = None
149 self.init_method = None
150
151 self._bases = [
152 mgr
153 for mgr in cast(
154 "List[Optional[ClassManager[Any]]]",
155 [
156 opt_manager_of_class(base)
157 for base in self.class_.__bases__
158 if isinstance(base, type)
159 ],
160 )
161 if mgr is not None
162 ]
163
164 for base_ in self._bases:
165 self.update(base_)
166
167 cast(
168 "InstanceEvents", self.dispatch._events
169 )._new_classmanager_instance(class_, self)
170
171 for basecls in class_.__mro__:
172 mgr = opt_manager_of_class(basecls)
173 if mgr is not None:
174 self.dispatch._update(mgr.dispatch)
175
176 self.manage()
177
178 if "__del__" in class_.__dict__:
179 util.warn(
180 "__del__() method on class %s will "
181 "cause unreachable cycles and memory leaks, "
182 "as SQLAlchemy instrumentation often creates "
183 "reference cycles. Please remove this method." % class_
184 )
185
186 def _update_state(
187 self,
188 finalize: bool = False,
189 mapper: Optional[Mapper[_O]] = None,
190 registry: Optional[_RegistryType] = None,
191 declarative_scan: Optional[_MapperConfig] = None,
192 expired_attribute_loader: Optional[
193 _ExpiredAttributeLoaderProto
194 ] = None,
195 init_method: Optional[Callable[..., None]] = None,
196 ) -> None:
197 if mapper:
198 self.mapper = mapper #
199 if registry:
200 registry._add_manager(self)
201 if declarative_scan:
202 self.declarative_scan = weakref.ref(declarative_scan)
203 if expired_attribute_loader:
204 self.expired_attribute_loader = expired_attribute_loader
205
206 if init_method:
207 assert not self._finalized, (
208 "class is already instrumented, "
209 "init_method %s can't be applied" % init_method
210 )
211 self.init_method = init_method
212
213 if not self._finalized:
214 self.original_init = (
215 self.init_method
216 if self.init_method is not None
217 and self.class_.__init__ is object.__init__
218 else self.class_.__init__
219 )
220
221 if finalize and not self._finalized:
222 self._finalize()
223
224 def _finalize(self) -> None:
225 if self._finalized:
226 return
227 self._finalized = True
228
229 self._instrument_init()
230
231 _instrumentation_factory.dispatch.class_instrument(self.class_)
232
233 def __hash__(self) -> int: # type: ignore[override]
234 return id(self)
235
236 def __eq__(self, other: Any) -> bool:
237 return other is self
238
239 @property
240 def is_mapped(self) -> bool:
241 return "mapper" in self.__dict__
242
243 @HasMemoized.memoized_attribute
244 def _all_key_set(self):
245 return frozenset(self)
246
247 @HasMemoized.memoized_attribute
248 def _collection_impl_keys(self):
249 return frozenset(
250 [attr.key for attr in self.values() if attr.impl.collection]
251 )
252
253 @HasMemoized.memoized_attribute
254 def _scalar_loader_impls(self):
255 return frozenset(
256 [
257 attr.impl
258 for attr in self.values()
259 if attr.impl.accepts_scalar_loader
260 ]
261 )
262
263 @HasMemoized.memoized_attribute
264 def _loader_impls(self):
265 return frozenset([attr.impl for attr in self.values()])
266
267 @util.memoized_property
268 def mapper(self) -> Mapper[_O]:
269 # raises unless self.mapper has been assigned
270 raise exc.UnmappedClassError(self.class_)
271
272 def _all_sqla_attributes(self, exclude=None):
273 """return an iterator of all classbound attributes that are
274 implement :class:`.InspectionAttr`.
275
276 This includes :class:`.QueryableAttribute` as well as extension
277 types such as :class:`.hybrid_property` and
278 :class:`.AssociationProxy`.
279
280 """
281
282 found: Dict[str, Any] = {}
283
284 # constraints:
285 # 1. yield keys in cls.__dict__ order
286 # 2. if a subclass has the same key as a superclass, include that
287 # key as part of the ordering of the superclass, because an
288 # overridden key is usually installed by the mapper which is going
289 # on a different ordering
290 # 3. don't use getattr() as this fires off descriptors
291
292 for supercls in self.class_.__mro__[0:-1]:
293 inherits = supercls.__mro__[1]
294 for key in supercls.__dict__:
295 found.setdefault(key, supercls)
296 if key in inherits.__dict__:
297 continue
298 val = found[key].__dict__[key]
299 if (
300 isinstance(val, interfaces.InspectionAttr)
301 and val.is_attribute
302 ):
303 yield key, val
304
305 def _get_class_attr_mro(self, key, default=None):
306 """return an attribute on the class without tripping it."""
307
308 for supercls in self.class_.__mro__:
309 if key in supercls.__dict__:
310 return supercls.__dict__[key]
311 else:
312 return default
313
314 def _attr_has_impl(self, key: str) -> bool:
315 """Return True if the given attribute is fully initialized.
316
317 i.e. has an impl.
318 """
319
320 return key in self and self[key].impl is not None
321
322 def _subclass_manager(self, cls: Type[_T]) -> ClassManager[_T]:
323 """Create a new ClassManager for a subclass of this ClassManager's
324 class.
325
326 This is called automatically when attributes are instrumented so that
327 the attributes can be propagated to subclasses against their own
328 class-local manager, without the need for mappers etc. to have already
329 pre-configured managers for the full class hierarchy. Mappers
330 can post-configure the auto-generated ClassManager when needed.
331
332 """
333 return register_class(cls, finalize=False)
334
335 def _instrument_init(self):
336 self.new_init = _generate_init(self.class_, self, self.original_init)
337 self.install_member("__init__", self.new_init)
338
339 @util.memoized_property
340 def _state_constructor(self) -> Type[state.InstanceState[_O]]:
341 return state.InstanceState
342
343 def manage(self):
344 """Mark this instance as the manager for its class."""
345
346 setattr(self.class_, self.MANAGER_ATTR, self)
347
348 @util.hybridmethod
349 def manager_getter(self):
350 return _default_manager_getter
351
352 @util.hybridmethod
353 def state_getter(self):
354 """Return a (instance) -> InstanceState callable.
355
356 "state getter" callables should raise either KeyError or
357 AttributeError if no InstanceState could be found for the
358 instance.
359 """
360
361 return _default_state_getter
362
363 @util.hybridmethod
364 def dict_getter(self):
365 return _default_dict_getter
366
367 def instrument_attribute(
368 self,
369 key: str,
370 inst: QueryableAttribute[Any],
371 propagated: bool = False,
372 ) -> None:
373 if propagated:
374 if key in self.local_attrs:
375 return # don't override local attr with inherited attr
376 else:
377 self.local_attrs[key] = inst
378 self.install_descriptor(key, inst)
379 self._reset_memoizations()
380 self[key] = inst
381
382 for cls in self.class_.__subclasses__():
383 manager = self._subclass_manager(cls)
384 manager.instrument_attribute(key, inst, True)
385
386 def subclass_managers(self, recursive):
387 for cls in self.class_.__subclasses__():
388 mgr = opt_manager_of_class(cls)
389 if mgr is not None and mgr is not self:
390 yield mgr
391 if recursive:
392 yield from mgr.subclass_managers(True)
393
394 def post_configure_attribute(self, key):
395 _instrumentation_factory.dispatch.attribute_instrument(
396 self.class_, key, self[key]
397 )
398
399 def uninstrument_attribute(self, key, propagated=False):
400 if key not in self:
401 return
402 if propagated:
403 if key in self.local_attrs:
404 return # don't get rid of local attr
405 else:
406 del self.local_attrs[key]
407 self.uninstall_descriptor(key)
408 self._reset_memoizations()
409 del self[key]
410 for cls in self.class_.__subclasses__():
411 manager = opt_manager_of_class(cls)
412 if manager:
413 manager.uninstrument_attribute(key, True)
414
415 def unregister(self) -> None:
416 """remove all instrumentation established by this ClassManager."""
417
418 for key in list(self.originals):
419 self.uninstall_member(key)
420
421 self.mapper = None
422 self.dispatch = None # type: ignore
423 self.new_init = None
424 self.info.clear()
425
426 for key in list(self):
427 if key in self.local_attrs:
428 self.uninstrument_attribute(key)
429
430 if self.MANAGER_ATTR in self.class_.__dict__:
431 delattr(self.class_, self.MANAGER_ATTR)
432
433 def install_descriptor(
434 self, key: str, inst: QueryableAttribute[Any]
435 ) -> None:
436 if key in (self.STATE_ATTR, self.MANAGER_ATTR):
437 raise KeyError(
438 "%r: requested attribute name conflicts with "
439 "instrumentation attribute of the same name." % key
440 )
441 setattr(self.class_, key, inst)
442
443 def uninstall_descriptor(self, key: str) -> None:
444 delattr(self.class_, key)
445
446 def install_member(self, key: str, implementation: Any) -> None:
447 if key in (self.STATE_ATTR, self.MANAGER_ATTR):
448 raise KeyError(
449 "%r: requested attribute name conflicts with "
450 "instrumentation attribute of the same name." % key
451 )
452 self.originals.setdefault(key, self.class_.__dict__.get(key, DEL_ATTR))
453 setattr(self.class_, key, implementation)
454
455 def uninstall_member(self, key: str) -> None:
456 original = self.originals.pop(key, None)
457 if original is not DEL_ATTR:
458 setattr(self.class_, key, original)
459 else:
460 delattr(self.class_, key)
461
462 def instrument_collection_class(
463 self, key: str, collection_class: Type[Collection[Any]]
464 ) -> _CollectionFactoryType:
465 return collections._prepare_instrumentation(collection_class)
466
467 def initialize_collection(
468 self,
469 key: str,
470 state: InstanceState[_O],
471 factory: _CollectionFactoryType,
472 ) -> Tuple[collections.CollectionAdapter, _AdaptedCollectionProtocol]:
473 user_data = factory()
474 impl = self.get_impl(key)
475 assert _is_collection_attribute_impl(impl)
476 adapter = collections.CollectionAdapter(impl, state, user_data)
477 return adapter, user_data
478
479 def is_instrumented(self, key: str, search: bool = False) -> bool:
480 if search:
481 return key in self
482 else:
483 return key in self.local_attrs
484
485 def get_impl(self, key: str) -> _AttributeImpl:
486 return self[key].impl
487
488 @property
489 def attributes(self) -> Iterable[Any]:
490 return iter(self.values())
491
492 # InstanceState management
493
494 def new_instance(self, state: Optional[InstanceState[_O]] = None) -> _O:
495 # here, we would prefer _O to be bound to "object"
496 # so that mypy sees that __new__ is present. currently
497 # it's bound to Any as there were other problems not having
498 # it that way but these can be revisited
499 instance = self.class_.__new__(self.class_)
500 if state is None:
501 state = self._state_constructor(instance, self)
502 self._state_setter(instance, state)
503 return instance
504
505 def setup_instance(
506 self, instance: _O, state: Optional[InstanceState[_O]] = None
507 ) -> None:
508 if state is None:
509 state = self._state_constructor(instance, self)
510 self._state_setter(instance, state)
511
512 def teardown_instance(self, instance: _O) -> None:
513 delattr(instance, self.STATE_ATTR)
514
515 def _serialize(
516 self, state: InstanceState[_O], state_dict: Dict[str, Any]
517 ) -> _SerializeManager:
518 return _SerializeManager(state, state_dict)
519
520 def _new_state_if_none(
521 self, instance: _O
522 ) -> Union[Literal[False], InstanceState[_O]]:
523 """Install a default InstanceState if none is present.
524
525 A private convenience method used by the __init__ decorator.
526
527 """
528 if hasattr(instance, self.STATE_ATTR):
529 return False
530 elif self.class_ is not instance.__class__ and self.is_mapped:
531 # this will create a new ClassManager for the
532 # subclass, without a mapper. This is likely a
533 # user error situation but allow the object
534 # to be constructed, so that it is usable
535 # in a non-ORM context at least.
536 return self._subclass_manager(
537 instance.__class__
538 )._new_state_if_none(instance)
539 else:
540 state = self._state_constructor(instance, self)
541 self._state_setter(instance, state)
542 return state
543
544 def has_state(self, instance: _O) -> bool:
545 return hasattr(instance, self.STATE_ATTR)
546
547 def has_parent(
548 self, state: InstanceState[_O], key: str, optimistic: bool = False
549 ) -> bool:
550 """TODO"""
551 return self.get_impl(key).hasparent(state, optimistic=optimistic)
552
553 def __bool__(self) -> bool:
554 """All ClassManagers are non-zero regardless of attribute state."""
555 return True
556
557 def __repr__(self) -> str:
558 return "<%s of %r at %x>" % (
559 self.__class__.__name__,
560 self.class_,
561 id(self),
562 )
563
564
565class _SerializeManager:
566 """Provide serialization of a :class:`.ClassManager`.
567
568 The :class:`.InstanceState` uses ``__init__()`` on serialize
569 and ``__call__()`` on deserialize.
570
571 """
572
573 def __init__(self, state: state.InstanceState[Any], d: Dict[str, Any]):
574 self.class_ = state.class_
575 manager = state.manager
576 manager.dispatch.pickle(state, d)
577
578 def __call__(self, state, inst, state_dict):
579 state.manager = manager = opt_manager_of_class(self.class_)
580 if manager is None:
581 raise exc.UnmappedInstanceError(
582 inst,
583 "Cannot deserialize object of type %r - "
584 "no mapper() has "
585 "been configured for this class within the current "
586 "Python process!" % self.class_,
587 )
588 elif manager.is_mapped and not manager.mapper.configured:
589 manager.mapper._check_configure()
590
591 # setup _sa_instance_state ahead of time so that
592 # unpickle events can access the object normally.
593 # see [ticket:2362]
594 if inst is not None:
595 manager.setup_instance(inst, state)
596 manager.dispatch.unpickle(state, state_dict)
597
598
599class InstrumentationFactory(EventTarget):
600 """Factory for new ClassManager instances."""
601
602 dispatch: dispatcher[InstrumentationFactory]
603
604 def create_manager_for_cls(self, class_: Type[_O]) -> ClassManager[_O]:
605 assert class_ is not None
606 assert opt_manager_of_class(class_) is None
607
608 # give a more complicated subclass
609 # a chance to do what it wants here
610 manager, factory = self._locate_extended_factory(class_)
611
612 if factory is None:
613 factory = ClassManager
614 manager = ClassManager(class_)
615 else:
616 assert manager is not None
617
618 self._check_conflicts(class_, factory)
619
620 manager.factory = factory
621
622 return manager
623
624 def _locate_extended_factory(
625 self, class_: Type[_O]
626 ) -> Tuple[Optional[ClassManager[_O]], Optional[_ManagerFactory]]:
627 """Overridden by a subclass to do an extended lookup."""
628 return None, None
629
630 def _check_conflicts(
631 self, class_: Type[_O], factory: Callable[[Type[_O]], ClassManager[_O]]
632 ) -> None:
633 """Overridden by a subclass to test for conflicting factories."""
634
635 def unregister(self, class_: Type[_O]) -> None:
636 manager = manager_of_class(class_)
637 manager.unregister()
638 self.dispatch.class_uninstrument(class_)
639
640
641# this attribute is replaced by sqlalchemy.ext.instrumentation
642# when imported.
643_instrumentation_factory = InstrumentationFactory()
644
645# these attributes are replaced by sqlalchemy.ext.instrumentation
646# when a non-standard InstrumentationManager class is first
647# used to instrument a class.
648instance_state = _default_state_getter = base.instance_state
649
650instance_dict = _default_dict_getter = base.instance_dict
651
652manager_of_class = _default_manager_getter = base.manager_of_class
653opt_manager_of_class = _default_opt_manager_getter = base.opt_manager_of_class
654
655
656def register_class(
657 class_: Type[_O],
658 finalize: bool = True,
659 mapper: Optional[Mapper[_O]] = None,
660 registry: Optional[_RegistryType] = None,
661 declarative_scan: Optional[_MapperConfig] = None,
662 expired_attribute_loader: Optional[_ExpiredAttributeLoaderProto] = None,
663 init_method: Optional[Callable[..., None]] = None,
664) -> ClassManager[_O]:
665 """Register class instrumentation.
666
667 Returns the existing or newly created class manager.
668
669 """
670
671 manager = opt_manager_of_class(class_)
672 if manager is None:
673 manager = _instrumentation_factory.create_manager_for_cls(class_)
674 manager._update_state(
675 mapper=mapper,
676 registry=registry,
677 declarative_scan=declarative_scan,
678 expired_attribute_loader=expired_attribute_loader,
679 init_method=init_method,
680 finalize=finalize,
681 )
682
683 return manager
684
685
686def unregister_class(class_):
687 """Unregister class instrumentation."""
688
689 _instrumentation_factory.unregister(class_)
690
691
692def is_instrumented(instance, key):
693 """Return True if the given attribute on the given instance is
694 instrumented by the attributes package.
695
696 This function may be used regardless of instrumentation
697 applied directly to the class, i.e. no descriptors are required.
698
699 """
700 return manager_of_class(instance.__class__).is_instrumented(
701 key, search=True
702 )
703
704
705def _generate_init(class_, class_manager, original_init):
706 """Build an __init__ decorator that triggers ClassManager events."""
707
708 # TODO: we should use the ClassManager's notion of the
709 # original '__init__' method, once ClassManager is fixed
710 # to always reference that.
711
712 if original_init is None:
713 original_init = class_.__init__
714
715 # Go through some effort here and don't change the user's __init__
716 # calling signature, including the unlikely case that it has
717 # a return value.
718 # FIXME: need to juggle local names to avoid constructor argument
719 # clashes.
720 func_body = """\
721def __init__(%(apply_pos)s):
722 new_state = class_manager._new_state_if_none(%(self_arg)s)
723 if new_state:
724 return new_state._initialize_instance(%(apply_kw)s)
725 else:
726 return original_init(%(apply_kw)s)
727"""
728 func_vars = util.format_argspec_init(original_init, grouped=False)
729 func_text = func_body % func_vars
730
731 func_defaults = getattr(original_init, "__defaults__", None)
732 func_kw_defaults = getattr(original_init, "__kwdefaults__", None)
733
734 env = locals().copy()
735 env["__name__"] = __name__
736 exec(func_text, env)
737 __init__ = env["__init__"]
738 __init__.__doc__ = original_init.__doc__
739 __init__._sa_original_init = original_init
740
741 if func_defaults:
742 __init__.__defaults__ = func_defaults
743 if func_kw_defaults:
744 __init__.__kwdefaults__ = func_kw_defaults
745
746 return __init__