1# event/legacy.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Routines to handle adaption of legacy call signatures,
9generation of deprecation notes and docstrings.
10
11"""
12from __future__ import annotations
13
14import typing
15from typing import Any
16from typing import Callable
17from typing import List
18from typing import Optional
19from typing import Tuple
20from typing import Type
21
22from .registry import _ET
23from .registry import _ListenerFnType
24from .. import util
25from ..util.compat import FullArgSpec
26
27if typing.TYPE_CHECKING:
28 from .attr import _ClsLevelDispatch
29 from .base import _HasEventsDispatch
30
31
32_LegacySignatureType = Tuple[str, List[str], Optional[Callable[..., Any]]]
33
34
35def _legacy_signature(
36 since: str,
37 argnames: List[str],
38 converter: Optional[Callable[..., Any]] = None,
39) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
40 """legacy sig decorator
41
42
43 :param since: string version for deprecation warning
44 :param argnames: list of strings, which is *all* arguments that the legacy
45 version accepted, including arguments that are still there
46 :param converter: lambda that will accept tuple of this full arg signature
47 and return tuple of new arg signature.
48
49 """
50
51 def leg(fn: Callable[..., Any]) -> Callable[..., Any]:
52 if not hasattr(fn, "_legacy_signatures"):
53 fn._legacy_signatures = [] # type: ignore[attr-defined]
54 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501
55 return fn
56
57 return leg
58
59
60def _wrap_fn_for_legacy(
61 dispatch_collection: _ClsLevelDispatch[_ET],
62 fn: _ListenerFnType,
63 argspec: FullArgSpec,
64) -> _ListenerFnType:
65 for since, argnames, conv in dispatch_collection.legacy_signatures:
66 if argnames[-1] == "**kw":
67 has_kw = True
68 argnames = argnames[0:-1]
69 else:
70 has_kw = False
71
72 if len(argnames) == len(argspec.args) and has_kw is bool(
73 argspec.varkw
74 ):
75 formatted_def = "def %s(%s%s)" % (
76 dispatch_collection.name,
77 ", ".join(dispatch_collection.arg_names),
78 ", **kw" if has_kw else "",
79 )
80 warning_txt = (
81 'The argument signature for the "%s.%s" event listener '
82 "has changed as of version %s, and conversion for "
83 "the old argument signature will be removed in a "
84 'future release. The new signature is "%s"'
85 % (
86 dispatch_collection.clsname,
87 dispatch_collection.name,
88 since,
89 formatted_def,
90 )
91 )
92
93 if conv is not None:
94 assert not has_kw
95
96 def wrap_leg(*args: Any, **kw: Any) -> Any:
97 util.warn_deprecated(warning_txt, version=since)
98 assert conv is not None
99 return fn(*conv(*args))
100
101 else:
102
103 def wrap_leg(*args: Any, **kw: Any) -> Any:
104 util.warn_deprecated(warning_txt, version=since)
105 argdict = dict(zip(dispatch_collection.arg_names, args))
106 args_from_dict = [argdict[name] for name in argnames]
107 if has_kw:
108 return fn(*args_from_dict, **kw)
109 else:
110 return fn(*args_from_dict)
111
112 return wrap_leg
113 else:
114 return fn
115
116
117def _indent(text: str, indent: str) -> str:
118 return "\n".join(indent + line for line in text.split("\n"))
119
120
121def _standard_listen_example(
122 dispatch_collection: _ClsLevelDispatch[_ET],
123 sample_target: Any,
124 fn: _ListenerFnType,
125) -> str:
126 example_kw_arg = _indent(
127 "\n".join(
128 "%(arg)s = kw['%(arg)s']" % {"arg": arg}
129 for arg in dispatch_collection.arg_names[0:2]
130 ),
131 " ",
132 )
133 if dispatch_collection.legacy_signatures:
134 current_since = max(
135 since
136 for since, args, conv in dispatch_collection.legacy_signatures
137 )
138 else:
139 current_since = None
140 text = (
141 "from sqlalchemy import event\n\n\n"
142 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
143 "def receive_%(event_name)s("
144 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
145 " \"listen for the '%(event_name)s' event\"\n"
146 "\n # ... (event handling logic) ...\n"
147 )
148
149 text %= {
150 "current_since": (
151 " (arguments as of %s)" % current_since if current_since else ""
152 ),
153 "event_name": fn.__name__,
154 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
155 "named_event_arguments": ", ".join(dispatch_collection.arg_names),
156 "example_kw_arg": example_kw_arg,
157 "sample_target": sample_target,
158 }
159 return text
160
161
162def _legacy_listen_examples(
163 dispatch_collection: _ClsLevelDispatch[_ET],
164 sample_target: str,
165 fn: _ListenerFnType,
166) -> str:
167 text = ""
168 for since, args, conv in dispatch_collection.legacy_signatures:
169 text += (
170 "\n# DEPRECATED calling style (pre-%(since)s, "
171 "will be removed in a future release)\n"
172 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
173 "def receive_%(event_name)s("
174 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
175 " \"listen for the '%(event_name)s' event\"\n"
176 "\n # ... (event handling logic) ...\n"
177 % {
178 "since": since,
179 "event_name": fn.__name__,
180 "has_kw_arguments": (
181 " **kw" if dispatch_collection.has_kw else ""
182 ),
183 "named_event_arguments": ", ".join(args),
184 "sample_target": sample_target,
185 }
186 )
187 return text
188
189
190def _version_signature_changes(
191 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
192 dispatch_collection: _ClsLevelDispatch[_ET],
193) -> str:
194 since, args, conv = dispatch_collection.legacy_signatures[0]
195 return (
196 "\n.. versionchanged:: %(since)s\n"
197 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n"
198 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n"
199 " Support for listener functions which accept the previous \n"
200 ' argument signature(s) listed above as "deprecated" will be \n'
201 " removed in a future release."
202 % {
203 "since": since,
204 "clsname": parent_dispatch_cls.__name__,
205 "event_name": dispatch_collection.name,
206 "named_event_arguments": ", ".join(
207 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`"
208 % {
209 "clsname": parent_dispatch_cls.__name__,
210 "event_name": dispatch_collection.name,
211 "param_name": param_name,
212 }
213 for param_name in dispatch_collection.arg_names
214 ),
215 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
216 }
217 )
218
219
220def _augment_fn_docs(
221 dispatch_collection: _ClsLevelDispatch[_ET],
222 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
223 fn: _ListenerFnType,
224) -> str:
225 header = (
226 ".. container:: event_signatures\n\n"
227 " Example argument forms::\n"
228 "\n"
229 )
230
231 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj")
232 text = header + _indent(
233 _standard_listen_example(dispatch_collection, sample_target, fn),
234 " " * 8,
235 )
236 if dispatch_collection.legacy_signatures:
237 text += _indent(
238 _legacy_listen_examples(dispatch_collection, sample_target, fn),
239 " " * 8,
240 )
241
242 text += _version_signature_changes(
243 parent_dispatch_cls, dispatch_collection
244 )
245
246 return util.inject_docstring_text(fn.__doc__, text, 1)