1# event/attr.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Attribute implementation for _Dispatch classes.
9
10The various listener targets for a particular event class are represented
11as attributes, which refer to collections of listeners to be fired off.
12These collections can exist at the class level as well as at the instance
13level. An event is fired off using code like this::
14
15 some_object.dispatch.first_connect(arg1, arg2)
16
17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and
18``first_connect`` is typically an instance of ``_ListenerCollection``
19if event listeners are present, or ``_EmptyListener`` if none are present.
20
21The attribute mechanics here spend effort trying to ensure listener functions
22are available with a minimum of function call overhead, that unnecessary
23objects aren't created (i.e. many empty per-instance listener collections),
24as well as that everything is garbage collectable when owning references are
25lost. Other features such as "propagation" of listener functions across
26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances,
27as well as support for subclass propagation (e.g. events assigned to
28``Pool`` vs. ``QueuePool``) are all implemented here.
29
30"""
31
32from __future__ import annotations
33
34import collections
35from itertools import chain
36import threading
37from types import TracebackType
38import typing
39from typing import Any
40from typing import cast
41from typing import Collection
42from typing import Deque
43from typing import FrozenSet
44from typing import Generic
45from typing import Iterator
46from typing import MutableMapping
47from typing import MutableSequence
48from typing import NoReturn
49from typing import Optional
50from typing import Protocol
51from typing import Sequence
52from typing import Set
53from typing import Tuple
54from typing import Type
55from typing import TypeVar
56from typing import Union
57import weakref
58
59from . import legacy
60from . import registry
61from .registry import _ET
62from .registry import _EventKey
63from .registry import _ListenerFnType
64from .. import exc
65from .. import util
66from ..util.concurrency import AsyncAdaptedLock
67
68_T = TypeVar("_T", bound=Any)
69
70if typing.TYPE_CHECKING:
71 from .base import _Dispatch
72 from .base import _DispatchCommon
73 from .base import _HasEventsDispatch
74
75
76class RefCollection(util.MemoizedSlots, Generic[_ET]):
77 __slots__ = ("ref",)
78
79 ref: weakref.ref[RefCollection[_ET]]
80
81 def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]:
82 return weakref.ref(self, registry._collection_gced)
83
84
85class _empty_collection(Collection[_T]):
86 def append(self, element: _T) -> None:
87 pass
88
89 def appendleft(self, element: _T) -> None:
90 pass
91
92 def extend(self, other: Sequence[_T]) -> None:
93 pass
94
95 def remove(self, element: _T) -> None:
96 pass
97
98 def __contains__(self, element: Any) -> bool:
99 return False
100
101 def __iter__(self) -> Iterator[_T]:
102 return iter([])
103
104 def clear(self) -> None:
105 pass
106
107 def __len__(self) -> int:
108 return 0
109
110
111_ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]]
112
113
114class _ClsLevelDispatch(RefCollection[_ET]):
115 """Class-level events on :class:`._Dispatch` classes."""
116
117 __slots__ = (
118 "clsname",
119 "name",
120 "arg_names",
121 "has_kw",
122 "legacy_signatures",
123 "_clslevel",
124 "__weakref__",
125 )
126
127 clsname: str
128 name: str
129 arg_names: Sequence[str]
130 has_kw: bool
131 legacy_signatures: MutableSequence[legacy._LegacySignatureType]
132 _clslevel: MutableMapping[
133 Type[_ET], _ListenerFnSequenceType[_ListenerFnType]
134 ]
135
136 def __init__(
137 self,
138 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
139 fn: _ListenerFnType,
140 ):
141 self.name = fn.__name__
142 self.clsname = parent_dispatch_cls.__name__
143 argspec = util.inspect_getfullargspec(fn)
144 self.arg_names = argspec.args[1:]
145 self.has_kw = bool(argspec.varkw)
146 self.legacy_signatures = list(
147 reversed(
148 sorted(
149 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0]
150 )
151 )
152 )
153 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
154
155 self._clslevel = weakref.WeakKeyDictionary()
156
157 def _adjust_fn_spec(
158 self, fn: _ListenerFnType, named: bool
159 ) -> _ListenerFnType:
160 if named:
161 fn = self._wrap_fn_for_kw(fn)
162 if self.legacy_signatures:
163 try:
164 argspec = util.get_callable_argspec(fn, no_self=True)
165 except TypeError:
166 pass
167 else:
168 fn = legacy._wrap_fn_for_legacy(self, fn, argspec)
169 return fn
170
171 def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType:
172 def wrap_kw(*args: Any, **kw: Any) -> Any:
173 argdict = dict(zip(self.arg_names, args))
174 argdict.update(kw)
175 return fn(**argdict)
176
177 return wrap_kw
178
179 def _do_insert_or_append(
180 self, event_key: _EventKey[_ET], is_append: bool
181 ) -> None:
182 target = event_key.dispatch_target
183 assert isinstance(
184 target, type
185 ), "Class-level Event targets must be classes."
186 if not getattr(target, "_sa_propagate_class_events", True):
187 raise exc.InvalidRequestError(
188 f"Can't assign an event directly to the {target} class"
189 )
190
191 cls: Type[_ET]
192
193 for cls in util.walk_subclasses(target):
194 if cls is not target and cls not in self._clslevel:
195 self.update_subclass(cls)
196 else:
197 if cls not in self._clslevel:
198 self.update_subclass(cls)
199 if is_append:
200 self._clslevel[cls].append(event_key._listen_fn)
201 else:
202 self._clslevel[cls].appendleft(event_key._listen_fn)
203 registry._stored_in_collection(event_key, self)
204
205 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
206 self._do_insert_or_append(event_key, is_append=False)
207
208 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
209 self._do_insert_or_append(event_key, is_append=True)
210
211 def update_subclass(self, target: Type[_ET]) -> None:
212 if target not in self._clslevel:
213 if getattr(target, "_sa_propagate_class_events", True):
214 self._clslevel[target] = collections.deque()
215 else:
216 self._clslevel[target] = _empty_collection()
217
218 clslevel = self._clslevel[target]
219 cls: Type[_ET]
220 for cls in target.__mro__[1:]:
221 if cls in self._clslevel:
222 clslevel.extend(
223 [fn for fn in self._clslevel[cls] if fn not in clslevel]
224 )
225
226 def remove(self, event_key: _EventKey[_ET]) -> None:
227 target = event_key.dispatch_target
228 cls: Type[_ET]
229 for cls in util.walk_subclasses(target):
230 if cls in self._clslevel:
231 self._clslevel[cls].remove(event_key._listen_fn)
232 registry._removed_from_collection(event_key, self)
233
234 def clear(self) -> None:
235 """Clear all class level listeners"""
236
237 to_clear: Set[_ListenerFnType] = set()
238 for dispatcher in self._clslevel.values():
239 to_clear.update(dispatcher)
240 dispatcher.clear()
241 registry._clear(self, to_clear)
242
243 def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]:
244 """Return an event collection which can be modified.
245
246 For _ClsLevelDispatch at the class level of
247 a dispatcher, this returns self.
248
249 """
250 return self
251
252
253class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]):
254 __slots__ = ()
255
256 parent: _ClsLevelDispatch[_ET]
257
258 def _adjust_fn_spec(
259 self, fn: _ListenerFnType, named: bool
260 ) -> _ListenerFnType:
261 return self.parent._adjust_fn_spec(fn, named)
262
263 def __contains__(self, item: Any) -> bool:
264 raise NotImplementedError()
265
266 def __len__(self) -> int:
267 raise NotImplementedError()
268
269 def __iter__(self) -> Iterator[_ListenerFnType]:
270 raise NotImplementedError()
271
272 def __bool__(self) -> bool:
273 raise NotImplementedError()
274
275 def exec_once(self, *args: Any, **kw: Any) -> None:
276 raise NotImplementedError()
277
278 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
279 raise NotImplementedError()
280
281 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
282 raise NotImplementedError()
283
284 def __call__(self, *args: Any, **kw: Any) -> None:
285 raise NotImplementedError()
286
287 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
288 raise NotImplementedError()
289
290 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
291 raise NotImplementedError()
292
293 def remove(self, event_key: _EventKey[_ET]) -> None:
294 raise NotImplementedError()
295
296 def for_modify(
297 self, obj: _DispatchCommon[_ET]
298 ) -> _InstanceLevelDispatch[_ET]:
299 """Return an event collection which can be modified.
300
301 For _ClsLevelDispatch at the class level of
302 a dispatcher, this returns self.
303
304 """
305 return self
306
307
308class _EmptyListener(_InstanceLevelDispatch[_ET]):
309 """Serves as a proxy interface to the events
310 served by a _ClsLevelDispatch, when there are no
311 instance-level events present.
312
313 Is replaced by _ListenerCollection when instance-level
314 events are added.
315
316 """
317
318 __slots__ = "parent", "parent_listeners", "name"
319
320 propagate: FrozenSet[_ListenerFnType] = frozenset()
321 listeners: Tuple[()] = ()
322 parent: _ClsLevelDispatch[_ET]
323 parent_listeners: _ListenerFnSequenceType[_ListenerFnType]
324 name: str
325
326 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
327 if target_cls not in parent._clslevel:
328 parent.update_subclass(target_cls)
329 self.parent = parent
330 self.parent_listeners = parent._clslevel[target_cls]
331 self.name = parent.name
332
333 def for_modify(
334 self, obj: _DispatchCommon[_ET]
335 ) -> _ListenerCollection[_ET]:
336 """Return an event collection which can be modified.
337
338 For _EmptyListener at the instance level of
339 a dispatcher, this generates a new
340 _ListenerCollection, applies it to the instance,
341 and returns it.
342
343 """
344 obj = cast("_Dispatch[_ET]", obj)
345
346 assert obj._instance_cls is not None
347 existing = getattr(obj, self.name)
348
349 with util.mini_gil:
350 if existing is self or isinstance(existing, _JoinedListener):
351 result = _ListenerCollection(self.parent, obj._instance_cls)
352 else:
353 # this codepath is an extremely rare race condition
354 # that has been observed in test_pool.py->test_timeout_race
355 # with freethreaded.
356 assert isinstance(existing, _ListenerCollection)
357 return existing
358
359 if existing is self:
360 setattr(obj, self.name, result)
361 return result
362
363 def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn:
364 raise NotImplementedError("need to call for_modify()")
365
366 def exec_once(self, *args: Any, **kw: Any) -> NoReturn:
367 self._needs_modify(*args, **kw)
368
369 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn:
370 self._needs_modify(*args, **kw)
371
372 def insert(self, *args: Any, **kw: Any) -> NoReturn:
373 self._needs_modify(*args, **kw)
374
375 def append(self, *args: Any, **kw: Any) -> NoReturn:
376 self._needs_modify(*args, **kw)
377
378 def remove(self, *args: Any, **kw: Any) -> NoReturn:
379 self._needs_modify(*args, **kw)
380
381 def clear(self, *args: Any, **kw: Any) -> NoReturn:
382 self._needs_modify(*args, **kw)
383
384 def __call__(self, *args: Any, **kw: Any) -> None:
385 """Execute this event."""
386
387 for fn in self.parent_listeners:
388 fn(*args, **kw)
389
390 def __contains__(self, item: Any) -> bool:
391 return item in self.parent_listeners
392
393 def __len__(self) -> int:
394 return len(self.parent_listeners)
395
396 def __iter__(self) -> Iterator[_ListenerFnType]:
397 return iter(self.parent_listeners)
398
399 def __bool__(self) -> bool:
400 return bool(self.parent_listeners)
401
402
403class _MutexProtocol(Protocol):
404 def __enter__(self) -> bool: ...
405
406 def __exit__(
407 self,
408 exc_type: Optional[Type[BaseException]],
409 exc_val: Optional[BaseException],
410 exc_tb: Optional[TracebackType],
411 ) -> Optional[bool]: ...
412
413
414class _CompoundListener(_InstanceLevelDispatch[_ET]):
415 __slots__ = (
416 "_exec_once_mutex",
417 "_exec_once",
418 "_exec_w_sync_once",
419 "_is_asyncio",
420 )
421
422 _exec_once_mutex: Optional[_MutexProtocol]
423 parent_listeners: Collection[_ListenerFnType]
424 listeners: Collection[_ListenerFnType]
425 _exec_once: bool
426 _exec_w_sync_once: bool
427
428 def __init__(self, *arg: Any, **kw: Any):
429 super().__init__(*arg, **kw)
430 self._is_asyncio = False
431
432 def _set_asyncio(self) -> None:
433 self._is_asyncio = True
434
435 def _get_exec_once_mutex(self) -> _MutexProtocol:
436 with util.mini_gil:
437 if self._exec_once_mutex is not None:
438 return self._exec_once_mutex
439
440 if self._is_asyncio:
441 mutex = AsyncAdaptedLock()
442 else:
443 mutex = threading.Lock() # type: ignore[assignment]
444 self._exec_once_mutex = mutex
445
446 return mutex
447
448 def _exec_once_impl(
449 self, retry_on_exception: bool, *args: Any, **kw: Any
450 ) -> None:
451 with self._get_exec_once_mutex():
452 if not self._exec_once:
453 try:
454 self(*args, **kw)
455 exception = False
456 except:
457 exception = True
458 raise
459 finally:
460 if not exception or not retry_on_exception:
461 self._exec_once = True
462
463 def exec_once(self, *args: Any, **kw: Any) -> None:
464 """Execute this event, but only if it has not been
465 executed already for this collection."""
466
467 if not self._exec_once:
468 self._exec_once_impl(False, *args, **kw)
469
470 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
471 """Execute this event, but only if it has not been
472 executed already for this collection, or was called
473 by a previous exec_once_unless_exception call and
474 raised an exception.
475
476 If exec_once was already called, then this method will never run
477 the callable regardless of whether it raised or not.
478
479 """
480 if not self._exec_once:
481 self._exec_once_impl(True, *args, **kw)
482
483 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
484 """Execute this event, and use a mutex if it has not been
485 executed already for this collection, or was called
486 by a previous _exec_w_sync_on_first_run call and
487 raised an exception.
488
489 If _exec_w_sync_on_first_run was already called and didn't raise an
490 exception, then a mutex is not used. It's not guaranteed
491 the mutex won't be used more than once in the case of very rare
492 race conditions.
493
494 .. versionadded:: 1.4.11
495
496 """
497 if not self._exec_w_sync_once:
498 with self._get_exec_once_mutex():
499 try:
500 self(*args, **kw)
501 except:
502 raise
503 else:
504 self._exec_w_sync_once = True
505 else:
506 self(*args, **kw)
507
508 def __call__(self, *args: Any, **kw: Any) -> None:
509 """Execute this event."""
510
511 for fn in self.parent_listeners:
512 fn(*args, **kw)
513 for fn in self.listeners:
514 fn(*args, **kw)
515
516 def __contains__(self, item: Any) -> bool:
517 return item in self.parent_listeners or item in self.listeners
518
519 def __len__(self) -> int:
520 return len(self.parent_listeners) + len(self.listeners)
521
522 def __iter__(self) -> Iterator[_ListenerFnType]:
523 return chain(self.parent_listeners, self.listeners)
524
525 def __bool__(self) -> bool:
526 return bool(self.listeners or self.parent_listeners)
527
528
529class _ListenerCollection(_CompoundListener[_ET]):
530 """Instance-level attributes on instances of :class:`._Dispatch`.
531
532 Represents a collection of listeners.
533
534 As of 0.7.9, _ListenerCollection is only first
535 created via the _EmptyListener.for_modify() method.
536
537 """
538
539 __slots__ = (
540 "parent_listeners",
541 "parent",
542 "name",
543 "listeners",
544 "propagate",
545 "__weakref__",
546 )
547
548 parent_listeners: Collection[_ListenerFnType]
549 parent: _ClsLevelDispatch[_ET]
550 name: str
551 listeners: Deque[_ListenerFnType]
552 propagate: Set[_ListenerFnType]
553
554 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
555 super().__init__()
556 if target_cls not in parent._clslevel:
557 parent.update_subclass(target_cls)
558 self._exec_once = False
559 self._exec_w_sync_once = False
560 self._exec_once_mutex = None
561 self.parent_listeners = parent._clslevel[target_cls]
562 self.parent = parent
563 self.name = parent.name
564 self.listeners = collections.deque()
565 self.propagate = set()
566
567 def for_modify(
568 self, obj: _DispatchCommon[_ET]
569 ) -> _ListenerCollection[_ET]:
570 """Return an event collection which can be modified.
571
572 For _ListenerCollection at the instance level of
573 a dispatcher, this returns self.
574
575 """
576 return self
577
578 def _update(
579 self, other: _ListenerCollection[_ET], only_propagate: bool = True
580 ) -> None:
581 """Populate from the listeners in another :class:`_Dispatch`
582 object."""
583 existing_listeners = self.listeners
584 existing_listener_set = set(existing_listeners)
585 self.propagate.update(other.propagate)
586 other_listeners = [
587 l
588 for l in other.listeners
589 if l not in existing_listener_set
590 and not only_propagate
591 or l in self.propagate
592 ]
593
594 existing_listeners.extend(other_listeners)
595
596 if other._is_asyncio:
597 self._set_asyncio()
598
599 to_associate = other.propagate.union(other_listeners)
600 registry._stored_in_collection_multi(self, other, to_associate)
601
602 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
603 if event_key.prepend_to_list(self, self.listeners):
604 if propagate:
605 self.propagate.add(event_key._listen_fn)
606
607 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
608 if event_key.append_to_list(self, self.listeners):
609 if propagate:
610 self.propagate.add(event_key._listen_fn)
611
612 def remove(self, event_key: _EventKey[_ET]) -> None:
613 self.listeners.remove(event_key._listen_fn)
614 self.propagate.discard(event_key._listen_fn)
615 registry._removed_from_collection(event_key, self)
616
617 def clear(self) -> None:
618 registry._clear(self, self.listeners)
619 self.propagate.clear()
620 self.listeners.clear()
621
622
623class _JoinedListener(_CompoundListener[_ET]):
624 __slots__ = "parent_dispatch", "name", "local", "parent_listeners"
625
626 parent_dispatch: _DispatchCommon[_ET]
627 name: str
628 local: _InstanceLevelDispatch[_ET]
629 parent_listeners: Collection[_ListenerFnType]
630
631 def __init__(
632 self,
633 parent_dispatch: _DispatchCommon[_ET],
634 name: str,
635 local: _EmptyListener[_ET],
636 ):
637 self._exec_once = False
638 self._exec_w_sync_once = False
639 self._exec_once_mutex = None
640 self.parent_dispatch = parent_dispatch
641 self.name = name
642 self.local = local
643 self.parent_listeners = self.local
644
645 if not typing.TYPE_CHECKING:
646 # first error, I don't really understand:
647 # Signature of "listeners" incompatible with
648 # supertype "_CompoundListener" [override]
649 # the name / return type are exactly the same
650 # second error is getattr_isn't typed, the cast() here
651 # adds too much method overhead
652 @property
653 def listeners(self) -> Collection[_ListenerFnType]:
654 return getattr(self.parent_dispatch, self.name)
655
656 def _adjust_fn_spec(
657 self, fn: _ListenerFnType, named: bool
658 ) -> _ListenerFnType:
659 return self.local._adjust_fn_spec(fn, named)
660
661 def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]:
662 self.local = self.parent_listeners = self.local.for_modify(obj)
663 return self
664
665 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
666 self.local.insert(event_key, propagate)
667
668 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
669 self.local.append(event_key, propagate)
670
671 def remove(self, event_key: _EventKey[_ET]) -> None:
672 self.local.remove(event_key)
673
674 def clear(self) -> None:
675 raise NotImplementedError()