1# event/registry.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Provides managed registration services on behalf of :func:`.listen`
9arguments.
10
11By "managed registration", we mean that event listening functions and
12other objects can be added to various collections in such a way that their
13membership in all those collections can be revoked at once, based on
14an equivalent :class:`._EventKey`.
15
16"""
17from __future__ import annotations
18
19import collections
20import types
21import typing
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Deque
26from typing import Dict
27from typing import Generic
28from typing import Iterable
29from typing import Optional
30from typing import Tuple
31from typing import TypeVar
32from typing import Union
33import weakref
34
35from .. import exc
36from .. import util
37
38if typing.TYPE_CHECKING:
39 from .attr import RefCollection
40 from .base import dispatcher
41
42_ListenerFnType = Callable[..., Any]
43_ListenerFnKeyType = Union[int, Tuple[int, int]]
44_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType]
45
46
47_ET = TypeVar("_ET", bound="EventTarget")
48
49
50class EventTarget:
51 """represents an event target, that is, something we can listen on
52 either with that target as a class or as an instance.
53
54 Examples include: Connection, Mapper, Table, Session,
55 InstrumentedAttribute, Engine, Pool, Dialect.
56
57 """
58
59 __slots__ = ()
60
61 dispatch: dispatcher[Any]
62
63
64_RefCollectionToListenerType = Dict[
65 "weakref.ref[RefCollection[Any]]",
66 "weakref.ref[_ListenerFnType]",
67]
68
69_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = (
70 collections.defaultdict(dict)
71)
72"""
73Given an original listen() argument, can locate all
74listener collections and the listener fn contained
75
76(target, identifier, fn) -> {
77 ref(listenercollection) -> ref(listener_fn)
78 ref(listenercollection) -> ref(listener_fn)
79 ref(listenercollection) -> ref(listener_fn)
80 }
81"""
82
83_ListenerToEventKeyType = Dict[
84 "weakref.ref[_ListenerFnType]",
85 _EventKeyTupleType,
86]
87_collection_to_key: Dict[
88 weakref.ref[RefCollection[Any]],
89 _ListenerToEventKeyType,
90] = collections.defaultdict(dict)
91"""
92Given a _ListenerCollection or _ClsLevelListener, can locate
93all the original listen() arguments and the listener fn contained
94
95ref(listenercollection) -> {
96 ref(listener_fn) -> (target, identifier, fn),
97 ref(listener_fn) -> (target, identifier, fn),
98 ref(listener_fn) -> (target, identifier, fn),
99 }
100"""
101
102
103def _collection_gced(ref: weakref.ref[Any]) -> None:
104 # defaultdict, so can't get a KeyError
105 if not _collection_to_key or ref not in _collection_to_key:
106 return
107
108 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref)
109
110 listener_to_key = _collection_to_key.pop(ref)
111 for key in listener_to_key.values():
112 if key in _key_to_collection:
113 # defaultdict, so can't get a KeyError
114 dispatch_reg = _key_to_collection[key]
115 dispatch_reg.pop(ref)
116 if not dispatch_reg:
117 _key_to_collection.pop(key)
118
119
120def _stored_in_collection(
121 event_key: _EventKey[_ET], owner: RefCollection[_ET]
122) -> bool:
123 key = event_key._key
124
125 dispatch_reg = _key_to_collection[key]
126
127 owner_ref = owner.ref
128 listen_ref = weakref.ref(event_key._listen_fn)
129
130 if owner_ref in dispatch_reg:
131 return False
132
133 dispatch_reg[owner_ref] = listen_ref
134
135 listener_to_key = _collection_to_key[owner_ref]
136 listener_to_key[listen_ref] = key
137
138 return True
139
140
141def _removed_from_collection(
142 event_key: _EventKey[_ET], owner: RefCollection[_ET]
143) -> None:
144 key = event_key._key
145
146 dispatch_reg = _key_to_collection[key]
147
148 listen_ref = weakref.ref(event_key._listen_fn)
149
150 owner_ref = owner.ref
151 dispatch_reg.pop(owner_ref, None)
152 if not dispatch_reg:
153 del _key_to_collection[key]
154
155 if owner_ref in _collection_to_key:
156 listener_to_key = _collection_to_key[owner_ref]
157 listener_to_key.pop(listen_ref)
158
159
160def _stored_in_collection_multi(
161 newowner: RefCollection[_ET],
162 oldowner: RefCollection[_ET],
163 elements: Iterable[_ListenerFnType],
164) -> None:
165 if not elements:
166 return
167
168 oldowner_ref = oldowner.ref
169 newowner_ref = newowner.ref
170
171 old_listener_to_key = _collection_to_key[oldowner_ref]
172 new_listener_to_key = _collection_to_key[newowner_ref]
173
174 for listen_fn in elements:
175 listen_ref = weakref.ref(listen_fn)
176 try:
177 key = old_listener_to_key[listen_ref]
178 except KeyError:
179 # can occur during interpreter shutdown.
180 # see #6740
181 continue
182
183 try:
184 dispatch_reg = _key_to_collection[key]
185 except KeyError:
186 continue
187
188 if newowner_ref in dispatch_reg:
189 assert dispatch_reg[newowner_ref] == listen_ref
190 else:
191 dispatch_reg[newowner_ref] = listen_ref
192
193 new_listener_to_key[listen_ref] = key
194
195
196def _clear(
197 owner: RefCollection[_ET],
198 elements: Iterable[_ListenerFnType],
199) -> None:
200 if not elements:
201 return
202
203 owner_ref = owner.ref
204 listener_to_key = _collection_to_key[owner_ref]
205 for listen_fn in elements:
206 listen_ref = weakref.ref(listen_fn)
207 key = listener_to_key[listen_ref]
208 dispatch_reg = _key_to_collection[key]
209 dispatch_reg.pop(owner_ref, None)
210
211 if not dispatch_reg:
212 del _key_to_collection[key]
213
214
215class _EventKey(Generic[_ET]):
216 """Represent :func:`.listen` arguments."""
217
218 __slots__ = (
219 "target",
220 "identifier",
221 "fn",
222 "fn_key",
223 "fn_wrap",
224 "dispatch_target",
225 )
226
227 target: _ET
228 identifier: str
229 fn: _ListenerFnType
230 fn_key: _ListenerFnKeyType
231 dispatch_target: Any
232 _fn_wrap: Optional[_ListenerFnType]
233
234 def __init__(
235 self,
236 target: _ET,
237 identifier: str,
238 fn: _ListenerFnType,
239 dispatch_target: Any,
240 _fn_wrap: Optional[_ListenerFnType] = None,
241 ):
242 self.target = target
243 self.identifier = identifier
244 self.fn = fn
245 if isinstance(fn, types.MethodType):
246 self.fn_key = id(fn.__func__), id(fn.__self__)
247 else:
248 self.fn_key = id(fn)
249 self.fn_wrap = _fn_wrap
250 self.dispatch_target = dispatch_target
251
252 @property
253 def _key(self) -> _EventKeyTupleType:
254 return (id(self.target), self.identifier, self.fn_key)
255
256 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]:
257 if fn_wrap is self._listen_fn:
258 return self
259 else:
260 return _EventKey(
261 self.target,
262 self.identifier,
263 self.fn,
264 self.dispatch_target,
265 _fn_wrap=fn_wrap,
266 )
267
268 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]:
269 if dispatch_target is self.dispatch_target:
270 return self
271 else:
272 return _EventKey(
273 self.target,
274 self.identifier,
275 self.fn,
276 dispatch_target,
277 _fn_wrap=self.fn_wrap,
278 )
279
280 def listen(self, *args: Any, **kw: Any) -> None:
281 once = kw.pop("once", False)
282 once_unless_exception = kw.pop("_once_unless_exception", False)
283 named = kw.pop("named", False)
284
285 target, identifier, fn = (
286 self.dispatch_target,
287 self.identifier,
288 self._listen_fn,
289 )
290
291 dispatch_collection = getattr(target.dispatch, identifier)
292
293 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named)
294
295 self = self.with_wrapper(adjusted_fn)
296
297 stub_function = getattr(
298 self.dispatch_target.dispatch._events, self.identifier
299 )
300 if hasattr(stub_function, "_sa_warn"):
301 stub_function._sa_warn()
302
303 if once or once_unless_exception:
304 self.with_wrapper(
305 util.only_once(
306 self._listen_fn, retry_on_exception=once_unless_exception
307 )
308 ).listen(*args, **kw)
309 else:
310 self.dispatch_target.dispatch._listen(self, *args, **kw)
311
312 def remove(self) -> None:
313 key = self._key
314
315 if key not in _key_to_collection:
316 raise exc.InvalidRequestError(
317 "No listeners found for event %s / %r / %s "
318 % (self.target, self.identifier, self.fn)
319 )
320
321 dispatch_reg = _key_to_collection.pop(key)
322
323 for collection_ref, listener_ref in dispatch_reg.items():
324 collection = collection_ref()
325 listener_fn = listener_ref()
326 if collection is not None and listener_fn is not None:
327 collection.remove(self.with_wrapper(listener_fn))
328
329 def contains(self) -> bool:
330 """Return True if this event key is registered to listen."""
331 return self._key in _key_to_collection
332
333 def base_listen(
334 self,
335 propagate: bool = False,
336 insert: bool = False,
337 named: bool = False,
338 retval: Optional[bool] = None,
339 asyncio: bool = False,
340 ) -> None:
341 target, identifier = self.dispatch_target, self.identifier
342
343 dispatch_collection = getattr(target.dispatch, identifier)
344
345 for_modify = dispatch_collection.for_modify(target.dispatch)
346 if asyncio:
347 for_modify._set_asyncio()
348
349 if insert:
350 for_modify.insert(self, propagate)
351 else:
352 for_modify.append(self, propagate)
353
354 @property
355 def _listen_fn(self) -> _ListenerFnType:
356 return self.fn_wrap or self.fn
357
358 def append_to_list(
359 self,
360 owner: RefCollection[_ET],
361 list_: Deque[_ListenerFnType],
362 ) -> bool:
363 if _stored_in_collection(self, owner):
364 list_.append(self._listen_fn)
365 return True
366 else:
367 return False
368
369 def remove_from_list(
370 self,
371 owner: RefCollection[_ET],
372 list_: Deque[_ListenerFnType],
373 ) -> None:
374 _removed_from_collection(self, owner)
375 list_.remove(self._listen_fn)
376
377 def prepend_to_list(
378 self,
379 owner: RefCollection[_ET],
380 list_: Deque[_ListenerFnType],
381 ) -> bool:
382 if _stored_in_collection(self, owner):
383 list_.appendleft(self._listen_fn)
384 return True
385 else:
386 return False