1# event/registry.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Provides managed registration services on behalf of :func:`.listen` 
    9arguments. 
    10 
    11By "managed registration", we mean that event listening functions and 
    12other objects can be added to various collections in such a way that their 
    13membership in all those collections can be revoked at once, based on 
    14an equivalent :class:`._EventKey`. 
    15 
    16""" 
    17from __future__ import annotations 
    18 
    19import collections 
    20import types 
    21import typing 
    22from typing import Any 
    23from typing import Callable 
    24from typing import cast 
    25from typing import Deque 
    26from typing import Dict 
    27from typing import Generic 
    28from typing import Iterable 
    29from typing import Optional 
    30from typing import Tuple 
    31from typing import TypeVar 
    32from typing import Union 
    33import weakref 
    34 
    35from .. import exc 
    36from .. import util 
    37 
    38if typing.TYPE_CHECKING: 
    39    from .attr import RefCollection 
    40    from .base import dispatcher 
    41 
    42_ListenerFnType = Callable[..., Any] 
    43_ListenerFnKeyType = Union[int, Tuple[int, int]] 
    44_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType] 
    45 
    46 
    47_ET = TypeVar("_ET", bound="EventTarget") 
    48 
    49 
    50class EventTarget: 
    51    """represents an event target, that is, something we can listen on 
    52    either with that target as a class or as an instance. 
    53 
    54    Examples include:  Connection, Mapper, Table, Session, 
    55    InstrumentedAttribute, Engine, Pool, Dialect. 
    56 
    57    """ 
    58 
    59    __slots__ = () 
    60 
    61    dispatch: dispatcher[Any] 
    62 
    63 
    64_RefCollectionToListenerType = Dict[ 
    65    "weakref.ref[RefCollection[Any]]", 
    66    "weakref.ref[_ListenerFnType]", 
    67] 
    68 
    69_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = ( 
    70    collections.defaultdict(dict) 
    71) 
    72""" 
    73Given an original listen() argument, can locate all 
    74listener collections and the listener fn contained 
    75 
    76(target, identifier, fn) -> { 
    77                            ref(listenercollection) -> ref(listener_fn) 
    78                            ref(listenercollection) -> ref(listener_fn) 
    79                            ref(listenercollection) -> ref(listener_fn) 
    80                        } 
    81""" 
    82 
    83_ListenerToEventKeyType = Dict[ 
    84    "weakref.ref[_ListenerFnType]", 
    85    _EventKeyTupleType, 
    86] 
    87_collection_to_key: Dict[ 
    88    weakref.ref[RefCollection[Any]], 
    89    _ListenerToEventKeyType, 
    90] = collections.defaultdict(dict) 
    91""" 
    92Given a _ListenerCollection or _ClsLevelListener, can locate 
    93all the original listen() arguments and the listener fn contained 
    94 
    95ref(listenercollection) -> { 
    96                            ref(listener_fn) -> (target, identifier, fn), 
    97                            ref(listener_fn) -> (target, identifier, fn), 
    98                            ref(listener_fn) -> (target, identifier, fn), 
    99                        } 
    100""" 
    101 
    102 
    103def _collection_gced(ref: weakref.ref[Any]) -> None: 
    104    # defaultdict, so can't get a KeyError 
    105    if not _collection_to_key or ref not in _collection_to_key: 
    106        return 
    107 
    108    ref = cast("weakref.ref[RefCollection[EventTarget]]", ref) 
    109 
    110    listener_to_key = _collection_to_key.pop(ref) 
    111    for key in listener_to_key.values(): 
    112        if key in _key_to_collection: 
    113            # defaultdict, so can't get a KeyError 
    114            dispatch_reg = _key_to_collection[key] 
    115            dispatch_reg.pop(ref) 
    116            if not dispatch_reg: 
    117                _key_to_collection.pop(key) 
    118 
    119 
    120def _stored_in_collection( 
    121    event_key: _EventKey[_ET], owner: RefCollection[_ET] 
    122) -> bool: 
    123    key = event_key._key 
    124 
    125    dispatch_reg = _key_to_collection[key] 
    126 
    127    owner_ref = owner.ref 
    128    listen_ref = weakref.ref(event_key._listen_fn) 
    129 
    130    if owner_ref in dispatch_reg: 
    131        return False 
    132 
    133    dispatch_reg[owner_ref] = listen_ref 
    134 
    135    listener_to_key = _collection_to_key[owner_ref] 
    136    listener_to_key[listen_ref] = key 
    137 
    138    return True 
    139 
    140 
    141def _removed_from_collection( 
    142    event_key: _EventKey[_ET], owner: RefCollection[_ET] 
    143) -> None: 
    144    key = event_key._key 
    145 
    146    dispatch_reg = _key_to_collection[key] 
    147 
    148    listen_ref = weakref.ref(event_key._listen_fn) 
    149 
    150    owner_ref = owner.ref 
    151    dispatch_reg.pop(owner_ref, None) 
    152    if not dispatch_reg: 
    153        del _key_to_collection[key] 
    154 
    155    if owner_ref in _collection_to_key: 
    156        listener_to_key = _collection_to_key[owner_ref] 
    157        # see #12216 - this guards against a removal that already occurred 
    158        # here. however, I cannot come up with a test that shows any negative 
    159        # side effects occurring from this removal happening, even though an 
    160        # event key may still be referenced from a clsleveldispatch here 
    161        listener_to_key.pop(listen_ref, None) 
    162 
    163 
    164def _stored_in_collection_multi( 
    165    newowner: RefCollection[_ET], 
    166    oldowner: RefCollection[_ET], 
    167    elements: Iterable[_ListenerFnType], 
    168) -> None: 
    169    if not elements: 
    170        return 
    171 
    172    oldowner_ref = oldowner.ref 
    173    newowner_ref = newowner.ref 
    174 
    175    old_listener_to_key = _collection_to_key[oldowner_ref] 
    176    new_listener_to_key = _collection_to_key[newowner_ref] 
    177 
    178    for listen_fn in elements: 
    179        listen_ref = weakref.ref(listen_fn) 
    180        try: 
    181            key = old_listener_to_key[listen_ref] 
    182        except KeyError: 
    183            # can occur during interpreter shutdown. 
    184            # see #6740 
    185            continue 
    186 
    187        try: 
    188            dispatch_reg = _key_to_collection[key] 
    189        except KeyError: 
    190            continue 
    191 
    192        if newowner_ref in dispatch_reg: 
    193            assert dispatch_reg[newowner_ref] == listen_ref 
    194        else: 
    195            dispatch_reg[newowner_ref] = listen_ref 
    196 
    197        new_listener_to_key[listen_ref] = key 
    198 
    199 
    200def _clear( 
    201    owner: RefCollection[_ET], 
    202    elements: Iterable[_ListenerFnType], 
    203) -> None: 
    204    if not elements: 
    205        return 
    206 
    207    owner_ref = owner.ref 
    208    listener_to_key = _collection_to_key[owner_ref] 
    209    for listen_fn in elements: 
    210        listen_ref = weakref.ref(listen_fn) 
    211        key = listener_to_key[listen_ref] 
    212        dispatch_reg = _key_to_collection[key] 
    213        dispatch_reg.pop(owner_ref, None) 
    214 
    215        if not dispatch_reg: 
    216            del _key_to_collection[key] 
    217 
    218 
    219class _EventKey(Generic[_ET]): 
    220    """Represent :func:`.listen` arguments.""" 
    221 
    222    __slots__ = ( 
    223        "target", 
    224        "identifier", 
    225        "fn", 
    226        "fn_key", 
    227        "fn_wrap", 
    228        "dispatch_target", 
    229    ) 
    230 
    231    target: _ET 
    232    identifier: str 
    233    fn: _ListenerFnType 
    234    fn_key: _ListenerFnKeyType 
    235    dispatch_target: Any 
    236    _fn_wrap: Optional[_ListenerFnType] 
    237 
    238    def __init__( 
    239        self, 
    240        target: _ET, 
    241        identifier: str, 
    242        fn: _ListenerFnType, 
    243        dispatch_target: Any, 
    244        _fn_wrap: Optional[_ListenerFnType] = None, 
    245    ): 
    246        self.target = target 
    247        self.identifier = identifier 
    248        self.fn = fn 
    249        if isinstance(fn, types.MethodType): 
    250            self.fn_key = id(fn.__func__), id(fn.__self__) 
    251        else: 
    252            self.fn_key = id(fn) 
    253        self.fn_wrap = _fn_wrap 
    254        self.dispatch_target = dispatch_target 
    255 
    256    @property 
    257    def _key(self) -> _EventKeyTupleType: 
    258        return (id(self.target), self.identifier, self.fn_key) 
    259 
    260    def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]: 
    261        if fn_wrap is self._listen_fn: 
    262            return self 
    263        else: 
    264            return _EventKey( 
    265                self.target, 
    266                self.identifier, 
    267                self.fn, 
    268                self.dispatch_target, 
    269                _fn_wrap=fn_wrap, 
    270            ) 
    271 
    272    def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]: 
    273        if dispatch_target is self.dispatch_target: 
    274            return self 
    275        else: 
    276            return _EventKey( 
    277                self.target, 
    278                self.identifier, 
    279                self.fn, 
    280                dispatch_target, 
    281                _fn_wrap=self.fn_wrap, 
    282            ) 
    283 
    284    def listen(self, *args: Any, **kw: Any) -> None: 
    285        once = kw.pop("once", False) 
    286        once_unless_exception = kw.pop("_once_unless_exception", False) 
    287        named = kw.pop("named", False) 
    288 
    289        target, identifier, fn = ( 
    290            self.dispatch_target, 
    291            self.identifier, 
    292            self._listen_fn, 
    293        ) 
    294 
    295        dispatch_collection = getattr(target.dispatch, identifier) 
    296 
    297        adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named) 
    298 
    299        self = self.with_wrapper(adjusted_fn) 
    300 
    301        stub_function = getattr( 
    302            self.dispatch_target.dispatch._events, self.identifier 
    303        ) 
    304        if hasattr(stub_function, "_sa_warn"): 
    305            stub_function._sa_warn() 
    306 
    307        if once or once_unless_exception: 
    308            self.with_wrapper( 
    309                util.only_once( 
    310                    self._listen_fn, retry_on_exception=once_unless_exception 
    311                ) 
    312            ).listen(*args, **kw) 
    313        else: 
    314            self.dispatch_target.dispatch._listen(self, *args, **kw) 
    315 
    316    def remove(self) -> None: 
    317        key = self._key 
    318 
    319        if key not in _key_to_collection: 
    320            raise exc.InvalidRequestError( 
    321                "No listeners found for event %s / %r / %s " 
    322                % (self.target, self.identifier, self.fn) 
    323            ) 
    324 
    325        dispatch_reg = _key_to_collection.pop(key) 
    326 
    327        for collection_ref, listener_ref in dispatch_reg.items(): 
    328            collection = collection_ref() 
    329            listener_fn = listener_ref() 
    330            if collection is not None and listener_fn is not None: 
    331                collection.remove(self.with_wrapper(listener_fn)) 
    332 
    333    def contains(self) -> bool: 
    334        """Return True if this event key is registered to listen.""" 
    335        return self._key in _key_to_collection 
    336 
    337    def base_listen( 
    338        self, 
    339        propagate: bool = False, 
    340        insert: bool = False, 
    341        named: bool = False, 
    342        retval: Optional[bool] = None, 
    343        asyncio: bool = False, 
    344    ) -> None: 
    345        target, identifier = self.dispatch_target, self.identifier 
    346 
    347        dispatch_collection = getattr(target.dispatch, identifier) 
    348 
    349        for_modify = dispatch_collection.for_modify(target.dispatch) 
    350        if asyncio: 
    351            for_modify._set_asyncio() 
    352 
    353        if insert: 
    354            for_modify.insert(self, propagate) 
    355        else: 
    356            for_modify.append(self, propagate) 
    357 
    358    @property 
    359    def _listen_fn(self) -> _ListenerFnType: 
    360        return self.fn_wrap or self.fn 
    361 
    362    def append_to_list( 
    363        self, 
    364        owner: RefCollection[_ET], 
    365        list_: Deque[_ListenerFnType], 
    366    ) -> bool: 
    367        if _stored_in_collection(self, owner): 
    368            list_.append(self._listen_fn) 
    369            return True 
    370        else: 
    371            return False 
    372 
    373    def remove_from_list( 
    374        self, 
    375        owner: RefCollection[_ET], 
    376        list_: Deque[_ListenerFnType], 
    377    ) -> None: 
    378        _removed_from_collection(self, owner) 
    379        list_.remove(self._listen_fn) 
    380 
    381    def prepend_to_list( 
    382        self, 
    383        owner: RefCollection[_ET], 
    384        list_: Deque[_ListenerFnType], 
    385    ) -> bool: 
    386        if _stored_in_collection(self, owner): 
    387            list_.appendleft(self._listen_fn) 
    388            return True 
    389        else: 
    390            return False