1# event/registry.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Provides managed registration services on behalf of :func:`.listen`
9arguments.
10
11By "managed registration", we mean that event listening functions and
12other objects can be added to various collections in such a way that their
13membership in all those collections can be revoked at once, based on
14an equivalent :class:`._EventKey`.
15
16"""
17from __future__ import annotations
18
19import collections
20import types
21import typing
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Deque
26from typing import Dict
27from typing import Generic
28from typing import Iterable
29from typing import Optional
30from typing import Tuple
31from typing import TypeVar
32from typing import Union
33import weakref
34
35from .. import exc
36from .. import util
37
38if typing.TYPE_CHECKING:
39 from .attr import RefCollection
40 from .base import dispatcher
41
42_ListenerFnType = Callable[..., Any]
43_ListenerFnKeyType = Union[int, Tuple[int, int]]
44_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType]
45
46
47_ET = TypeVar("_ET", bound="EventTarget")
48
49
50class EventTarget:
51 """represents an event target, that is, something we can listen on
52 either with that target as a class or as an instance.
53
54 Examples include: Connection, Mapper, Table, Session,
55 InstrumentedAttribute, Engine, Pool, Dialect.
56
57 """
58
59 __slots__ = ()
60
61 dispatch: dispatcher[Any]
62
63
64_RefCollectionToListenerType = Dict[
65 "weakref.ref[RefCollection[Any]]",
66 "weakref.ref[_ListenerFnType]",
67]
68
69_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = (
70 collections.defaultdict(dict)
71)
72"""
73Given an original listen() argument, can locate all
74listener collections and the listener fn contained
75
76(target, identifier, fn) -> {
77 ref(listenercollection) -> ref(listener_fn)
78 ref(listenercollection) -> ref(listener_fn)
79 ref(listenercollection) -> ref(listener_fn)
80 }
81"""
82
83_ListenerToEventKeyType = Dict[
84 "weakref.ref[_ListenerFnType]",
85 _EventKeyTupleType,
86]
87_collection_to_key: Dict[
88 weakref.ref[RefCollection[Any]],
89 _ListenerToEventKeyType,
90] = collections.defaultdict(dict)
91"""
92Given a _ListenerCollection or _ClsLevelListener, can locate
93all the original listen() arguments and the listener fn contained
94
95ref(listenercollection) -> {
96 ref(listener_fn) -> (target, identifier, fn),
97 ref(listener_fn) -> (target, identifier, fn),
98 ref(listener_fn) -> (target, identifier, fn),
99 }
100"""
101
102
103def _collection_gced(ref: weakref.ref[Any]) -> None:
104 # defaultdict, so can't get a KeyError
105 if not _collection_to_key or ref not in _collection_to_key:
106 return
107
108 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref)
109
110 listener_to_key = _collection_to_key.pop(ref)
111 for key in listener_to_key.values():
112 if key in _key_to_collection:
113 # defaultdict, so can't get a KeyError
114 dispatch_reg = _key_to_collection[key]
115 dispatch_reg.pop(ref)
116 if not dispatch_reg:
117 _key_to_collection.pop(key)
118
119
120def _stored_in_collection(
121 event_key: _EventKey[_ET], owner: RefCollection[_ET]
122) -> bool:
123 key = event_key._key
124
125 dispatch_reg = _key_to_collection[key]
126
127 owner_ref = owner.ref
128 listen_ref = weakref.ref(event_key._listen_fn)
129
130 if owner_ref in dispatch_reg:
131 return False
132
133 dispatch_reg[owner_ref] = listen_ref
134
135 listener_to_key = _collection_to_key[owner_ref]
136 listener_to_key[listen_ref] = key
137
138 return True
139
140
141def _removed_from_collection(
142 event_key: _EventKey[_ET], owner: RefCollection[_ET]
143) -> None:
144 key = event_key._key
145
146 dispatch_reg = _key_to_collection[key]
147
148 listen_ref = weakref.ref(event_key._listen_fn)
149
150 owner_ref = owner.ref
151 dispatch_reg.pop(owner_ref, None)
152 if not dispatch_reg:
153 del _key_to_collection[key]
154
155 if owner_ref in _collection_to_key:
156 listener_to_key = _collection_to_key[owner_ref]
157 # see #12216 - this guards against a removal that already occurred
158 # here. however, I cannot come up with a test that shows any negative
159 # side effects occurring from this removal happening, even though an
160 # event key may still be referenced from a clsleveldispatch here
161 listener_to_key.pop(listen_ref, None)
162
163
164def _stored_in_collection_multi(
165 newowner: RefCollection[_ET],
166 oldowner: RefCollection[_ET],
167 elements: Iterable[_ListenerFnType],
168) -> None:
169 if not elements:
170 return
171
172 oldowner_ref = oldowner.ref
173 newowner_ref = newowner.ref
174
175 old_listener_to_key = _collection_to_key[oldowner_ref]
176 new_listener_to_key = _collection_to_key[newowner_ref]
177
178 for listen_fn in elements:
179 listen_ref = weakref.ref(listen_fn)
180 try:
181 key = old_listener_to_key[listen_ref]
182 except KeyError:
183 # can occur during interpreter shutdown.
184 # see #6740
185 continue
186
187 try:
188 dispatch_reg = _key_to_collection[key]
189 except KeyError:
190 continue
191
192 if newowner_ref in dispatch_reg:
193 assert dispatch_reg[newowner_ref] == listen_ref
194 else:
195 dispatch_reg[newowner_ref] = listen_ref
196
197 new_listener_to_key[listen_ref] = key
198
199
200def _clear(
201 owner: RefCollection[_ET],
202 elements: Iterable[_ListenerFnType],
203) -> None:
204 if not elements:
205 return
206
207 owner_ref = owner.ref
208 listener_to_key = _collection_to_key[owner_ref]
209 for listen_fn in elements:
210 listen_ref = weakref.ref(listen_fn)
211 key = listener_to_key[listen_ref]
212 dispatch_reg = _key_to_collection[key]
213 dispatch_reg.pop(owner_ref, None)
214
215 if not dispatch_reg:
216 del _key_to_collection[key]
217
218
219class _EventKey(Generic[_ET]):
220 """Represent :func:`.listen` arguments."""
221
222 __slots__ = (
223 "target",
224 "identifier",
225 "fn",
226 "fn_key",
227 "fn_wrap",
228 "dispatch_target",
229 )
230
231 target: _ET
232 identifier: str
233 fn: _ListenerFnType
234 fn_key: _ListenerFnKeyType
235 dispatch_target: Any
236 _fn_wrap: Optional[_ListenerFnType]
237
238 def __init__(
239 self,
240 target: _ET,
241 identifier: str,
242 fn: _ListenerFnType,
243 dispatch_target: Any,
244 _fn_wrap: Optional[_ListenerFnType] = None,
245 ):
246 self.target = target
247 self.identifier = identifier
248 self.fn = fn
249 if isinstance(fn, types.MethodType):
250 self.fn_key = id(fn.__func__), id(fn.__self__)
251 else:
252 self.fn_key = id(fn)
253 self.fn_wrap = _fn_wrap
254 self.dispatch_target = dispatch_target
255
256 @property
257 def _key(self) -> _EventKeyTupleType:
258 return (id(self.target), self.identifier, self.fn_key)
259
260 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]:
261 if fn_wrap is self._listen_fn:
262 return self
263 else:
264 return _EventKey(
265 self.target,
266 self.identifier,
267 self.fn,
268 self.dispatch_target,
269 _fn_wrap=fn_wrap,
270 )
271
272 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]:
273 if dispatch_target is self.dispatch_target:
274 return self
275 else:
276 return _EventKey(
277 self.target,
278 self.identifier,
279 self.fn,
280 dispatch_target,
281 _fn_wrap=self.fn_wrap,
282 )
283
284 def listen(self, *args: Any, **kw: Any) -> None:
285 once = kw.pop("once", False)
286 once_unless_exception = kw.pop("_once_unless_exception", False)
287 named = kw.pop("named", False)
288
289 target, identifier, fn = (
290 self.dispatch_target,
291 self.identifier,
292 self._listen_fn,
293 )
294
295 dispatch_collection = getattr(target.dispatch, identifier)
296
297 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named)
298
299 self = self.with_wrapper(adjusted_fn)
300
301 stub_function = getattr(
302 self.dispatch_target.dispatch._events, self.identifier
303 )
304 if hasattr(stub_function, "_sa_warn"):
305 stub_function._sa_warn()
306
307 if once or once_unless_exception:
308 self.with_wrapper(
309 util.only_once(
310 self._listen_fn, retry_on_exception=once_unless_exception
311 )
312 ).listen(*args, **kw)
313 else:
314 self.dispatch_target.dispatch._listen(self, *args, **kw)
315
316 def remove(self) -> None:
317 key = self._key
318
319 if key not in _key_to_collection:
320 raise exc.InvalidRequestError(
321 "No listeners found for event %s / %r / %s "
322 % (self.target, self.identifier, self.fn)
323 )
324
325 dispatch_reg = _key_to_collection.pop(key)
326
327 for collection_ref, listener_ref in dispatch_reg.items():
328 collection = collection_ref()
329 listener_fn = listener_ref()
330 if collection is not None and listener_fn is not None:
331 collection.remove(self.with_wrapper(listener_fn))
332
333 def contains(self) -> bool:
334 """Return True if this event key is registered to listen."""
335 return self._key in _key_to_collection
336
337 def base_listen(
338 self,
339 propagate: bool = False,
340 insert: bool = False,
341 named: bool = False,
342 retval: Optional[bool] = None,
343 asyncio: bool = False,
344 ) -> None:
345 target, identifier = self.dispatch_target, self.identifier
346
347 dispatch_collection = getattr(target.dispatch, identifier)
348
349 for_modify = dispatch_collection.for_modify(target.dispatch)
350 if asyncio:
351 for_modify._set_asyncio()
352
353 if insert:
354 for_modify.insert(self, propagate)
355 else:
356 for_modify.append(self, propagate)
357
358 @property
359 def _listen_fn(self) -> _ListenerFnType:
360 return self.fn_wrap or self.fn
361
362 def append_to_list(
363 self,
364 owner: RefCollection[_ET],
365 list_: Deque[_ListenerFnType],
366 ) -> bool:
367 if _stored_in_collection(self, owner):
368 list_.append(self._listen_fn)
369 return True
370 else:
371 return False
372
373 def remove_from_list(
374 self,
375 owner: RefCollection[_ET],
376 list_: Deque[_ListenerFnType],
377 ) -> None:
378 _removed_from_collection(self, owner)
379 list_.remove(self._listen_fn)
380
381 def prepend_to_list(
382 self,
383 owner: RefCollection[_ET],
384 list_: Deque[_ListenerFnType],
385 ) -> bool:
386 if _stored_in_collection(self, owner):
387 list_.appendleft(self._listen_fn)
388 return True
389 else:
390 return False