1# event/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Base implementation classes.
9
10The public-facing ``Events`` serves as the base class for an event interface;
11its public attributes represent different kinds of events. These attributes
12are mirrored onto a ``_Dispatch`` class, which serves as a container for
13collections of listener functions. These collections are represented both
14at the class level of a particular ``_Dispatch`` class as well as within
15instances of ``_Dispatch``.
16
17"""
18
19from __future__ import annotations
20
21import typing
22from typing import Any
23from typing import cast
24from typing import Dict
25from typing import Generic
26from typing import Iterator
27from typing import List
28from typing import Literal
29from typing import Mapping
30from typing import MutableMapping
31from typing import Optional
32from typing import overload
33from typing import Tuple
34from typing import Type
35from typing import Union
36import weakref
37
38from .attr import _ClsLevelDispatch
39from .attr import _EmptyListener
40from .attr import _InstanceLevelDispatch
41from .attr import _JoinedListener
42from .registry import _ET
43from .registry import _EventKey
44from .. import util
45
46_registrars: MutableMapping[str, List[Type[_HasEventsDispatch[Any]]]] = (
47 util.defaultdict(list)
48)
49
50
51def _is_event_name(name: str) -> bool:
52 # _sa_event prefix is special to support internal-only event names.
53 # most event names are just plain method names that aren't
54 # underscored.
55
56 return (
57 not name.startswith("_") and name != "dispatch"
58 ) or name.startswith("_sa_event")
59
60
61class _UnpickleDispatch:
62 """Serializable callable that re-generates an instance of
63 :class:`_Dispatch` given a particular :class:`.Events` subclass.
64
65 """
66
67 def __call__(self, _instance_cls: Type[_ET]) -> _Dispatch[_ET]:
68 for cls in _instance_cls.__mro__:
69 if "dispatch" in cls.__dict__:
70 return cast(
71 "_Dispatch[_ET]", cls.__dict__["dispatch"].dispatch
72 )._for_class(_instance_cls)
73 else:
74 raise AttributeError("No class with a 'dispatch' member present.")
75
76
77class _DispatchCommon(Generic[_ET]):
78 __slots__ = ()
79
80 _instance_cls: Optional[Type[_ET]]
81
82 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]:
83 raise NotImplementedError()
84
85 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]:
86 raise NotImplementedError()
87
88 @property
89 def _events(self) -> Type[_HasEventsDispatch[_ET]]:
90 raise NotImplementedError()
91
92
93class _Dispatch(_DispatchCommon[_ET]):
94 """Mirror the event listening definitions of an Events class with
95 listener collections.
96
97 Classes which define a "dispatch" member will return a
98 non-instantiated :class:`._Dispatch` subclass when the member
99 is accessed at the class level. When the "dispatch" member is
100 accessed at the instance level of its owner, an instance
101 of the :class:`._Dispatch` class is returned.
102
103 A :class:`._Dispatch` class is generated for each :class:`.Events`
104 class defined, by the :meth:`._HasEventsDispatch._create_dispatcher_class`
105 method. The original :class:`.Events` classes remain untouched.
106 This decouples the construction of :class:`.Events` subclasses from
107 the implementation used by the event internals, and allows
108 inspecting tools like Sphinx to work in an unsurprising
109 way against the public API.
110
111 """
112
113 # "active_history" is an ORM case we add here. ideally a better
114 # system would be in place for ad-hoc attributes.
115 __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners"
116
117 _active_history: bool
118
119 _empty_listener_reg: MutableMapping[
120 Type[_ET], Dict[str, _EmptyListener[_ET]]
121 ] = weakref.WeakKeyDictionary()
122
123 _empty_listeners: Dict[str, _EmptyListener[_ET]]
124
125 _event_names: List[str]
126
127 _instance_cls: Optional[Type[_ET]]
128
129 _joined_dispatch_cls: Type[_JoinedDispatcher[_ET]]
130
131 _events: Type[_HasEventsDispatch[_ET]]
132 """reference back to the Events class.
133
134 Bidirectional against _HasEventsDispatch.dispatch
135
136 """
137
138 def __init__(
139 self,
140 parent: Optional[_Dispatch[_ET]],
141 instance_cls: Optional[Type[_ET]] = None,
142 ):
143 self._parent = parent
144 self._instance_cls = instance_cls
145
146 if instance_cls:
147 assert parent is not None
148 try:
149 self._empty_listeners = self._empty_listener_reg[instance_cls]
150 except KeyError:
151 self._empty_listeners = self._empty_listener_reg[
152 instance_cls
153 ] = {
154 ls.name: _EmptyListener(ls, instance_cls)
155 for ls in parent._event_descriptors
156 }
157 else:
158 self._empty_listeners = {}
159
160 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]:
161 # Assign EmptyListeners as attributes on demand
162 # to reduce startup time for new dispatch objects.
163 try:
164 ls = self._empty_listeners[name]
165 except KeyError:
166 raise AttributeError(name)
167 else:
168 setattr(self, ls.name, ls)
169 return ls
170
171 @property
172 def _event_descriptors(self) -> Iterator[_ClsLevelDispatch[_ET]]:
173 for k in self._event_names:
174 # Yield _ClsLevelDispatch related
175 # to relevant event name.
176 yield getattr(self, k)
177
178 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None:
179 return self._events._listen(event_key, **kw)
180
181 def _for_class(self, instance_cls: Type[_ET]) -> _Dispatch[_ET]:
182 return self.__class__(self, instance_cls)
183
184 def _for_instance(self, instance: _ET) -> _Dispatch[_ET]:
185 instance_cls = instance.__class__
186 return self._for_class(instance_cls)
187
188 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]:
189 """Create a 'join' of this :class:`._Dispatch` and another.
190
191 This new dispatcher will dispatch events to both
192 :class:`._Dispatch` objects.
193
194 """
195 assert "_joined_dispatch_cls" in self.__class__.__dict__
196
197 return self._joined_dispatch_cls(self, other)
198
199 def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
200 return _UnpickleDispatch(), (self._instance_cls,)
201
202 def _update(
203 self, other: _Dispatch[_ET], only_propagate: bool = True
204 ) -> None:
205 """Populate from the listeners in another :class:`_Dispatch`
206 object."""
207 for ls in other._event_descriptors:
208 if isinstance(ls, _EmptyListener):
209 continue
210 getattr(self, ls.name).for_modify(self)._update(
211 ls, only_propagate=only_propagate
212 )
213
214 def _clear(self) -> None:
215 for ls in self._event_descriptors:
216 ls.for_modify(self).clear()
217
218
219def _remove_dispatcher(cls: Type[_HasEventsDispatch[_ET]]) -> None:
220 for k in cls.dispatch._event_names:
221 _registrars[k].remove(cls)
222 if not _registrars[k]:
223 del _registrars[k]
224
225
226class _HasEventsDispatch(Generic[_ET]):
227 _dispatch_target: Optional[Type[_ET]]
228 """class which will receive the .dispatch collection"""
229
230 dispatch: _Dispatch[_ET]
231 """reference back to the _Dispatch class.
232
233 Bidirectional against _Dispatch._events
234
235 """
236
237 if typing.TYPE_CHECKING:
238
239 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: ...
240
241 def __init_subclass__(cls) -> None:
242 """Intercept new Event subclasses and create associated _Dispatch
243 classes."""
244
245 cls._create_dispatcher_class(cls.__name__, cls.__bases__, cls.__dict__)
246
247 @classmethod
248 def _accept_with(
249 cls, target: Union[_ET, Type[_ET]], identifier: str
250 ) -> Optional[Union[_ET, Type[_ET]]]:
251 raise NotImplementedError()
252
253 @classmethod
254 def _listen(
255 cls,
256 event_key: _EventKey[_ET],
257 *,
258 propagate: bool = False,
259 insert: bool = False,
260 named: bool = False,
261 asyncio: bool = False,
262 ) -> None:
263 raise NotImplementedError()
264
265 @staticmethod
266 def _set_dispatch(
267 klass: Type[_HasEventsDispatch[_ET]],
268 dispatch_cls: Type[_Dispatch[_ET]],
269 ) -> _Dispatch[_ET]:
270 # This allows an Events subclass to define additional utility
271 # methods made available to the target via
272 # "self.dispatch._events.<utilitymethod>"
273 # @staticmethod to allow easy "super" calls while in a metaclass
274 # constructor.
275 klass.dispatch = dispatch_cls(None)
276 dispatch_cls._events = klass
277 return klass.dispatch
278
279 @classmethod
280 def _create_dispatcher_class(
281 cls, classname: str, bases: Tuple[type, ...], dict_: Mapping[str, Any]
282 ) -> None:
283 """Create a :class:`._Dispatch` class corresponding to an
284 :class:`.Events` class."""
285
286 # there's all kinds of ways to do this,
287 # i.e. make a Dispatch class that shares the '_listen' method
288 # of the Event class, this is the straight monkeypatch.
289 if hasattr(cls, "dispatch"):
290 dispatch_base = cls.dispatch.__class__
291 else:
292 dispatch_base = _Dispatch
293
294 event_names = [k for k in dict_ if _is_event_name(k)]
295 dispatch_cls = cast(
296 "Type[_Dispatch[_ET]]",
297 type(
298 "%sDispatch" % classname,
299 (dispatch_base,),
300 {"__slots__": event_names},
301 ),
302 )
303
304 dispatch_cls._event_names = event_names
305 dispatch_inst = cls._set_dispatch(cls, dispatch_cls)
306 for k in dispatch_cls._event_names:
307 setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k]))
308 _registrars[k].append(cls)
309
310 for super_ in dispatch_cls.__bases__:
311 if issubclass(super_, _Dispatch) and super_ is not _Dispatch:
312 for ls in super_._events.dispatch._event_descriptors:
313 setattr(dispatch_inst, ls.name, ls)
314 dispatch_cls._event_names.append(ls.name)
315
316 if getattr(cls, "_dispatch_target", None):
317 dispatch_target_cls = cls._dispatch_target
318 assert dispatch_target_cls is not None
319 if (
320 hasattr(dispatch_target_cls, "__slots__")
321 and "_slots_dispatch" in dispatch_target_cls.__slots__
322 ):
323 dispatch_target_cls.dispatch = slots_dispatcher(cls)
324 else:
325 dispatch_target_cls.dispatch = dispatcher(cls)
326
327 klass = type(
328 "Joined%s" % dispatch_cls.__name__,
329 (_JoinedDispatcher,),
330 {"__slots__": event_names},
331 )
332 dispatch_cls._joined_dispatch_cls = klass
333
334 # establish pickle capability by adding it to this module
335 globals()[klass.__name__] = klass
336
337
338class _JoinedDispatcher(_DispatchCommon[_ET]):
339 """Represent a connection between two _Dispatch objects."""
340
341 __slots__ = "local", "parent", "_instance_cls"
342
343 local: _DispatchCommon[_ET]
344 parent: _DispatchCommon[_ET]
345 _instance_cls: Optional[Type[_ET]]
346
347 def __init__(
348 self, local: _DispatchCommon[_ET], parent: _DispatchCommon[_ET]
349 ):
350 self.local = local
351 self.parent = parent
352 self._instance_cls = self.local._instance_cls
353
354 def __reduce__(self) -> Any:
355 return (self.__class__, (self.local, self.parent))
356
357 def __getattr__(self, name: str) -> _JoinedListener[_ET]:
358 # Assign _JoinedListeners as attributes on demand
359 # to reduce startup time for new dispatch objects.
360 ls = getattr(self.local, name)
361 jl = _JoinedListener(self.parent, ls.name, ls)
362 setattr(self, ls.name, jl)
363 return jl
364
365 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None:
366 return self.parent._listen(event_key, **kw)
367
368 @property
369 def _events(self) -> Type[_HasEventsDispatch[_ET]]:
370 return self.parent._events
371
372
373class Events(_HasEventsDispatch[_ET]):
374 """Define event listening functions for a particular target type."""
375
376 @classmethod
377 def _accept_with(
378 cls, target: Union[_ET, Type[_ET]], identifier: str
379 ) -> Optional[Union[_ET, Type[_ET]]]:
380 def dispatch_is(*types: Type[Any]) -> bool:
381 return all(isinstance(target.dispatch, t) for t in types)
382
383 def dispatch_parent_is(t: Type[Any]) -> bool:
384 parent = cast("_JoinedDispatcher[_ET]", target.dispatch).parent
385 while isinstance(parent, _JoinedDispatcher):
386 parent = cast("_JoinedDispatcher[_ET]", parent).parent
387
388 return isinstance(parent, t)
389
390 # Mapper, ClassManager, Session override this to
391 # also accept classes, scoped_sessions, sessionmakers, etc.
392 if hasattr(target, "dispatch"):
393 if (
394 dispatch_is(cls.dispatch.__class__)
395 or dispatch_is(type, cls.dispatch.__class__)
396 or (
397 dispatch_is(_JoinedDispatcher)
398 and dispatch_parent_is(cls.dispatch.__class__)
399 )
400 ):
401 return target
402
403 return None
404
405 @classmethod
406 def _listen(
407 cls,
408 event_key: _EventKey[_ET],
409 *,
410 propagate: bool = False,
411 insert: bool = False,
412 named: bool = False,
413 asyncio: bool = False,
414 ) -> None:
415 event_key.base_listen(
416 propagate=propagate, insert=insert, named=named, asyncio=asyncio
417 )
418
419 @classmethod
420 def _remove(cls, event_key: _EventKey[_ET]) -> None:
421 event_key.remove()
422
423 @classmethod
424 def _clear(cls) -> None:
425 cls.dispatch._clear()
426
427
428class dispatcher(Generic[_ET]):
429 """Descriptor used by target classes to
430 deliver the _Dispatch class at the class level
431 and produce new _Dispatch instances for target
432 instances.
433
434 """
435
436 def __init__(self, events: Type[_HasEventsDispatch[_ET]]):
437 self.dispatch = events.dispatch
438 self.events = events
439
440 @overload
441 def __get__(
442 self, obj: Literal[None], cls: Type[Any]
443 ) -> Type[_Dispatch[_ET]]: ...
444
445 @overload
446 def __get__(self, obj: Any, cls: Type[Any]) -> _DispatchCommon[_ET]: ...
447
448 def __get__(self, obj: Any, cls: Type[Any]) -> Any:
449 if obj is None:
450 return self.dispatch
451
452 disp = self.dispatch._for_instance(obj)
453 try:
454 obj.__dict__["dispatch"] = disp
455 except AttributeError as ae:
456 raise TypeError(
457 "target %r doesn't have __dict__, should it be "
458 "defining _slots_dispatch?" % (obj,)
459 ) from ae
460 return disp
461
462
463class slots_dispatcher(dispatcher[_ET]):
464 def __get__(self, obj: Any, cls: Type[Any]) -> Any:
465 if obj is None:
466 return self.dispatch
467
468 if hasattr(obj, "_slots_dispatch"):
469 return obj._slots_dispatch
470
471 disp = self.dispatch._for_instance(obj)
472 obj._slots_dispatch = disp
473 return disp