Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/evaluator.py: 22%

138 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# orm/evaluator.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8import operator 

9 

10from .. import inspect 

11from .. import util 

12from ..sql import and_ 

13from ..sql import operators 

14from ..sql.sqltypes import Integer 

15from ..sql.sqltypes import Numeric 

16 

17 

18class UnevaluatableError(Exception): 

19 pass 

20 

21 

22class _NoObject(operators.ColumnOperators): 

23 def operate(self, *arg, **kw): 

24 return None 

25 

26 def reverse_operate(self, *arg, **kw): 

27 return None 

28 

29 

30_NO_OBJECT = _NoObject() 

31 

32_straight_ops = set( 

33 getattr(operators, op) 

34 for op in ( 

35 "lt", 

36 "le", 

37 "ne", 

38 "gt", 

39 "ge", 

40 "eq", 

41 ) 

42) 

43 

44_math_only_straight_ops = set( 

45 getattr(operators, op) 

46 for op in ( 

47 "add", 

48 "mul", 

49 "sub", 

50 "div", 

51 "mod", 

52 "truediv", 

53 ) 

54) 

55 

56_extended_ops = { 

57 operators.in_op: (lambda a, b: a in b if a is not _NO_OBJECT else None), 

58 operators.not_in_op: ( 

59 lambda a, b: a not in b if a is not _NO_OBJECT else None 

60 ), 

61} 

62 

63_notimplemented_ops = set( 

64 getattr(operators, op) 

65 for op in ( 

66 "like_op", 

67 "not_like_op", 

68 "ilike_op", 

69 "not_ilike_op", 

70 "startswith_op", 

71 "between_op", 

72 "endswith_op", 

73 ) 

74) 

75 

76 

77class EvaluatorCompiler(object): 

78 def __init__(self, target_cls=None): 

79 self.target_cls = target_cls 

80 

81 def process(self, *clauses): 

82 if len(clauses) > 1: 

83 clause = and_(*clauses) 

84 elif clauses: 

85 clause = clauses[0] 

86 

87 meth = getattr(self, "visit_%s" % clause.__visit_name__, None) 

88 if not meth: 

89 raise UnevaluatableError( 

90 "Cannot evaluate %s" % type(clause).__name__ 

91 ) 

92 return meth(clause) 

93 

94 def visit_grouping(self, clause): 

95 return self.process(clause.element) 

96 

97 def visit_null(self, clause): 

98 return lambda obj: None 

99 

100 def visit_false(self, clause): 

101 return lambda obj: False 

102 

103 def visit_true(self, clause): 

104 return lambda obj: True 

105 

106 def visit_column(self, clause): 

107 if "parentmapper" in clause._annotations: 

108 parentmapper = clause._annotations["parentmapper"] 

109 if self.target_cls and not issubclass( 

110 self.target_cls, parentmapper.class_ 

111 ): 

112 raise UnevaluatableError( 

113 "Can't evaluate criteria against alternate class %s" 

114 % parentmapper.class_ 

115 ) 

116 key = parentmapper._columntoproperty[clause].key 

117 else: 

118 key = clause.key 

119 if ( 

120 self.target_cls 

121 and key in inspect(self.target_cls).column_attrs 

122 ): 

123 util.warn( 

124 "Evaluating non-mapped column expression '%s' onto " 

125 "ORM instances; this is a deprecated use case. Please " 

126 "make use of the actual mapped columns in ORM-evaluated " 

127 "UPDATE / DELETE expressions." % clause 

128 ) 

129 else: 

130 raise UnevaluatableError("Cannot evaluate column: %s" % clause) 

131 

132 get_corresponding_attr = operator.attrgetter(key) 

133 return ( 

134 lambda obj: get_corresponding_attr(obj) 

135 if obj is not None 

136 else _NO_OBJECT 

137 ) 

138 

139 def visit_tuple(self, clause): 

140 return self.visit_clauselist(clause) 

141 

142 def visit_clauselist(self, clause): 

