Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/legacy.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1# event/legacy.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Routines to handle adaption of legacy call signatures, 

9generation of deprecation notes and docstrings. 

10 

11""" 

12 

13from __future__ import annotations 

14 

15import typing 

16from typing import Any 

17from typing import Callable 

18from typing import List 

19from typing import Optional 

20from typing import Tuple 

21from typing import Type 

22from typing import TypeVar 

23 

24from .registry import _ET 

25from .registry import _ListenerFnType 

26from .. import util 

27from ..util.compat import FullArgSpec 

28 

29if typing.TYPE_CHECKING: 

30 from .attr import _ClsLevelDispatch 

31 from .base import _HasEventsDispatch 

32 

33 

34_F = TypeVar("_F", bound=Callable[..., Any]) 

35 

36_LegacySignatureType = Tuple[str, List[str], Callable[..., Any]] 

37 

38 

39def _legacy_signature( 

40 since: str, 

41 argnames: List[str], 

42 converter: Optional[Callable[..., Any]] = None, 

43) -> Callable[[_F], _F]: 

44 """legacy sig decorator 

45 

46 

47 :param since: string version for deprecation warning 

48 :param argnames: list of strings, which is *all* arguments that the legacy 

49 version accepted, including arguments that are still there 

50 :param converter: lambda that will accept tuple of this full arg signature 

51 and return tuple of new arg signature. 

52 

