1# event/registry.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Provides managed registration services on behalf of :func:`.listen`
9arguments.
10
11By "managed registration", we mean that event listening functions and
12other objects can be added to various collections in such a way that their
13membership in all those collections can be revoked at once, based on
14an equivalent :class:`._EventKey`.
15
16"""
17
18from __future__ import annotations
19
20import collections
21import types
22import typing
23from typing import Any
24from typing import Callable
25from typing import cast
26from typing import Deque
27from typing import Dict
28from typing import Generic
29from typing import Iterable
30from typing import Optional
31from typing import Tuple
32from typing import TypeVar
33from typing import Union
34import weakref
35
36from .. import exc
37from .. import util
38
39if typing.TYPE_CHECKING:
40 from .attr import RefCollection
41 from .base import dispatcher
42
43_ListenerFnType = Callable[..., Any]
44_ListenerFnKeyType = Union[int, Tuple[int, int]]
45_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType]
46
47
48_ET = TypeVar("_ET", bound="EventTarget")
49
50
51class EventTarget:
52 """represents an event target, that is, something we can listen on
53 either with that target as a class or as an instance.
54
55 Examples include: Connection, Mapper, Table, Session,
56 InstrumentedAttribute, Engine, Pool, Dialect.
57
58 """
59
60 __slots__ = ()
61
62 dispatch: dispatcher[Any]
63
64
65_RefCollectionToListenerType = Dict[
66 "weakref.ref[RefCollection[Any]]",
67 "weakref.ref[_ListenerFnType]",
68]
69
70_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = (
71 collections.defaultdict(dict)
72)
73"""
74Given an original listen() argument, can locate all
75listener collections and the listener fn contained
76
77(target, identifier, fn) -> {
78 ref(listenercollection) -> ref(listener_fn)
79 ref(listenercollection) -> ref(listener_fn)
80 ref(listenercollection) -> ref(listener_fn)
81 }
82"""
83
84_ListenerToEventKeyType = Dict[
85 "weakref.ref[_ListenerFnType]",
86 _EventKeyTupleType,
87]
88_collection_to_key: Dict[
89 weakref.ref[RefCollection[Any]],
90 _ListenerToEventKeyType,
91] = collections.defaultdict(dict)
92"""
93Given a _ListenerCollection or _ClsLevelListener, can locate
94all the original listen() arguments and the listener fn contained
95
96ref(listenercollection) -> {
97 ref(listener_fn) -> (target, identifier, fn),
98 ref(listener_fn) -> (target, identifier, fn),
99 ref(listener_fn) -> (target, identifier, fn),
100 }
101"""
102
103
104def _collection_gced(ref: weakref.ref[Any]) -> None:
105 # defaultdict, so can't get a KeyError
106 if not _collection_to_key or ref not in _collection_to_key:
107 return
108
109 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref)
110
111 listener_to_key = _collection_to_key.pop(ref)
112 for key in listener_to_key.values():
113 if key in _key_to_collection:
114 # defaultdict, so can't get a KeyError
115 dispatch_reg = _key_to_collection[key]
116 dispatch_reg.pop(ref)
117 if not dispatch_reg:
118 _key_to_collection.pop(key)
119
120
121def _stored_in_collection(
122 event_key: _EventKey[_ET], owner: RefCollection[_ET]
123) -> bool:
124 key = event_key._key
125
126 dispatch_reg = _key_to_collection[key]
127
128 owner_ref = owner.ref
129 listen_ref = weakref.ref(event_key._listen_fn)
130
131 if owner_ref in dispatch_reg:
132 return False
133
134 dispatch_reg[owner_ref] = listen_ref
135
136 listener_to_key = _collection_to_key[owner_ref]
137 listener_to_key[listen_ref] = key
138
139 return True
140
141
142def _removed_from_collection(
143 event_key: _EventKey[_ET], owner: RefCollection[_ET]
144) -> None:
145 key = event_key._key
146
147 dispatch_reg = _key_to_collection[key]
148
149 listen_ref = weakref.ref(event_key._listen_fn)
150
151 owner_ref = owner.ref
152 dispatch_reg.pop(owner_ref, None)
153 if not dispatch_reg:
154 del _key_to_collection[key]
155
156 if owner_ref in _collection_to_key:
157 listener_to_key = _collection_to_key[owner_ref]
158 # see #12216 - this guards against a removal that already occurred
159 # here. however, I cannot come up with a test that shows any negative
160 # side effects occurring from this removal happening, even though an
161 # event key may still be referenced from a clsleveldispatch here
162 listener_to_key.pop(listen_ref, None)
163
164
165def _stored_in_collection_multi(
166 newowner: RefCollection[_ET],
167 oldowner: RefCollection[_ET],
168 elements: Iterable[_ListenerFnType],
169) -> None:
170 if not elements:
171 return
172
173 oldowner_ref = oldowner.ref
174 newowner_ref = newowner.ref
175
176 old_listener_to_key = _collection_to_key[oldowner_ref]
177 new_listener_to_key = _collection_to_key[newowner_ref]
178
179 for listen_fn in elements:
180 listen_ref = weakref.ref(listen_fn)
181 try:
182 key = old_listener_to_key[listen_ref]
183 except KeyError:
184 # can occur during interpreter shutdown.
185 # see #6740
186 continue
187
188 try:
189 dispatch_reg = _key_to_collection[key]
190 except KeyError:
191 continue
192
193 if newowner_ref in dispatch_reg:
194 assert dispatch_reg[newowner_ref] == listen_ref
195 else:
196 dispatch_reg[newowner_ref] = listen_ref
197
198 new_listener_to_key[listen_ref] = key
199
200
201def _clear(
202 owner: RefCollection[_ET],
203 elements: Iterable[_ListenerFnType],
204) -> None:
205 if not elements:
206 return
207
208 owner_ref = owner.ref
209 listener_to_key = _collection_to_key[owner_ref]
210 for listen_fn in elements:
211 listen_ref = weakref.ref(listen_fn)
212 key = listener_to_key[listen_ref]
213 dispatch_reg = _key_to_collection[key]
214 dispatch_reg.pop(owner_ref, None)
215
216 if not dispatch_reg:
217 del _key_to_collection[key]
218
219
220class _EventKey(Generic[_ET]):
221 """Represent :func:`.listen` arguments."""
222
223 __slots__ = (
224 "target",
225 "identifier",
226 "fn",
227 "fn_key",
228 "fn_wrap",
229 "dispatch_target",
230 )
231
232 target: _ET
233 identifier: str
234 fn: _ListenerFnType
235 fn_key: _ListenerFnKeyType
236 dispatch_target: Any
237 _fn_wrap: Optional[_ListenerFnType]
238
239 def __init__(
240 self,
241 target: _ET,
242 identifier: str,
243 fn: _ListenerFnType,
244 dispatch_target: Any,
245 _fn_wrap: Optional[_ListenerFnType] = None,
246 ):
247 self.target = target
248 self.identifier = identifier
249 self.fn = fn
250 if isinstance(fn, types.MethodType):
251 self.fn_key = id(fn.__func__), id(fn.__self__)
252 else:
253 self.fn_key = id(fn)
254 self.fn_wrap = _fn_wrap
255 self.dispatch_target = dispatch_target
256
257 @property
258 def _key(self) -> _EventKeyTupleType:
259 return (id(self.target), self.identifier, self.fn_key)
260
261 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]:
262 if fn_wrap is self._listen_fn:
263 return self
264 else:
265 return _EventKey(
266 self.target,
267 self.identifier,
268 self.fn,
269 self.dispatch_target,
270 _fn_wrap=fn_wrap,
271 )
272
273 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]:
274 if dispatch_target is self.dispatch_target:
275 return self
276 else:
277 return _EventKey(
278 self.target,
279 self.identifier,
280 self.fn,
281 dispatch_target,
282 _fn_wrap=self.fn_wrap,
283 )
284
285 def listen(self, *args: Any, **kw: Any) -> None:
286 once = kw.pop("once", False)
287 once_unless_exception = kw.pop("_once_unless_exception", False)
288 named = kw.pop("named", False)
289
290 target, identifier, fn = (
291 self.dispatch_target,
292 self.identifier,
293 self._listen_fn,
294 )
295
296 dispatch_collection = getattr(target.dispatch, identifier)
297
298 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named)
299
300 self = self.with_wrapper(adjusted_fn)
301
302 stub_function = getattr(
303 self.dispatch_target.dispatch._events, self.identifier
304 )
305 if hasattr(stub_function, "_sa_warn"):
306 stub_function._sa_warn()
307
308 if once or once_unless_exception:
309 self.with_wrapper(
310 util.only_once(
311 self._listen_fn, retry_on_exception=once_unless_exception
312 )
313 ).listen(*args, **kw)
314 else:
315 self.dispatch_target.dispatch._listen(self, *args, **kw)
316
317 def remove(self) -> None:
318 key = self._key
319
320 if key not in _key_to_collection:
321 raise exc.InvalidRequestError(
322 "No listeners found for event %s / %r / %s "
323 % (self.target, self.identifier, self.fn)
324 )
325
326 dispatch_reg = _key_to_collection.pop(key)
327
328 for collection_ref, listener_ref in dispatch_reg.items():
329 collection = collection_ref()
330 listener_fn = listener_ref()
331 if collection is not None and listener_fn is not None:
332 collection.remove(self.with_wrapper(listener_fn))
333
334 def contains(self) -> bool:
335 """Return True if this event key is registered to listen."""
336 return self._key in _key_to_collection
337
338 def base_listen(
339 self,
340 propagate: bool = False,
341 insert: bool = False,
342 named: bool = False,
343 retval: Optional[bool] = None,
344 asyncio: bool = False,
345 ) -> None:
346 target, identifier = self.dispatch_target, self.identifier
347
348 dispatch_collection = getattr(target.dispatch, identifier)
349
350 for_modify = dispatch_collection.for_modify(target.dispatch)
351 if asyncio:
352 for_modify._set_asyncio()
353
354 if insert:
355 for_modify.insert(self, propagate)
356 else:
357 for_modify.append(self, propagate)
358
359 @property
360 def _listen_fn(self) -> _ListenerFnType:
361 return self.fn_wrap or self.fn
362
363 def append_to_list(
364 self,
365 owner: RefCollection[_ET],
366 list_: Deque[_ListenerFnType],
367 ) -> bool:
368 if _stored_in_collection(self, owner):
369 list_.append(self._listen_fn)
370 return True
371 else:
372 return False
373
374 def remove_from_list(
375 self,
376 owner: RefCollection[_ET],
377 list_: Deque[_ListenerFnType],
378 ) -> None:
379 _removed_from_collection(self, owner)
380 list_.remove(self._listen_fn)
381
382 def prepend_to_list(
383 self,
384 owner: RefCollection[_ET],
385 list_: Deque[_ListenerFnType],
386 ) -> bool:
387 if _stored_in_collection(self, owner):
388 list_.appendleft(self._listen_fn)
389 return True
390 else:
391 return False