143 evaluators = list(map(self.process, clause.clauses)) 

144 if clause.operator is operators.or_: 

145 

146 def evaluate(obj): 

147 has_null = False 

148 for sub_evaluate in evaluators: 

149 value = sub_evaluate(obj) 

150 if value: 

151 return True 

152 has_null = has_null or value is None 

153 if has_null: 

154 return None 

155 return False 

156 

157 elif clause.operator is operators.and_: 

158 

159 def evaluate(obj): 

160 for sub_evaluate in evaluators: 

161 value = sub_evaluate(obj) 

162 if not value: 

163 if value is None or value is _NO_OBJECT: 

164 return None 

165 return False 

166 return True 

167 

168 elif clause.operator is operators.comma_op: 

169 

170 def evaluate(obj): 

171 values = [] 

172 for sub_evaluate in evaluators: 

173 value = sub_evaluate(obj) 

174 if value is None or value is _NO_OBJECT: 

175 return None 

176 values.append(value) 

177 return tuple(values) 

178 

179 else: 

180 raise UnevaluatableError( 

181 "Cannot evaluate clauselist with operator %s" % clause.operator 

182 ) 

183 

184 return evaluate 

185 

186 def visit_binary(self, clause): 

187 eval_left, eval_right = list( 

188 map(self.process, [clause.left, clause.right]) 

189 ) 

190 operator = clause.operator 

191 if operator is operators.is_: 

192 

193 def evaluate(obj): 

194 return eval_left(obj) == eval_right(obj) 

195 

196 elif operator is operators.is_not: 

197 

198 def evaluate(obj): 

199 return eval_left(obj) != eval_right(obj) 

200 

201 elif operator is operators.concat_op: 

202 

203 def evaluate(obj): 

204 return eval_left(obj) + eval_right(obj) 

205 

206 elif operator in _extended_ops: 

207 

208 def evaluate(obj): 

209 left_val = eval_left(obj) 

210 right_val = eval_right(obj) 

211 if left_val is None or right_val is None: 

212 return None 

213 

214 return _extended_ops[operator](left_val, right_val) 

215 

216 elif operator in _math_only_straight_ops: 

217 if ( 

218 clause.left.type._type_affinity 

219 not in ( 

220 Numeric, 

221 Integer, 

222 ) 

223 or clause.right.type._type_affinity not in (Numeric, Integer) 

224 ): 

225 raise UnevaluatableError( 

226 'Cannot evaluate math operator "%s" for ' 

227 "datatypes %s, %s" 

228 % (operator.__name__, clause.left.type, clause.right.type) 

229 ) 

230 

231 def evaluate(obj): 

232 left_val = eval_left(obj) 

233 right_val = eval_right(obj) 

234 if left_val is None or right_val is None: 

235 return None 

236 return operator(eval_left(obj), eval_right(obj)) 

237 

238 elif operator in _straight_ops: 

239 

240 def evaluate(obj): 

241 left_val = eval_left(obj) 

242 right_val = eval_right(obj) 

243 if left_val is None or right_val is None: 

244 return None 

245 return operator(eval_left(obj), eval_right(obj)) 

246 

247 else: 

248 raise UnevaluatableError( 

249 "Cannot evaluate %s with operator %s" 

250 % (type(clause).__name__, clause.operator) 

251 ) 

252 return evaluate 

253 

254 def visit_unary(self, clause): 

255 eval_inner = self.process(clause.element) 

256 if clause.operator is operators.inv: 

257 

258 def evaluate(obj): 

259 value = eval_inner(obj) 

260 if value is None: 

261 return None 

262 return not value 

263 

264 return evaluate 

265 raise UnevaluatableError( 

266 "Cannot evaluate %s with operator %s" 

267 % (type(clause).__name__, clause.operator) 

268 ) 

269 

270 def visit_bindparam(self, clause): 

271 if clause.callable: 

272 val = clause.callable() 

273 else: 

274 val = clause.value 

275 return lambda obj: val