1# event/base.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Base implementation classes. 
    9 
    10The public-facing ``Events`` serves as the base class for an event interface; 
    11its public attributes represent different kinds of events.   These attributes 
    12are mirrored onto a ``_Dispatch`` class, which serves as a container for 
    13collections of listener functions.   These collections are represented both 
    14at the class level of a particular ``_Dispatch`` class as well as within 
    15instances of ``_Dispatch``. 
    16 
    17""" 
    18from __future__ import annotations 
    19 
    20import typing 
    21from typing import Any 
    22from typing import cast 
    23from typing import Dict 
    24from typing import Generic 
    25from typing import Iterator 
    26from typing import List 
    27from typing import Literal 
    28from typing import Mapping 
    29from typing import MutableMapping 
    30from typing import Optional 
    31from typing import overload 
    32from typing import Tuple 
    33from typing import Type 
    34from typing import Union 
    35import weakref 
    36 
    37from .attr import _ClsLevelDispatch 
    38from .attr import _EmptyListener 
    39from .attr import _InstanceLevelDispatch 
    40from .attr import _JoinedListener 
    41from .registry import _ET 
    42from .registry import _EventKey 
    43from .. import util 
    44 
    45_registrars: MutableMapping[str, List[Type[_HasEventsDispatch[Any]]]] = ( 
    46    util.defaultdict(list) 
    47) 
    48 
    49 
    50def _is_event_name(name: str) -> bool: 
    51    # _sa_event prefix is special to support internal-only event names. 
    52    # most event names are just plain method names that aren't 
    53    # underscored. 
    54 
    55    return ( 
    56        not name.startswith("_") and name != "dispatch" 
    57    ) or name.startswith("_sa_event") 
    58 
    59 
    60class _UnpickleDispatch: 
    61    """Serializable callable that re-generates an instance of 
    62    :class:`_Dispatch` given a particular :class:`.Events` subclass. 
    63 
    64    """ 
    65 
    66    def __call__(self, _instance_cls: Type[_ET]) -> _Dispatch[_ET]: 
    67        for cls in _instance_cls.__mro__: 
    68            if "dispatch" in cls.__dict__: 
    69                return cast( 
    70                    "_Dispatch[_ET]", cls.__dict__["dispatch"].dispatch 
    71                )._for_class(_instance_cls) 
    72        else: 
    73            raise AttributeError("No class with a 'dispatch' member present.") 
    74 
    75 
    76class _DispatchCommon(Generic[_ET]): 
    77    __slots__ = () 
    78 
    79    _instance_cls: Optional[Type[_ET]] 
    80 
    81    def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 
    82        raise NotImplementedError() 
    83 
    84    def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 
    85        raise NotImplementedError() 
    86 
    87    @property 
    88    def _events(self) -> Type[_HasEventsDispatch[_ET]]: 
    89        raise NotImplementedError() 
    90 
    91 
    92class _Dispatch(_DispatchCommon[_ET]): 
    93    """Mirror the event listening definitions of an Events class with 
    94    listener collections. 
    95 
    96    Classes which define a "dispatch" member will return a 
    97    non-instantiated :class:`._Dispatch` subclass when the member 
    98    is accessed at the class level.  When the "dispatch" member is 
    99    accessed at the instance level of its owner, an instance 
    100    of the :class:`._Dispatch` class is returned. 
    101 
    102    A :class:`._Dispatch` class is generated for each :class:`.Events` 
    103    class defined, by the :meth:`._HasEventsDispatch._create_dispatcher_class` 
    104    method.  The original :class:`.Events` classes remain untouched. 
    105    This decouples the construction of :class:`.Events` subclasses from 
    106    the implementation used by the event internals, and allows 
    107    inspecting tools like Sphinx to work in an unsurprising 
    108    way against the public API. 
    109 
    110    """ 
    111 
    112    # "active_history" is an ORM case we add here.   ideally a better 
    113    # system would be in place for ad-hoc attributes. 
    114    __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners" 
    115 
    116    _active_history: bool 
    117 
    118    _empty_listener_reg: MutableMapping[ 
    119        Type[_ET], Dict[str, _EmptyListener[_ET]] 
    120    ] = weakref.WeakKeyDictionary() 
    121 
    122    _empty_listeners: Dict[str, _EmptyListener[_ET]] 
    123 
    124    _event_names: List[str] 
    125 
    126    _instance_cls: Optional[Type[_ET]] 
    127 
    128    _joined_dispatch_cls: Type[_JoinedDispatcher[_ET]] 
    129 
    130    _events: Type[_HasEventsDispatch[_ET]] 
    131    """reference back to the Events class. 
    132 
    133    Bidirectional against _HasEventsDispatch.dispatch 
    134 
    135    """ 
    136 
    137    def __init__( 
    138        self, 
    139        parent: Optional[_Dispatch[_ET]], 
    140        instance_cls: Optional[Type[_ET]] = None, 
    141    ): 
    142        self._parent = parent 
    143        self._instance_cls = instance_cls 
    144 
    145        if instance_cls: 
    146            assert parent is not None 
    147            try: 
    148                self._empty_listeners = self._empty_listener_reg[instance_cls] 
    149            except KeyError: 
    150                self._empty_listeners = self._empty_listener_reg[ 
    151                    instance_cls 
    152                ] = { 
    153                    ls.name: _EmptyListener(ls, instance_cls) 
    154                    for ls in parent._event_descriptors 
    155                } 
    156        else: 
    157            self._empty_listeners = {} 
    158 
    159    def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 
    160        # Assign EmptyListeners as attributes on demand 
    161        # to reduce startup time for new dispatch objects. 
    162        try: 
    163            ls = self._empty_listeners[name] 
    164        except KeyError: 
    165            raise AttributeError(name) 
    166        else: 
    167            setattr(self, ls.name, ls) 
    168            return ls 
    169 
    170    @property 
    171    def _event_descriptors(self) -> Iterator[_ClsLevelDispatch[_ET]]: 
    172        for k in self._event_names: 
    173            # Yield _ClsLevelDispatch related 
    174            # to relevant event name. 
    175            yield getattr(self, k) 
    176 
    177    def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 
    178        return self._events._listen(event_key, **kw) 
    179 
    180    def _for_class(self, instance_cls: Type[_ET]) -> _Dispatch[_ET]: 
    181        return self.__class__(self, instance_cls) 
    182 
    183    def _for_instance(self, instance: _ET) -> _Dispatch[_ET]: 
    184        instance_cls = instance.__class__ 
    185        return self._for_class(instance_cls) 
    186 
    187    def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 
    188        """Create a 'join' of this :class:`._Dispatch` and another. 
    189 
    190        This new dispatcher will dispatch events to both 
    191        :class:`._Dispatch` objects. 
    192 
    193        """ 
    194        assert "_joined_dispatch_cls" in self.__class__.__dict__ 
    195 
    196        return self._joined_dispatch_cls(self, other) 
    197 
    198    def __reduce__(self) -> Union[str, Tuple[Any, ...]]: 
    199        return _UnpickleDispatch(), (self._instance_cls,) 
    200 
    201    def _update( 
    202        self, other: _Dispatch[_ET], only_propagate: bool = True 
    203    ) -> None: 
    204        """Populate from the listeners in another :class:`_Dispatch` 
    205        object.""" 
    206        for ls in other._event_descriptors: 
    207            if isinstance(ls, _EmptyListener): 
    208                continue 
    209            getattr(self, ls.name).for_modify(self)._update( 
    210                ls, only_propagate=only_propagate 
    211            ) 
    212 
    213    def _clear(self) -> None: 
    214        for ls in self._event_descriptors: 
    215            ls.for_modify(self).clear() 
    216 
    217 
    218def _remove_dispatcher(cls: Type[_HasEventsDispatch[_ET]]) -> None: 
    219    for k in cls.dispatch._event_names: 
    220        _registrars[k].remove(cls) 
    221        if not _registrars[k]: 
    222            del _registrars[k] 
    223 
    224 
    225class _HasEventsDispatch(Generic[_ET]): 
    226    _dispatch_target: Optional[Type[_ET]] 
    227    """class which will receive the .dispatch collection""" 
    228 
    229    dispatch: _Dispatch[_ET] 
    230    """reference back to the _Dispatch class. 
    231 
    232    Bidirectional against _Dispatch._events 
    233 
    234    """ 
    235 
    236    if typing.TYPE_CHECKING: 
    237 
    238        def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: ... 
    239 
    240    def __init_subclass__(cls) -> None: 
    241        """Intercept new Event subclasses and create associated _Dispatch 
    242        classes.""" 
    243 
    244        cls._create_dispatcher_class(cls.__name__, cls.__bases__, cls.__dict__) 
    245 
    246    @classmethod 
    247    def _accept_with( 
    248        cls, target: Union[_ET, Type[_ET]], identifier: str 
    249    ) -> Optional[Union[_ET, Type[_ET]]]: 
    250        raise NotImplementedError() 
    251 
    252    @classmethod 
    253    def _listen( 
    254        cls, 
    255        event_key: _EventKey[_ET], 
    256        *, 
    257        propagate: bool = False, 
    258        insert: bool = False, 
    259        named: bool = False, 
    260        asyncio: bool = False, 
    261    ) -> None: 
    262        raise NotImplementedError() 
    263 
    264    @staticmethod 
    265    def _set_dispatch( 
    266        klass: Type[_HasEventsDispatch[_ET]], 
    267        dispatch_cls: Type[_Dispatch[_ET]], 
    268    ) -> _Dispatch[_ET]: 
    269        # This allows an Events subclass to define additional utility 
    270        # methods made available to the target via 
    271        # "self.dispatch._events.<utilitymethod>" 
    272        # @staticmethod to allow easy "super" calls while in a metaclass 
    273        # constructor. 
    274        klass.dispatch = dispatch_cls(None) 
    275        dispatch_cls._events = klass 
    276        return klass.dispatch 
    277 
    278    @classmethod 
    279    def _create_dispatcher_class( 
    280        cls, classname: str, bases: Tuple[type, ...], dict_: Mapping[str, Any] 
    281    ) -> None: 
    282        """Create a :class:`._Dispatch` class corresponding to an 
    283        :class:`.Events` class.""" 
    284 
    285        # there's all kinds of ways to do this, 
    286        # i.e. make a Dispatch class that shares the '_listen' method 
    287        # of the Event class, this is the straight monkeypatch. 
    288        if hasattr(cls, "dispatch"): 
    289            dispatch_base = cls.dispatch.__class__ 
    290        else: 
    291            dispatch_base = _Dispatch 
    292 
    293        event_names = [k for k in dict_ if _is_event_name(k)] 
    294        dispatch_cls = cast( 
    295            "Type[_Dispatch[_ET]]", 
    296            type( 
    297                "%sDispatch" % classname, 
    298                (dispatch_base,), 
    299                {"__slots__": event_names}, 
    300            ), 
    301        ) 
    302 
    303        dispatch_cls._event_names = event_names 
    304        dispatch_inst = cls._set_dispatch(cls, dispatch_cls) 
    305        for k in dispatch_cls._event_names: 
    306            setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k])) 
    307            _registrars[k].append(cls) 
    308 
    309        for super_ in dispatch_cls.__bases__: 
    310            if issubclass(super_, _Dispatch) and super_ is not _Dispatch: 
    311                for ls in super_._events.dispatch._event_descriptors: 
    312                    setattr(dispatch_inst, ls.name, ls) 
    313                    dispatch_cls._event_names.append(ls.name) 
    314 
    315        if getattr(cls, "_dispatch_target", None): 
    316            dispatch_target_cls = cls._dispatch_target 
    317            assert dispatch_target_cls is not None 
    318            if ( 
    319                hasattr(dispatch_target_cls, "__slots__") 
    320                and "_slots_dispatch" in dispatch_target_cls.__slots__ 
    321            ): 
    322                dispatch_target_cls.dispatch = slots_dispatcher(cls) 
    323            else: 
    324                dispatch_target_cls.dispatch = dispatcher(cls) 
    325 
    326        klass = type( 
    327            "Joined%s" % dispatch_cls.__name__, 
    328            (_JoinedDispatcher,), 
    329            {"__slots__": event_names}, 
    330        ) 
    331        dispatch_cls._joined_dispatch_cls = klass 
    332 
    333        # establish pickle capability by adding it to this module 
    334        globals()[klass.__name__] = klass 
    335 
    336 
    337class _JoinedDispatcher(_DispatchCommon[_ET]): 
    338    """Represent a connection between two _Dispatch objects.""" 
    339 
    340    __slots__ = "local", "parent", "_instance_cls" 
    341 
    342    local: _DispatchCommon[_ET] 
    343    parent: _DispatchCommon[_ET] 
    344    _instance_cls: Optional[Type[_ET]] 
    345 
    346    def __init__( 
    347        self, local: _DispatchCommon[_ET], parent: _DispatchCommon[_ET] 
    348    ): 
    349        self.local = local 
    350        self.parent = parent 
    351        self._instance_cls = self.local._instance_cls 
    352 
    353    def __reduce__(self) -> Any: 
    354        return (self.__class__, (self.local, self.parent)) 
    355 
    356    def __getattr__(self, name: str) -> _JoinedListener[_ET]: 
    357        # Assign _JoinedListeners as attributes on demand 
    358        # to reduce startup time for new dispatch objects. 
    359        ls = getattr(self.local, name) 
    360        jl = _JoinedListener(self.parent, ls.name, ls) 
    361        setattr(self, ls.name, jl) 
    362        return jl 
    363 
    364    def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 
    365        return self.parent._listen(event_key, **kw) 
    366 
    367    @property 
    368    def _events(self) -> Type[_HasEventsDispatch[_ET]]: 
    369        return self.parent._events 
    370 
    371 
    372class Events(_HasEventsDispatch[_ET]): 
    373    """Define event listening functions for a particular target type.""" 
    374 
    375    @classmethod 
    376    def _accept_with( 
    377        cls, target: Union[_ET, Type[_ET]], identifier: str 
    378    ) -> Optional[Union[_ET, Type[_ET]]]: 
    379        def dispatch_is(*types: Type[Any]) -> bool: 
    380            return all(isinstance(target.dispatch, t) for t in types) 
    381 
    382        def dispatch_parent_is(t: Type[Any]) -> bool: 
    383            parent = cast("_JoinedDispatcher[_ET]", target.dispatch).parent 
    384            while isinstance(parent, _JoinedDispatcher): 
    385                parent = cast("_JoinedDispatcher[_ET]", parent).parent 
    386 
    387            return isinstance(parent, t) 
    388 
    389        # Mapper, ClassManager, Session override this to 
    390        # also accept classes, scoped_sessions, sessionmakers, etc. 
    391        if hasattr(target, "dispatch"): 
    392            if ( 
    393                dispatch_is(cls.dispatch.__class__) 
    394                or dispatch_is(type, cls.dispatch.__class__) 
    395                or ( 
    396                    dispatch_is(_JoinedDispatcher) 
    397                    and dispatch_parent_is(cls.dispatch.__class__) 
    398                ) 
    399            ): 
    400                return target 
    401 
    402        return None 
    403 
    404    @classmethod 
    405    def _listen( 
    406        cls, 
    407        event_key: _EventKey[_ET], 
    408        *, 
    409        propagate: bool = False, 
    410        insert: bool = False, 
    411        named: bool = False, 
    412        asyncio: bool = False, 
    413    ) -> None: 
    414        event_key.base_listen( 
    415            propagate=propagate, insert=insert, named=named, asyncio=asyncio 
    416        ) 
    417 
    418    @classmethod 
    419    def _remove(cls, event_key: _EventKey[_ET]) -> None: 
    420        event_key.remove() 
    421 
    422    @classmethod 
    423    def _clear(cls) -> None: 
    424        cls.dispatch._clear() 
    425 
    426 
    427class dispatcher(Generic[_ET]): 
    428    """Descriptor used by target classes to 
    429    deliver the _Dispatch class at the class level 
    430    and produce new _Dispatch instances for target 
    431    instances. 
    432 
    433    """ 
    434 
    435    def __init__(self, events: Type[_HasEventsDispatch[_ET]]): 
    436        self.dispatch = events.dispatch 
    437        self.events = events 
    438 
    439    @overload 
    440    def __get__( 
    441        self, obj: Literal[None], cls: Type[Any] 
    442    ) -> Type[_Dispatch[_ET]]: ... 
    443 
    444    @overload 
    445    def __get__(self, obj: Any, cls: Type[Any]) -> _DispatchCommon[_ET]: ... 
    446 
    447    def __get__(self, obj: Any, cls: Type[Any]) -> Any: 
    448        if obj is None: 
    449            return self.dispatch 
    450 
    451        disp = self.dispatch._for_instance(obj) 
    452        try: 
    453            obj.__dict__["dispatch"] = disp 
    454        except AttributeError as ae: 
    455            raise TypeError( 
    456                "target %r doesn't have __dict__, should it be " 
    457                "defining _slots_dispatch?" % (obj,) 
    458            ) from ae 
    459        return disp 
    460 
    461 
    462class slots_dispatcher(dispatcher[_ET]): 
    463    def __get__(self, obj: Any, cls: Type[Any]) -> Any: 
    464        if obj is None: 
    465            return self.dispatch 
    466 
    467        if hasattr(obj, "_slots_dispatch"): 
    468            return obj._slots_dispatch 
    469 
    470        disp = self.dispatch._for_instance(obj) 
    471        obj._slots_dispatch = disp 
    472        return disp