Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/legacy.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

74 statements  

1# event/legacy.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Routines to handle adaption of legacy call signatures, 

9generation of deprecation notes and docstrings. 

10 

11""" 

12from __future__ import annotations 

13 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import List 

18from typing import Optional 

19from typing import Tuple 

20from typing import Type 

21 

22from .registry import _ET 

23from .registry import _ListenerFnType 

24from .. import util 

25from ..util.compat import FullArgSpec 

26 

27if typing.TYPE_CHECKING: 

28 from .attr import _ClsLevelDispatch 

29 from .base import _HasEventsDispatch 

30 

31 

32_LegacySignatureType = Tuple[str, List[str], Optional[Callable[..., Any]]] 

33 

34 

35def _legacy_signature( 

36 since: str, 

37 argnames: List[str], 

38 converter: Optional[Callable[..., Any]] = None, 

39) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

40 """legacy sig decorator 

41 

42 

43 :param since: string version for deprecation warning 

44 :param argnames: list of strings, which is *all* arguments that the legacy 

45 version accepted, including arguments that are still there 

46 :param converter: lambda that will accept tuple of this full arg signature 

47 and return tuple of new arg signature. 

48 

49 """ 

50 

51 def leg(fn: Callable[..., Any]) -> Callable[..., Any]: 

52 if not hasattr(fn, "_legacy_signatures"): 

53 fn._legacy_signatures = [] # type: ignore[attr-defined] 

54 fn._legacy_signatures.append((since, argnames, converter)) # type: ignore[attr-defined] # noqa: E501 

55 return fn 

56 

57 return leg 

58 

59 

60def _wrap_fn_for_legacy( 

61 dispatch_collection: _ClsLevelDispatch[_ET], 

62 fn: _ListenerFnType, 

63 argspec: FullArgSpec, 

64) -> _ListenerFnType: 

65 for since, argnames, conv in dispatch_collection.legacy_signatures: 

66 if argnames[-1] == "**kw": 

67 has_kw = True 

68 argnames = argnames[0:-1] 

69 else: 

70 has_kw = False 

71 

72 if len(argnames) == len(argspec.args) and has_kw is bool( 

73 argspec.varkw 

74 ): 

75 formatted_def = "def %s(%s%s)" % ( 

76 dispatch_collection.name, 

77 ", ".join(dispatch_collection.arg_names), 

78 ", **kw" if has_kw else "", 

79 ) 

80 warning_txt = ( 

81 'The argument signature for the "%s.%s" event listener ' 

82 "has changed as of version %s, and conversion for " 

83 "the old argument signature will be removed in a " 

84 'future release. The new signature is "%s"' 

85 % ( 

86 dispatch_collection.clsname, 

87 dispatch_collection.name, 

88 since, 

89 formatted_def, 

90 ) 

91 ) 

92 

93 if conv is not None: 

94 assert not has_kw 

95 

96 def wrap_leg(*args: Any, **kw: Any) -> Any: 

97 util.warn_deprecated(warning_txt, version=since) 

98 assert conv is not None 

99 return fn(*conv(*args)) 

100 

101 else: 

102 

103 def wrap_leg(*args: Any, **kw: Any) -> Any: 

104 util.warn_deprecated(warning_txt, version=since) 

105 argdict = dict(zip(dispatch_collection.arg_names, args)) 

106 args_from_dict = [argdict[name] for name in argnames] 

107 if has_kw: 

108 return fn(*args_from_dict, **kw) 

109 else: 

110 return fn(*args_from_dict) 

111 

112 return wrap_leg 

113 else: 

114 return fn 

115 

116 

117def _indent(text: str, indent: str) -> str: 

118 return "\n".join(indent + line for line in text.split("\n")) 

119 

120 

121def _standard_listen_example( 

122 dispatch_collection: _ClsLevelDispatch[_ET], 

123 sample_target: Any, 

124 fn: _ListenerFnType, 

125) -> str: 

126 example_kw_arg = _indent( 

127 "\n".join( 

128 "%(arg)s = kw['%(arg)s']" % {"arg": arg} 

129 for arg in dispatch_collection.arg_names[0:2] 

130 ), 

131 " ", 

132 ) 

133 if dispatch_collection.legacy_signatures: 

134 current_since = max( 

135 since 

136 for since, args, conv in dispatch_collection.legacy_signatures 

137 ) 

138 else: 

139 current_since = None 

140 text = ( 

141 "from sqlalchemy import event\n\n\n" 

142 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

143 "def receive_%(event_name)s(" 

144 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

145 " \"listen for the '%(event_name)s' event\"\n" 

146 "\n # ... (event handling logic) ...\n" 

147 ) 

148 

149 text %= { 

150 "current_since": ( 

151 " (arguments as of %s)" % current_since if current_since else "" 

152 ), 

153 "event_name": fn.__name__, 

154 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

155 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

156 "example_kw_arg": example_kw_arg, 

157 "sample_target": sample_target, 

158 } 

159 return text 

160 

161 

162def _legacy_listen_examples( 

163 dispatch_collection: _ClsLevelDispatch[_ET], 

164 sample_target: str, 

165 fn: _ListenerFnType, 

166) -> str: 

167 text = "" 

168 for since, args, conv in dispatch_collection.legacy_signatures: 

169 text += ( 

170 "\n# DEPRECATED calling style (pre-%(since)s, " 

171 "will be removed in a future release)\n" 

172 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

173 "def receive_%(event_name)s(" 

174 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

175 " \"listen for the '%(event_name)s' event\"\n" 

176 "\n # ... (event handling logic) ...\n" 

177 % { 

178 "since": since, 

179 "event_name": fn.__name__, 

180 "has_kw_arguments": ( 

181 " **kw" if dispatch_collection.has_kw else "" 

182 ), 

183 "named_event_arguments": ", ".join(args), 

184 "sample_target": sample_target, 

185 } 

186 ) 

187 return text 

188 

189 

190def _version_signature_changes( 

191 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

192 dispatch_collection: _ClsLevelDispatch[_ET], 

193) -> str: 

194 since, args, conv = dispatch_collection.legacy_signatures[0] 

195 return ( 

196 "\n.. versionchanged:: %(since)s\n" 

197 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n" 

198 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n" 

199 " Support for listener functions which accept the previous \n" 

200 ' argument signature(s) listed above as "deprecated" will be \n' 

201 " removed in a future release." 

202 % { 

203 "since": since, 

204 "clsname": parent_dispatch_cls.__name__, 

205 "event_name": dispatch_collection.name, 

206 "named_event_arguments": ", ".join( 

207 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`" 

208 % { 

209 "clsname": parent_dispatch_cls.__name__, 

210 "event_name": dispatch_collection.name, 

211 "param_name": param_name, 

212 } 

213 for param_name in dispatch_collection.arg_names 

214 ), 

215 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

216 } 

217 ) 

218 

219 

220def _augment_fn_docs( 

221 dispatch_collection: _ClsLevelDispatch[_ET], 

222 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

223 fn: _ListenerFnType, 

224) -> str: 

225 header = ( 

226 ".. container:: event_signatures\n\n" 

227 " Example argument forms::\n" 

228 "\n" 

229 ) 

230 

231 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 

232 text = header + _indent( 

233 _standard_listen_example(dispatch_collection, sample_target, fn), 

234 " " * 8, 

235 ) 

236 if dispatch_collection.legacy_signatures: 

237 text += _indent( 

238 _legacy_listen_examples(dispatch_collection, sample_target, fn), 

239 " " * 8, 

240 ) 

241 

242 text += _version_signature_changes( 

243 parent_dispatch_cls, dispatch_collection 

244 ) 

245 

246 return util.inject_docstring_text(fn.__doc__, text, 1)