1# event/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Base implementation classes.
9
10The public-facing ``Events`` serves as the base class for an event interface;
11its public attributes represent different kinds of events. These attributes
12are mirrored onto a ``_Dispatch`` class, which serves as a container for
13collections of listener functions. These collections are represented both
14at the class level of a particular ``_Dispatch`` class as well as within
15instances of ``_Dispatch``.
16
17"""
18from __future__ import annotations
19
20import typing
21from typing import Any
22from typing import cast
23from typing import Dict
24from typing import Generic
25from typing import Iterator
26from typing import List
27from typing import Mapping
28from typing import MutableMapping
29from typing import Optional
30from typing import overload
31from typing import Tuple
32from typing import Type
33from typing import Union
34import weakref
35
36from .attr import _ClsLevelDispatch
37from .attr import _EmptyListener
38from .attr import _InstanceLevelDispatch
39from .attr import _JoinedListener
40from .registry import _ET
41from .registry import _EventKey
42from .. import util
43from ..util.typing import Literal
44
45_registrars: MutableMapping[str, List[Type[_HasEventsDispatch[Any]]]] = (
46 util.defaultdict(list)
47)
48
49
50def _is_event_name(name: str) -> bool:
51 # _sa_event prefix is special to support internal-only event names.
52 # most event names are just plain method names that aren't
53 # underscored.
54
55 return (
56 not name.startswith("_") and name != "dispatch"
57 ) or name.startswith("_sa_event")
58
59
60class _UnpickleDispatch:
61 """Serializable callable that re-generates an instance of
62 :class:`_Dispatch` given a particular :class:`.Events` subclass.
63
64 """
65
66 def __call__(self, _instance_cls: Type[_ET]) -> _Dispatch[_ET]:
67 for cls in _instance_cls.__mro__:
68 if "dispatch" in cls.__dict__:
69 return cast(
70 "_Dispatch[_ET]", cls.__dict__["dispatch"].dispatch
71 )._for_class(_instance_cls)
72 else:
73 raise AttributeError("No class with a 'dispatch' member present.")
74
75
76class _DispatchCommon(Generic[_ET]):
77 __slots__ = ()
78
79 _instance_cls: Optional[Type[_ET]]
80
81 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]:
82 raise NotImplementedError()
83
84 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]:
85 raise NotImplementedError()
86
87 @property
88 def _events(self) -> Type[_HasEventsDispatch[_ET]]:
89 raise NotImplementedError()
90
91
92class _Dispatch(_DispatchCommon[_ET]):
93 """Mirror the event listening definitions of an Events class with
94 listener collections.
95
96 Classes which define a "dispatch" member will return a
97 non-instantiated :class:`._Dispatch` subclass when the member
98 is accessed at the class level. When the "dispatch" member is
99 accessed at the instance level of its owner, an instance
100 of the :class:`._Dispatch` class is returned.
101
102 A :class:`._Dispatch` class is generated for each :class:`.Events`
103 class defined, by the :meth:`._HasEventsDispatch._create_dispatcher_class`
104 method. The original :class:`.Events` classes remain untouched.
105 This decouples the construction of :class:`.Events` subclasses from
106 the implementation used by the event internals, and allows
107 inspecting tools like Sphinx to work in an unsurprising
108 way against the public API.
109
110 """
111
112 # "active_history" is an ORM case we add here. ideally a better
113 # system would be in place for ad-hoc attributes.
114 __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners"
115
116 _active_history: bool
117
118 _empty_listener_reg: MutableMapping[
119 Type[_ET], Dict[str, _EmptyListener[_ET]]
120 ] = weakref.WeakKeyDictionary()
121
122 _empty_listeners: Dict[str, _EmptyListener[_ET]]
123
124 _event_names: List[str]
125
126 _instance_cls: Optional[Type[_ET]]
127
128 _joined_dispatch_cls: Type[_JoinedDispatcher[_ET]]
129
130 _events: Type[_HasEventsDispatch[_ET]]
131 """reference back to the Events class.
132
133 Bidirectional against _HasEventsDispatch.dispatch
134
135 """
136
137 def __init__(
138 self,
139 parent: Optional[_Dispatch[_ET]],
140 instance_cls: Optional[Type[_ET]] = None,
141 ):
142 self._parent = parent
143 self._instance_cls = instance_cls
144
145 if instance_cls:
146 assert parent is not None
147 try:
148 self._empty_listeners = self._empty_listener_reg[instance_cls]
149 except KeyError:
150 self._empty_listeners = self._empty_listener_reg[
151 instance_cls
152 ] = {
153 ls.name: _EmptyListener(ls, instance_cls)
154 for ls in parent._event_descriptors
155 }
156 else:
157 self._empty_listeners = {}
158
159 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]:
160 # Assign EmptyListeners as attributes on demand
161 # to reduce startup time for new dispatch objects.
162 try:
163 ls = self._empty_listeners[name]
164 except KeyError:
165 raise AttributeError(name)
166 else:
167 setattr(self, ls.name, ls)
168 return ls
169
170 @property
171 def _event_descriptors(self) -> Iterator[_ClsLevelDispatch[_ET]]:
172 for k in self._event_names:
173 # Yield _ClsLevelDispatch related
174 # to relevant event name.
175 yield getattr(self, k)
176
177 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None:
178 return self._events._listen(event_key, **kw)
179
180 def _for_class(self, instance_cls: Type[_ET]) -> _Dispatch[_ET]:
181 return self.__class__(self, instance_cls)
182
183 def _for_instance(self, instance: _ET) -> _Dispatch[_ET]:
184 instance_cls = instance.__class__
185 return self._for_class(instance_cls)
186
187 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]:
188 """Create a 'join' of this :class:`._Dispatch` and another.
189
190 This new dispatcher will dispatch events to both
191 :class:`._Dispatch` objects.
192
193 """
194 assert "_joined_dispatch_cls" in self.__class__.__dict__
195
196 return self._joined_dispatch_cls(self, other)
197
198 def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
199 return _UnpickleDispatch(), (self._instance_cls,)
200
201 def _update(
202 self, other: _Dispatch[_ET], only_propagate: bool = True
203 ) -> None:
204 """Populate from the listeners in another :class:`_Dispatch`
205 object."""
206 for ls in other._event_descriptors:
207 if isinstance(ls, _EmptyListener):
208 continue
209 getattr(self, ls.name).for_modify(self)._update(
210 ls, only_propagate=only_propagate
211 )
212
213 def _clear(self) -> None:
214 for ls in self._event_descriptors:
215 ls.for_modify(self).clear()
216
217
218def _remove_dispatcher(cls: Type[_HasEventsDispatch[_ET]]) -> None:
219 for k in cls.dispatch._event_names:
220 _registrars[k].remove(cls)
221 if not _registrars[k]:
222 del _registrars[k]
223
224
225class _HasEventsDispatch(Generic[_ET]):
226 _dispatch_target: Optional[Type[_ET]]
227 """class which will receive the .dispatch collection"""
228
229 dispatch: _Dispatch[_ET]
230 """reference back to the _Dispatch class.
231
232 Bidirectional against _Dispatch._events
233
234 """
235
236 if typing.TYPE_CHECKING:
237
238 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: ...
239
240 def __init_subclass__(cls) -> None:
241 """Intercept new Event subclasses and create associated _Dispatch
242 classes."""
243
244 cls._create_dispatcher_class(cls.__name__, cls.__bases__, cls.__dict__)
245
246 @classmethod
247 def _accept_with(
248 cls, target: Union[_ET, Type[_ET]], identifier: str
249 ) -> Optional[Union[_ET, Type[_ET]]]:
250 raise NotImplementedError()
251
252 @classmethod
253 def _listen(
254 cls,
255 event_key: _EventKey[_ET],
256 *,
257 propagate: bool = False,
258 insert: bool = False,
259 named: bool = False,
260 asyncio: bool = False,
261 ) -> None:
262 raise NotImplementedError()
263
264 @staticmethod
265 def _set_dispatch(
266 klass: Type[_HasEventsDispatch[_ET]],
267 dispatch_cls: Type[_Dispatch[_ET]],
268 ) -> _Dispatch[_ET]:
269 # This allows an Events subclass to define additional utility
270 # methods made available to the target via
271 # "self.dispatch._events.<utilitymethod>"
272 # @staticmethod to allow easy "super" calls while in a metaclass
273 # constructor.
274 klass.dispatch = dispatch_cls(None)
275 dispatch_cls._events = klass
276 return klass.dispatch
277
278 @classmethod
279 def _create_dispatcher_class(
280 cls, classname: str, bases: Tuple[type, ...], dict_: Mapping[str, Any]
281 ) -> None:
282 """Create a :class:`._Dispatch` class corresponding to an
283 :class:`.Events` class."""
284
285 # there's all kinds of ways to do this,
286 # i.e. make a Dispatch class that shares the '_listen' method
287 # of the Event class, this is the straight monkeypatch.
288 if hasattr(cls, "dispatch"):
289 dispatch_base = cls.dispatch.__class__
290 else:
291 dispatch_base = _Dispatch
292
293 event_names = [k for k in dict_ if _is_event_name(k)]
294 dispatch_cls = cast(
295 "Type[_Dispatch[_ET]]",
296 type(
297 "%sDispatch" % classname,
298 (dispatch_base,),
299 {"__slots__": event_names},
300 ),
301 )
302
303 dispatch_cls._event_names = event_names
304 dispatch_inst = cls._set_dispatch(cls, dispatch_cls)
305 for k in dispatch_cls._event_names:
306 setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k]))
307 _registrars[k].append(cls)
308
309 for super_ in dispatch_cls.__bases__:
310 if issubclass(super_, _Dispatch) and super_ is not _Dispatch:
311 for ls in super_._events.dispatch._event_descriptors:
312 setattr(dispatch_inst, ls.name, ls)
313 dispatch_cls._event_names.append(ls.name)
314
315 if getattr(cls, "_dispatch_target", None):
316 dispatch_target_cls = cls._dispatch_target
317 assert dispatch_target_cls is not None
318 if (
319 hasattr(dispatch_target_cls, "__slots__")
320 and "_slots_dispatch" in dispatch_target_cls.__slots__
321 ):
322 dispatch_target_cls.dispatch = slots_dispatcher(cls)
323 else:
324 dispatch_target_cls.dispatch = dispatcher(cls)
325
326 klass = type(
327 "Joined%s" % dispatch_cls.__name__,
328 (_JoinedDispatcher,),
329 {"__slots__": event_names},
330 )
331 dispatch_cls._joined_dispatch_cls = klass
332
333 # establish pickle capability by adding it to this module
334 globals()[klass.__name__] = klass
335
336
337class _JoinedDispatcher(_DispatchCommon[_ET]):
338 """Represent a connection between two _Dispatch objects."""
339
340 __slots__ = "local", "parent", "_instance_cls"
341
342 local: _DispatchCommon[_ET]
343 parent: _DispatchCommon[_ET]
344 _instance_cls: Optional[Type[_ET]]
345
346 def __init__(
347 self, local: _DispatchCommon[_ET], parent: _DispatchCommon[_ET]
348 ):
349 self.local = local
350 self.parent = parent
351 self._instance_cls = self.local._instance_cls
352
353 def __reduce__(self) -> Any:
354 return (self.__class__, (self.local, self.parent))
355
356 def __getattr__(self, name: str) -> _JoinedListener[_ET]:
357 # Assign _JoinedListeners as attributes on demand
358 # to reduce startup time for new dispatch objects.
359 ls = getattr(self.local, name)
360 jl = _JoinedListener(self.parent, ls.name, ls)
361 setattr(self, ls.name, jl)
362 return jl
363
364 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None:
365 return self.parent._listen(event_key, **kw)
366
367 @property
368 def _events(self) -> Type[_HasEventsDispatch[_ET]]:
369 return self.parent._events
370
371
372class Events(_HasEventsDispatch[_ET]):
373 """Define event listening functions for a particular target type."""
374
375 @classmethod
376 def _accept_with(
377 cls, target: Union[_ET, Type[_ET]], identifier: str
378 ) -> Optional[Union[_ET, Type[_ET]]]:
379 def dispatch_is(*types: Type[Any]) -> bool:
380 return all(isinstance(target.dispatch, t) for t in types)
381
382 def dispatch_parent_is(t: Type[Any]) -> bool:
383 return isinstance(
384 cast("_JoinedDispatcher[_ET]", target.dispatch).parent, t
385 )
386
387 # Mapper, ClassManager, Session override this to
388 # also accept classes, scoped_sessions, sessionmakers, etc.
389 if hasattr(target, "dispatch"):
390 if (
391 dispatch_is(cls.dispatch.__class__)
392 or dispatch_is(type, cls.dispatch.__class__)
393 or (
394 dispatch_is(_JoinedDispatcher)
395 and dispatch_parent_is(cls.dispatch.__class__)
396 )
397 ):
398 return target
399
400 return None
401
402 @classmethod
403 def _listen(
404 cls,
405 event_key: _EventKey[_ET],
406 *,
407 propagate: bool = False,
408 insert: bool = False,
409 named: bool = False,
410 asyncio: bool = False,
411 ) -> None:
412 event_key.base_listen(
413 propagate=propagate, insert=insert, named=named, asyncio=asyncio
414 )
415
416 @classmethod
417 def _remove(cls, event_key: _EventKey[_ET]) -> None:
418 event_key.remove()
419
420 @classmethod
421 def _clear(cls) -> None:
422 cls.dispatch._clear()
423
424
425class dispatcher(Generic[_ET]):
426 """Descriptor used by target classes to
427 deliver the _Dispatch class at the class level
428 and produce new _Dispatch instances for target
429 instances.
430
431 """
432
433 def __init__(self, events: Type[_HasEventsDispatch[_ET]]):
434 self.dispatch = events.dispatch
435 self.events = events
436
437 @overload
438 def __get__(
439 self, obj: Literal[None], cls: Type[Any]
440 ) -> Type[_Dispatch[_ET]]: ...
441
442 @overload
443 def __get__(self, obj: Any, cls: Type[Any]) -> _DispatchCommon[_ET]: ...
444
445 def __get__(self, obj: Any, cls: Type[Any]) -> Any:
446 if obj is None:
447 return self.dispatch
448
449 disp = self.dispatch._for_instance(obj)
450 try:
451 obj.__dict__["dispatch"] = disp
452 except AttributeError as ae:
453 raise TypeError(
454 "target %r doesn't have __dict__, should it be "
455 "defining _slots_dispatch?" % (obj,)
456 ) from ae
457 return disp
458
459
460class slots_dispatcher(dispatcher[_ET]):
461 def __get__(self, obj: Any, cls: Type[Any]) -> Any:
462 if obj is None:
463 return self.dispatch
464
465 if hasattr(obj, "_slots_dispatch"):
466 return obj._slots_dispatch
467
468 disp = self.dispatch._for_instance(obj)
469 obj._slots_dispatch = disp
470 return disp