Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/legacy.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1# event/legacy.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Routines to handle adaption of legacy call signatures, 

9generation of deprecation notes and docstrings. 

10 

11""" 

12from __future__ import annotations 

13 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import List 

18from typing import Optional 

19from typing import Tuple 

20from typing import Type 

21from typing import TypeVar 

22 

23from .registry import _ET 

24from .registry import _ListenerFnType 

25from .. import util 

26from ..util.compat import FullArgSpec 

27 

28if typing.TYPE_CHECKING: 

29 from .attr import _ClsLevelDispatch 

30 from .base import _HasEventsDispatch 

31 

32 

33_F = TypeVar("_F", bound=Callable[..., Any]) 

34 

35_LegacySignatureType = Tuple[str, List[str], Callable[..., Any]] 

36 

37 

38def _legacy_signature( 

39 since: str, 

40 argnames: List[str], 

41 converter: Optional[Callable[..., Any]] = None, 

42) -> Callable[[_F], _F]: 

43 """legacy sig decorator 

44 

45 

46 :param since: string version for deprecation warning 

47 :param argnames: list of strings, which is *all* arguments that the legacy 

48 version accepted, including arguments that are still there 

49 :param converter: lambda that will accept tuple of this full arg signature 

50 and return tuple of new arg signature. 

51 

