1# event/attr.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Attribute implementation for _Dispatch classes.
9
10The various listener targets for a particular event class are represented
11as attributes, which refer to collections of listeners to be fired off.
12These collections can exist at the class level as well as at the instance
13level. An event is fired off using code like this::
14
15 some_object.dispatch.first_connect(arg1, arg2)
16
17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and
18``first_connect`` is typically an instance of ``_ListenerCollection``
19if event listeners are present, or ``_EmptyListener`` if none are present.
20
21The attribute mechanics here spend effort trying to ensure listener functions
22are available with a minimum of function call overhead, that unnecessary
23objects aren't created (i.e. many empty per-instance listener collections),
24as well as that everything is garbage collectable when owning references are
25lost. Other features such as "propagation" of listener functions across
26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances,
27as well as support for subclass propagation (e.g. events assigned to
28``Pool`` vs. ``QueuePool``) are all implemented here.
29
30"""
31
32from __future__ import annotations
33
34import collections
35from itertools import chain
36import threading
37from types import TracebackType
38import typing
39from typing import Any
40from typing import cast
41from typing import Collection
42from typing import Deque
43from typing import FrozenSet
44from typing import Generic
45from typing import Iterator
46from typing import MutableMapping
47from typing import MutableSequence
48from typing import NoReturn
49from typing import Optional
50from typing import Sequence
51from typing import Set
52from typing import Tuple
53from typing import Type
54from typing import TypeVar
55from typing import Union
56import weakref
57
58from . import legacy
59from . import registry
60from .registry import _ET
61from .registry import _EventKey
62from .registry import _ListenerFnType
63from .. import exc
64from .. import util
65from ..util.concurrency import AsyncAdaptedLock
66from ..util.typing import Protocol
67
68_T = TypeVar("_T", bound=Any)
69
70if typing.TYPE_CHECKING:
71 from .base import _Dispatch
72 from .base import _DispatchCommon
73 from .base import _HasEventsDispatch
74
75
76class RefCollection(util.MemoizedSlots, Generic[_ET]):
77 __slots__ = ("ref",)
78
79 ref: weakref.ref[RefCollection[_ET]]
80
81 def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]:
82 return weakref.ref(self, registry._collection_gced)
83
84
85class _empty_collection(Collection[_T]):
86 def append(self, element: _T) -> None:
87 pass
88
89 def appendleft(self, element: _T) -> None:
90 pass
91
92 def extend(self, other: Sequence[_T]) -> None:
93 pass
94
95 def remove(self, element: _T) -> None:
96 pass
97
98 def __contains__(self, element: Any) -> bool:
99 return False
100
101 def __iter__(self) -> Iterator[_T]:
102 return iter([])
103
104 def clear(self) -> None:
105 pass
106
107 def __len__(self) -> int:
108 return 0
109
110
111_ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]]
112
113
114class _ClsLevelDispatch(RefCollection[_ET]):
115 """Class-level events on :class:`._Dispatch` classes."""
116
117 __slots__ = (
118 "clsname",
119 "name",
120 "arg_names",
121 "has_kw",
122 "legacy_signatures",
123 "_clslevel",
124 "__weakref__",
125 )
126
127 clsname: str
128 name: str
129 arg_names: Sequence[str]
130 has_kw: bool
131 legacy_signatures: MutableSequence[legacy._LegacySignatureType]
132 _clslevel: MutableMapping[
133 Type[_ET], _ListenerFnSequenceType[_ListenerFnType]
134 ]
135
136 def __init__(
137 self,
138 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
139 fn: _ListenerFnType,
140 ):
141 self.name = fn.__name__
142 self.clsname = parent_dispatch_cls.__name__
143 argspec = util.inspect_getfullargspec(fn)
144 self.arg_names = argspec.args[1:]
145 self.has_kw = bool(argspec.varkw)
146 self.legacy_signatures = list(
147 reversed(
148 sorted(
149 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0]
150 )
151 )
152 )
153 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
154
155 self._clslevel = weakref.WeakKeyDictionary()
156
157 def _adjust_fn_spec(
158 self, fn: _ListenerFnType, named: bool
159 ) -> _ListenerFnType:
160 if named:
161 fn = self._wrap_fn_for_kw(fn)
162 if self.legacy_signatures:
163 try:
164 argspec = util.get_callable_argspec(fn, no_self=True)
165 except TypeError:
166 pass
167 else:
168 fn = legacy._wrap_fn_for_legacy(self, fn, argspec)
169 return fn
170
171 def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType:
172 def wrap_kw(*args: Any, **kw: Any) -> Any:
173 argdict = dict(zip(self.arg_names, args))
174 argdict.update(kw)
175 return fn(**argdict)
176
177 return wrap_kw
178
179 def _do_insert_or_append(
180 self, event_key: _EventKey[_ET], is_append: bool
181 ) -> None:
182 target = event_key.dispatch_target
183 assert isinstance(
184 target, type
185 ), "Class-level Event targets must be classes."
186 if not getattr(target, "_sa_propagate_class_events", True):
187 raise exc.InvalidRequestError(
188 f"Can't assign an event directly to the {target} class"
189 )
190
191 cls: Type[_ET]
192
193 for cls in util.walk_subclasses(target):
194 if cls is not target and cls not in self._clslevel:
195 self.update_subclass(cls)
196 else:
197 if cls not in self._clslevel:
198 self.update_subclass(cls)
199 if is_append:
200 self._clslevel[cls].append(event_key._listen_fn)
201 else:
202 self._clslevel[cls].appendleft(event_key._listen_fn)
203 registry._stored_in_collection(event_key, self)
204
205 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
206 self._do_insert_or_append(event_key, is_append=False)
207
208 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
209 self._do_insert_or_append(event_key, is_append=True)
210
211 def update_subclass(self, target: Type[_ET]) -> None:
212 if target not in self._clslevel:
213 if getattr(target, "_sa_propagate_class_events", True):
214 self._clslevel[target] = collections.deque()
215 else:
216 self._clslevel[target] = _empty_collection()
217
218 clslevel = self._clslevel[target]
219 cls: Type[_ET]
220 for cls in target.__mro__[1:]:
221 if cls in self._clslevel:
222 clslevel.extend(
223 [fn for fn in self._clslevel[cls] if fn not in clslevel]
224 )
225
226 def remove(self, event_key: _EventKey[_ET]) -> None:
227 target = event_key.dispatch_target
228 cls: Type[_ET]
229 for cls in util.walk_subclasses(target):
230 if cls in self._clslevel:
231 self._clslevel[cls].remove(event_key._listen_fn)
232 registry._removed_from_collection(event_key, self)
233
234 def clear(self) -> None:
235 """Clear all class level listeners"""
236
237 to_clear: Set[_ListenerFnType] = set()
238 for dispatcher in self._clslevel.values():
239 to_clear.update(dispatcher)
240 dispatcher.clear()
241 registry._clear(self, to_clear)
242
243 def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]:
244 """Return an event collection which can be modified.
245
246 For _ClsLevelDispatch at the class level of
247 a dispatcher, this returns self.
248
249 """
250 return self
251
252
253class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]):
254 __slots__ = ()
255
256 parent: _ClsLevelDispatch[_ET]
257
258 def _adjust_fn_spec(
259 self, fn: _ListenerFnType, named: bool
260 ) -> _ListenerFnType:
261 return self.parent._adjust_fn_spec(fn, named)
262
263 def __contains__(self, item: Any) -> bool:
264 raise NotImplementedError()
265
266 def __len__(self) -> int:
267 raise NotImplementedError()
268
269 def __iter__(self) -> Iterator[_ListenerFnType]:
270 raise NotImplementedError()
271
272 def __bool__(self) -> bool:
273 raise NotImplementedError()
274
275 def exec_once(self, *args: Any, **kw: Any) -> None:
276 raise NotImplementedError()
277
278 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
279 raise NotImplementedError()
280
281 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
282 raise NotImplementedError()
283
284 def __call__(self, *args: Any, **kw: Any) -> None:
285 raise NotImplementedError()
286
287 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
288 raise NotImplementedError()
289
290 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
291 raise NotImplementedError()
292
293 def remove(self, event_key: _EventKey[_ET]) -> None:
294 raise NotImplementedError()
295
296 def for_modify(
297 self, obj: _DispatchCommon[_ET]
298 ) -> _InstanceLevelDispatch[_ET]:
299 """Return an event collection which can be modified.
300
301 For _ClsLevelDispatch at the class level of
302 a dispatcher, this returns self.
303
304 """
305 return self
306
307
308class _EmptyListener(_InstanceLevelDispatch[_ET]):
309 """Serves as a proxy interface to the events
310 served by a _ClsLevelDispatch, when there are no
311 instance-level events present.
312
313 Is replaced by _ListenerCollection when instance-level
314 events are added.
315
316 """
317
318 __slots__ = "parent", "parent_listeners", "name"
319
320 propagate: FrozenSet[_ListenerFnType] = frozenset()
321 listeners: Tuple[()] = ()
322 parent: _ClsLevelDispatch[_ET]
323 parent_listeners: _ListenerFnSequenceType[_ListenerFnType]
324 name: str
325
326 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
327 if target_cls not in parent._clslevel:
328 parent.update_subclass(target_cls)
329 self.parent = parent
330 self.parent_listeners = parent._clslevel[target_cls]
331 self.name = parent.name
332
333 def for_modify(
334 self, obj: _DispatchCommon[_ET]
335 ) -> _ListenerCollection[_ET]:
336 """Return an event collection which can be modified.
337
338 For _EmptyListener at the instance level of
339 a dispatcher, this generates a new
340 _ListenerCollection, applies it to the instance,
341 and returns it.
342
343 """
344 obj = cast("_Dispatch[_ET]", obj)
345
346 assert obj._instance_cls is not None
347 existing = getattr(obj, self.name)
348
349 with util.mini_gil:
350 if existing is self or isinstance(existing, _JoinedListener):
351 result = _ListenerCollection(self.parent, obj._instance_cls)
352 else:
353 # this codepath is an extremely rare race condition
354 # that has been observed in test_pool.py->test_timeout_race
355 # with freethreaded.
356 assert isinstance(existing, _ListenerCollection)
357 return existing
358
359 if existing is self:
360 setattr(obj, self.name, result)
361 return result
362
363 def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn:
364 raise NotImplementedError("need to call for_modify()")
365
366 def exec_once(self, *args: Any, **kw: Any) -> NoReturn:
367 self._needs_modify(*args, **kw)
368
369 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn:
370 self._needs_modify(*args, **kw)
371
372 def insert(self, *args: Any, **kw: Any) -> NoReturn:
373 self._needs_modify(*args, **kw)
374
375 def append(self, *args: Any, **kw: Any) -> NoReturn:
376 self._needs_modify(*args, **kw)
377
378 def remove(self, *args: Any, **kw: Any) -> NoReturn:
379 self._needs_modify(*args, **kw)
380
381 def clear(self, *args: Any, **kw: Any) -> NoReturn:
382 self._needs_modify(*args, **kw)
383
384 def __call__(self, *args: Any, **kw: Any) -> None:
385 """Execute this event."""
386
387 for fn in self.parent_listeners:
388 fn(*args, **kw)
389
390 def __contains__(self, item: Any) -> bool:
391 return item in self.parent_listeners
392
393 def __len__(self) -> int:
394 return len(self.parent_listeners)
395
396 def __iter__(self) -> Iterator[_ListenerFnType]:
397 return iter(self.parent_listeners)
398
399 def __bool__(self) -> bool:
400 return bool(self.parent_listeners)
401
402
403class _MutexProtocol(Protocol):
404 def __enter__(self) -> bool: ...
405
406 def __exit__(
407 self,
408 exc_type: Optional[Type[BaseException]],
409 exc_val: Optional[BaseException],
410 exc_tb: Optional[TracebackType],
411 ) -> Optional[bool]: ...
412
413
414class _CompoundListener(_InstanceLevelDispatch[_ET]):
415 __slots__ = (
416 "_exec_once_mutex",
417 "_exec_once",
418 "_exec_w_sync_once",
419 "_is_asyncio",
420 )
421
422 _exec_once_mutex: Optional[_MutexProtocol]
423 parent_listeners: Collection[_ListenerFnType]
424 listeners: Collection[_ListenerFnType]
425 _exec_once: bool
426 _exec_w_sync_once: bool
427
428 def __init__(self, *arg: Any, **kw: Any):
429 super().__init__(*arg, **kw)
430 self._is_asyncio = False
431
432 def _set_asyncio(self) -> None:
433 self._is_asyncio = True
434
435 def _get_exec_once_mutex(self) -> _MutexProtocol:
436 with util.mini_gil:
437 if self._exec_once_mutex is not None:
438 return self._exec_once_mutex
439
440 if self._is_asyncio:
441 mutex = AsyncAdaptedLock()
442 else:
443 mutex = threading.Lock() # type: ignore[assignment]
444 self._exec_once_mutex = mutex
445
446 return mutex
447
448 def _exec_once_impl(
449 self, retry_on_exception: bool, *args: Any, **kw: Any
450 ) -> None:
451 with self._get_exec_once_mutex():
452 if not self._exec_once:
453 try:
454 self(*args, **kw)
455 exception = False
456 except:
457 exception = True
458 raise
459 finally:
460 if not exception or not retry_on_exception:
461 self._exec_once = True
462
463 def exec_once(self, *args: Any, **kw: Any) -> None:
464 """Execute this event, but only if it has not been
465 executed already for this collection."""
466
467 if not self._exec_once:
468 self._exec_once_impl(False, *args, **kw)
469
470 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
471 """Execute this event, but only if it has not been
472 executed already for this collection, or was called
473 by a previous exec_once_unless_exception call and
474 raised an exception.
475
476 If exec_once was already called, then this method will never run
477 the callable regardless of whether it raised or not.
478
479 .. versionadded:: 1.3.8
480
481 """
482 if not self._exec_once:
483 self._exec_once_impl(True, *args, **kw)
484
485 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
486 """Execute this event, and use a mutex if it has not been
487 executed already for this collection, or was called
488 by a previous _exec_w_sync_on_first_run call and
489 raised an exception.
490
491 If _exec_w_sync_on_first_run was already called and didn't raise an
492 exception, then a mutex is not used. It's not guaranteed
493 the mutex won't be used more than once in the case of very rare
494 race conditions.
495
496 .. versionadded:: 1.4.11
497
498 """
499 if not self._exec_w_sync_once:
500 with self._get_exec_once_mutex():
501 try:
502 self(*args, **kw)
503 except:
504 raise
505 else:
506 self._exec_w_sync_once = True
507 else:
508 self(*args, **kw)
509
510 def __call__(self, *args: Any, **kw: Any) -> None:
511 """Execute this event."""
512
513 for fn in self.parent_listeners:
514 fn(*args, **kw)
515 for fn in self.listeners:
516 fn(*args, **kw)
517
518 def __contains__(self, item: Any) -> bool:
519 return item in self.parent_listeners or item in self.listeners
520
521 def __len__(self) -> int:
522 return len(self.parent_listeners) + len(self.listeners)
523
524 def __iter__(self) -> Iterator[_ListenerFnType]:
525 return chain(self.parent_listeners, self.listeners)
526
527 def __bool__(self) -> bool:
528 return bool(self.listeners or self.parent_listeners)
529
530
531class _ListenerCollection(_CompoundListener[_ET]):
532 """Instance-level attributes on instances of :class:`._Dispatch`.
533
534 Represents a collection of listeners.
535
536 As of 0.7.9, _ListenerCollection is only first
537 created via the _EmptyListener.for_modify() method.
538
539 """
540
541 __slots__ = (
542 "parent_listeners",
543 "parent",
544 "name",
545 "listeners",
546 "propagate",
547 "__weakref__",
548 )
549
550 parent_listeners: Collection[_ListenerFnType]
551 parent: _ClsLevelDispatch[_ET]
552 name: str
553 listeners: Deque[_ListenerFnType]
554 propagate: Set[_ListenerFnType]
555
556 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
557 super().__init__()
558 if target_cls not in parent._clslevel:
559 parent.update_subclass(target_cls)
560 self._exec_once = False
561 self._exec_w_sync_once = False
562 self._exec_once_mutex = None
563 self.parent_listeners = parent._clslevel[target_cls]
564 self.parent = parent
565 self.name = parent.name
566 self.listeners = collections.deque()
567 self.propagate = set()
568
569 def for_modify(
570 self, obj: _DispatchCommon[_ET]
571 ) -> _ListenerCollection[_ET]:
572 """Return an event collection which can be modified.
573
574 For _ListenerCollection at the instance level of
575 a dispatcher, this returns self.
576
577 """
578 return self
579
580 def _update(
581 self, other: _ListenerCollection[_ET], only_propagate: bool = True
582 ) -> None:
583 """Populate from the listeners in another :class:`_Dispatch`
584 object."""
585 existing_listeners = self.listeners
586 existing_listener_set = set(existing_listeners)
587 self.propagate.update(other.propagate)
588 other_listeners = [
589 l
590 for l in other.listeners
591 if l not in existing_listener_set
592 and not only_propagate
593 or l in self.propagate
594 ]
595
596 existing_listeners.extend(other_listeners)
597
598 if other._is_asyncio:
599 self._set_asyncio()
600
601 to_associate = other.propagate.union(other_listeners)
602 registry._stored_in_collection_multi(self, other, to_associate)
603
604 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
605 if event_key.prepend_to_list(self, self.listeners):
606 if propagate:
607 self.propagate.add(event_key._listen_fn)
608
609 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
610 if event_key.append_to_list(self, self.listeners):
611 if propagate:
612 self.propagate.add(event_key._listen_fn)
613
614 def remove(self, event_key: _EventKey[_ET]) -> None:
615 self.listeners.remove(event_key._listen_fn)
616 self.propagate.discard(event_key._listen_fn)
617 registry._removed_from_collection(event_key, self)
618
619 def clear(self) -> None:
620 registry._clear(self, self.listeners)
621 self.propagate.clear()
622 self.listeners.clear()
623
624
625class _JoinedListener(_CompoundListener[_ET]):
626 __slots__ = "parent_dispatch", "name", "local", "parent_listeners"
627
628 parent_dispatch: _DispatchCommon[_ET]
629 name: str
630 local: _InstanceLevelDispatch[_ET]
631 parent_listeners: Collection[_ListenerFnType]
632
633 def __init__(
634 self,
635 parent_dispatch: _DispatchCommon[_ET],
636 name: str,
637 local: _EmptyListener[_ET],
638 ):
639 self._exec_once = False
640 self._exec_w_sync_once = False
641 self._exec_once_mutex = None
642 self.parent_dispatch = parent_dispatch
643 self.name = name
644 self.local = local
645 self.parent_listeners = self.local
646
647 if not typing.TYPE_CHECKING:
648 # first error, I don't really understand:
649 # Signature of "listeners" incompatible with
650 # supertype "_CompoundListener" [override]
651 # the name / return type are exactly the same
652 # second error is getattr_isn't typed, the cast() here
653 # adds too much method overhead
654 @property
655 def listeners(self) -> Collection[_ListenerFnType]:
656 return getattr(self.parent_dispatch, self.name)
657
658 def _adjust_fn_spec(
659 self, fn: _ListenerFnType, named: bool
660 ) -> _ListenerFnType:
661 return self.local._adjust_fn_spec(fn, named)
662
663 def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]:
664 self.local = self.parent_listeners = self.local.for_modify(obj)
665 return self
666
667 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
668 self.local.insert(event_key, propagate)
669
670 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
671 self.local.append(event_key, propagate)
672
673 def remove(self, event_key: _EventKey[_ET]) -> None:
674 self.local.remove(event_key)
675
676 def clear(self) -> None:
677 raise NotImplementedError()