1# sql/default_comparator.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Default implementation of SQL comparison operations.""" 
    9 
    10from __future__ import annotations 
    11 
    12import typing 
    13from typing import Any 
    14from typing import Callable 
    15from typing import NoReturn 
    16from typing import Optional 
    17from typing import Tuple 
    18from typing import Type 
    19from typing import Union 
    20 
    21from . import coercions 
    22from . import functions 
    23from . import operators 
    24from . import roles 
    25from . import type_api 
    26from .elements import and_ 
    27from .elements import BinaryExpression 
    28from .elements import ClauseElement 
    29from .elements import CollationClause 
    30from .elements import CollectionAggregate 
    31from .elements import ExpressionClauseList 
    32from .elements import False_ 
    33from .elements import Null 
    34from .elements import OperatorExpression 
    35from .elements import or_ 
    36from .elements import True_ 
    37from .elements import UnaryExpression 
    38from .operators import OperatorType 
    39from .. import exc 
    40from .. import util 
    41 
    42_T = typing.TypeVar("_T", bound=Any) 
    43 
    44if typing.TYPE_CHECKING: 
    45    from .elements import ColumnElement 
    46    from .operators import custom_op 
    47    from .type_api import TypeEngine 
    48 
    49 
    50def _boolean_compare( 
    51    expr: ColumnElement[Any], 
    52    op: OperatorType, 
    53    obj: Any, 
    54    *, 
    55    negate_op: Optional[OperatorType] = None, 
    56    reverse: bool = False, 
    57    _python_is_types: Tuple[Type[Any], ...] = (type(None), bool), 
    58    result_type: Optional[TypeEngine[bool]] = None, 
    59    **kwargs: Any, 
    60) -> OperatorExpression[bool]: 
    61    if result_type is None: 
    62        result_type = type_api.BOOLEANTYPE 
    63 
    64    if isinstance(obj, _python_is_types + (Null, True_, False_)): 
    65        # allow x ==/!= True/False to be treated as a literal. 
    66        # this comes out to "== / != true/false" or "1/0" if those 
    67        # constants aren't supported and works on all platforms 
    68        if op in (operators.eq, operators.ne) and isinstance( 
    69            obj, (bool, True_, False_) 
    70        ): 
    71            return OperatorExpression._construct_for_op( 
    72                expr, 
    73                coercions.expect(roles.ConstExprRole, obj), 
    74                op, 
    75                type_=result_type, 
    76                negate=negate_op, 
    77                modifiers=kwargs, 
    78            ) 
    79        elif op in ( 
    80            operators.is_distinct_from, 
    81            operators.is_not_distinct_from, 
    82        ): 
    83            return OperatorExpression._construct_for_op( 
    84                expr, 
    85                coercions.expect(roles.ConstExprRole, obj), 
    86                op, 
    87                type_=result_type, 
    88                negate=negate_op, 
    89                modifiers=kwargs, 
    90            ) 
    91        elif expr._is_collection_aggregate: 
    92            obj = coercions.expect( 
    93                roles.ConstExprRole, element=obj, operator=op, expr=expr 
    94            ) 
    95        else: 
    96            # all other None uses IS, IS NOT 
    97            if op in (operators.eq, operators.is_): 
    98                return OperatorExpression._construct_for_op( 
    99                    expr, 
    100                    coercions.expect(roles.ConstExprRole, obj), 
    101                    operators.is_, 
    102                    negate=operators.is_not, 
    103                    type_=result_type, 
    104                ) 
    105            elif op in (operators.ne, operators.is_not): 
    106                return OperatorExpression._construct_for_op( 
    107                    expr, 
    108                    coercions.expect(roles.ConstExprRole, obj), 
    109                    operators.is_not, 
    110                    negate=operators.is_, 
    111                    type_=result_type, 
    112                ) 
    113            else: 
    114                raise exc.ArgumentError( 
    115                    "Only '=', '!=', 'is_()', 'is_not()', " 
    116                    "'is_distinct_from()', 'is_not_distinct_from()' " 
    117                    "operators can be used with None/True/False" 
    118                ) 
    119    else: 
    120        obj = coercions.expect( 
    121            roles.BinaryElementRole, element=obj, operator=op, expr=expr 
    122        ) 
    123 
    124    if reverse: 
    125        return OperatorExpression._construct_for_op( 
    126            obj, 
    127            expr, 
    128            op, 
    129            type_=result_type, 
    130            negate=negate_op, 
    131            modifiers=kwargs, 
    132        ) 
    133    else: 
    134        return OperatorExpression._construct_for_op( 
    135            expr, 
    136            obj, 
    137            op, 
    138            type_=result_type, 
    139            negate=negate_op, 
    140            modifiers=kwargs, 
    141        ) 
    142 
    143 
    144def _custom_op_operate( 
    145    expr: ColumnElement[Any], 
    146    op: custom_op[Any], 
    147    obj: Any, 
    148    reverse: bool = False, 
    149    result_type: Optional[TypeEngine[Any]] = None, 
    150    **kw: Any, 
    151) -> ColumnElement[Any]: 
    152    if result_type is None: 
    153        if op.return_type: 
    154            result_type = op.return_type 
    155        elif op.is_comparison: 
    156            result_type = type_api.BOOLEANTYPE 
    157 
    158    return _binary_operate( 
    159        expr, op, obj, reverse=reverse, result_type=result_type, **kw 
    160    ) 
    161 
    162 
    163def _binary_operate( 
    164    expr: ColumnElement[Any], 
    165    op: OperatorType, 
    166    obj: roles.BinaryElementRole[Any], 
    167    *, 
    168    reverse: bool = False, 
    169    result_type: Optional[TypeEngine[_T]] = None, 
    170    **kw: Any, 
    171) -> OperatorExpression[_T]: 
    172    coerced_obj = coercions.expect( 
    173        roles.BinaryElementRole, obj, expr=expr, operator=op 
    174    ) 
    175 
    176    if reverse: 
    177        left, right = coerced_obj, expr 
    178    else: 
    179        left, right = expr, coerced_obj 
    180 
    181    if result_type is None: 
    182        op, result_type = left.comparator._adapt_expression( 
    183            op, right.comparator 
    184        ) 
    185 
    186    return OperatorExpression._construct_for_op( 
    187        left, right, op, type_=result_type, modifiers=kw 
    188    ) 
    189 
    190 
    191def _conjunction_operate( 
    192    expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 
    193) -> ColumnElement[Any]: 
    194    if op is operators.and_: 
    195        return and_(expr, other) 
    196    elif op is operators.or_: 
    197        return or_(expr, other) 
    198    else: 
    199        raise NotImplementedError() 
    200 
    201 
    202def _scalar( 
    203    expr: ColumnElement[Any], 
    204    op: OperatorType, 
    205    fn: Callable[[ColumnElement[Any]], ColumnElement[Any]], 
    206    **kw: Any, 
    207) -> ColumnElement[Any]: 
    208    return fn(expr) 
    209 
    210 
    211def _in_impl( 
    212    expr: ColumnElement[Any], 
    213    op: OperatorType, 
    214    seq_or_selectable: ClauseElement, 
    215    negate_op: OperatorType, 
    216    **kw: Any, 
    217) -> ColumnElement[Any]: 
    218    seq_or_selectable = coercions.expect( 
    219        roles.InElementRole, seq_or_selectable, expr=expr, operator=op 
    220    ) 
    221    if "in_ops" in seq_or_selectable._annotations: 
    222        op, negate_op = seq_or_selectable._annotations["in_ops"] 
    223 
    224    return _boolean_compare( 
    225        expr, op, seq_or_selectable, negate_op=negate_op, **kw 
    226    ) 
    227 
    228 
    229def _getitem_impl( 
    230    expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 
    231) -> ColumnElement[Any]: 
    232    if ( 
    233        isinstance(expr.type, type_api.INDEXABLE) 
    234        or isinstance(expr.type, type_api.TypeDecorator) 
    235        and isinstance(expr.type.impl_instance, type_api.INDEXABLE) 
    236    ): 
    237        other = coercions.expect( 
    238            roles.BinaryElementRole, other, expr=expr, operator=op 
    239        ) 
    240        return _binary_operate(expr, op, other, **kw) 
    241    else: 
    242        _unsupported_impl(expr, op, other, **kw) 
    243 
    244 
    245def _unsupported_impl( 
    246    expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any 
    247) -> NoReturn: 
    248    raise NotImplementedError( 
    249        "Operator '%s' is not supported on this expression" % op.__name__ 
    250    ) 
    251 
    252 
    253def _inv_impl( 
    254    expr: ColumnElement[Any], op: OperatorType, **kw: Any 
    255) -> ColumnElement[Any]: 
    256    """See :meth:`.ColumnOperators.__inv__`.""" 
    257 
    258    # undocumented element currently used by the ORM for 
    259    # relationship.contains() 
    260    if hasattr(expr, "negation_clause"): 
    261        return expr.negation_clause 
    262    else: 
    263        return expr._negate() 
    264 
    265 
    266def _neg_impl( 
    267    expr: ColumnElement[Any], op: OperatorType, **kw: Any 
    268) -> ColumnElement[Any]: 
    269    """See :meth:`.ColumnOperators.__neg__`.""" 
    270    return UnaryExpression(expr, operator=operators.neg, type_=expr.type) 
    271 
    272 
    273def _bitwise_not_impl( 
    274    expr: ColumnElement[Any], op: OperatorType, **kw: Any 
    275) -> ColumnElement[Any]: 
    276    """See :meth:`.ColumnOperators.bitwise_not`.""" 
    277 
    278    return UnaryExpression( 
    279        expr, operator=operators.bitwise_not_op, type_=expr.type 
    280    ) 
    281 
    282 
    283def _match_impl( 
    284    expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 
    285) -> ColumnElement[Any]: 
    286    """See :meth:`.ColumnOperators.match`.""" 
    287 
    288    return _boolean_compare( 
    289        expr, 
    290        operators.match_op, 
    291        coercions.expect( 
    292            roles.BinaryElementRole, 
    293            other, 
    294            expr=expr, 
    295            operator=operators.match_op, 
    296        ), 
    297        result_type=type_api.MATCHTYPE, 
    298        negate_op=( 
    299            operators.not_match_op 
    300            if op is operators.match_op 
    301            else operators.match_op 
    302        ), 
    303        **kw, 
    304    ) 
    305 
    306 
    307def _distinct_impl( 
    308    expr: ColumnElement[Any], op: OperatorType, **kw: Any 
    309) -> ColumnElement[Any]: 
    310    """See :meth:`.ColumnOperators.distinct`.""" 
    311    return UnaryExpression( 
    312        expr, operator=operators.distinct_op, type_=expr.type 
    313    ) 
    314 
    315 
    316def _between_impl( 
    317    expr: ColumnElement[Any], 
    318    op: OperatorType, 
    319    cleft: Any, 
    320    cright: Any, 
    321    **kw: Any, 
    322) -> ColumnElement[Any]: 
    323    """See :meth:`.ColumnOperators.between`.""" 
    324    return BinaryExpression( 
    325        expr, 
    326        ExpressionClauseList._construct_for_list( 
    327            operators.and_, 
    328            type_api.NULLTYPE, 
    329            coercions.expect( 
    330                roles.BinaryElementRole, 
    331                cleft, 
    332                expr=expr, 
    333                operator=operators.and_, 
    334            ), 
    335            coercions.expect( 
    336                roles.BinaryElementRole, 
    337                cright, 
    338                expr=expr, 
    339                operator=operators.and_, 
    340            ), 
    341            group=False, 
    342        ), 
    343        op, 
    344        negate=( 
    345            operators.not_between_op 
    346            if op is operators.between_op 
    347            else operators.between_op 
    348        ), 
    349        modifiers=kw, 
    350    ) 
    351 
    352 
    353def _pow_impl( 
    354    expr: ColumnElement[Any], 
    355    op: OperatorType, 
    356    other: Any, 
    357    reverse: bool = False, 
    358    **kw: Any, 
    359) -> ColumnElement[Any]: 
    360    if reverse: 
    361        return functions.pow(other, expr) 
    362    else: 
    363        return functions.pow(expr, other) 
    364 
    365 
    366def _collate_impl( 
    367    expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any 
    368) -> ColumnElement[str]: 
    369    return CollationClause._create_collation_expression(expr, collation) 
    370 
    371 
    372def _regexp_match_impl( 
    373    expr: ColumnElement[str], 
    374    op: OperatorType, 
    375    pattern: Any, 
    376    flags: Optional[str], 
    377    **kw: Any, 
    378) -> ColumnElement[Any]: 
    379    return BinaryExpression( 
    380        expr, 
    381        coercions.expect( 
    382            roles.BinaryElementRole, 
    383            pattern, 
    384            expr=expr, 
    385            operator=operators.comma_op, 
    386        ), 
    387        op, 
    388        negate=operators.not_regexp_match_op, 
    389        modifiers={"flags": flags}, 
    390    ) 
    391 
    392 
    393def _regexp_replace_impl( 
    394    expr: ColumnElement[Any], 
    395    op: OperatorType, 
    396    pattern: Any, 
    397    replacement: Any, 
    398    flags: Optional[str], 
    399    **kw: Any, 
    400) -> ColumnElement[Any]: 
    401    return BinaryExpression( 
    402        expr, 
    403        ExpressionClauseList._construct_for_list( 
    404            operators.comma_op, 
    405            type_api.NULLTYPE, 
    406            coercions.expect( 
    407                roles.BinaryElementRole, 
    408                pattern, 
    409                expr=expr, 
    410                operator=operators.comma_op, 
    411            ), 
    412            coercions.expect( 
    413                roles.BinaryElementRole, 
    414                replacement, 
    415                expr=expr, 
    416                operator=operators.comma_op, 
    417            ), 
    418            group=False, 
    419        ), 
    420        op, 
    421        modifiers={"flags": flags}, 
    422    ) 
    423 
    424 
    425operator_lookup: util.immutabledict[ 
    426    str, 
    427    Tuple[ 
    428        Callable[..., "ColumnElement[Any]"], 
    429        util.immutabledict[ 
    430            str, Union["OperatorType", Callable[..., "ColumnElement[Any]"]] 
    431        ], 
    432    ], 
    433] = util.immutabledict( 
    434    { 
    435        "any_op": ( 
    436            _scalar, 
    437            util.immutabledict({"fn": CollectionAggregate._create_any}), 
    438        ), 
    439        "all_op": ( 
    440            _scalar, 
    441            util.immutabledict({"fn": CollectionAggregate._create_all}), 
    442        ), 
    443        "lt": ( 
    444            _boolean_compare, 
    445            util.immutabledict({"negate_op": operators.ge}), 
    446        ), 
    447        "le": ( 
    448            _boolean_compare, 
    449            util.immutabledict({"negate_op": operators.gt}), 
    450        ), 
    451        "ne": ( 
    452            _boolean_compare, 
    453            util.immutabledict({"negate_op": operators.eq}), 
    454        ), 
    455        "gt": ( 
    456            _boolean_compare, 
    457            util.immutabledict({"negate_op": operators.le}), 
    458        ), 
    459        "ge": ( 
    460            _boolean_compare, 
    461            util.immutabledict({"negate_op": operators.lt}), 
    462        ), 
    463        "eq": ( 
    464            _boolean_compare, 
    465            util.immutabledict({"negate_op": operators.ne}), 
    466        ), 
    467        "is_distinct_from": ( 
    468            _boolean_compare, 
    469            util.immutabledict({"negate_op": operators.is_not_distinct_from}), 
    470        ), 
    471        "is_not_distinct_from": ( 
    472            _boolean_compare, 
    473            util.immutabledict({"negate_op": operators.is_distinct_from}), 
    474        ), 
    475        "in_op": ( 
    476            _in_impl, 
    477            util.immutabledict({"negate_op": operators.not_in_op}), 
    478        ), 
    479        "not_in_op": ( 
    480            _in_impl, 
    481            util.immutabledict({"negate_op": operators.in_op}), 
    482        ), 
    483        "is_": ( 
    484            _boolean_compare, 
    485            util.immutabledict({"negate_op": operators.is_}), 
    486        ), 
    487        "is_not": ( 
    488            _boolean_compare, 
    489            util.immutabledict({"negate_op": operators.is_not}), 
    490        ), 
    491        "between_op": ( 
    492            _between_impl, 
    493            util.EMPTY_DICT, 
    494        ), 
    495        "not_between_op": ( 
    496            _between_impl, 
    497            util.EMPTY_DICT, 
    498        ), 
    499        "desc_op": ( 
    500            _scalar, 
    501            util.immutabledict({"fn": UnaryExpression._create_desc}), 
    502        ), 
    503        "asc_op": ( 
    504            _scalar, 
    505            util.immutabledict({"fn": UnaryExpression._create_asc}), 
    506        ), 
    507        "nulls_first_op": ( 
    508            _scalar, 
    509            util.immutabledict({"fn": UnaryExpression._create_nulls_first}), 
    510        ), 
    511        "nulls_last_op": ( 
    512            _scalar, 
    513            util.immutabledict({"fn": UnaryExpression._create_nulls_last}), 
    514        ), 
    515        "distinct_op": ( 
    516            _distinct_impl, 
    517            util.EMPTY_DICT, 
    518        ), 
    519        "null_op": (_binary_operate, util.EMPTY_DICT), 
    520        "custom_op": (_custom_op_operate, util.EMPTY_DICT), 
    521        "and_": ( 
    522            _conjunction_operate, 
    523            util.EMPTY_DICT, 
    524        ), 
    525        "or_": ( 
    526            _conjunction_operate, 
    527            util.EMPTY_DICT, 
    528        ), 
    529        "inv": ( 
    530            _inv_impl, 
    531            util.EMPTY_DICT, 
    532        ), 
    533        "add": ( 
    534            _binary_operate, 
    535            util.EMPTY_DICT, 
    536        ), 
    537        "concat_op": ( 
    538            _binary_operate, 
    539            util.EMPTY_DICT, 
    540        ), 
    541        "getitem": (_getitem_impl, util.EMPTY_DICT), 
    542        "contains_op": ( 
    543            _boolean_compare, 
    544            util.immutabledict({"negate_op": operators.not_contains_op}), 
    545        ), 
    546        "icontains_op": ( 
    547            _boolean_compare, 
    548            util.immutabledict({"negate_op": operators.not_icontains_op}), 
    549        ), 
    550        "contains": ( 
    551            _unsupported_impl, 
    552            util.EMPTY_DICT, 
    553        ), 
    554        "like_op": ( 
    555            _boolean_compare, 
    556            util.immutabledict({"negate_op": operators.not_like_op}), 
    557        ), 
    558        "ilike_op": ( 
    559            _boolean_compare, 
    560            util.immutabledict({"negate_op": operators.not_ilike_op}), 
    561        ), 
    562        "not_like_op": ( 
    563            _boolean_compare, 
    564            util.immutabledict({"negate_op": operators.like_op}), 
    565        ), 
    566        "not_ilike_op": ( 
    567            _boolean_compare, 
    568            util.immutabledict({"negate_op": operators.ilike_op}), 
    569        ), 
    570        "startswith_op": ( 
    571            _boolean_compare, 
    572            util.immutabledict({"negate_op": operators.not_startswith_op}), 
    573        ), 
    574        "istartswith_op": ( 
    575            _boolean_compare, 
    576            util.immutabledict({"negate_op": operators.not_istartswith_op}), 
    577        ), 
    578        "endswith_op": ( 
    579            _boolean_compare, 
    580            util.immutabledict({"negate_op": operators.not_endswith_op}), 
    581        ), 
    582        "iendswith_op": ( 
    583            _boolean_compare, 
    584            util.immutabledict({"negate_op": operators.not_iendswith_op}), 
    585        ), 
    586        "collate": ( 
    587            _collate_impl, 
    588            util.EMPTY_DICT, 
    589        ), 
    590        "match_op": (_match_impl, util.EMPTY_DICT), 
    591        "not_match_op": ( 
    592            _match_impl, 
    593            util.EMPTY_DICT, 
    594        ), 
    595        "regexp_match_op": ( 
    596            _regexp_match_impl, 
    597            util.EMPTY_DICT, 
    598        ), 
    599        "not_regexp_match_op": ( 
    600            _regexp_match_impl, 
    601            util.EMPTY_DICT, 
    602        ), 
    603        "regexp_replace_op": ( 
    604            _regexp_replace_impl, 
    605            util.EMPTY_DICT, 
    606        ), 
    607        "lshift": (_unsupported_impl, util.EMPTY_DICT), 
    608        "rshift": (_unsupported_impl, util.EMPTY_DICT), 
    609        "bitwise_xor_op": ( 
    610            _binary_operate, 
    611            util.EMPTY_DICT, 
    612        ), 
    613        "bitwise_or_op": ( 
    614            _binary_operate, 
    615            util.EMPTY_DICT, 
    616        ), 
    617        "bitwise_and_op": ( 
    618            _binary_operate, 
    619            util.EMPTY_DICT, 
    620        ), 
    621        "bitwise_not_op": ( 
    622            _bitwise_not_impl, 
    623            util.EMPTY_DICT, 
    624        ), 
    625        "bitwise_lshift_op": ( 
    626            _binary_operate, 
    627            util.EMPTY_DICT, 
    628        ), 
    629        "bitwise_rshift_op": ( 
    630            _binary_operate, 
    631            util.EMPTY_DICT, 
    632        ), 
    633        "matmul": (_unsupported_impl, util.EMPTY_DICT), 
    634        "pow": (_pow_impl, util.EMPTY_DICT), 
    635        "neg": (_neg_impl, util.EMPTY_DICT), 
    636        "mul": (_binary_operate, util.EMPTY_DICT), 
    637        "sub": ( 
    638            _binary_operate, 
    639            util.EMPTY_DICT, 
    640        ), 
    641        "div": (_binary_operate, util.EMPTY_DICT), 
    642        "mod": (_binary_operate, util.EMPTY_DICT), 
    643        "truediv": (_binary_operate, util.EMPTY_DICT), 
    644        "floordiv": (_binary_operate, util.EMPTY_DICT), 
    645        "json_path_getitem_op": ( 
    646            _binary_operate, 
    647            util.EMPTY_DICT, 
    648        ), 
    649        "json_getitem_op": ( 
    650            _binary_operate, 
    651            util.EMPTY_DICT, 
    652        ), 
    653    } 
    654)