Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/evaluator.py: 22%
138 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# orm/evaluator.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8import operator
10from .. import inspect
11from .. import util
12from ..sql import and_
13from ..sql import operators
14from ..sql.sqltypes import Integer
15from ..sql.sqltypes import Numeric
18class UnevaluatableError(Exception):
19 pass
22class _NoObject(operators.ColumnOperators):
23 def operate(self, *arg, **kw):
24 return None
26 def reverse_operate(self, *arg, **kw):
27 return None
30_NO_OBJECT = _NoObject()
32_straight_ops = set(
33 getattr(operators, op)
34 for op in (
35 "lt",
36 "le",
37 "ne",
38 "gt",
39 "ge",
40 "eq",
41 )
42)
44_math_only_straight_ops = set(
45 getattr(operators, op)
46 for op in (
47 "add",
48 "mul",
49 "sub",
50 "div",
51 "mod",
52 "truediv",
53 )
54)
56_extended_ops = {
57 operators.in_op: (lambda a, b: a in b if a is not _NO_OBJECT else None),
58 operators.not_in_op: (
59 lambda a, b: a not in b if a is not _NO_OBJECT else None
60 ),
61}
63_notimplemented_ops = set(
64 getattr(operators, op)
65 for op in (
66 "like_op",
67 "not_like_op",
68 "ilike_op",
69 "not_ilike_op",
70 "startswith_op",
71 "between_op",
72 "endswith_op",
73 )
74)
77class EvaluatorCompiler(object):
78 def __init__(self, target_cls=None):
79 self.target_cls = target_cls
81 def process(self, *clauses):
82 if len(clauses) > 1:
83 clause = and_(*clauses)
84 elif clauses:
85 clause = clauses[0]
87 meth = getattr(self, "visit_%s" % clause.__visit_name__, None)
88 if not meth:
89 raise UnevaluatableError(
90 "Cannot evaluate %s" % type(clause).__name__
91 )
92 return meth(clause)
94 def visit_grouping(self, clause):
95 return self.process(clause.element)
97 def visit_null(self, clause):
98 return lambda obj: None
100 def visit_false(self, clause):
101 return lambda obj: False
103 def visit_true(self, clause):
104 return lambda obj: True
106 def visit_column(self, clause):
107 if "parentmapper" in clause._annotations:
108 parentmapper = clause._annotations["parentmapper"]
109 if self.target_cls and not issubclass(
110 self.target_cls, parentmapper.class_
111 ):
112 raise UnevaluatableError(
113 "Can't evaluate criteria against alternate class %s"
114 % parentmapper.class_
115 )
116 key = parentmapper._columntoproperty[clause].key
117 else:
118 key = clause.key
119 if (
120 self.target_cls
121 and key in inspect(self.target_cls).column_attrs
122 ):
123 util.warn(
124 "Evaluating non-mapped column expression '%s' onto "
125 "ORM instances; this is a deprecated use case. Please "
126 "make use of the actual mapped columns in ORM-evaluated "
127 "UPDATE / DELETE expressions." % clause
128 )
129 else:
130 raise UnevaluatableError("Cannot evaluate column: %s" % clause)
132 get_corresponding_attr = operator.attrgetter(key)
133 return (
134 lambda obj: get_corresponding_attr(obj)
135 if obj is not None
136 else _NO_OBJECT
137 )
139 def visit_tuple(self, clause):
140 return self.visit_clauselist(clause)
142 def visit_clauselist(self, clause):
143 evaluators = list(map(self.process, clause.clauses))
144 if clause.operator is operators.or_:
146 def evaluate(obj):
147 has_null = False
148 for sub_evaluate in evaluators:
149 value = sub_evaluate(obj)
150 if value:
151 return True
152 has_null = has_null or value is None
153 if has_null:
154 return None
155 return False
157 elif clause.operator is operators.and_:
159 def evaluate(obj):
160 for sub_evaluate in evaluators:
161 value = sub_evaluate(obj)
162 if not value:
163 if value is None or value is _NO_OBJECT:
164 return None
165 return False
166 return True
168 elif clause.operator is operators.comma_op:
170 def evaluate(obj):
171 values = []
172 for sub_evaluate in evaluators:
173 value = sub_evaluate(obj)
174 if value is None or value is _NO_OBJECT:
175 return None
176 values.append(value)
177 return tuple(values)
179 else:
180 raise UnevaluatableError(
181 "Cannot evaluate clauselist with operator %s" % clause.operator
182 )
184 return evaluate
186 def visit_binary(self, clause):
187 eval_left, eval_right = list(
188 map(self.process, [clause.left, clause.right])
189 )
190 operator = clause.operator
191 if operator is operators.is_:
193 def evaluate(obj):
194 return eval_left(obj) == eval_right(obj)
196 elif operator is operators.is_not:
198 def evaluate(obj):
199 return eval_left(obj) != eval_right(obj)
201 elif operator is operators.concat_op:
203 def evaluate(obj):
204 return eval_left(obj) + eval_right(obj)
206 elif operator in _extended_ops:
208 def evaluate(obj):
209 left_val = eval_left(obj)
210 right_val = eval_right(obj)
211 if left_val is None or right_val is None:
212 return None
214 return _extended_ops[operator](left_val, right_val)
216 elif operator in _math_only_straight_ops:
217 if (
218 clause.left.type._type_affinity
219 not in (
220 Numeric,
221 Integer,
222 )
223 or clause.right.type._type_affinity not in (Numeric, Integer)
224 ):
225 raise UnevaluatableError(
226 'Cannot evaluate math operator "%s" for '
227 "datatypes %s, %s"
228 % (operator.__name__, clause.left.type, clause.right.type)
229 )
231 def evaluate(obj):
232 left_val = eval_left(obj)
233 right_val = eval_right(obj)
234 if left_val is None or right_val is None:
235 return None
236 return operator(eval_left(obj), eval_right(obj))
238 elif operator in _straight_ops:
240 def evaluate(obj):
241 left_val = eval_left(obj)
242 right_val = eval_right(obj)
243 if left_val is None or right_val is None:
244 return None
245 return operator(eval_left(obj), eval_right(obj))
247 else:
248 raise UnevaluatableError(
249 "Cannot evaluate %s with operator %s"
250 % (type(clause).__name__, clause.operator)
251 )
252 return evaluate
254 def visit_unary(self, clause):
255 eval_inner = self.process(clause.element)
256 if clause.operator is operators.inv:
258 def evaluate(obj):
259 value = eval_inner(obj)
260 if value is None:
261 return None
262 return not value
264 return evaluate
265 raise UnevaluatableError(
266 "Cannot evaluate %s with operator %s"
267 % (type(clause).__name__, clause.operator)
268 )
270 def visit_bindparam(self, clause):
271 if clause.callable:
272 val = clause.callable()
273 else:
274 val = clause.value
275 return lambda obj: val