1# event/attr.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Attribute implementation for _Dispatch classes.
9
10The various listener targets for a particular event class are represented
11as attributes, which refer to collections of listeners to be fired off.
12These collections can exist at the class level as well as at the instance
13level. An event is fired off using code like this::
14
15 some_object.dispatch.first_connect(arg1, arg2)
16
17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and
18``first_connect`` is typically an instance of ``_ListenerCollection``
19if event listeners are present, or ``_EmptyListener`` if none are present.
20
21The attribute mechanics here spend effort trying to ensure listener functions
22are available with a minimum of function call overhead, that unnecessary
23objects aren't created (i.e. many empty per-instance listener collections),
24as well as that everything is garbage collectable when owning references are
25lost. Other features such as "propagation" of listener functions across
26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances,
27as well as support for subclass propagation (e.g. events assigned to
28``Pool`` vs. ``QueuePool``) are all implemented here.
29
30"""
31
32from __future__ import absolute_import
33from __future__ import with_statement
34
35import collections
36from itertools import chain
37import weakref
38
39from . import legacy
40from . import registry
41from .. import exc
42from .. import util
43from ..util import threading
44from ..util.concurrency import AsyncAdaptedLock
45
46
47class RefCollection(util.MemoizedSlots):
48 __slots__ = ("ref",)
49
50 def _memoized_attr_ref(self):
51 return weakref.ref(self, registry._collection_gced)
52
53
54class _empty_collection(object):
55 def append(self, element):
56 pass
57
58 def extend(self, other):
59 pass
60
61 def remove(self, element):
62 pass
63
64 def __iter__(self):
65 return iter([])
66
67 def clear(self):
68 pass
69
70
71class _ClsLevelDispatch(RefCollection):
72 """Class-level events on :class:`._Dispatch` classes."""
73
74 __slots__ = (
75 "clsname",
76 "name",
77 "arg_names",
78 "has_kw",
79 "legacy_signatures",
80 "_clslevel",
81 "__weakref__",
82 )
83
84 def __init__(self, parent_dispatch_cls, fn):
85 self.name = fn.__name__
86 self.clsname = parent_dispatch_cls.__name__
87 argspec = util.inspect_getfullargspec(fn)
88 self.arg_names = argspec.args[1:]
89 self.has_kw = bool(argspec.varkw)
90 self.legacy_signatures = list(
91 reversed(
92 sorted(
93 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0]
94 )
95 )
96 )
97 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
98
99 self._clslevel = weakref.WeakKeyDictionary()
100
101 def _adjust_fn_spec(self, fn, named):
102 if named:
103 fn = self._wrap_fn_for_kw(fn)
104 if self.legacy_signatures:
105 try:
106 argspec = util.get_callable_argspec(fn, no_self=True)
107 except TypeError:
108 pass
109 else:
110 fn = legacy._wrap_fn_for_legacy(self, fn, argspec)
111 return fn
112
113 def _wrap_fn_for_kw(self, fn):
114 def wrap_kw(*args, **kw):
115 argdict = dict(zip(self.arg_names, args))
116 argdict.update(kw)
117 return fn(**argdict)
118
119 return wrap_kw
120
121 def _do_insert_or_append(self, event_key, is_append):
122 target = event_key.dispatch_target
123 assert isinstance(
124 target, type
125 ), "Class-level Event targets must be classes."
126 if not getattr(target, "_sa_propagate_class_events", True):
127 raise exc.InvalidRequestError(
128 "Can't assign an event directly to the %s class" % (target,)
129 )
130
131 for cls in util.walk_subclasses(target):
132 if cls is not target and cls not in self._clslevel:
133 self.update_subclass(cls)
134 else:
135 if cls not in self._clslevel:
136 self.update_subclass(cls)
137 if is_append:
138 self._clslevel[cls].append(event_key._listen_fn)
139 else:
140 self._clslevel[cls].appendleft(event_key._listen_fn)
141 registry._stored_in_collection(event_key, self)
142
143 def insert(self, event_key, propagate):
144 self._do_insert_or_append(event_key, is_append=False)
145
146 def append(self, event_key, propagate):
147 self._do_insert_or_append(event_key, is_append=True)
148
149 def update_subclass(self, target):
150 if target not in self._clslevel:
151 if getattr(target, "_sa_propagate_class_events", True):
152 self._clslevel[target] = collections.deque()
153 else:
154 self._clslevel[target] = _empty_collection()
155
156 clslevel = self._clslevel[target]
157
158 for cls in target.__mro__[1:]:
159 if cls in self._clslevel:
160 clslevel.extend(
161 [fn for fn in self._clslevel[cls] if fn not in clslevel]
162 )
163
164 def remove(self, event_key):
165 target = event_key.dispatch_target
166
167 for cls in util.walk_subclasses(target):
168 if cls in self._clslevel:
169 self._clslevel[cls].remove(event_key._listen_fn)
170 registry._removed_from_collection(event_key, self)
171
172 def clear(self):
173 """Clear all class level listeners"""
174
175 to_clear = set()
176 for dispatcher in self._clslevel.values():
177 to_clear.update(dispatcher)
178 dispatcher.clear()
179 registry._clear(self, to_clear)
180
181 def for_modify(self, obj):
182 """Return an event collection which can be modified.
183
184 For _ClsLevelDispatch at the class level of
185 a dispatcher, this returns self.
186
187 """
188 return self
189
190
191class _InstanceLevelDispatch(RefCollection):
192 __slots__ = ()
193
194 def _adjust_fn_spec(self, fn, named):
195 return self.parent._adjust_fn_spec(fn, named)
196
197
198class _EmptyListener(_InstanceLevelDispatch):
199 """Serves as a proxy interface to the events
200 served by a _ClsLevelDispatch, when there are no
201 instance-level events present.
202
203 Is replaced by _ListenerCollection when instance-level
204 events are added.
205
206 """
207
208 propagate = frozenset()
209 listeners = ()
210
211 __slots__ = "parent", "parent_listeners", "name"
212
213 def __init__(self, parent, target_cls):
214 if target_cls not in parent._clslevel:
215 parent.update_subclass(target_cls)
216 self.parent = parent # _ClsLevelDispatch
217 self.parent_listeners = parent._clslevel[target_cls]
218 self.name = parent.name
219
220 def for_modify(self, obj):
221 """Return an event collection which can be modified.
222
223 For _EmptyListener at the instance level of
224 a dispatcher, this generates a new
225 _ListenerCollection, applies it to the instance,
226 and returns it.
227
228 """
229 result = _ListenerCollection(self.parent, obj._instance_cls)
230 if getattr(obj, self.name) is self:
231 setattr(obj, self.name, result)
232 else:
233 assert isinstance(getattr(obj, self.name), _JoinedListener)
234 return result
235
236 def _needs_modify(self, *args, **kw):
237 raise NotImplementedError("need to call for_modify()")
238
239 exec_once = (
240 exec_once_unless_exception
241 ) = insert = append = remove = clear = _needs_modify
242
243 def __call__(self, *args, **kw):
244 """Execute this event."""
245
246 for fn in self.parent_listeners:
247 fn(*args, **kw)
248
249 def __len__(self):
250 return len(self.parent_listeners)
251
252 def __iter__(self):
253 return iter(self.parent_listeners)
254
255 def __bool__(self):
256 return bool(self.parent_listeners)
257
258 __nonzero__ = __bool__
259
260
261class _CompoundListener(_InstanceLevelDispatch):
262 __slots__ = (
263 "_exec_once_mutex",
264 "_exec_once",
265 "_exec_w_sync_once",
266 "_is_asyncio",
267 )
268
269 def __init__(self, *arg, **kw):
270 super(_CompoundListener, self).__init__(*arg, **kw)
271 self._is_asyncio = False
272
273 def _set_asyncio(self):
274 self._is_asyncio = True
275
276 def _memoized_attr__exec_once_mutex(self):
277 if self._is_asyncio:
278 return AsyncAdaptedLock()
279 else:
280 return threading.Lock()
281
282 def _exec_once_impl(self, retry_on_exception, *args, **kw):
283 with self._exec_once_mutex:
284 if not self._exec_once:
285 try:
286 self(*args, **kw)
287 exception = False
288 except:
289 exception = True
290 raise
291 finally:
292 if not exception or not retry_on_exception:
293 self._exec_once = True
294
295 def exec_once(self, *args, **kw):
296 """Execute this event, but only if it has not been
297 executed already for this collection."""
298
299 if not self._exec_once:
300 self._exec_once_impl(False, *args, **kw)
301
302 def exec_once_unless_exception(self, *args, **kw):
303 """Execute this event, but only if it has not been
304 executed already for this collection, or was called
305 by a previous exec_once_unless_exception call and
306 raised an exception.
307
308 If exec_once was already called, then this method will never run
309 the callable regardless of whether it raised or not.
310
311 .. versionadded:: 1.3.8
312
313 """
314 if not self._exec_once:
315 self._exec_once_impl(True, *args, **kw)
316
317 def _exec_w_sync_on_first_run(self, *args, **kw):
318 """Execute this event, and use a mutex if it has not been
319 executed already for this collection, or was called
320 by a previous _exec_w_sync_on_first_run call and
321 raised an exception.
322
323 If _exec_w_sync_on_first_run was already called and didn't raise an
324 exception, then a mutex is not used.
325
326 .. versionadded:: 1.4.11
327
328 """
329 if not self._exec_w_sync_once:
330 with self._exec_once_mutex:
331 try:
332 self(*args, **kw)
333 except:
334 raise
335 else:
336 self._exec_w_sync_once = True
337 else:
338 self(*args, **kw)
339
340 def __call__(self, *args, **kw):
341 """Execute this event."""
342
343 for fn in self.parent_listeners:
344 fn(*args, **kw)
345 for fn in self.listeners:
346 fn(*args, **kw)
347
348 def __len__(self):
349 return len(self.parent_listeners) + len(self.listeners)
350
351 def __iter__(self):
352 return chain(self.parent_listeners, self.listeners)
353
354 def __bool__(self):
355 return bool(self.listeners or self.parent_listeners)
356
357 __nonzero__ = __bool__
358
359
360class _ListenerCollection(_CompoundListener):
361 """Instance-level attributes on instances of :class:`._Dispatch`.
362
363 Represents a collection of listeners.
364
365 As of 0.7.9, _ListenerCollection is only first
366 created via the _EmptyListener.for_modify() method.
367
368 """
369
370 __slots__ = (
371 "parent_listeners",
372 "parent",
373 "name",
374 "listeners",
375 "propagate",
376 "__weakref__",
377 )
378
379 def __init__(self, parent, target_cls):
380 super(_ListenerCollection, self).__init__()
381 if target_cls not in parent._clslevel:
382 parent.update_subclass(target_cls)
383 self._exec_once = False
384 self._exec_w_sync_once = False
385 self.parent_listeners = parent._clslevel[target_cls]
386 self.parent = parent
387 self.name = parent.name
388 self.listeners = collections.deque()
389 self.propagate = set()
390
391 def for_modify(self, obj):
392 """Return an event collection which can be modified.
393
394 For _ListenerCollection at the instance level of
395 a dispatcher, this returns self.
396
397 """
398 return self
399
400 def _update(self, other, only_propagate=True):
401 """Populate from the listeners in another :class:`_Dispatch`
402 object."""
403
404 existing_listeners = self.listeners
405 existing_listener_set = set(existing_listeners)
406 self.propagate.update(other.propagate)
407 other_listeners = [
408 l
409 for l in other.listeners
410 if l not in existing_listener_set
411 and not only_propagate
412 or l in self.propagate
413 ]
414
415 existing_listeners.extend(other_listeners)
416
417 if other._is_asyncio:
418 self._set_asyncio()
419
420 to_associate = other.propagate.union(other_listeners)
421 registry._stored_in_collection_multi(self, other, to_associate)
422
423 def insert(self, event_key, propagate):
424 if event_key.prepend_to_list(self, self.listeners):
425 if propagate:
426 self.propagate.add(event_key._listen_fn)
427
428 def append(self, event_key, propagate):
429 if event_key.append_to_list(self, self.listeners):
430 if propagate:
431 self.propagate.add(event_key._listen_fn)
432
433 def remove(self, event_key):
434 self.listeners.remove(event_key._listen_fn)
435 self.propagate.discard(event_key._listen_fn)
436 registry._removed_from_collection(event_key, self)
437
438 def clear(self):
439 registry._clear(self, self.listeners)
440 self.propagate.clear()
441 self.listeners.clear()
442
443
444class _JoinedListener(_CompoundListener):
445 __slots__ = "parent", "name", "local", "parent_listeners"
446
447 def __init__(self, parent, name, local):
448 self._exec_once = False
449 self.parent = parent
450 self.name = name
451 self.local = local
452 self.parent_listeners = self.local
453
454 @property
455 def listeners(self):
456 return getattr(self.parent, self.name)
457
458 def _adjust_fn_spec(self, fn, named):
459 return self.local._adjust_fn_spec(fn, named)
460
461 def for_modify(self, obj):
462 self.local = self.parent_listeners = self.local.for_modify(obj)
463 return self
464
465 def insert(self, event_key, propagate):
466 self.local.insert(event_key, propagate)
467
468 def append(self, event_key, propagate):
469 self.local.append(event_key, propagate)
470
471 def remove(self, event_key):
472 self.local.remove(event_key)
473
474 def clear(self):
475 raise NotImplementedError()