Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/legacy.py: 62%

58 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# event/legacy.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Routines to handle adaption of legacy call signatures, 

9generation of deprecation notes and docstrings. 

10 

11""" 

12 

13from .. import util 

14 

15 

16def _legacy_signature(since, argnames, converter=None): 

17 def leg(fn): 

18 if not hasattr(fn, "_legacy_signatures"): 

19 fn._legacy_signatures = [] 

20 fn._legacy_signatures.append((since, argnames, converter)) 

21 return fn 

22 

23 return leg 

24 

25 

26def _wrap_fn_for_legacy(dispatch_collection, fn, argspec): 

27 for since, argnames, conv in dispatch_collection.legacy_signatures: 

28 if argnames[-1] == "**kw": 

29 has_kw = True 

30 argnames = argnames[0:-1] 

31 else: 

32 has_kw = False 

33 

34 if len(argnames) == len(argspec.args) and has_kw is bool( 

35 argspec.varkw 

36 ): 

37 

38 formatted_def = "def %s(%s%s)" % ( 

39 dispatch_collection.name, 

40 ", ".join(dispatch_collection.arg_names), 

41 ", **kw" if has_kw else "", 

42 ) 

43 warning_txt = ( 

44 'The argument signature for the "%s.%s" event listener ' 

45 "has changed as of version %s, and conversion for " 

46 "the old argument signature will be removed in a " 

47 'future release. The new signature is "%s"' 

48 % ( 

49 dispatch_collection.clsname, 

50 dispatch_collection.name, 

51 since, 

52 formatted_def, 

53 ) 

54 ) 

55 

56 if conv: 

57 assert not has_kw 

58 

59 def wrap_leg(*args): 

60 util.warn_deprecated(warning_txt, version=since) 

61 return fn(*conv(*args)) 

62 

63 else: 

64 

65 def wrap_leg(*args, **kw): 

66 util.warn_deprecated(warning_txt, version=since) 

67 argdict = dict(zip(dispatch_collection.arg_names, args)) 

68 args = [argdict[name] for name in argnames] 

69 if has_kw: 

70 return fn(*args, **kw) 

71 else: 

72 return fn(*args) 

73 

74 return wrap_leg 

75 else: 

76 return fn 

77 

78 

79def _indent(text, indent): 

80 return "\n".join(indent + line for line in text.split("\n")) 

81 

82 

83def _standard_listen_example(dispatch_collection, sample_target, fn): 

84 example_kw_arg = _indent( 

85 "\n".join( 

86 "%(arg)s = kw['%(arg)s']" % {"arg": arg} 

87 for arg in dispatch_collection.arg_names[0:2] 

88 ), 

89 " ", 

90 ) 

91 if dispatch_collection.legacy_signatures: 

92 current_since = max( 

93 since 

94 for since, args, conv in dispatch_collection.legacy_signatures 

95 ) 

96 else: 

97 current_since = None 

98 text = ( 

99 "from sqlalchemy import event\n\n\n" 

100 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

101 "def receive_%(event_name)s(" 

102 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

103 " \"listen for the '%(event_name)s' event\"\n" 

104 "\n # ... (event handling logic) ...\n" 

105 ) 

106 

107 text %= { 

108 "current_since": " (arguments as of %s)" % current_since 

109 if current_since 

110 else "", 

111 "event_name": fn.__name__, 

112 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

113 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

114 "example_kw_arg": example_kw_arg, 

115 "sample_target": sample_target, 

116 } 

117 return text 

118 

119 

120def _legacy_listen_examples(dispatch_collection, sample_target, fn): 

121 text = "" 

122 for since, args, conv in dispatch_collection.legacy_signatures: 

123 text += ( 

124 "\n# DEPRECATED calling style (pre-%(since)s, " 

125 "will be removed in a future release)\n" 

126 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

127 "def receive_%(event_name)s(" 

128 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

129 " \"listen for the '%(event_name)s' event\"\n" 

130 "\n # ... (event handling logic) ...\n" 

131 % { 

132 "since": since, 

133 "event_name": fn.__name__, 

134 "has_kw_arguments": " **kw" 

135 if dispatch_collection.has_kw 

136 else "", 

137 "named_event_arguments": ", ".join(args), 

138 "sample_target": sample_target, 

139 } 

140 ) 

141 return text 

142 

143 

144def _version_signature_changes(parent_dispatch_cls, dispatch_collection): 

145 since, args, conv = dispatch_collection.legacy_signatures[0] 

146 return ( 

147 "\n.. versionchanged:: %(since)s\n" 

148 " The :meth:`.%(clsname)s.%(event_name)s` event now accepts the \n" 

149 " arguments %(named_event_arguments)s%(has_kw_arguments)s.\n" 

150 " Support for listener functions which accept the previous \n" 

151 ' argument signature(s) listed above as "deprecated" will be \n' 

152 " removed in a future release." 

153 % { 

154 "since": since, 

155 "clsname": parent_dispatch_cls.__name__, 

156 "event_name": dispatch_collection.name, 

157 "named_event_arguments": ", ".join( 

158 ":paramref:`.%(clsname)s.%(event_name)s.%(param_name)s`" 

159 % { 

160 "clsname": parent_dispatch_cls.__name__, 

161 "event_name": dispatch_collection.name, 

162 "param_name": param_name, 

163 } 

164 for param_name in dispatch_collection.arg_names 

165 ), 

166 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

167 } 

168 ) 

169 

170 

171def _augment_fn_docs(dispatch_collection, parent_dispatch_cls, fn): 

172 header = ( 

173 ".. container:: event_signatures\n\n" 

174 " Example argument forms::\n" 

175 "\n" 

176 ) 

177 

178 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 

179 text = header + _indent( 

180 _standard_listen_example(dispatch_collection, sample_target, fn), 

181 " " * 8, 

182 ) 

183 if dispatch_collection.legacy_signatures: 

184 text += _indent( 

185 _legacy_listen_examples(dispatch_collection, sample_target, fn), 

186 " " * 8, 

187 ) 

188 

189 text += _version_signature_changes( 

190 parent_dispatch_cls, dispatch_collection 

191 ) 

192 

193 return util.inject_docstring_text(fn.__doc__, text, 1)