1# sql/default_comparator.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Default implementation of SQL comparison operations."""
9
10from __future__ import annotations
11
12import typing
13from typing import Any
14from typing import Callable
15from typing import NoReturn
16from typing import Optional
17from typing import Tuple
18from typing import Type
19from typing import Union
20
21from . import coercions
22from . import functions
23from . import operators
24from . import roles
25from . import type_api
26from .elements import and_
27from .elements import BinaryExpression
28from .elements import ClauseElement
29from .elements import CollationClause
30from .elements import CollectionAggregate
31from .elements import ExpressionClauseList
32from .elements import False_
33from .elements import Null
34from .elements import OperatorExpression
35from .elements import or_
36from .elements import True_
37from .elements import UnaryExpression
38from .operators import OperatorType
39from .. import exc
40from .. import util
41
42_T = typing.TypeVar("_T", bound=Any)
43
44if typing.TYPE_CHECKING:
45 from .elements import ColumnElement
46 from .operators import custom_op
47 from .type_api import TypeEngine
48
49
50def _boolean_compare(
51 expr: ColumnElement[Any],
52 op: OperatorType,
53 obj: Any,
54 *,
55 negate_op: Optional[OperatorType] = None,
56 reverse: bool = False,
57 _python_is_types: Tuple[Type[Any], ...] = (type(None), bool),
58 result_type: Optional[TypeEngine[bool]] = None,
59 **kwargs: Any,
60) -> OperatorExpression[bool]:
61 if result_type is None:
62 result_type = type_api.BOOLEANTYPE
63
64 if isinstance(obj, _python_is_types + (Null, True_, False_)):
65 # allow x ==/!= True/False to be treated as a literal.
66 # this comes out to "== / != true/false" or "1/0" if those
67 # constants aren't supported and works on all platforms
68 if op in (operators.eq, operators.ne) and isinstance(
69 obj, (bool, True_, False_)
70 ):
71 return OperatorExpression._construct_for_op(
72 expr,
73 coercions.expect(roles.ConstExprRole, obj),
74 op,
75 type_=result_type,
76 negate=negate_op,
77 modifiers=kwargs,
78 )
79 elif op in (
80 operators.is_distinct_from,
81 operators.is_not_distinct_from,
82 ):
83 return OperatorExpression._construct_for_op(
84 expr,
85 coercions.expect(roles.ConstExprRole, obj),
86 op,
87 type_=result_type,
88 negate=negate_op,
89 modifiers=kwargs,
90 )
91 elif expr._is_collection_aggregate:
92 obj = coercions.expect(
93 roles.ConstExprRole, element=obj, operator=op, expr=expr
94 )
95 else:
96 # all other None uses IS, IS NOT
97 if op in (operators.eq, operators.is_):
98 return OperatorExpression._construct_for_op(
99 expr,
100 coercions.expect(roles.ConstExprRole, obj),
101 operators.is_,
102 negate=operators.is_not,
103 type_=result_type,
104 )
105 elif op in (operators.ne, operators.is_not):
106 return OperatorExpression._construct_for_op(
107 expr,
108 coercions.expect(roles.ConstExprRole, obj),
109 operators.is_not,
110 negate=operators.is_,
111 type_=result_type,
112 )
113 else:
114 raise exc.ArgumentError(
115 "Only '=', '!=', 'is_()', 'is_not()', "
116 "'is_distinct_from()', 'is_not_distinct_from()' "
117 "operators can be used with None/True/False"
118 )
119 else:
120 obj = coercions.expect(
121 roles.BinaryElementRole, element=obj, operator=op, expr=expr
122 )
123
124 if reverse:
125 return OperatorExpression._construct_for_op(
126 obj,
127 expr,
128 op,
129 type_=result_type,
130 negate=negate_op,
131 modifiers=kwargs,
132 )
133 else:
134 return OperatorExpression._construct_for_op(
135 expr,
136 obj,
137 op,
138 type_=result_type,
139 negate=negate_op,
140 modifiers=kwargs,
141 )
142
143
144def _custom_op_operate(
145 expr: ColumnElement[Any],
146 op: custom_op[Any],
147 obj: Any,
148 reverse: bool = False,
149 result_type: Optional[TypeEngine[Any]] = None,
150 **kw: Any,
151) -> ColumnElement[Any]:
152 if result_type is None:
153 if op.return_type:
154 result_type = op.return_type
155 elif op.is_comparison:
156 result_type = type_api.BOOLEANTYPE
157
158 return _binary_operate(
159 expr, op, obj, reverse=reverse, result_type=result_type, **kw
160 )
161
162
163def _binary_operate(
164 expr: ColumnElement[Any],
165 op: OperatorType,
166 obj: roles.BinaryElementRole[Any],
167 *,
168 reverse: bool = False,
169 result_type: Optional[TypeEngine[_T]] = None,
170 **kw: Any,
171) -> OperatorExpression[_T]:
172 coerced_obj = coercions.expect(
173 roles.BinaryElementRole, obj, expr=expr, operator=op
174 )
175
176 if reverse:
177 left, right = coerced_obj, expr
178 else:
179 left, right = expr, coerced_obj
180
181 if result_type is None:
182 op, result_type = left.comparator._adapt_expression(
183 op, right.comparator
184 )
185
186 return OperatorExpression._construct_for_op(
187 left, right, op, type_=result_type, modifiers=kw
188 )
189
190
191def _conjunction_operate(
192 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
193) -> ColumnElement[Any]:
194 if op is operators.and_:
195 return and_(expr, other)
196 elif op is operators.or_:
197 return or_(expr, other)
198 else:
199 raise NotImplementedError()
200
201
202def _scalar(
203 expr: ColumnElement[Any],
204 op: OperatorType,
205 fn: Callable[[ColumnElement[Any]], ColumnElement[Any]],
206 **kw: Any,
207) -> ColumnElement[Any]:
208 return fn(expr)
209
210
211def _in_impl(
212 expr: ColumnElement[Any],
213 op: OperatorType,
214 seq_or_selectable: ClauseElement,
215 negate_op: OperatorType,
216 **kw: Any,
217) -> ColumnElement[Any]:
218 seq_or_selectable = coercions.expect(
219 roles.InElementRole, seq_or_selectable, expr=expr, operator=op
220 )
221 if "in_ops" in seq_or_selectable._annotations:
222 op, negate_op = seq_or_selectable._annotations["in_ops"]
223
224 return _boolean_compare(
225 expr, op, seq_or_selectable, negate_op=negate_op, **kw
226 )
227
228
229def _getitem_impl(
230 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
231) -> ColumnElement[Any]:
232 if (
233 isinstance(expr.type, type_api.INDEXABLE)
234 or isinstance(expr.type, type_api.TypeDecorator)
235 and isinstance(expr.type.impl_instance, type_api.INDEXABLE)
236 ):
237 other = coercions.expect(
238 roles.BinaryElementRole, other, expr=expr, operator=op
239 )
240 return _binary_operate(expr, op, other, **kw)
241 else:
242 _unsupported_impl(expr, op, other, **kw)
243
244
245def _unsupported_impl(
246 expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any
247) -> NoReturn:
248 raise NotImplementedError(
249 "Operator '%s' is not supported on this expression" % op.__name__
250 )
251
252
253def _inv_impl(
254 expr: ColumnElement[Any], op: OperatorType, **kw: Any
255) -> ColumnElement[Any]:
256 """See :meth:`.ColumnOperators.__inv__`."""
257
258 # undocumented element currently used by the ORM for
259 # relationship.contains()
260 if hasattr(expr, "negation_clause"):
261 return expr.negation_clause
262 else:
263 return expr._negate()
264
265
266def _neg_impl(
267 expr: ColumnElement[Any], op: OperatorType, **kw: Any
268) -> ColumnElement[Any]:
269 """See :meth:`.ColumnOperators.__neg__`."""
270 return UnaryExpression(expr, operator=operators.neg, type_=expr.type)
271
272
273def _bitwise_not_impl(
274 expr: ColumnElement[Any], op: OperatorType, **kw: Any
275) -> ColumnElement[Any]:
276 """See :meth:`.ColumnOperators.bitwise_not`."""
277
278 return UnaryExpression(
279 expr, operator=operators.bitwise_not_op, type_=expr.type
280 )
281
282
283def _match_impl(
284 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
285) -> ColumnElement[Any]:
286 """See :meth:`.ColumnOperators.match`."""
287
288 return _boolean_compare(
289 expr,
290 operators.match_op,
291 coercions.expect(
292 roles.BinaryElementRole,
293 other,
294 expr=expr,
295 operator=operators.match_op,
296 ),
297 result_type=type_api.MATCHTYPE,
298 negate_op=(
299 operators.not_match_op
300 if op is operators.match_op
301 else operators.match_op
302 ),
303 **kw,
304 )
305
306
307def _distinct_impl(
308 expr: ColumnElement[Any], op: OperatorType, **kw: Any
309) -> ColumnElement[Any]:
310 """See :meth:`.ColumnOperators.distinct`."""
311 return UnaryExpression(
312 expr, operator=operators.distinct_op, type_=expr.type
313 )
314
315
316def _between_impl(
317 expr: ColumnElement[Any],
318 op: OperatorType,
319 cleft: Any,
320 cright: Any,
321 **kw: Any,
322) -> ColumnElement[Any]:
323 """See :meth:`.ColumnOperators.between`."""
324 return BinaryExpression(
325 expr,
326 ExpressionClauseList._construct_for_list(
327 operators.and_,
328 type_api.NULLTYPE,
329 coercions.expect(
330 roles.BinaryElementRole,
331 cleft,
332 expr=expr,
333 operator=operators.and_,
334 ),
335 coercions.expect(
336 roles.BinaryElementRole,
337 cright,
338 expr=expr,
339 operator=operators.and_,
340 ),
341 group=False,
342 ),
343 op,
344 negate=(
345 operators.not_between_op
346 if op is operators.between_op
347 else operators.between_op
348 ),
349 modifiers=kw,
350 )
351
352
353def _pow_impl(
354 expr: ColumnElement[Any],
355 op: OperatorType,
356 other: Any,
357 reverse: bool = False,
358 **kw: Any,
359) -> ColumnElement[Any]:
360 if reverse:
361 return functions.pow(other, expr)
362 else:
363 return functions.pow(expr, other)
364
365
366def _collate_impl(
367 expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any
368) -> ColumnElement[str]:
369 return CollationClause._create_collation_expression(expr, collation)
370
371
372def _regexp_match_impl(
373 expr: ColumnElement[str],
374 op: OperatorType,
375 pattern: Any,
376 flags: Optional[str],
377 **kw: Any,
378) -> ColumnElement[Any]:
379 return BinaryExpression(
380 expr,
381 coercions.expect(
382 roles.BinaryElementRole,
383 pattern,
384 expr=expr,
385 operator=operators.comma_op,
386 ),
387 op,
388 negate=operators.not_regexp_match_op,
389 modifiers={"flags": flags},
390 )
391
392
393def _regexp_replace_impl(
394 expr: ColumnElement[Any],
395 op: OperatorType,
396 pattern: Any,
397 replacement: Any,
398 flags: Optional[str],
399 **kw: Any,
400) -> ColumnElement[Any]:
401 return BinaryExpression(
402 expr,
403 ExpressionClauseList._construct_for_list(
404 operators.comma_op,
405 type_api.NULLTYPE,
406 coercions.expect(
407 roles.BinaryElementRole,
408 pattern,
409 expr=expr,
410 operator=operators.comma_op,
411 ),
412 coercions.expect(
413 roles.BinaryElementRole,
414 replacement,
415 expr=expr,
416 operator=operators.comma_op,
417 ),
418 group=False,
419 ),
420 op,
421 modifiers={"flags": flags},
422 )
423
424
425operator_lookup: util.immutabledict[
426 str,
427 Tuple[
428 Callable[..., "ColumnElement[Any]"],
429 util.immutabledict[
430 str, Union["OperatorType", Callable[..., "ColumnElement[Any]"]]
431 ],
432 ],
433] = util.immutabledict(
434 {
435 "any_op": (
436 _scalar,
437 util.immutabledict({"fn": CollectionAggregate._create_any}),
438 ),
439 "all_op": (
440 _scalar,
441 util.immutabledict({"fn": CollectionAggregate._create_all}),
442 ),
443 "lt": (
444 _boolean_compare,
445 util.immutabledict({"negate_op": operators.ge}),
446 ),
447 "le": (
448 _boolean_compare,
449 util.immutabledict({"negate_op": operators.gt}),
450 ),
451 "ne": (
452 _boolean_compare,
453 util.immutabledict({"negate_op": operators.eq}),
454 ),
455 "gt": (
456 _boolean_compare,
457 util.immutabledict({"negate_op": operators.le}),
458 ),
459 "ge": (
460 _boolean_compare,
461 util.immutabledict({"negate_op": operators.lt}),
462 ),
463 "eq": (
464 _boolean_compare,
465 util.immutabledict({"negate_op": operators.ne}),
466 ),
467 "is_distinct_from": (
468 _boolean_compare,
469 util.immutabledict({"negate_op": operators.is_not_distinct_from}),
470 ),
471 "is_not_distinct_from": (
472 _boolean_compare,
473 util.immutabledict({"negate_op": operators.is_distinct_from}),
474 ),
475 "in_op": (
476 _in_impl,
477 util.immutabledict({"negate_op": operators.not_in_op}),
478 ),
479 "not_in_op": (
480 _in_impl,
481 util.immutabledict({"negate_op": operators.in_op}),
482 ),
483 "is_": (
484 _boolean_compare,
485 util.immutabledict({"negate_op": operators.is_}),
486 ),
487 "is_not": (
488 _boolean_compare,
489 util.immutabledict({"negate_op": operators.is_not}),
490 ),
491 "between_op": (
492 _between_impl,
493 util.EMPTY_DICT,
494 ),
495 "not_between_op": (
496 _between_impl,
497 util.EMPTY_DICT,
498 ),
499 "desc_op": (
500 _scalar,
501 util.immutabledict({"fn": UnaryExpression._create_desc}),
502 ),
503 "asc_op": (
504 _scalar,
505 util.immutabledict({"fn": UnaryExpression._create_asc}),
506 ),
507 "nulls_first_op": (
508 _scalar,
509 util.immutabledict({"fn": UnaryExpression._create_nulls_first}),
510 ),
511 "nulls_last_op": (
512 _scalar,
513 util.immutabledict({"fn": UnaryExpression._create_nulls_last}),
514 ),
515 "distinct_op": (
516 _distinct_impl,
517 util.EMPTY_DICT,
518 ),
519 "null_op": (_binary_operate, util.EMPTY_DICT),
520 "custom_op": (_custom_op_operate, util.EMPTY_DICT),
521 "and_": (
522 _conjunction_operate,
523 util.EMPTY_DICT,
524 ),
525 "or_": (
526 _conjunction_operate,
527 util.EMPTY_DICT,
528 ),
529 "inv": (
530 _inv_impl,
531 util.EMPTY_DICT,
532 ),
533 "add": (
534 _binary_operate,
535 util.EMPTY_DICT,
536 ),
537 "concat_op": (
538 _binary_operate,
539 util.EMPTY_DICT,
540 ),
541 "getitem": (_getitem_impl, util.EMPTY_DICT),
542 "contains_op": (
543 _boolean_compare,
544 util.immutabledict({"negate_op": operators.not_contains_op}),
545 ),
546 "icontains_op": (
547 _boolean_compare,
548 util.immutabledict({"negate_op": operators.not_icontains_op}),
549 ),
550 "contains": (
551 _unsupported_impl,
552 util.EMPTY_DICT,
553 ),
554 "like_op": (
555 _boolean_compare,
556 util.immutabledict({"negate_op": operators.not_like_op}),
557 ),
558 "ilike_op": (
559 _boolean_compare,
560 util.immutabledict({"negate_op": operators.not_ilike_op}),
561 ),
562 "not_like_op": (
563 _boolean_compare,
564 util.immutabledict({"negate_op": operators.like_op}),
565 ),
566 "not_ilike_op": (
567 _boolean_compare,
568 util.immutabledict({"negate_op": operators.ilike_op}),
569 ),
570 "startswith_op": (
571 _boolean_compare,
572 util.immutabledict({"negate_op": operators.not_startswith_op}),
573 ),
574 "istartswith_op": (
575 _boolean_compare,
576 util.immutabledict({"negate_op": operators.not_istartswith_op}),
577 ),
578 "endswith_op": (
579 _boolean_compare,
580 util.immutabledict({"negate_op": operators.not_endswith_op}),
581 ),
582 "iendswith_op": (
583 _boolean_compare,
584 util.immutabledict({"negate_op": operators.not_iendswith_op}),
585 ),
586 "collate": (
587 _collate_impl,
588 util.EMPTY_DICT,
589 ),
590 "match_op": (_match_impl, util.EMPTY_DICT),
591 "not_match_op": (
592 _match_impl,
593 util.EMPTY_DICT,
594 ),
595 "regexp_match_op": (
596 _regexp_match_impl,
597 util.EMPTY_DICT,
598 ),
599 "not_regexp_match_op": (
600 _regexp_match_impl,
601 util.EMPTY_DICT,
602 ),
603 "regexp_replace_op": (
604 _regexp_replace_impl,
605 util.EMPTY_DICT,
606 ),
607 "lshift": (_unsupported_impl, util.EMPTY_DICT),
608 "rshift": (_unsupported_impl, util.EMPTY_DICT),
609 "bitwise_xor_op": (
610 _binary_operate,
611 util.EMPTY_DICT,
612 ),
613 "bitwise_or_op": (
614 _binary_operate,
615 util.EMPTY_DICT,
616 ),
617 "bitwise_and_op": (
618 _binary_operate,
619 util.EMPTY_DICT,
620 ),
621 "bitwise_not_op": (
622 _bitwise_not_impl,
623 util.EMPTY_DICT,
624 ),
625 "bitwise_lshift_op": (
626 _binary_operate,
627 util.EMPTY_DICT,
628 ),
629 "bitwise_rshift_op": (
630 _binary_operate,
631 util.EMPTY_DICT,
632 ),
633 "matmul": (_unsupported_impl, util.EMPTY_DICT),
634 "pow": (_pow_impl, util.EMPTY_DICT),
635 "neg": (_neg_impl, util.EMPTY_DICT),
636 "mul": (_binary_operate, util.EMPTY_DICT),
637 "sub": (
638 _binary_operate,
639 util.EMPTY_DICT,
640 ),
641 "div": (_binary_operate, util.EMPTY_DICT),
642 "mod": (_binary_operate, util.EMPTY_DICT),
643 "truediv": (_binary_operate, util.EMPTY_DICT),
644 "floordiv": (_binary_operate, util.EMPTY_DICT),
645 "json_path_getitem_op": (
646 _binary_operate,
647 util.EMPTY_DICT,
648 ),
649 "json_getitem_op": (
650 _binary_operate,
651 util.EMPTY_DICT,
652 ),
653 }
654)