1# sql/default_comparator.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Default implementation of SQL comparison operations."""
9
10from __future__ import annotations
11
12import typing
13from typing import Any
14from typing import Callable
15from typing import Dict
16from typing import NoReturn
17from typing import Optional
18from typing import Tuple
19from typing import Type
20from typing import Union
21
22from . import coercions
23from . import functions
24from . import operators
25from . import roles
26from . import type_api
27from .elements import and_
28from .elements import BinaryExpression
29from .elements import ClauseElement
30from .elements import CollationClause
31from .elements import CollectionAggregate
32from .elements import ExpressionClauseList
33from .elements import False_
34from .elements import Null
35from .elements import OperatorExpression
36from .elements import or_
37from .elements import True_
38from .elements import UnaryExpression
39from .operators import OperatorType
40from .. import exc
41from .. import util
42
43_T = typing.TypeVar("_T", bound=Any)
44
45if typing.TYPE_CHECKING:
46 from .elements import ColumnElement
47 from .operators import custom_op
48 from .type_api import TypeEngine
49
50
51def _boolean_compare(
52 expr: ColumnElement[Any],
53 op: OperatorType,
54 obj: Any,
55 *,
56 negate_op: Optional[OperatorType] = None,
57 reverse: bool = False,
58 _python_is_types: Tuple[Type[Any], ...] = (type(None), bool),
59 result_type: Optional[TypeEngine[bool]] = None,
60 **kwargs: Any,
61) -> OperatorExpression[bool]:
62 if result_type is None:
63 result_type = type_api.BOOLEANTYPE
64
65 if isinstance(obj, _python_is_types + (Null, True_, False_)):
66 # allow x ==/!= True/False to be treated as a literal.
67 # this comes out to "== / != true/false" or "1/0" if those
68 # constants aren't supported and works on all platforms
69 if op in (operators.eq, operators.ne) and isinstance(
70 obj, (bool, True_, False_)
71 ):
72 return OperatorExpression._construct_for_op(
73 expr,
74 coercions.expect(roles.ConstExprRole, obj),
75 op,
76 type_=result_type,
77 negate=negate_op,
78 modifiers=kwargs,
79 )
80 elif op in (
81 operators.is_distinct_from,
82 operators.is_not_distinct_from,
83 ):
84 return OperatorExpression._construct_for_op(
85 expr,
86 coercions.expect(roles.ConstExprRole, obj),
87 op,
88 type_=result_type,
89 negate=negate_op,
90 modifiers=kwargs,
91 )
92 elif expr._is_collection_aggregate:
93 obj = coercions.expect(
94 roles.ConstExprRole, element=obj, operator=op, expr=expr
95 )
96 else:
97 # all other None uses IS, IS NOT
98 if op in (operators.eq, operators.is_):
99 return OperatorExpression._construct_for_op(
100 expr,
101 coercions.expect(roles.ConstExprRole, obj),
102 operators.is_,
103 negate=operators.is_not,
104 type_=result_type,
105 )
106 elif op in (operators.ne, operators.is_not):
107 return OperatorExpression._construct_for_op(
108 expr,
109 coercions.expect(roles.ConstExprRole, obj),
110 operators.is_not,
111 negate=operators.is_,
112 type_=result_type,
113 )
114 else:
115 raise exc.ArgumentError(
116 "Only '=', '!=', 'is_()', 'is_not()', "
117 "'is_distinct_from()', 'is_not_distinct_from()' "
118 "operators can be used with None/True/False"
119 )
120 else:
121 obj = coercions.expect(
122 roles.BinaryElementRole, element=obj, operator=op, expr=expr
123 )
124
125 if reverse:
126 return OperatorExpression._construct_for_op(
127 obj,
128 expr,
129 op,
130 type_=result_type,
131 negate=negate_op,
132 modifiers=kwargs,
133 )
134 else:
135 return OperatorExpression._construct_for_op(
136 expr,
137 obj,
138 op,
139 type_=result_type,
140 negate=negate_op,
141 modifiers=kwargs,
142 )
143
144
145def _custom_op_operate(
146 expr: ColumnElement[Any],
147 op: custom_op[Any],
148 obj: Any,
149 reverse: bool = False,
150 result_type: Optional[TypeEngine[Any]] = None,
151 **kw: Any,
152) -> ColumnElement[Any]:
153 if result_type is None:
154 if op.return_type:
155 result_type = op.return_type
156 elif op.is_comparison:
157 result_type = type_api.BOOLEANTYPE
158
159 return _binary_operate(
160 expr, op, obj, reverse=reverse, result_type=result_type, **kw
161 )
162
163
164def _binary_operate(
165 expr: ColumnElement[Any],
166 op: OperatorType,
167 obj: roles.BinaryElementRole[Any],
168 *,
169 reverse: bool = False,
170 result_type: Optional[TypeEngine[_T]] = None,
171 **kw: Any,
172) -> OperatorExpression[_T]:
173 coerced_obj = coercions.expect(
174 roles.BinaryElementRole, obj, expr=expr, operator=op
175 )
176
177 if reverse:
178 left, right = coerced_obj, expr
179 else:
180 left, right = expr, coerced_obj
181
182 if result_type is None:
183 op, result_type = left.comparator._adapt_expression(
184 op, right.comparator
185 )
186
187 return OperatorExpression._construct_for_op(
188 left, right, op, type_=result_type, modifiers=kw
189 )
190
191
192def _conjunction_operate(
193 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
194) -> ColumnElement[Any]:
195 if op is operators.and_:
196 return and_(expr, other)
197 elif op is operators.or_:
198 return or_(expr, other)
199 else:
200 raise NotImplementedError()
201
202
203def _scalar(
204 expr: ColumnElement[Any],
205 op: OperatorType,
206 fn: Callable[[ColumnElement[Any]], ColumnElement[Any]],
207 **kw: Any,
208) -> ColumnElement[Any]:
209 return fn(expr)
210
211
212def _in_impl(
213 expr: ColumnElement[Any],
214 op: OperatorType,
215 seq_or_selectable: ClauseElement,
216 negate_op: OperatorType,
217 **kw: Any,
218) -> ColumnElement[Any]:
219 seq_or_selectable = coercions.expect(
220 roles.InElementRole, seq_or_selectable, expr=expr, operator=op
221 )
222 if "in_ops" in seq_or_selectable._annotations:
223 op, negate_op = seq_or_selectable._annotations["in_ops"]
224
225 return _boolean_compare(
226 expr, op, seq_or_selectable, negate_op=negate_op, **kw
227 )
228
229
230def _getitem_impl(
231 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
232) -> ColumnElement[Any]:
233 if (
234 isinstance(expr.type, type_api.INDEXABLE)
235 or isinstance(expr.type, type_api.TypeDecorator)
236 and isinstance(expr.type.impl_instance, type_api.INDEXABLE)
237 ):
238 other = coercions.expect(
239 roles.BinaryElementRole, other, expr=expr, operator=op
240 )
241 return _binary_operate(expr, op, other, **kw)
242 else:
243 _unsupported_impl(expr, op, other, **kw)
244
245
246def _unsupported_impl(
247 expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any
248) -> NoReturn:
249 raise NotImplementedError(
250 "Operator '%s' is not supported on this expression" % op.__name__
251 )
252
253
254def _inv_impl(
255 expr: ColumnElement[Any], op: OperatorType, **kw: Any
256) -> ColumnElement[Any]:
257 """See :meth:`.ColumnOperators.__inv__`."""
258
259 # undocumented element currently used by the ORM for
260 # relationship.contains()
261 if hasattr(expr, "negation_clause"):
262 return expr.negation_clause
263 else:
264 return expr._negate()
265
266
267def _neg_impl(
268 expr: ColumnElement[Any], op: OperatorType, **kw: Any
269) -> ColumnElement[Any]:
270 """See :meth:`.ColumnOperators.__neg__`."""
271 return UnaryExpression(expr, operator=operators.neg, type_=expr.type)
272
273
274def _bitwise_not_impl(
275 expr: ColumnElement[Any], op: OperatorType, **kw: Any
276) -> ColumnElement[Any]:
277 """See :meth:`.ColumnOperators.bitwise_not`."""
278
279 return UnaryExpression(
280 expr, operator=operators.bitwise_not_op, type_=expr.type
281 )
282
283
284def _match_impl(
285 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
286) -> ColumnElement[Any]:
287 """See :meth:`.ColumnOperators.match`."""
288
289 return _boolean_compare(
290 expr,
291 operators.match_op,
292 coercions.expect(
293 roles.BinaryElementRole,
294 other,
295 expr=expr,
296 operator=operators.match_op,
297 ),
298 result_type=type_api.MATCHTYPE,
299 negate_op=(
300 operators.not_match_op
301 if op is operators.match_op
302 else operators.match_op
303 ),
304 **kw,
305 )
306
307
308def _distinct_impl(
309 expr: ColumnElement[Any], op: OperatorType, **kw: Any
310) -> ColumnElement[Any]:
311 """See :meth:`.ColumnOperators.distinct`."""
312 return UnaryExpression(
313 expr, operator=operators.distinct_op, type_=expr.type
314 )
315
316
317def _between_impl(
318 expr: ColumnElement[Any],
319 op: OperatorType,
320 cleft: Any,
321 cright: Any,
322 **kw: Any,
323) -> ColumnElement[Any]:
324 """See :meth:`.ColumnOperators.between`."""
325 return BinaryExpression(
326 expr,
327 ExpressionClauseList._construct_for_list(
328 operators.and_,
329 type_api.NULLTYPE,
330 coercions.expect(
331 roles.BinaryElementRole,
332 cleft,
333 expr=expr,
334 operator=operators.and_,
335 ),
336 coercions.expect(
337 roles.BinaryElementRole,
338 cright,
339 expr=expr,
340 operator=operators.and_,
341 ),
342 group=False,
343 ),
344 op,
345 negate=(
346 operators.not_between_op
347 if op is operators.between_op
348 else operators.between_op
349 ),
350 modifiers=kw,
351 )
352
353
354def _pow_impl(
355 expr: ColumnElement[Any],
356 op: OperatorType,
357 other: Any,
358 reverse: bool = False,
359 **kw: Any,
360) -> ColumnElement[Any]:
361 if reverse:
362 return functions.pow(other, expr)
363 else:
364 return functions.pow(expr, other)
365
366
367def _collate_impl(
368 expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any
369) -> ColumnElement[str]:
370 return CollationClause._create_collation_expression(expr, collation)
371
372
373def _regexp_match_impl(
374 expr: ColumnElement[str],
375 op: OperatorType,
376 pattern: Any,
377 flags: Optional[str],
378 **kw: Any,
379) -> ColumnElement[Any]:
380 return BinaryExpression(
381 expr,
382 coercions.expect(
383 roles.BinaryElementRole,
384 pattern,
385 expr=expr,
386 operator=operators.comma_op,
387 ),
388 op,
389 negate=operators.not_regexp_match_op,
390 modifiers={"flags": flags},
391 )
392
393
394def _regexp_replace_impl(
395 expr: ColumnElement[Any],
396 op: OperatorType,
397 pattern: Any,
398 replacement: Any,
399 flags: Optional[str],
400 **kw: Any,
401) -> ColumnElement[Any]:
402 return BinaryExpression(
403 expr,
404 ExpressionClauseList._construct_for_list(
405 operators.comma_op,
406 type_api.NULLTYPE,
407 coercions.expect(
408 roles.BinaryElementRole,
409 pattern,
410 expr=expr,
411 operator=operators.comma_op,
412 ),
413 coercions.expect(
414 roles.BinaryElementRole,
415 replacement,
416 expr=expr,
417 operator=operators.comma_op,
418 ),
419 group=False,
420 ),
421 op,
422 modifiers={"flags": flags},
423 )
424
425
426# a mapping of operators with the method they use, along with
427# additional keyword arguments to be passed
428operator_lookup: Dict[
429 str,
430 Tuple[
431 Callable[..., ColumnElement[Any]],
432 util.immutabledict[
433 str, Union[OperatorType, Callable[..., ColumnElement[Any]]]
434 ],
435 ],
436] = {
437 "and_": (_conjunction_operate, util.EMPTY_DICT),
438 "or_": (_conjunction_operate, util.EMPTY_DICT),
439 "inv": (_inv_impl, util.EMPTY_DICT),
440 "add": (_binary_operate, util.EMPTY_DICT),
441 "mul": (_binary_operate, util.EMPTY_DICT),
442 "sub": (_binary_operate, util.EMPTY_DICT),
443 "div": (_binary_operate, util.EMPTY_DICT),
444 "mod": (_binary_operate, util.EMPTY_DICT),
445 "bitwise_xor_op": (_binary_operate, util.EMPTY_DICT),
446 "bitwise_or_op": (_binary_operate, util.EMPTY_DICT),
447 "bitwise_and_op": (_binary_operate, util.EMPTY_DICT),
448 "bitwise_not_op": (_bitwise_not_impl, util.EMPTY_DICT),
449 "bitwise_lshift_op": (_binary_operate, util.EMPTY_DICT),
450 "bitwise_rshift_op": (_binary_operate, util.EMPTY_DICT),
451 "truediv": (_binary_operate, util.EMPTY_DICT),
452 "floordiv": (_binary_operate, util.EMPTY_DICT),
453 "custom_op": (_custom_op_operate, util.EMPTY_DICT),
454 "json_path_getitem_op": (_binary_operate, util.EMPTY_DICT),
455 "json_getitem_op": (_binary_operate, util.EMPTY_DICT),
456 "concat_op": (_binary_operate, util.EMPTY_DICT),
457 "any_op": (
458 _scalar,
459 util.immutabledict({"fn": CollectionAggregate._create_any}),
460 ),
461 "all_op": (
462 _scalar,
463 util.immutabledict({"fn": CollectionAggregate._create_all}),
464 ),
465 "lt": (_boolean_compare, util.immutabledict({"negate_op": operators.ge})),
466 "le": (_boolean_compare, util.immutabledict({"negate_op": operators.gt})),
467 "ne": (_boolean_compare, util.immutabledict({"negate_op": operators.eq})),
468 "gt": (_boolean_compare, util.immutabledict({"negate_op": operators.le})),
469 "ge": (_boolean_compare, util.immutabledict({"negate_op": operators.lt})),
470 "eq": (_boolean_compare, util.immutabledict({"negate_op": operators.ne})),
471 "is_distinct_from": (
472 _boolean_compare,
473 util.immutabledict({"negate_op": operators.is_not_distinct_from}),
474 ),
475 "is_not_distinct_from": (
476 _boolean_compare,
477 util.immutabledict({"negate_op": operators.is_distinct_from}),
478 ),
479 "like_op": (
480 _boolean_compare,
481 util.immutabledict({"negate_op": operators.not_like_op}),
482 ),
483 "ilike_op": (
484 _boolean_compare,
485 util.immutabledict({"negate_op": operators.not_ilike_op}),
486 ),
487 "not_like_op": (
488 _boolean_compare,
489 util.immutabledict({"negate_op": operators.like_op}),
490 ),
491 "not_ilike_op": (
492 _boolean_compare,
493 util.immutabledict({"negate_op": operators.ilike_op}),
494 ),
495 "contains_op": (
496 _boolean_compare,
497 util.immutabledict({"negate_op": operators.not_contains_op}),
498 ),
499 "icontains_op": (
500 _boolean_compare,
501 util.immutabledict({"negate_op": operators.not_icontains_op}),
502 ),
503 "startswith_op": (
504 _boolean_compare,
505 util.immutabledict({"negate_op": operators.not_startswith_op}),
506 ),
507 "istartswith_op": (
508 _boolean_compare,
509 util.immutabledict({"negate_op": operators.not_istartswith_op}),
510 ),
511 "endswith_op": (
512 _boolean_compare,
513 util.immutabledict({"negate_op": operators.not_endswith_op}),
514 ),
515 "iendswith_op": (
516 _boolean_compare,
517 util.immutabledict({"negate_op": operators.not_iendswith_op}),
518 ),
519 "desc_op": (
520 _scalar,
521 util.immutabledict({"fn": UnaryExpression._create_desc}),
522 ),
523 "asc_op": (
524 _scalar,
525 util.immutabledict({"fn": UnaryExpression._create_asc}),
526 ),
527 "nulls_first_op": (
528 _scalar,
529 util.immutabledict({"fn": UnaryExpression._create_nulls_first}),
530 ),
531 "nulls_last_op": (
532 _scalar,
533 util.immutabledict({"fn": UnaryExpression._create_nulls_last}),
534 ),
535 "in_op": (
536 _in_impl,
537 util.immutabledict({"negate_op": operators.not_in_op}),
538 ),
539 "not_in_op": (
540 _in_impl,
541 util.immutabledict({"negate_op": operators.in_op}),
542 ),
543 "is_": (
544 _boolean_compare,
545 util.immutabledict({"negate_op": operators.is_}),
546 ),
547 "is_not": (
548 _boolean_compare,
549 util.immutabledict({"negate_op": operators.is_not}),
550 ),
551 "collate": (_collate_impl, util.EMPTY_DICT),
552 "match_op": (_match_impl, util.EMPTY_DICT),
553 "not_match_op": (_match_impl, util.EMPTY_DICT),
554 "distinct_op": (_distinct_impl, util.EMPTY_DICT),
555 "between_op": (_between_impl, util.EMPTY_DICT),
556 "not_between_op": (_between_impl, util.EMPTY_DICT),
557 "neg": (_neg_impl, util.EMPTY_DICT),
558 "getitem": (_getitem_impl, util.EMPTY_DICT),
559 "lshift": (_unsupported_impl, util.EMPTY_DICT),
560 "rshift": (_unsupported_impl, util.EMPTY_DICT),
561 "matmul": (_unsupported_impl, util.EMPTY_DICT),
562 "contains": (_unsupported_impl, util.EMPTY_DICT),
563 "regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
564 "not_regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
565 "regexp_replace_op": (_regexp_replace_impl, util.EMPTY_DICT),
566 "pow": (_pow_impl, util.EMPTY_DICT),
567}