1# event/legacy.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Routines to handle adaption of legacy call signatures,
9generation of deprecation notes and docstrings.
10
11"""
12from __future__ import annotations
13
14import typing
15from typing import Any
16from typing import Callable
17from typing import List
18from typing import Optional
19from typing import Tuple
20from typing import Type
21from typing import TypeVar
22
23from .registry import _ET
24from .registry import _ListenerFnType
25from .. import util
26from ..util.compat import FullArgSpec
27
28if typing.TYPE_CHECKING:
29 from .attr import _ClsLevelDispatch
30 from .base import _HasEventsDispatch
31
32
33_F = TypeVar("_F", bound=Callable[..., Any])
34
35_LegacySignatureType = Tuple[str, List[str], Callable[..., Any]]
36
37
38def _legacy_signature(
39 since: str,
40 argnames: List[str],
41 converter: Optional[Callable[..., Any]] = None,
42) -> Callable[[_F], _F]:
43 """legacy sig decorator
44
45
46 :param since: string version for deprecation warning
47 :param argnames: list of strings, which is *all* arguments that the legacy
48 version accepted, including arguments that are still there
49 :param converter: lambda that will accept tuple of this full arg signature
50 and return tuple of new arg signature.
51
52 """
53
54 def leg(fn: _F) -> _F:
55 if not hasattr(fn, "_legacy_signatures"):
56 fn._legacy_signatures = [] # type: ignore[attr-defined]
57 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501
58 return fn
59
60 return leg
61
62
63def _omit_standard_example(fn: _F) -> _F:
64 fn._omit_standard_example = True # type: ignore[attr-defined]
65 return fn
66
67
68def _wrap_fn_for_legacy(
69 dispatch_collection: _ClsLevelDispatch[_ET],
70 fn: _ListenerFnType,
71 argspec: FullArgSpec,
72) -> _ListenerFnType:
73 for since, argnames, conv in dispatch_collection.legacy_signatures:
74 if argnames[-1] == "**kw":
75 has_kw = True
76 argnames = argnames[0:-1]
77 else:
78 has_kw = False
79
80 if len(argnames) == len(argspec.args) and has_kw is bool(
81 argspec.varkw
82 ):
83 formatted_def = "def %s(%s%s)" % (
84 dispatch_collection.name,
85 ", ".join(dispatch_collection.arg_names),
86 ", **kw" if has_kw else "",
87 )
88 warning_txt = (
89 'The argument signature for the "%s.%s" event listener '
90 "has changed as of version %s, and conversion for "
91 "the old argument signature will be removed in a "
92 'future release. The new signature is "%s"'
93 % (
94 dispatch_collection.clsname,
95 dispatch_collection.name,
96 since,
97 formatted_def,
98 )
99 )
100
101 if conv is not None:
102 assert not has_kw
103
104 def wrap_leg(*args: Any, **kw: Any) -> Any:
105 util.warn_deprecated(warning_txt, version=since)
106 assert conv is not None
107 return fn(*conv(*args))
108
109 else:
110
111 def wrap_leg(*args: Any, **kw: Any) -> Any:
112 util.warn_deprecated(warning_txt, version=since)
113 argdict = dict(zip(dispatch_collection.arg_names, args))
114 args_from_dict = [argdict[name] for name in argnames]
115 if has_kw:
116 return fn(*args_from_dict, **kw)
117 else:
118 return fn(*args_from_dict)
119
120 return wrap_leg
121 else:
122 return fn
123
124
125def _indent(text: str, indent: str) -> str:
126 return "\n".join(indent + line for line in text.split("\n"))
127
128
129def _standard_listen_example(
130 dispatch_collection: _ClsLevelDispatch[_ET],
131 sample_target: Any,
132 fn: _ListenerFnType,
133) -> str:
134 example_kw_arg = _indent(
135 "\n".join(
136 "%(arg)s = kw['%(arg)s']" % {"arg": arg}
137 for arg in dispatch_collection.arg_names[0:2]
138 ),
139 " ",
140 )
141 if dispatch_collection.legacy_signatures:
142 current_since = max(
143 since
144 for since, args, conv in dispatch_collection.legacy_signatures
145 )
146 else:
147 current_since = None
148 text = (
149 "from sqlalchemy import event\n\n\n"
150 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
151 "def receive_%(event_name)s("
152 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
153 " \"listen for the '%(event_name)s' event\"\n"
154 "\n # ... (event handling logic) ...\n"
155 )
156
157 text %= {
158 "current_since": (
159 " (arguments as of %s)" % current_since if current_since else ""
160 ),
161 "event_name": fn.__name__,
162 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
163 "named_event_arguments": ", ".join(dispatch_collection.arg_names),
164 "example_kw_arg": example_kw_arg,
165 "sample_target": sample_target,
166 }
167 return text
168
169
170def _legacy_listen_examples(
171 dispatch_collection: _ClsLevelDispatch[_ET],
172 sample_target: str,
173 fn: _ListenerFnType,
174) -> str:
175 text = ""
176 for since, args, conv in dispatch_collection.legacy_signatures:
177 text += (
178 "\n# DEPRECATED calling style (pre-%(since)s, "
179 "will be removed in a future release)\n"
180 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
181 "def receive_%(event_name)s("
182 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
183 " \"listen for the '%(event_name)s' event\"\n"
184 "\n # ... (event handling logic) ...\n"
185 % {
186 "since": since,
187 "event_name": fn.__name__,
188 "has_kw_arguments": (
189 " **kw" if dispatch_collection.has_kw else ""
190 ),
191 "named_event_arguments": ", ".join(args),
192 "sample_target": sample_target,
193 }
194 )
195 return text
196
197
198def _version_signature_changes(
199 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
200 dispatch_collection: _ClsLevelDispatch[_ET],
201) -> str:
202 since, args, conv = dispatch_collection.legacy_signatures[0]
203 return (
204 "\n.. versionchanged:: %(since)s\n"
205 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n"
206 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n"
207 " Support for listener functions which accept the previous \n"
208 ' argument signature(s) listed above as "deprecated" will be \n'
209 " removed in a future release."
210 % {
211 "since": since,
212 "clsname": parent_dispatch_cls.__name__,
213 "event_name": dispatch_collection.name,
214 "named_event_arguments": ", ".join(
215 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`"
216 % {
217 "clsname": parent_dispatch_cls.__name__,
218 "event_name": dispatch_collection.name,
219 "param_name": param_name,
220 }
221 for param_name in dispatch_collection.arg_names
222 ),
223 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
224 }
225 )
226
227
228def _augment_fn_docs(
229 dispatch_collection: _ClsLevelDispatch[_ET],
230 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
231 fn: _ListenerFnType,
232) -> str:
233 if getattr(fn, "_omit_standard_example", False):
234 assert fn.__doc__
235 return fn.__doc__
236
237 header = (
238 ".. container:: event_signatures\n\n"
239 " Example argument forms::\n"
240 "\n"
241 )
242
243 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj")
244 text = header + _indent(
245 _standard_listen_example(dispatch_collection, sample_target, fn),
246 " " * 8,
247 )
248 if dispatch_collection.legacy_signatures:
249 text += _indent(
250 _legacy_listen_examples(dispatch_collection, sample_target, fn),
251 " " * 8,
252 )
253
254 text += _version_signature_changes(
255 parent_dispatch_cls, dispatch_collection
256 )
257
258 return util.inject_docstring_text(fn.__doc__, text, 1)