52 """ 

53 

54 def leg(fn: _F) -> _F: 

55 if not hasattr(fn, "_legacy_signatures"): 

56 fn._legacy_signatures = [] # type: ignore[attr-defined] 

57 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501 

58 return fn 

59 

60 return leg 

61 

62 

63def _omit_standard_example(fn: _F) -> _F: 

64 fn._omit_standard_example = True # type: ignore[attr-defined] 

65 return fn 

66 

67 

68def _wrap_fn_for_legacy( 

69 dispatch_collection: _ClsLevelDispatch[_ET], 

70 fn: _ListenerFnType, 

71 argspec: FullArgSpec, 

72) -> _ListenerFnType: 

73 for since, argnames, conv in dispatch_collection.legacy_signatures: 

74 if argnames[-1] == "**kw": 

75 has_kw = True 

76 argnames = argnames[0:-1] 

77 else: 

78 has_kw = False 

79 

80 if len(argnames) == len(argspec.args) and has_kw is bool( 

81 argspec.varkw 

82 ): 

83 formatted_def = "def %s(%s%s)" % ( 

84 dispatch_collection.name, 

85 ", ".join(dispatch_collection.arg_names), 

86 ", **kw" if has_kw else "", 

87 ) 

88 warning_txt = ( 

89 'The argument signature for the "%s.%s" event listener ' 

90 "has changed as of version %s, and conversion for " 

91 "the old argument signature will be removed in a " 

92 'future release. The new signature is "%s"' 

93 % ( 

94 dispatch_collection.clsname, 

95 dispatch_collection.name, 

96 since, 

97 formatted_def, 

98 ) 

99 ) 

100 

101 if conv is not None: 

102 assert not has_kw 

103 

104 def wrap_leg(*args: Any, **kw: Any) -> Any: 

105 util.warn_deprecated(warning_txt, version=since) 

106 assert conv is not None 

107 return fn(*conv(*args)) 

108 

109 else: 

110 

111 def wrap_leg(*args: Any, **kw: Any) -> Any: 

112 util.warn_deprecated(warning_txt, version=since) 

113 argdict = dict(zip(dispatch_collection.arg_names, args)) 

114 args_from_dict = [argdict[name] for name in argnames] 

115 if has_kw: 

116 return fn(*args_from_dict, **kw) 

117 else: 

118 return fn(*args_from_dict) 

119 

120 return wrap_leg 

121 else: 

122 return fn 

123 

124 

125def _indent(text: str, indent: str) -> str: 

126 return "\n".join(indent + line for line in text.split("\n")) 

127 

128 

129def _standard_listen_example( 

130 dispatch_collection: _ClsLevelDispatch[_ET], 

131 sample_target: Any, 

132 fn: _ListenerFnType, 

133) -> str: 

134 example_kw_arg = _indent( 

135 "\n".join( 

136 "%(arg)s = kw['%(arg)s']" % {"arg": arg} 

137 for arg in dispatch_collection.arg_names[0:2] 

138 ), 

139 " ", 

140 ) 

141 if dispatch_collection.legacy_signatures: 

142 current_since = max( 

143 since 

144 for since, args, conv in dispatch_collection.legacy_signatures 

145 ) 

146 else: 

147 current_since = None 

148 text = ( 

149 "from sqlalchemy import event\n\n\n" 

150 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

151 "def receive_%(event_name)s(" 

152 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

153 " \"listen for the '%(event_name)s' event\"\n" 

154 "\n # ... (event handling logic) ...\n" 

155 ) 

156 

157 text %= { 

158 "current_since": ( 

159 " (arguments as of %s)" % current_since if current_since else "" 

160 ), 

161 "event_name": fn.__name__, 

162 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

163 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

164 "example_kw_arg": example_kw_arg, 

165 "sample_target": sample_target, 

166 } 

167 return text 

168 

169 

170def _legacy_listen_examples( 

171 dispatch_collection: _ClsLevelDispatch[_ET], 

172 sample_target: str, 

173 fn: _ListenerFnType, 

174) -> str: 

175 text = "" 

176 for since, args, conv in dispatch_collection.legacy_signatures: 

177 text += ( 

178 "\n# DEPRECATED calling style (pre-%(since)s, " 

179 "will be removed in a future release)\n" 

180 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

181 "def receive_%(event_name)s(" 

182 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

183 " \"listen for the '%(event_name)s' event\"\n" 

184 "\n # ... (event handling logic) ...\n" 

185 % { 

186 "since": since, 

187 "event_name": fn.__name__, 

188 "has_kw_arguments": ( 

189 " **kw" if dispatch_collection.has_kw else "" 

190 ), 

191 "named_event_arguments": ", ".join(args), 

192 "sample_target": sample_target, 

193 } 

194 ) 

195 return text 

196 

197 

198def _version_signature_changes( 

199 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

200 dispatch_collection: _ClsLevelDispatch[_ET], 

201) -> str: 

202 since, args, conv = dispatch_collection.legacy_signatures[0] 

203 return ( 

204 "\n.. versionchanged:: %(since)s\n" 

205 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n" 

206 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n" 

207 " Support for listener functions which accept the previous \n" 

208 ' argument signature(s) listed above as "deprecated" will be \n' 

209 " removed in a future release." 

210 % { 

211 "since": since, 

212 "clsname": parent_dispatch_cls.__name__, 

213 "event_name": dispatch_collection.name, 

214 "named_event_arguments": ", ".join( 

215 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`" 

216 % { 

217 "clsname": parent_dispatch_cls.__name__, 

218 "event_name": dispatch_collection.name, 

219 "param_name": param_name, 

220 } 

221 for param_name in dispatch_collection.arg_names 

222 ), 

223 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

224 } 

225 ) 

226 

227 

228def _augment_fn_docs( 

229 dispatch_collection: _ClsLevelDispatch[_ET], 

230 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

231 fn: _ListenerFnType, 

232) -> str: 

233 if getattr(fn, "_omit_standard_example", False): 

234 assert fn.__doc__ 

235 return fn.__doc__ 

236 

237 header = ( 

238 ".. container:: event_signatures\n\n" 

239 " Example argument forms::\n" 

240 "\n" 

241 ) 

242 

243 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 

244 text = header + _indent( 

245 _standard_listen_example(dispatch_collection, sample_target, fn), 

246 " " * 8, 

247 ) 

248 if dispatch_collection.legacy_signatures: 

249 text += _indent( 

250 _legacy_listen_examples(dispatch_collection, sample_target, fn), 

251 " " * 8, 

252 ) 

253 

254 text += _version_signature_changes( 

255 parent_dispatch_cls, dispatch_collection 

256 ) 

257 

258 return util.inject_docstring_text(fn.__doc__, text, 1)