1# event/legacy.py 
    2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: http://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Routines to handle adaption of legacy call signatures, 
    9generation of deprecation notes and docstrings. 
    10 
    11""" 
    12 
    13from .. import util 
    14 
    15 
    16def _legacy_signature(since, argnames, converter=None): 
    17    def leg(fn): 
    18        if not hasattr(fn, "_legacy_signatures"): 
    19            fn._legacy_signatures = [] 
    20        fn._legacy_signatures.append((since, argnames, converter)) 
    21        return fn 
    22 
    23    return leg 
    24 
    25 
    26def _wrap_fn_for_legacy(dispatch_collection, fn, argspec): 
    27    for since, argnames, conv in dispatch_collection.legacy_signatures: 
    28        if argnames[-1] == "**kw": 
    29            has_kw = True 
    30            argnames = argnames[0:-1] 
    31        else: 
    32            has_kw = False 
    33 
    34        if len(argnames) == len(argspec.args) and has_kw is bool( 
    35            argspec.varkw 
    36        ): 
    37 
    38            if conv: 
    39                assert not has_kw 
    40 
    41                def wrap_leg(*args): 
    42                    return fn(*conv(*args)) 
    43 
    44            else: 
    45 
    46                def wrap_leg(*args, **kw): 
    47                    argdict = dict(zip(dispatch_collection.arg_names, args)) 
    48                    args = [argdict[name] for name in argnames] 
    49                    if has_kw: 
    50                        return fn(*args, **kw) 
    51                    else: 
    52                        return fn(*args) 
    53 
    54            return wrap_leg 
    55    else: 
    56        return fn 
    57 
    58 
    59def _indent(text, indent): 
    60    return "\n".join(indent + line for line in text.split("\n")) 
    61 
    62 
    63def _standard_listen_example(dispatch_collection, sample_target, fn): 
    64    example_kw_arg = _indent( 
    65        "\n".join( 
    66            "%(arg)s = kw['%(arg)s']" % {"arg": arg} 
    67            for arg in dispatch_collection.arg_names[0:2] 
    68        ), 
    69        "    ", 
    70    ) 
    71    if dispatch_collection.legacy_signatures: 
    72        current_since = max( 
    73            since 
    74            for since, args, conv in dispatch_collection.legacy_signatures 
    75        ) 
    76    else: 
    77        current_since = None 
    78    text = ( 
    79        "from sqlalchemy import event\n\n" 
    80        "# standard decorator style%(current_since)s\n" 
    81        "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 
    82        "def receive_%(event_name)s(" 
    83        "%(named_event_arguments)s%(has_kw_arguments)s):\n" 
    84        "    \"listen for the '%(event_name)s' event\"\n" 
    85        "\n    # ... (event handling logic) ...\n" 
    86    ) 
    87 
    88    if len(dispatch_collection.arg_names) > 3: 
    89        text += ( 
    90            "\n# named argument style (new in 0.9)\n" 
    91            "@event.listens_for(" 
    92            "%(sample_target)s, '%(event_name)s', named=True)\n" 
    93            "def receive_%(event_name)s(**kw):\n" 
    94            "    \"listen for the '%(event_name)s' event\"\n" 
    95            "%(example_kw_arg)s\n" 
    96            "\n    # ... (event handling logic) ...\n" 
    97        ) 
    98 
    99    text %= { 
    100        "current_since": " (arguments as of %s)" % current_since 
    101        if current_since 
    102        else "", 
    103        "event_name": fn.__name__, 
    104        "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 
    105        "named_event_arguments": ", ".join(dispatch_collection.arg_names), 
    106        "example_kw_arg": example_kw_arg, 
    107        "sample_target": sample_target, 
    108    } 
    109    return text 
    110 
    111 
    112def _legacy_listen_examples(dispatch_collection, sample_target, fn): 
    113    text = "" 
    114    for since, args, conv in dispatch_collection.legacy_signatures: 
    115        text += ( 
    116            "\n# DEPRECATED calling style (pre-%(since)s, " 
    117            "will be removed in a future release)\n" 
    118            "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 
    119            "def receive_%(event_name)s(" 
    120            "%(named_event_arguments)s%(has_kw_arguments)s):\n" 
    121            "    \"listen for the '%(event_name)s' event\"\n" 
    122            "\n    # ... (event handling logic) ...\n" 
    123            % { 
    124                "since": since, 
    125                "event_name": fn.__name__, 
    126                "has_kw_arguments": " **kw" 
    127                if dispatch_collection.has_kw 
    128                else "", 
    129                "named_event_arguments": ", ".join(args), 
    130                "sample_target": sample_target, 
    131            } 
    132        ) 
    133    return text 
    134 
    135 
    136def _version_signature_changes(parent_dispatch_cls, dispatch_collection): 
    137    since, args, conv = dispatch_collection.legacy_signatures[0] 
    138    return ( 
    139        "\n.. deprecated:: %(since)s\n" 
    140        "    The :class:`.%(clsname)s.%(event_name)s` event now accepts the \n" 
    141        "    arguments ``%(named_event_arguments)s%(has_kw_arguments)s``.\n" 
    142        "    Support for listener functions which accept the previous \n" 
    143        '    argument signature(s) listed above as "deprecated" will be \n' 
    144        "    removed in a future release." 
    145        % { 
    146            "since": since, 
    147            "clsname": parent_dispatch_cls.__name__, 
    148            "event_name": dispatch_collection.name, 
    149            "named_event_arguments": ", ".join(dispatch_collection.arg_names), 
    150            "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 
    151        } 
    152    ) 
    153 
    154 
    155def _augment_fn_docs(dispatch_collection, parent_dispatch_cls, fn): 
    156    header = ( 
    157        ".. container:: event_signatures\n\n" 
    158        "     Example argument forms::\n" 
    159        "\n" 
    160    ) 
    161 
    162    sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 
    163    text = header + _indent( 
    164        _standard_listen_example(dispatch_collection, sample_target, fn), 
    165        " " * 8, 
    166    ) 
    167    if dispatch_collection.legacy_signatures: 
    168        text += _indent( 
    169            _legacy_listen_examples(dispatch_collection, sample_target, fn), 
    170            " " * 8, 
    171        ) 
    172 
    173        text += _version_signature_changes( 
    174            parent_dispatch_cls, dispatch_collection 
    175        ) 
    176 
    177    return util.inject_docstring_text(fn.__doc__, text, 1)