53 """ 

54 

55 def leg(fn: _F) -> _F: 

56 if not hasattr(fn, "_legacy_signatures"): 

57 fn._legacy_signatures = [] # type: ignore[attr-defined] 

58 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501 

59 return fn 

60 

61 return leg 

62 

63 

64def _omit_standard_example(fn: _F) -> _F: 

65 fn._omit_standard_example = True # type: ignore[attr-defined] 

66 return fn 

67 

68 

69def _wrap_fn_for_legacy( 

70 dispatch_collection: _ClsLevelDispatch[_ET], 

71 fn: _ListenerFnType, 

72 argspec: FullArgSpec, 

73) -> _ListenerFnType: 

74 for since, argnames, conv in dispatch_collection.legacy_signatures: 

75 if argnames[-1] == "**kw": 

76 has_kw = True 

77 argnames = argnames[0:-1] 

78 else: 

79 has_kw = False 

80 

81 if len(argnames) == len(argspec.args) and has_kw is bool( 

82 argspec.varkw 

83 ): 

84 formatted_def = "def %s(%s%s)" % ( 

85 dispatch_collection.name, 

86 ", ".join(dispatch_collection.arg_names), 

87 ", **kw" if has_kw else "", 

88 ) 

89 warning_txt = ( 

90 'The argument signature for the "%s.%s" event listener ' 

91 "has changed as of version %s, and conversion for " 

92 "the old argument signature will be removed in a " 

93 'future release. The new signature is "%s"' 

94 % ( 

95 dispatch_collection.clsname, 

96 dispatch_collection.name, 

97 since, 

98 formatted_def, 

99 ) 

100 ) 

101 

102 if conv is not None: 

103 assert not has_kw 

104 

105 def wrap_leg(*args: Any, **kw: Any) -> Any: 

106 util.warn_deprecated(warning_txt, version=since) 

107 assert conv is not None 

108 return fn(*conv(*args)) 

109 

110 else: 

111 

112 def wrap_leg(*args: Any, **kw: Any) -> Any: 

113 util.warn_deprecated(warning_txt, version=since) 

114 argdict = dict(zip(dispatch_collection.arg_names, args)) 

115 args_from_dict = [argdict[name] for name in argnames] 

116 if has_kw: 

117 return fn(*args_from_dict, **kw) 

118 else: 

119 return fn(*args_from_dict) 

120 

121 return wrap_leg 

122 else: 

123 return fn 

124 

125 

126def _indent(text: str, indent: str) -> str: 

127 return "\n".join(indent + line for line in text.split("\n")) 

128 

129 

130def _standard_listen_example( 

131 dispatch_collection: _ClsLevelDispatch[_ET], 

132 sample_target: Any, 

133 fn: _ListenerFnType, 

134) -> str: 

135 example_kw_arg = _indent( 

136 "\n".join( 

137 "%(arg)s = kw['%(arg)s']" % {"arg": arg} 

138 for arg in dispatch_collection.arg_names[0:2] 

139 ), 

140 " ", 

141 ) 

142 if dispatch_collection.legacy_signatures: 

143 current_since = max( 

144 since 

145 for since, args, conv in dispatch_collection.legacy_signatures 

146 ) 

147 else: 

148 current_since = None 

149 text = ( 

150 "from sqlalchemy import event\n\n\n" 

151 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

152 "def receive_%(event_name)s(" 

153 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

154 " \"listen for the '%(event_name)s' event\"\n" 

155 "\n # ... (event handling logic) ...\n" 

156 ) 

157 

158 text %= { 

159 "current_since": ( 

160 " (arguments as of %s)" % current_since if current_since else "" 

161 ), 

162 "event_name": fn.__name__, 

163 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

164 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

165 "example_kw_arg": example_kw_arg, 

166 "sample_target": sample_target, 

167 } 

168 return text 

169 

170 

171def _legacy_listen_examples( 

172 dispatch_collection: _ClsLevelDispatch[_ET], 

173 sample_target: str, 

174 fn: _ListenerFnType, 

175) -> str: 

176 text = "" 

177 for since, args, conv in dispatch_collection.legacy_signatures: 

178 text += ( 

179 "\n# DEPRECATED calling style (pre-%(since)s, " 

180 "will be removed in a future release)\n" 

181 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

182 "def receive_%(event_name)s(" 

183 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

184 " \"listen for the '%(event_name)s' event\"\n" 

185 "\n # ... (event handling logic) ...\n" 

186 % { 

187 "since": since, 

188 "event_name": fn.__name__, 

189 "has_kw_arguments": ( 

190 " **kw" if dispatch_collection.has_kw else "" 

191 ), 

192 "named_event_arguments": ", ".join(args), 

193 "sample_target": sample_target, 

194 } 

195 ) 

196 return text 

197 

198 

199def _version_signature_changes( 

200 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

201 dispatch_collection: _ClsLevelDispatch[_ET], 

202) -> str: 

203 since, args, conv = dispatch_collection.legacy_signatures[0] 

204 return ( 

205 "\n.. versionchanged:: %(since)s\n" 

206 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n" 

207 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n" 

208 " Support for listener functions which accept the previous \n" 

209 ' argument signature(s) listed above as "deprecated" will be \n' 

210 " removed in a future release." 

211 % { 

212 "since": since, 

213 "clsname": parent_dispatch_cls.__name__, 

214 "event_name": dispatch_collection.name, 

215 "named_event_arguments": ", ".join( 

216 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`" 

217 % { 

218 "clsname": parent_dispatch_cls.__name__, 

219 "event_name": dispatch_collection.name, 

220 "param_name": param_name, 

221 } 

222 for param_name in dispatch_collection.arg_names 

223 ), 

224 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

225 } 

226 ) 

227 

228 

229def _augment_fn_docs( 

230 dispatch_collection: _ClsLevelDispatch[_ET], 

231 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

232 fn: _ListenerFnType, 

233) -> str: 

234 if getattr(fn, "_omit_standard_example", False): 

235 assert fn.__doc__ 

236 return fn.__doc__ 

237 

238 header = ( 

239 ".. container:: event_signatures\n\n" 

240 " Example argument forms::\n" 

241 "\n" 

242 ) 

243 

244 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 

245 text = header + _indent( 

246 _standard_listen_example(dispatch_collection, sample_target, fn), 

247 " " * 8, 

248 ) 

249 if dispatch_collection.legacy_signatures: 

250 text += _indent( 

251 _legacy_listen_examples(dispatch_collection, sample_target, fn), 

252 " " * 8, 

253 ) 

254 

255 text += _version_signature_changes( 

256 parent_dispatch_cls, dispatch_collection 

257 ) 

258 

259 return util.inject_docstring_text(fn.__doc__, text, 1)