Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/attr.py: 66%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# event/attr.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""Attribute implementation for _Dispatch classes.
10The various listener targets for a particular event class are represented
11as attributes, which refer to collections of listeners to be fired off.
12These collections can exist at the class level as well as at the instance
13level. An event is fired off using code like this::
15 some_object.dispatch.first_connect(arg1, arg2)
17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and
18``first_connect`` is typically an instance of ``_ListenerCollection``
19if event listeners are present, or ``_EmptyListener`` if none are present.
21The attribute mechanics here spend effort trying to ensure listener functions
22are available with a minimum of function call overhead, that unnecessary
23objects aren't created (i.e. many empty per-instance listener collections),
24as well as that everything is garbage collectable when owning references are
25lost. Other features such as "propagation" of listener functions across
26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances,
27as well as support for subclass propagation (e.g. events assigned to
28``Pool`` vs. ``QueuePool``) are all implemented here.
30"""
31from __future__ import annotations
33import collections
34from itertools import chain
35import threading
36from types import TracebackType
37import typing
38from typing import Any
39from typing import cast
40from typing import Collection
41from typing import Deque
42from typing import FrozenSet
43from typing import Generic
44from typing import Iterator
45from typing import MutableMapping
46from typing import MutableSequence
47from typing import NoReturn
48from typing import Optional
49from typing import Protocol
50from typing import Sequence
51from typing import Set
52from typing import Tuple
53from typing import Type
54from typing import TypeVar
55from typing import Union
56import weakref
58from . import legacy
59from . import registry
60from .registry import _ET
61from .registry import _EventKey
62from .registry import _ListenerFnType
63from .. import exc
64from .. import util
65from ..util.concurrency import AsyncAdaptedLock
67_T = TypeVar("_T", bound=Any)
69if typing.TYPE_CHECKING:
70 from .base import _Dispatch
71 from .base import _DispatchCommon
72 from .base import _HasEventsDispatch
75class RefCollection(util.MemoizedSlots, Generic[_ET]):
76 __slots__ = ("ref",)
78 ref: weakref.ref[RefCollection[_ET]]
80 def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]:
81 return weakref.ref(self, registry._collection_gced)
84class _empty_collection(Collection[_T]):
85 def append(self, element: _T) -> None:
86 pass
88 def appendleft(self, element: _T) -> None:
89 pass
91 def extend(self, other: Sequence[_T]) -> None:
92 pass
94 def remove(self, element: _T) -> None:
95 pass
97 def __contains__(self, element: Any) -> bool:
98 return False
100 def __iter__(self) -> Iterator[_T]:
101 return iter([])
103 def clear(self) -> None:
104 pass
106 def __len__(self) -> int:
107 return 0
110_ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]]
113class _ClsLevelDispatch(RefCollection[_ET]):
114 """Class-level events on :class:`._Dispatch` classes."""
116 __slots__ = (
117 "clsname",
118 "name",
119 "arg_names",
120 "has_kw",
121 "legacy_signatures",
122 "_clslevel",
123 "__weakref__",
124 )
126 clsname: str
127 name: str
128 arg_names: Sequence[str]
129 has_kw: bool
130 legacy_signatures: MutableSequence[legacy._LegacySignatureType]
131 _clslevel: MutableMapping[
132 Type[_ET], _ListenerFnSequenceType[_ListenerFnType]
133 ]
135 def __init__(
136 self,
137 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
138 fn: _ListenerFnType,
139 ):
140 self.name = fn.__name__
141 self.clsname = parent_dispatch_cls.__name__
142 argspec = util.inspect_getfullargspec(fn)
143 self.arg_names = argspec.args[1:]
144 self.has_kw = bool(argspec.varkw)
145 self.legacy_signatures = list(
146 reversed(
147 sorted(
148 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0]
149 )
150 )
151 )
152 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)
154 self._clslevel = weakref.WeakKeyDictionary()
156 def _adjust_fn_spec(
157 self, fn: _ListenerFnType, named: bool
158 ) -> _ListenerFnType:
159 if named:
160 fn = self._wrap_fn_for_kw(fn)
161 if self.legacy_signatures:
162 try:
163 argspec = util.get_callable_argspec(fn, no_self=True)
164 except TypeError:
165 pass
166 else:
167 fn = legacy._wrap_fn_for_legacy(self, fn, argspec)
168 return fn
170 def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType:
171 def wrap_kw(*args: Any, **kw: Any) -> Any:
172 argdict = dict(zip(self.arg_names, args))
173 argdict.update(kw)
174 return fn(**argdict)
176 return wrap_kw
178 def _do_insert_or_append(
179 self, event_key: _EventKey[_ET], is_append: bool
180 ) -> None:
181 target = event_key.dispatch_target
182 assert isinstance(
183 target, type
184 ), "Class-level Event targets must be classes."
185 if not getattr(target, "_sa_propagate_class_events", True):
186 raise exc.InvalidRequestError(
187 f"Can't assign an event directly to the {target} class"
188 )
190 cls: Type[_ET]
192 for cls in util.walk_subclasses(target):
193 if cls is not target and cls not in self._clslevel:
194 self.update_subclass(cls)
195 else:
196 if cls not in self._clslevel:
197 self.update_subclass(cls)
198 if is_append:
199 self._clslevel[cls].append(event_key._listen_fn)
200 else:
201 self._clslevel[cls].appendleft(event_key._listen_fn)
202 registry._stored_in_collection(event_key, self)
204 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
205 self._do_insert_or_append(event_key, is_append=False)
207 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
208 self._do_insert_or_append(event_key, is_append=True)
210 def update_subclass(self, target: Type[_ET]) -> None:
211 if target not in self._clslevel:
212 if getattr(target, "_sa_propagate_class_events", True):
213 self._clslevel[target] = collections.deque()
214 else:
215 self._clslevel[target] = _empty_collection()
217 clslevel = self._clslevel[target]
218 cls: Type[_ET]
219 for cls in target.__mro__[1:]:
220 if cls in self._clslevel:
221 clslevel.extend(
222 [fn for fn in self._clslevel[cls] if fn not in clslevel]
223 )
225 def remove(self, event_key: _EventKey[_ET]) -> None:
226 target = event_key.dispatch_target
227 cls: Type[_ET]
228 for cls in util.walk_subclasses(target):
229 if cls in self._clslevel:
230 self._clslevel[cls].remove(event_key._listen_fn)
231 registry._removed_from_collection(event_key, self)
233 def clear(self) -> None:
234 """Clear all class level listeners"""
236 to_clear: Set[_ListenerFnType] = set()
237 for dispatcher in self._clslevel.values():
238 to_clear.update(dispatcher)
239 dispatcher.clear()
240 registry._clear(self, to_clear)
242 def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]:
243 """Return an event collection which can be modified.
245 For _ClsLevelDispatch at the class level of
246 a dispatcher, this returns self.
248 """
249 return self
252class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]):
253 __slots__ = ()
255 parent: _ClsLevelDispatch[_ET]
257 def _adjust_fn_spec(
258 self, fn: _ListenerFnType, named: bool
259 ) -> _ListenerFnType:
260 return self.parent._adjust_fn_spec(fn, named)
262 def __contains__(self, item: Any) -> bool:
263 raise NotImplementedError()
265 def __len__(self) -> int:
266 raise NotImplementedError()
268 def __iter__(self) -> Iterator[_ListenerFnType]:
269 raise NotImplementedError()
271 def __bool__(self) -> bool:
272 raise NotImplementedError()
274 def exec_once(self, *args: Any, **kw: Any) -> None:
275 raise NotImplementedError()
277 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
278 raise NotImplementedError()
280 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
281 raise NotImplementedError()
283 def __call__(self, *args: Any, **kw: Any) -> None:
284 raise NotImplementedError()
286 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
287 raise NotImplementedError()
289 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
290 raise NotImplementedError()
292 def remove(self, event_key: _EventKey[_ET]) -> None:
293 raise NotImplementedError()
295 def for_modify(
296 self, obj: _DispatchCommon[_ET]
297 ) -> _InstanceLevelDispatch[_ET]:
298 """Return an event collection which can be modified.
300 For _ClsLevelDispatch at the class level of
301 a dispatcher, this returns self.
303 """
304 return self
307class _EmptyListener(_InstanceLevelDispatch[_ET]):
308 """Serves as a proxy interface to the events
309 served by a _ClsLevelDispatch, when there are no
310 instance-level events present.
312 Is replaced by _ListenerCollection when instance-level
313 events are added.
315 """
317 __slots__ = "parent", "parent_listeners", "name"
319 propagate: FrozenSet[_ListenerFnType] = frozenset()
320 listeners: Tuple[()] = ()
321 parent: _ClsLevelDispatch[_ET]
322 parent_listeners: _ListenerFnSequenceType[_ListenerFnType]
323 name: str
325 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
326 if target_cls not in parent._clslevel:
327 parent.update_subclass(target_cls)
328 self.parent = parent
329 self.parent_listeners = parent._clslevel[target_cls]
330 self.name = parent.name
332 def for_modify(
333 self, obj: _DispatchCommon[_ET]
334 ) -> _ListenerCollection[_ET]:
335 """Return an event collection which can be modified.
337 For _EmptyListener at the instance level of
338 a dispatcher, this generates a new
339 _ListenerCollection, applies it to the instance,
340 and returns it.
342 """
343 obj = cast("_Dispatch[_ET]", obj)
345 assert obj._instance_cls is not None
346 result = _ListenerCollection(self.parent, obj._instance_cls)
347 if getattr(obj, self.name) is self:
348 setattr(obj, self.name, result)
349 else:
350 assert isinstance(getattr(obj, self.name), _JoinedListener)
351 return result
353 def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn:
354 raise NotImplementedError("need to call for_modify()")
356 def exec_once(self, *args: Any, **kw: Any) -> NoReturn:
357 self._needs_modify(*args, **kw)
359 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn:
360 self._needs_modify(*args, **kw)
362 def insert(self, *args: Any, **kw: Any) -> NoReturn:
363 self._needs_modify(*args, **kw)
365 def append(self, *args: Any, **kw: Any) -> NoReturn:
366 self._needs_modify(*args, **kw)
368 def remove(self, *args: Any, **kw: Any) -> NoReturn:
369 self._needs_modify(*args, **kw)
371 def clear(self, *args: Any, **kw: Any) -> NoReturn:
372 self._needs_modify(*args, **kw)
374 def __call__(self, *args: Any, **kw: Any) -> None:
375 """Execute this event."""
377 for fn in self.parent_listeners:
378 fn(*args, **kw)
380 def __contains__(self, item: Any) -> bool:
381 return item in self.parent_listeners
383 def __len__(self) -> int:
384 return len(self.parent_listeners)
386 def __iter__(self) -> Iterator[_ListenerFnType]:
387 return iter(self.parent_listeners)
389 def __bool__(self) -> bool:
390 return bool(self.parent_listeners)
393class _MutexProtocol(Protocol):
394 def __enter__(self) -> bool: ...
396 def __exit__(
397 self,
398 exc_type: Optional[Type[BaseException]],
399 exc_val: Optional[BaseException],
400 exc_tb: Optional[TracebackType],
401 ) -> Optional[bool]: ...
404class _CompoundListener(_InstanceLevelDispatch[_ET]):
405 __slots__ = (
406 "_exec_once_mutex",
407 "_exec_once",
408 "_exec_w_sync_once",
409 "_is_asyncio",
410 )
412 _exec_once_mutex: _MutexProtocol
413 parent_listeners: Collection[_ListenerFnType]
414 listeners: Collection[_ListenerFnType]
415 _exec_once: bool
416 _exec_w_sync_once: bool
418 def __init__(self, *arg: Any, **kw: Any):
419 super().__init__(*arg, **kw)
420 self._is_asyncio = False
422 def _set_asyncio(self) -> None:
423 self._is_asyncio = True
425 def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol:
426 if self._is_asyncio:
427 return AsyncAdaptedLock()
428 else:
429 return threading.Lock()
431 def _exec_once_impl(
432 self, retry_on_exception: bool, *args: Any, **kw: Any
433 ) -> None:
434 with self._exec_once_mutex:
435 if not self._exec_once:
436 try:
437 self(*args, **kw)
438 exception = False
439 except:
440 exception = True
441 raise
442 finally:
443 if not exception or not retry_on_exception:
444 self._exec_once = True
446 def exec_once(self, *args: Any, **kw: Any) -> None:
447 """Execute this event, but only if it has not been
448 executed already for this collection."""
450 if not self._exec_once:
451 self._exec_once_impl(False, *args, **kw)
453 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None:
454 """Execute this event, but only if it has not been
455 executed already for this collection, or was called
456 by a previous exec_once_unless_exception call and
457 raised an exception.
459 If exec_once was already called, then this method will never run
460 the callable regardless of whether it raised or not.
462 .. versionadded:: 1.3.8
464 """
465 if not self._exec_once:
466 self._exec_once_impl(True, *args, **kw)
468 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None:
469 """Execute this event, and use a mutex if it has not been
470 executed already for this collection, or was called
471 by a previous _exec_w_sync_on_first_run call and
472 raised an exception.
474 If _exec_w_sync_on_first_run was already called and didn't raise an
475 exception, then a mutex is not used.
477 .. versionadded:: 1.4.11
479 """
480 if not self._exec_w_sync_once:
481 with self._exec_once_mutex:
482 try:
483 self(*args, **kw)
484 except:
485 raise
486 else:
487 self._exec_w_sync_once = True
488 else:
489 self(*args, **kw)
491 def __call__(self, *args: Any, **kw: Any) -> None:
492 """Execute this event."""
494 for fn in self.parent_listeners:
495 fn(*args, **kw)
496 for fn in self.listeners:
497 fn(*args, **kw)
499 def __contains__(self, item: Any) -> bool:
500 return item in self.parent_listeners or item in self.listeners
502 def __len__(self) -> int:
503 return len(self.parent_listeners) + len(self.listeners)
505 def __iter__(self) -> Iterator[_ListenerFnType]:
506 return chain(self.parent_listeners, self.listeners)
508 def __bool__(self) -> bool:
509 return bool(self.listeners or self.parent_listeners)
512class _ListenerCollection(_CompoundListener[_ET]):
513 """Instance-level attributes on instances of :class:`._Dispatch`.
515 Represents a collection of listeners.
517 As of 0.7.9, _ListenerCollection is only first
518 created via the _EmptyListener.for_modify() method.
520 """
522 __slots__ = (
523 "parent_listeners",
524 "parent",
525 "name",
526 "listeners",
527 "propagate",
528 "__weakref__",
529 )
531 parent_listeners: Collection[_ListenerFnType]
532 parent: _ClsLevelDispatch[_ET]
533 name: str
534 listeners: Deque[_ListenerFnType]
535 propagate: Set[_ListenerFnType]
537 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
538 super().__init__()
539 if target_cls not in parent._clslevel:
540 parent.update_subclass(target_cls)
541 self._exec_once = False
542 self._exec_w_sync_once = False
543 self.parent_listeners = parent._clslevel[target_cls]
544 self.parent = parent
545 self.name = parent.name
546 self.listeners = collections.deque()
547 self.propagate = set()
549 def for_modify(
550 self, obj: _DispatchCommon[_ET]
551 ) -> _ListenerCollection[_ET]:
552 """Return an event collection which can be modified.
554 For _ListenerCollection at the instance level of
555 a dispatcher, this returns self.
557 """
558 return self
560 def _update(
561 self, other: _ListenerCollection[_ET], only_propagate: bool = True
562 ) -> None:
563 """Populate from the listeners in another :class:`_Dispatch`
564 object."""
565 existing_listeners = self.listeners
566 existing_listener_set = set(existing_listeners)
567 self.propagate.update(other.propagate)
568 other_listeners = [
569 l
570 for l in other.listeners
571 if l not in existing_listener_set
572 and not only_propagate
573 or l in self.propagate
574 ]
576 existing_listeners.extend(other_listeners)
578 if other._is_asyncio:
579 self._set_asyncio()
581 to_associate = other.propagate.union(other_listeners)
582 registry._stored_in_collection_multi(self, other, to_associate)
584 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
585 if event_key.prepend_to_list(self, self.listeners):
586 if propagate:
587 self.propagate.add(event_key._listen_fn)
589 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
590 if event_key.append_to_list(self, self.listeners):
591 if propagate:
592 self.propagate.add(event_key._listen_fn)
594 def remove(self, event_key: _EventKey[_ET]) -> None:
595 self.listeners.remove(event_key._listen_fn)
596 self.propagate.discard(event_key._listen_fn)
597 registry._removed_from_collection(event_key, self)
599 def clear(self) -> None:
600 registry._clear(self, self.listeners)
601 self.propagate.clear()
602 self.listeners.clear()
605class _JoinedListener(_CompoundListener[_ET]):
606 __slots__ = "parent_dispatch", "name", "local", "parent_listeners"
608 parent_dispatch: _DispatchCommon[_ET]
609 name: str
610 local: _InstanceLevelDispatch[_ET]
611 parent_listeners: Collection[_ListenerFnType]
613 def __init__(
614 self,
615 parent_dispatch: _DispatchCommon[_ET],
616 name: str,
617 local: _EmptyListener[_ET],
618 ):
619 self._exec_once = False
620 self.parent_dispatch = parent_dispatch
621 self.name = name
622 self.local = local
623 self.parent_listeners = self.local
625 if not typing.TYPE_CHECKING:
626 # first error, I don't really understand:
627 # Signature of "listeners" incompatible with
628 # supertype "_CompoundListener" [override]
629 # the name / return type are exactly the same
630 # second error is getattr_isn't typed, the cast() here
631 # adds too much method overhead
632 @property
633 def listeners(self) -> Collection[_ListenerFnType]:
634 return getattr(self.parent_dispatch, self.name)
636 def _adjust_fn_spec(
637 self, fn: _ListenerFnType, named: bool
638 ) -> _ListenerFnType:
639 return self.local._adjust_fn_spec(fn, named)
641 def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]:
642 self.local = self.parent_listeners = self.local.for_modify(obj)
643 return self
645 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None:
646 self.local.insert(event_key, propagate)
648 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None:
649 self.local.append(event_key, propagate)
651 def remove(self, event_key: _EventKey[_ET]) -> None:
652 self.local.remove(event_key)
654 def clear(self) -> None:
655 raise NotImplementedError()