Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/SQLAlchemy-1.3.25.dev0-py3.11-linux-x86_64.egg/sqlalchemy/event/legacy.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1# event/legacy.py 

2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7 

8"""Routines to handle adaption of legacy call signatures, 

9generation of deprecation notes and docstrings. 

10 

11""" 

12 

13from .. import util 

14 

15 

16def _legacy_signature(since, argnames, converter=None): 

17 def leg(fn): 

18 if not hasattr(fn, "_legacy_signatures"): 

19 fn._legacy_signatures = [] 

20 fn._legacy_signatures.append((since, argnames, converter)) 

21 return fn 

22 

23 return leg 

24 

25 

26def _wrap_fn_for_legacy(dispatch_collection, fn, argspec): 

27 for since, argnames, conv in dispatch_collection.legacy_signatures: 

28 if argnames[-1] == "**kw": 

29 has_kw = True 

30 argnames = argnames[0:-1] 

31 else: 

32 has_kw = False 

33 

34 if len(argnames) == len(argspec.args) and has_kw is bool( 

35 argspec.varkw 

36 ): 

37 

38 if conv: 

39 assert not has_kw 

40 

41 def wrap_leg(*args): 

42 return fn(*conv(*args)) 

43 

44 else: 

45 

46 def wrap_leg(*args, **kw): 

47 argdict = dict(zip(dispatch_collection.arg_names, args)) 

48 args = [argdict[name] for name in argnames] 

49 if has_kw: 

50 return fn(*args, **kw) 

51 else: 

52 return fn(*args) 

53 

54 return wrap_leg 

55 else: 

56 return fn 

57 

58 

59def _indent(text, indent): 

60 return "\n".join(indent + line for line in text.split("\n")) 

61 

62 

63def _standard_listen_example(dispatch_collection, sample_target, fn): 

64 example_kw_arg = _indent( 

65 "\n".join( 

66 "%(arg)s = kw['%(arg)s']" % {"arg": arg} 

67 for arg in dispatch_collection.arg_names[0:2] 

68 ), 

69 " ", 

70 ) 

71 if dispatch_collection.legacy_signatures: 

72 current_since = max( 

73 since 

74 for since, args, conv in dispatch_collection.legacy_signatures 

75 ) 

76 else: 

77 current_since = None 

78 text = ( 

79 "from sqlalchemy import event\n\n" 

80 "# standard decorator style%(current_since)s\n" 

81 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

82 "def receive_%(event_name)s(" 

83 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

84 " \"listen for the '%(event_name)s' event\"\n" 

85 "\n # ... (event handling logic) ...\n" 

86 ) 

87 

88 if len(dispatch_collection.arg_names) > 3: 

89 text += ( 

90 "\n# named argument style (new in 0.9)\n" 

91 "@event.listens_for(" 

92 "%(sample_target)s, '%(event_name)s', named=True)\n" 

93 "def receive_%(event_name)s(**kw):\n" 

94 " \"listen for the '%(event_name)s' event\"\n" 

95 "%(example_kw_arg)s\n" 

96 "\n # ... (event handling logic) ...\n" 

97 ) 

98 

99 text %= { 

100 "current_since": " (arguments as of %s)" % current_since 

101 if current_since 

102 else "", 

103 "event_name": fn.__name__, 

104 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

105 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

106 "example_kw_arg": example_kw_arg, 

107 "sample_target": sample_target, 

108 } 

109 return text 

110 

111 

112def _legacy_listen_examples(dispatch_collection, sample_target, fn): 

113 text = "" 

114 for since, args, conv in dispatch_collection.legacy_signatures: 

115 text += ( 

116 "\n# DEPRECATED calling style (pre-%(since)s, " 

117 "will be removed in a future release)\n" 

118 "@event.listens_for(%(sample_target)s, '%(event_name)s')\n" 

119 "def receive_%(event_name)s(" 

120 "%(named_event_arguments)s%(has_kw_arguments)s):\n" 

121 " \"listen for the '%(event_name)s' event\"\n" 

122 "\n # ... (event handling logic) ...\n" 

123 % { 

124 "since": since, 

125 "event_name": fn.__name__, 

126 "has_kw_arguments": " **kw" 

127 if dispatch_collection.has_kw 

128 else "", 

129 "named_event_arguments": ", ".join(args), 

130 "sample_target": sample_target, 

131 } 

132 ) 

133 return text 

134 

135 

136def _version_signature_changes(parent_dispatch_cls, dispatch_collection): 

137 since, args, conv = dispatch_collection.legacy_signatures[0] 

138 return ( 

139 "\n.. deprecated:: %(since)s\n" 

140 " The :class:`.%(clsname)s.%(event_name)s` event now accepts the \n" 

141 " arguments ``%(named_event_arguments)s%(has_kw_arguments)s``.\n" 

142 " Support for listener functions which accept the previous \n" 

143 ' argument signature(s) listed above as "deprecated" will be \n' 

144 " removed in a future release." 

145 % { 

146 "since": since, 

147 "clsname": parent_dispatch_cls.__name__, 

148 "event_name": dispatch_collection.name, 

149 "named_event_arguments": ", ".join(dispatch_collection.arg_names), 

150 "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "", 

151 } 

152 ) 

153 

154 

155def _augment_fn_docs(dispatch_collection, parent_dispatch_cls, fn): 

156 header = ( 

157 ".. container:: event_signatures\n\n" 

158 " Example argument forms::\n" 

159 "\n" 

160 ) 

161 

162 sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj") 

163 text = header + _indent( 

164 _standard_listen_example(dispatch_collection, sample_target, fn), 

165 " " * 8, 

166 ) 

167 if dispatch_collection.legacy_signatures: 

168 text += _indent( 

169 _legacy_listen_examples(dispatch_collection, sample_target, fn), 

170 " " * 8, 

171 ) 

172 

173 text += _version_signature_changes( 

174 parent_dispatch_cls, dispatch_collection 

175 ) 

176 

177 return util.inject_docstring_text(fn.__doc__, text, 1)