1# event/legacy.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Routines to handle adaption of legacy call signatures,
9generation of deprecation notes and docstrings.
10
11"""
12
13from __future__ import annotations
14
15import typing
16from typing import Any
17from typing import Callable
18from typing import List
19from typing import Optional
20from typing import Tuple
21from typing import Type
22from typing import TypeVar
23
24from .registry import _ET
25from .registry import _ListenerFnType
26from .. import util
27from ..util.compat import FullArgSpec
28
29if typing.TYPE_CHECKING:
30 from .attr import _ClsLevelDispatch
31 from .base import _HasEventsDispatch
32
33
34_F = TypeVar("_F", bound=Callable[..., Any])
35
36_LegacySignatureType = Tuple[str, List[str], Callable[..., Any]]
37
38
39def _legacy_signature(
40 since: str,
41 argnames: List[str],
42 converter: Optional[Callable[..., Any]] = None,
43) -> Callable[[_F], _F]:
44 """legacy sig decorator
45
46
47 :param since: string version for deprecation warning
48 :param argnames: list of strings, which is *all* arguments that the legacy
49 version accepted, including arguments that are still there
50 :param converter: lambda that will accept tuple of this full arg signature
51 and return tuple of new arg signature.
52
53 """
54
55 def leg(fn: _F) -> _F:
56 if not hasattr(fn, "_legacy_signatures"):
57 fn._legacy_signatures = [] # type: ignore[attr-defined]
58 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501
59 return fn
60
61 return leg
62
63
64def _omit_standard_example(fn: _F) -> _F:
65 fn._omit_standard_example = True # type: ignore[attr-defined]
66 return fn
67
68
69def _wrap_fn_for_legacy(
70 dispatch_collection: _ClsLevelDispatch[_ET],
71 fn: _ListenerFnType,
72 argspec: FullArgSpec,
73) -> _ListenerFnType:
74 for since, argnames, conv in dispatch_collection.legacy_signatures:
75 if argnames[-1] == "**kw":
76 has_kw = True
77 argnames = argnames[0:-1]
78 else:
79 has_kw = False
80
81 if len(argnames) == len(argspec.args) and has_kw is bool(
82 argspec.varkw
83 ):
84 formatted_def = "def %s(%s%s)" % (
85 dispatch_collection.name,
86 ", ".join(dispatch_collection.arg_names),
87 ", **kw" if has_kw else "",
88 )
89 warning_txt = (
90 'The argument signature for the "%s.%s" event listener '
91 "has changed as of version %s, and conversion for "
92 "the old argument signature will be removed in a "
93 'future release. The new signature is "%s"'
94 % (
95 dispatch_collection.clsname,
96 dispatch_collection.name,
97 since,
98 formatted_def,
99 )
100 )
101
102 if conv is not None:
103 assert not has_kw
104
105 def wrap_leg(*args: Any, **kw: Any) -> Any:
106 util.warn_deprecated(warning_txt, version=since)
107 assert conv is not None
108 return fn(*conv(*args))
109
110 else:
111
112 def wrap_leg(*args: Any, **kw: Any) -> Any:
113 util.warn_deprecated(warning_txt, version=since)
114 argdict = dict(zip(dispatch_collection.arg_names, args))
115 args_from_dict = [argdict[name] for name in argnames]
116 if has_kw:
117 return fn(*args_from_dict, **kw)
118 else:
119 return fn(*args_from_dict)
120
121 return wrap_leg
122 else:
123 return fn
124
125
126def _indent(text: str, indent: str) -> str:
127 return "\n".join(indent + line for line in text.split("\n"))
128
129
130def _standard_listen_example(
131 dispatch_collection: _ClsLevelDispatch[_ET],
132 sample_target: Any,
133 fn: _ListenerFnType,
134) -> str:
135 example_kw_arg = _indent(
136 "\n".join(
137 "%(arg)s = kw['%(arg)s']" % {"arg": arg}
138 for arg in dispatch_collection.arg_names[0:2]
139 ),
140 " ",
141 )
142 if dispatch_collection.legacy_signatures:
143 current_since = max(
144 since
145 for since, args, conv in dispatch_collection.legacy_signatures
146 )
147 else:
148 current_since = None
149 text = (
150 "from sqlalchemy import event\n\n\n"
151 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
152 "def receive_%(event_name)s("
153 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
154 " \"listen for the '%(event_name)s' event\"\n"
155 "\n # ... (event handling logic) ...\n"
156 )
157
158 text %= {
159 "current_since": (
160 " (arguments as of %s)" % current_since if current_since else ""
161 ),
162 "event_name": fn.__name__,
163 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
164 "named_event_arguments": ", ".join(dispatch_collection.arg_names),
165 "example_kw_arg": example_kw_arg,
166 "sample_target": sample_target,
167 }
168 return text
169
170
171def _legacy_listen_examples(
172 dispatch_collection: _ClsLevelDispatch[_ET],
173 sample_target: str,
174 fn: _ListenerFnType,
175) -> str:
176 text = ""
177 for since, args, conv in dispatch_collection.legacy_signatures:
178 text += (
179 "\n# DEPRECATED calling style (pre-%(since)s, "
180 "will be removed in a future release)\n"
181 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
182 "def receive_%(event_name)s("
183 "%(named_event_arguments)s%(has_kw_arguments)s):\n"
184 " \"listen for the '%(event_name)s' event\"\n"
185 "\n # ... (event handling logic) ...\n"
186 % {
187 "since": since,
188 "event_name": fn.__name__,
189 "has_kw_arguments": (
190 " **kw" if dispatch_collection.has_kw else ""
191 ),
192 "named_event_arguments": ", ".join(args),
193 "sample_target": sample_target,
194 }
195 )
196 return text
197
198
199def _version_signature_changes(
200 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
201 dispatch_collection: _ClsLevelDispatch[_ET],
202) -> str:
203 since, args, conv = dispatch_collection.legacy_signatures[0]
204 return (
205 "\n.. versionchanged:: %(since)s\n"
206 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n"
207 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n"
208 " Support for listener functions which accept the previous \n"
209 ' argument signature(s) listed above as "deprecated" will be \n'
210 " removed in a future release."
211 % {
212 "since": since,
213 "clsname": parent_dispatch_cls.__name__,
214 "event_name": dispatch_collection.name,
215 "named_event_arguments": ", ".join(
216 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`"
217 % {
218 "clsname": parent_dispatch_cls.__name__,
219 "event_name": dispatch_collection.name,
220 "param_name": param_name,
221 }
222 for param_name in dispatch_collection.arg_names
223 ),
224 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
225 }
226 )
227
228
229def _augment_fn_docs(
230 dispatch_collection: _ClsLevelDispatch[_ET],
231 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]],
232 fn: _ListenerFnType,
233) -> str:
234 if getattr(fn, "_omit_standard_example", False):
235 assert fn.__doc__
236 return fn.__doc__
237
238 header = (
239 ".. container:: event_signatures\n\n"
240 " Example argument forms::\n"
241 "\n"
242 )
243
244 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj")
245 text = header + _indent(
246 _standard_listen_example(dispatch_collection, sample_target, fn),
247 " " * 8,
248 )
249 if dispatch_collection.legacy_signatures:
250 text += _indent(
251 _legacy_listen_examples(dispatch_collection, sample_target, fn),
252 " " * 8,
253 )
254
255 text += _version_signature_changes(
256 parent_dispatch_cls, dispatch_collection
257 )
258
259 return util.inject_docstring_text(fn.__doc__, text, 1)