1# event/legacy.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Routines to handle adaption of legacy call signatures, 
    9generation of deprecation notes and docstrings. 
    10 
    11""" 
    12from __future__ import annotations 
    13 
    14import typing 
    15from typing import Any 
    16from typing import Callable 
    17from typing import List 
    18from typing import Optional 
    19from typing import Tuple 
    20from typing import Type 
    21from typing import TypeVar 
    22 
    23from .registry import _ET 
    24from .registry import _ListenerFnType 
    25from .. import util 
    26from ..util.compat import FullArgSpec 
    27 
    28if typing.TYPE_CHECKING: 
    29    from .attr import _ClsLevelDispatch 
    30    from .base import _HasEventsDispatch 
    31 
    32 
    33_F = TypeVar("_F", bound=Callable[..., Any]) 
    34 
    35_LegacySignatureType = Tuple[str, List[str], Callable[..., Any]] 
    36 
    37 
    38def _legacy_signature( 
    39    since: str, 
    40    argnames: List[str], 
    41    converter: Optional[Callable[..., Any]] = None, 
    42) -> Callable[[_F], _F]: 
    43    """legacy sig decorator 
    44 
    45 
    46    :param since: string version for deprecation warning 
    47    :param argnames: list of strings, which is *all* arguments that the legacy 
    48     version accepted, including arguments that are still there 
    49    :param converter: lambda that will accept tuple of this full arg signature 
    50     and return tuple of new arg signature. 
    51 
    52    """ 
    53 
    54    def leg(fn: _F) -> _F: 
    55        if not hasattr(fn, "_legacy_signatures"): 
    56            fn._legacy_signatures = []  # type: ignore[attr-defined] 
    57        fn._legacy_signatures.append((since, argnames, converter))  # type: ignore[attr-defined] # noqa: E501 
    58        return fn 
    59 
    60    return leg 
    61 
    62 
    63def _omit_standard_example(fn: _F) -> _F: 
    64    fn._omit_standard_example = True  # type: ignore[attr-defined] 
    65    return fn 
    66 
    67 
    68def _wrap_fn_for_legacy( 
    69    dispatch_collection: _ClsLevelDispatch[_ET], 
    70    fn: _ListenerFnType, 
    71    argspec: FullArgSpec, 
    72) -> _ListenerFnType: 
    73    for since, argnames, conv in dispatch_collection.legacy_signatures: 
    74        if argnames[-1] == "**kw": 
    75            has_kw = True 
    76            argnames = argnames[0:-1] 
    77        else: 
    78            has_kw = False 
    79 
    80        if len(argnames) == len(argspec.args) and has_kw is bool( 
    81            argspec.varkw 
    82        ): 
    83            formatted_def = "def %s(%s%s)" % ( 
    84                dispatch_collection.name, 
    85                ", ".join(dispatch_collection.arg_names), 
    86                ", **kw" if has_kw else "", 
    87            ) 
    88            warning_txt = ( 
    89                'The argument signature for the "%s.%s" event listener ' 
    90                "has changed as of version %s, and conversion for " 
    91                "the old argument signature will be removed in a " 
    92                'future release.  The new signature is "%s"' 
    93                % ( 
    94                    dispatch_collection.clsname, 
    95                    dispatch_collection.name, 
    96                    since, 
    97                    formatted_def, 
    98                ) 
    99            ) 
    100 
    101            if conv is not None: 
    102                assert not has_kw 
    103 
    104                def wrap_leg(*args: Any, **kw: Any) -> Any: 
    105                    util.warn_deprecated(warning_txt, version=since) 
    106                    assert conv is not None 
    107                    return fn(*conv(*args)) 
    108 
    109            else: 
    110 
    111                def wrap_leg(*args: Any, **kw: Any) -> Any: 
    112                    util.warn_deprecated(warning_txt, version=since) 
    113                    argdict = dict(zip(dispatch_collection.arg_names, args)) 
    114                    args_from_dict = [argdict[name] for name in argnames] 
    115                    if has_kw: 
    116                        return fn(*args_from_dict, **kw) 
    117                    else: 
    118                        return fn(*args_from_dict) 
    119 
    120            return wrap_leg 
    121    else: 
    122        return fn 
    123 
    124 
    125def _indent(text: str, indent: str) -> str: 
    126    return "\n".join(indent + line for line in text.split("\n")) 
    127 
    128 
    129def _standard_listen_example( 
    130    dispatch_collection: _ClsLevelDispatch[_ET], 
    131    sample_target: Any, 
    132    fn: _ListenerFnType, 
    133) -> str: 
    134    example_kw_arg = _indent( 
    135        "\n".join( 
    136            "%(arg)s = kw['%(arg)s']" % {"arg": arg} 
    137            for arg in dispatch_collection.arg_names[0:2] 
    138        ), 
    139        "    ", 
    140    ) 
    141    if dispatch_collection.legacy_signatures: 
    142        current_since = max( 
    143            since 
    144            for since, args, conv in dispatch_collection.legacy_signatures 
    145        ) 
    146    else: 
    147        current_since = None 
    148    text = ( 
    149        "from sqlalchemy import event\n\n\n" 
    150        "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 
    151        "def receive_%(event_name)s(" 
    152        "%(named_event_arguments)s%(has_kw_arguments)s):\n" 
    153        "    \"listen for the '%(event_name)s' event\"\n" 
    154        "\n    # ... (event handling logic) ...\n" 
    155    ) 
    156 
    157    text %= { 
    158        "current_since": ( 
    159            " (arguments as of %s)" % current_since if current_since else "" 
    160        ), 
    161        "event_name": fn.__name__, 
    162        "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 
    163        "named_event_arguments": ", ".join(dispatch_collection.arg_names), 
    164        "example_kw_arg": example_kw_arg, 
    165        "sample_target": sample_target, 
    166    } 
    167    return text 
    168 
    169 
    170def _legacy_listen_examples( 
    171    dispatch_collection: _ClsLevelDispatch[_ET], 
    172    sample_target: str, 
    173    fn: _ListenerFnType, 
    174) -> str: 
    175    text = "" 
    176    for since, args, conv in dispatch_collection.legacy_signatures: 
    177        text += ( 
    178            "\n# DEPRECATED calling style (pre-%(since)s, " 
    179            "will be removed in a future release)\n" 
    180            "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 
    181            "def receive_%(event_name)s(" 
    182            "%(named_event_arguments)s%(has_kw_arguments)s):\n" 
    183            "    \"listen for the '%(event_name)s' event\"\n" 
    184            "\n    # ... (event handling logic) ...\n" 
    185            % { 
    186                "since": since, 
    187                "event_name": fn.__name__, 
    188                "has_kw_arguments": ( 
    189                    " **kw" if dispatch_collection.has_kw else "" 
    190                ), 
    191                "named_event_arguments": ", ".join(args), 
    192                "sample_target": sample_target, 
    193            } 
    194        ) 
    195    return text 
    196 
    197 
    198def _version_signature_changes( 
    199    parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 
    200    dispatch_collection: _ClsLevelDispatch[_ET], 
    201) -> str: 
    202    since, args, conv = dispatch_collection.legacy_signatures[0] 
    203    return ( 
    204        "\n.. versionchanged:: %(since)s\n" 
    205        "    The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n" 
    206        "    arguments %(named_event_arguments)s%(has_kw_arguments)s.\n" 
    207        "    Support for listener functions which accept the previous \n" 
    208        '    argument signature(s) listed above as "deprecated" will be \n' 
    209        "    removed in a future release." 
    210        % { 
    211            "since": since, 
    212            "clsname": parent_dispatch_cls.__name__, 
    213            "event_name": dispatch_collection.name, 
    214            "named_event_arguments": ", ".join( 
    215                ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`" 
    216                % { 
    217                    "clsname": parent_dispatch_cls.__name__, 
    218                    "event_name": dispatch_collection.name, 
    219                    "param_name": param_name, 
    220                } 
    221                for param_name in dispatch_collection.arg_names 
    222            ), 
    223            "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 
    224        } 
    225    ) 
    226 
    227 
    228def _augment_fn_docs( 
    229    dispatch_collection: _ClsLevelDispatch[_ET], 
    230    parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 
    231    fn: _ListenerFnType, 
    232) -> str: 
    233    if getattr(fn, "_omit_standard_example", False): 
    234        assert fn.__doc__ 
    235        return fn.__doc__ 
    236 
    237    header = ( 
    238        ".. container:: event_signatures\n\n" 
    239        "     Example argument forms::\n" 
    240        "\n" 
    241    ) 
    242 
    243    sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 
    244    text = header + _indent( 
    245        _standard_listen_example(dispatch_collection, sample_target, fn), 
    246        " " * 8, 
    247    ) 
    248    if dispatch_collection.legacy_signatures: 
    249        text += _indent( 
    250            _legacy_listen_examples(dispatch_collection, sample_target, fn), 
    251            " " * 8, 
    252        ) 
    253 
    254        text += _version_signature_changes( 
    255            parent_dispatch_cls, dispatch_collection 
    256        ) 
    257 
    258    return util.inject_docstring_text(fn.__doc__, text, 1)