1# sql/default_comparator.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Default implementation of SQL comparison operations."""
9
10from __future__ import annotations
11
12import typing
13from typing import Any
14from typing import Callable
15from typing import Dict
16from typing import NoReturn
17from typing import Optional
18from typing import Tuple
19from typing import Type
20from typing import Union
21
22from . import coercions
23from . import operators
24from . import roles
25from . import type_api
26from .elements import and_
27from .elements import BinaryExpression
28from .elements import ClauseElement
29from .elements import CollationClause
30from .elements import CollectionAggregate
31from .elements import ExpressionClauseList
32from .elements import False_
33from .elements import Null
34from .elements import OperatorExpression
35from .elements import or_
36from .elements import True_
37from .elements import UnaryExpression
38from .operators import OperatorType
39from .. import exc
40from .. import util
41
42_T = typing.TypeVar("_T", bound=Any)
43
44if typing.TYPE_CHECKING:
45 from .elements import ColumnElement
46 from .operators import custom_op
47 from .type_api import TypeEngine
48
49
50def _boolean_compare(
51 expr: ColumnElement[Any],
52 op: OperatorType,
53 obj: Any,
54 *,
55 negate_op: Optional[OperatorType] = None,
56 reverse: bool = False,
57 _python_is_types: Tuple[Type[Any], ...] = (type(None), bool),
58 result_type: Optional[TypeEngine[bool]] = None,
59 **kwargs: Any,
60) -> OperatorExpression[bool]:
61 if result_type is None:
62 result_type = type_api.BOOLEANTYPE
63
64 if isinstance(obj, _python_is_types + (Null, True_, False_)):
65 # allow x ==/!= True/False to be treated as a literal.
66 # this comes out to "== / != true/false" or "1/0" if those
67 # constants aren't supported and works on all platforms
68 if op in (operators.eq, operators.ne) and isinstance(
69 obj, (bool, True_, False_)
70 ):
71 return OperatorExpression._construct_for_op(
72 expr,
73 coercions.expect(roles.ConstExprRole, obj),
74 op,
75 type_=result_type,
76 negate=negate_op,
77 modifiers=kwargs,
78 )
79 elif op in (
80 operators.is_distinct_from,
81 operators.is_not_distinct_from,
82 ):
83 return OperatorExpression._construct_for_op(
84 expr,
85 coercions.expect(roles.ConstExprRole, obj),
86 op,
87 type_=result_type,
88 negate=negate_op,
89 modifiers=kwargs,
90 )
91 elif expr._is_collection_aggregate:
92 obj = coercions.expect(
93 roles.ConstExprRole, element=obj, operator=op, expr=expr
94 )
95 else:
96 # all other None uses IS, IS NOT
97 if op in (operators.eq, operators.is_):
98 return OperatorExpression._construct_for_op(
99 expr,
100 coercions.expect(roles.ConstExprRole, obj),
101 operators.is_,
102 negate=operators.is_not,
103 type_=result_type,
104 )
105 elif op in (operators.ne, operators.is_not):
106 return OperatorExpression._construct_for_op(
107 expr,
108 coercions.expect(roles.ConstExprRole, obj),
109 operators.is_not,
110 negate=operators.is_,
111 type_=result_type,
112 )
113 else:
114 raise exc.ArgumentError(
115 "Only '=', '!=', 'is_()', 'is_not()', "
116 "'is_distinct_from()', 'is_not_distinct_from()' "
117 "operators can be used with None/True/False"
118 )
119 else:
120 obj = coercions.expect(
121 roles.BinaryElementRole, element=obj, operator=op, expr=expr
122 )
123
124 if reverse:
125 return OperatorExpression._construct_for_op(
126 obj,
127 expr,
128 op,
129 type_=result_type,
130 negate=negate_op,
131 modifiers=kwargs,
132 )
133 else:
134 return OperatorExpression._construct_for_op(
135 expr,
136 obj,
137 op,
138 type_=result_type,
139 negate=negate_op,
140 modifiers=kwargs,
141 )
142
143
144def _custom_op_operate(
145 expr: ColumnElement[Any],
146 op: custom_op[Any],
147 obj: Any,
148 reverse: bool = False,
149 result_type: Optional[TypeEngine[Any]] = None,
150 **kw: Any,
151) -> ColumnElement[Any]:
152 if result_type is None:
153 if op.return_type:
154 result_type = op.return_type
155 elif op.is_comparison:
156 result_type = type_api.BOOLEANTYPE
157
158 return _binary_operate(
159 expr, op, obj, reverse=reverse, result_type=result_type, **kw
160 )
161
162
163def _binary_operate(
164 expr: ColumnElement[Any],
165 op: OperatorType,
166 obj: roles.BinaryElementRole[Any],
167 *,
168 reverse: bool = False,
169 result_type: Optional[TypeEngine[_T]] = None,
170 **kw: Any,
171) -> OperatorExpression[_T]:
172 coerced_obj = coercions.expect(
173 roles.BinaryElementRole, obj, expr=expr, operator=op
174 )
175
176 if reverse:
177 left, right = coerced_obj, expr
178 else:
179 left, right = expr, coerced_obj
180
181 if result_type is None:
182 op, result_type = left.comparator._adapt_expression(
183 op, right.comparator
184 )
185
186 return OperatorExpression._construct_for_op(
187 left, right, op, type_=result_type, modifiers=kw
188 )
189
190
191def _conjunction_operate(
192 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
193) -> ColumnElement[Any]:
194 if op is operators.and_:
195 return and_(expr, other)
196 elif op is operators.or_:
197 return or_(expr, other)
198 else:
199 raise NotImplementedError()
200
201
202def _scalar(
203 expr: ColumnElement[Any],
204 op: OperatorType,
205 fn: Callable[[ColumnElement[Any]], ColumnElement[Any]],
206 **kw: Any,
207) -> ColumnElement[Any]:
208 return fn(expr)
209
210
211def _in_impl(
212 expr: ColumnElement[Any],
213 op: OperatorType,
214 seq_or_selectable: ClauseElement,
215 negate_op: OperatorType,
216 **kw: Any,
217) -> ColumnElement[Any]:
218 seq_or_selectable = coercions.expect(
219 roles.InElementRole, seq_or_selectable, expr=expr, operator=op
220 )
221 if "in_ops" in seq_or_selectable._annotations:
222 op, negate_op = seq_or_selectable._annotations["in_ops"]
223
224 return _boolean_compare(
225 expr, op, seq_or_selectable, negate_op=negate_op, **kw
226 )
227
228
229def _getitem_impl(
230 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
231) -> ColumnElement[Any]:
232 if (
233 isinstance(expr.type, type_api.INDEXABLE)
234 or isinstance(expr.type, type_api.TypeDecorator)
235 and isinstance(expr.type.impl_instance, type_api.INDEXABLE)
236 ):
237 other = coercions.expect(
238 roles.BinaryElementRole, other, expr=expr, operator=op
239 )
240 return _binary_operate(expr, op, other, **kw)
241 else:
242 _unsupported_impl(expr, op, other, **kw)
243
244
245def _unsupported_impl(
246 expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any
247) -> NoReturn:
248 raise NotImplementedError(
249 "Operator '%s' is not supported on this expression" % op.__name__
250 )
251
252
253def _inv_impl(
254 expr: ColumnElement[Any], op: OperatorType, **kw: Any
255) -> ColumnElement[Any]:
256 """See :meth:`.ColumnOperators.__inv__`."""
257
258 # undocumented element currently used by the ORM for
259 # relationship.contains()
260 if hasattr(expr, "negation_clause"):
261 return expr.negation_clause
262 else:
263 return expr._negate()
264
265
266def _neg_impl(
267 expr: ColumnElement[Any], op: OperatorType, **kw: Any
268) -> ColumnElement[Any]:
269 """See :meth:`.ColumnOperators.__neg__`."""
270 return UnaryExpression(expr, operator=operators.neg, type_=expr.type)
271
272
273def _bitwise_not_impl(
274 expr: ColumnElement[Any], op: OperatorType, **kw: Any
275) -> ColumnElement[Any]:
276 """See :meth:`.ColumnOperators.bitwise_not`."""
277
278 return UnaryExpression(
279 expr, operator=operators.bitwise_not_op, type_=expr.type
280 )
281
282
283def _match_impl(
284 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
285) -> ColumnElement[Any]:
286 """See :meth:`.ColumnOperators.match`."""
287
288 return _boolean_compare(
289 expr,
290 operators.match_op,
291 coercions.expect(
292 roles.BinaryElementRole,
293 other,
294 expr=expr,
295 operator=operators.match_op,
296 ),
297 result_type=type_api.MATCHTYPE,
298 negate_op=(
299 operators.not_match_op
300 if op is operators.match_op
301 else operators.match_op
302 ),
303 **kw,
304 )
305
306
307def _distinct_impl(
308 expr: ColumnElement[Any], op: OperatorType, **kw: Any
309) -> ColumnElement[Any]:
310 """See :meth:`.ColumnOperators.distinct`."""
311 return UnaryExpression(
312 expr, operator=operators.distinct_op, type_=expr.type
313 )
314
315
316def _between_impl(
317 expr: ColumnElement[Any],
318 op: OperatorType,
319 cleft: Any,
320 cright: Any,
321 **kw: Any,
322) -> ColumnElement[Any]:
323 """See :meth:`.ColumnOperators.between`."""
324 return BinaryExpression(
325 expr,
326 ExpressionClauseList._construct_for_list(
327 operators.and_,
328 type_api.NULLTYPE,
329 coercions.expect(
330 roles.BinaryElementRole,
331 cleft,
332 expr=expr,
333 operator=operators.and_,
334 ),
335 coercions.expect(
336 roles.BinaryElementRole,
337 cright,
338 expr=expr,
339 operator=operators.and_,
340 ),
341 group=False,
342 ),
343 op,
344 negate=(
345 operators.not_between_op
346 if op is operators.between_op
347 else operators.between_op
348 ),
349 modifiers=kw,
350 )
351
352
353def _collate_impl(
354 expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any
355) -> ColumnElement[str]:
356 return CollationClause._create_collation_expression(expr, collation)
357
358
359def _regexp_match_impl(
360 expr: ColumnElement[str],
361 op: OperatorType,
362 pattern: Any,
363 flags: Optional[str],
364 **kw: Any,
365) -> ColumnElement[Any]:
366 return BinaryExpression(
367 expr,
368 coercions.expect(
369 roles.BinaryElementRole,
370 pattern,
371 expr=expr,
372 operator=operators.comma_op,
373 ),
374 op,
375 negate=operators.not_regexp_match_op,
376 modifiers={"flags": flags},
377 )
378
379
380def _regexp_replace_impl(
381 expr: ColumnElement[Any],
382 op: OperatorType,
383 pattern: Any,
384 replacement: Any,
385 flags: Optional[str],
386 **kw: Any,
387) -> ColumnElement[Any]:
388 return BinaryExpression(
389 expr,
390 ExpressionClauseList._construct_for_list(
391 operators.comma_op,
392 type_api.NULLTYPE,
393 coercions.expect(
394 roles.BinaryElementRole,
395 pattern,
396 expr=expr,
397 operator=operators.comma_op,
398 ),
399 coercions.expect(
400 roles.BinaryElementRole,
401 replacement,
402 expr=expr,
403 operator=operators.comma_op,
404 ),
405 group=False,
406 ),
407 op,
408 modifiers={"flags": flags},
409 )
410
411
412# a mapping of operators with the method they use, along with
413# additional keyword arguments to be passed
414operator_lookup: Dict[
415 str,
416 Tuple[
417 Callable[..., ColumnElement[Any]],
418 util.immutabledict[
419 str, Union[OperatorType, Callable[..., ColumnElement[Any]]]
420 ],
421 ],
422] = {
423 "and_": (_conjunction_operate, util.EMPTY_DICT),
424 "or_": (_conjunction_operate, util.EMPTY_DICT),
425 "inv": (_inv_impl, util.EMPTY_DICT),
426 "add": (_binary_operate, util.EMPTY_DICT),
427 "mul": (_binary_operate, util.EMPTY_DICT),
428 "sub": (_binary_operate, util.EMPTY_DICT),
429 "div": (_binary_operate, util.EMPTY_DICT),
430 "mod": (_binary_operate, util.EMPTY_DICT),
431 "bitwise_xor_op": (_binary_operate, util.EMPTY_DICT),
432 "bitwise_or_op": (_binary_operate, util.EMPTY_DICT),
433 "bitwise_and_op": (_binary_operate, util.EMPTY_DICT),
434 "bitwise_not_op": (_bitwise_not_impl, util.EMPTY_DICT),
435 "bitwise_lshift_op": (_binary_operate, util.EMPTY_DICT),
436 "bitwise_rshift_op": (_binary_operate, util.EMPTY_DICT),
437 "truediv": (_binary_operate, util.EMPTY_DICT),
438 "floordiv": (_binary_operate, util.EMPTY_DICT),
439 "custom_op": (_custom_op_operate, util.EMPTY_DICT),
440 "json_path_getitem_op": (_binary_operate, util.EMPTY_DICT),
441 "json_getitem_op": (_binary_operate, util.EMPTY_DICT),
442 "concat_op": (_binary_operate, util.EMPTY_DICT),
443 "any_op": (
444 _scalar,
445 util.immutabledict({"fn": CollectionAggregate._create_any}),
446 ),
447 "all_op": (
448 _scalar,
449 util.immutabledict({"fn": CollectionAggregate._create_all}),
450 ),
451 "lt": (_boolean_compare, util.immutabledict({"negate_op": operators.ge})),
452 "le": (_boolean_compare, util.immutabledict({"negate_op": operators.gt})),
453 "ne": (_boolean_compare, util.immutabledict({"negate_op": operators.eq})),
454 "gt": (_boolean_compare, util.immutabledict({"negate_op": operators.le})),
455 "ge": (_boolean_compare, util.immutabledict({"negate_op": operators.lt})),
456 "eq": (_boolean_compare, util.immutabledict({"negate_op": operators.ne})),
457 "is_distinct_from": (
458 _boolean_compare,
459 util.immutabledict({"negate_op": operators.is_not_distinct_from}),
460 ),
461 "is_not_distinct_from": (
462 _boolean_compare,
463 util.immutabledict({"negate_op": operators.is_distinct_from}),
464 ),
465 "like_op": (
466 _boolean_compare,
467 util.immutabledict({"negate_op": operators.not_like_op}),
468 ),
469 "ilike_op": (
470 _boolean_compare,
471 util.immutabledict({"negate_op": operators.not_ilike_op}),
472 ),
473 "not_like_op": (
474 _boolean_compare,
475 util.immutabledict({"negate_op": operators.like_op}),
476 ),
477 "not_ilike_op": (
478 _boolean_compare,
479 util.immutabledict({"negate_op": operators.ilike_op}),
480 ),
481 "contains_op": (
482 _boolean_compare,
483 util.immutabledict({"negate_op": operators.not_contains_op}),
484 ),
485 "icontains_op": (
486 _boolean_compare,
487 util.immutabledict({"negate_op": operators.not_icontains_op}),
488 ),
489 "startswith_op": (
490 _boolean_compare,
491 util.immutabledict({"negate_op": operators.not_startswith_op}),
492 ),
493 "istartswith_op": (
494 _boolean_compare,
495 util.immutabledict({"negate_op": operators.not_istartswith_op}),
496 ),
497 "endswith_op": (
498 _boolean_compare,
499 util.immutabledict({"negate_op": operators.not_endswith_op}),
500 ),
501 "iendswith_op": (
502 _boolean_compare,
503 util.immutabledict({"negate_op": operators.not_iendswith_op}),
504 ),
505 "desc_op": (
506 _scalar,
507 util.immutabledict({"fn": UnaryExpression._create_desc}),
508 ),
509 "asc_op": (
510 _scalar,
511 util.immutabledict({"fn": UnaryExpression._create_asc}),
512 ),
513 "nulls_first_op": (
514 _scalar,
515 util.immutabledict({"fn": UnaryExpression._create_nulls_first}),
516 ),
517 "nulls_last_op": (
518 _scalar,
519 util.immutabledict({"fn": UnaryExpression._create_nulls_last}),
520 ),
521 "in_op": (
522 _in_impl,
523 util.immutabledict({"negate_op": operators.not_in_op}),
524 ),
525 "not_in_op": (
526 _in_impl,
527 util.immutabledict({"negate_op": operators.in_op}),
528 ),
529 "is_": (
530 _boolean_compare,
531 util.immutabledict({"negate_op": operators.is_}),
532 ),
533 "is_not": (
534 _boolean_compare,
535 util.immutabledict({"negate_op": operators.is_not}),
536 ),
537 "collate": (_collate_impl, util.EMPTY_DICT),
538 "match_op": (_match_impl, util.EMPTY_DICT),
539 "not_match_op": (_match_impl, util.EMPTY_DICT),
540 "distinct_op": (_distinct_impl, util.EMPTY_DICT),
541 "between_op": (_between_impl, util.EMPTY_DICT),
542 "not_between_op": (_between_impl, util.EMPTY_DICT),
543 "neg": (_neg_impl, util.EMPTY_DICT),
544 "getitem": (_getitem_impl, util.EMPTY_DICT),
545 "lshift": (_unsupported_impl, util.EMPTY_DICT),
546 "rshift": (_unsupported_impl, util.EMPTY_DICT),
547 "contains": (_unsupported_impl, util.EMPTY_DICT),
548 "regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
549 "not_regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
550 "regexp_replace_op": (_regexp_replace_impl, util.EMPTY_DICT),
551}