Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/default_comparator.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

111 statements  

1# sql/default_comparator.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Default implementation of SQL comparison operations.""" 

9 

10from __future__ import annotations 

11 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import Dict 

16from typing import NoReturn 

17from typing import Optional 

18from typing import Tuple 

19from typing import Type 

20from typing import Union 

21 

22from . import coercions 

23from . import operators 

24from . import roles 

25from . import type_api 

26from .elements import and_ 

27from .elements import BinaryExpression 

28from .elements import ClauseElement 

29from .elements import CollationClause 

30from .elements import CollectionAggregate 

31from .elements import ExpressionClauseList 

32from .elements import False_ 

33from .elements import Null 

34from .elements import OperatorExpression 

35from .elements import or_ 

36from .elements import True_ 

37from .elements import UnaryExpression 

38from .operators import OperatorType 

39from .. import exc 

40from .. import util 

41 

42_T = typing.TypeVar("_T", bound=Any) 

43 

44if typing.TYPE_CHECKING: 

45 from .elements import ColumnElement 

46 from .operators import custom_op 

47 from .type_api import TypeEngine 

48 

49 

50def _boolean_compare( 

51 expr: ColumnElement[Any], 

52 op: OperatorType, 

53 obj: Any, 

54 *, 

55 negate_op: Optional[OperatorType] = None, 

56 reverse: bool = False, 

57 _python_is_types: Tuple[Type[Any], ...] = (type(None), bool), 

58 result_type: Optional[TypeEngine[bool]] = None, 

59 **kwargs: Any, 

60) -> OperatorExpression[bool]: 

61 if result_type is None: 

62 result_type = type_api.BOOLEANTYPE 

63 

64 if isinstance(obj, _python_is_types + (Null, True_, False_)): 

65 # allow x ==/!= True/False to be treated as a literal. 

66 # this comes out to "== / != true/false" or "1/0" if those 

67 # constants aren't supported and works on all platforms 

68 if op in (operators.eq, operators.ne) and isinstance( 

69 obj, (bool, True_, False_) 

70 ): 

71 return OperatorExpression._construct_for_op( 

72 expr, 

73 coercions.expect(roles.ConstExprRole, obj), 

74 op, 

75 type_=result_type, 

76 negate=negate_op, 

77 modifiers=kwargs, 

78 ) 

79 elif op in ( 

80 operators.is_distinct_from, 

81 operators.is_not_distinct_from, 

82 ): 

83 return OperatorExpression._construct_for_op( 

84 expr, 

85 coercions.expect(roles.ConstExprRole, obj), 

86 op, 

87 type_=result_type, 

88 negate=negate_op, 

89 modifiers=kwargs, 

90 ) 

91 elif expr._is_collection_aggregate: 

92 obj = coercions.expect( 

93 roles.ConstExprRole, element=obj, operator=op, expr=expr 

94 ) 

95 else: 

96 # all other None uses IS, IS NOT 

97 if op in (operators.eq, operators.is_): 

98 return OperatorExpression._construct_for_op( 

99 expr, 

100 coercions.expect(roles.ConstExprRole, obj), 

101 operators.is_, 

102 negate=operators.is_not, 

103 type_=result_type, 

104 ) 

105 elif op in (operators.ne, operators.is_not): 

106 return OperatorExpression._construct_for_op( 

107 expr, 

108 coercions.expect(roles.ConstExprRole, obj), 

109 operators.is_not, 

110 negate=operators.is_, 

111 type_=result_type, 

112 ) 

113 else: 

114 raise exc.ArgumentError( 

115 "Only '=', '!=', 'is_()', 'is_not()', " 

116 "'is_distinct_from()', 'is_not_distinct_from()' " 

117 "operators can be used with None/True/False" 

118 ) 

119 else: 

120 obj = coercions.expect( 

121 roles.BinaryElementRole, element=obj, operator=op, expr=expr 

122 ) 

123 

124 if reverse: 

125 return OperatorExpression._construct_for_op( 

126 obj, 

127 expr, 

128 op, 

129 type_=result_type, 

130 negate=negate_op, 

131 modifiers=kwargs, 

132 ) 

133 else: 

134 return OperatorExpression._construct_for_op( 

135 expr, 

136 obj, 

137 op, 

138 type_=result_type, 

139 negate=negate_op, 

140 modifiers=kwargs, 

141 ) 

142 

143 

144def _custom_op_operate( 

145 expr: ColumnElement[Any], 

146 op: custom_op[Any], 

147 obj: Any, 

148 reverse: bool = False, 

149 result_type: Optional[TypeEngine[Any]] = None, 

150 **kw: Any, 

151) -> ColumnElement[Any]: 

152 if result_type is None: 

153 if op.return_type: 

154 result_type = op.return_type 

155 elif op.is_comparison: 

156 result_type = type_api.BOOLEANTYPE 

157 

158 return _binary_operate( 

159 expr, op, obj, reverse=reverse, result_type=result_type, **kw 

160 ) 

161 

162 

163def _binary_operate( 

164 expr: ColumnElement[Any], 

165 op: OperatorType, 

166 obj: roles.BinaryElementRole[Any], 

167 *, 

168 reverse: bool = False, 

169 result_type: Optional[TypeEngine[_T]] = None, 

170 **kw: Any, 

171) -> OperatorExpression[_T]: 

172 coerced_obj = coercions.expect( 

173 roles.BinaryElementRole, obj, expr=expr, operator=op 

174 ) 

175 

176 if reverse: 

177 left, right = coerced_obj, expr 

178 else: 

179 left, right = expr, coerced_obj 

180 

181 if result_type is None: 

182 op, result_type = left.comparator._adapt_expression( 

183 op, right.comparator 

184 ) 

185 

186 return OperatorExpression._construct_for_op( 

187 left, right, op, type_=result_type, modifiers=kw 

188 ) 

189 

190 

191def _conjunction_operate( 

192 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

193) -> ColumnElement[Any]: 

194 if op is operators.and_: 

195 return and_(expr, other) 

196 elif op is operators.or_: 

197 return or_(expr, other) 

198 else: 

199 raise NotImplementedError() 

200 

201 

202def _scalar( 

203 expr: ColumnElement[Any], 

204 op: OperatorType, 

205 fn: Callable[[ColumnElement[Any]], ColumnElement[Any]], 

206 **kw: Any, 

207) -> ColumnElement[Any]: 

208 return fn(expr) 

209 

210 

211def _in_impl( 

212 expr: ColumnElement[Any], 

213 op: OperatorType, 

214 seq_or_selectable: ClauseElement, 

215 negate_op: OperatorType, 

216 **kw: Any, 

217) -> ColumnElement[Any]: 

218 seq_or_selectable = coercions.expect( 

219 roles.InElementRole, seq_or_selectable, expr=expr, operator=op 

220 ) 

221 if "in_ops" in seq_or_selectable._annotations: 

222 op, negate_op = seq_or_selectable._annotations["in_ops"] 

223 

224 return _boolean_compare( 

225 expr, op, seq_or_selectable, negate_op=negate_op, **kw 

226 ) 

227 

228 

229def _getitem_impl( 

230 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

231) -> ColumnElement[Any]: 

232 if ( 

233 isinstance(expr.type, type_api.INDEXABLE) 

234 or isinstance(expr.type, type_api.TypeDecorator) 

235 and isinstance(expr.type.impl_instance, type_api.INDEXABLE) 

236 ): 

237 other = coercions.expect( 

238 roles.BinaryElementRole, other, expr=expr, operator=op 

239 ) 

240 return _binary_operate(expr, op, other, **kw) 

241 else: 

242 _unsupported_impl(expr, op, other, **kw) 

243 

244 

245def _unsupported_impl( 

246 expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any 

247) -> NoReturn: 

248 raise NotImplementedError( 

249 "Operator '%s' is not supported on this expression" % op.__name__ 

250 ) 

251 

252 

253def _inv_impl( 

254 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

255) -> ColumnElement[Any]: 

256 """See :meth:`.ColumnOperators.__inv__`.""" 

257 

258 # undocumented element currently used by the ORM for 

259 # relationship.contains() 

260 if hasattr(expr, "negation_clause"): 

261 return expr.negation_clause 

262 else: 

263 return expr._negate() 

264 

265 

266def _neg_impl( 

267 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

268) -> ColumnElement[Any]: 

269 """See :meth:`.ColumnOperators.__neg__`.""" 

270 return UnaryExpression(expr, operator=operators.neg, type_=expr.type) 

271 

272 

273def _bitwise_not_impl( 

274 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

275) -> ColumnElement[Any]: 

276 """See :meth:`.ColumnOperators.bitwise_not`.""" 

277 

278 return UnaryExpression( 

279 expr, operator=operators.bitwise_not_op, type_=expr.type 

280 ) 

281 

282 

283def _match_impl( 

284 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

285) -> ColumnElement[Any]: 

286 """See :meth:`.ColumnOperators.match`.""" 

287 

288 return _boolean_compare( 

289 expr, 

290 operators.match_op, 

291 coercions.expect( 

292 roles.BinaryElementRole, 

293 other, 

294 expr=expr, 

295 operator=operators.match_op, 

296 ), 

297 result_type=type_api.MATCHTYPE, 

298 negate_op=( 

299 operators.not_match_op 

300 if op is operators.match_op 

301 else operators.match_op 

302 ), 

303 **kw, 

304 ) 

305 

306 

307def _distinct_impl( 

308 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

309) -> ColumnElement[Any]: 

310 """See :meth:`.ColumnOperators.distinct`.""" 

311 return UnaryExpression( 

312 expr, operator=operators.distinct_op, type_=expr.type 

313 ) 

314 

315 

316def _between_impl( 

317 expr: ColumnElement[Any], 

318 op: OperatorType, 

319 cleft: Any, 

320 cright: Any, 

321 **kw: Any, 

322) -> ColumnElement[Any]: 

323 """See :meth:`.ColumnOperators.between`.""" 

324 return BinaryExpression( 

325 expr, 

326 ExpressionClauseList._construct_for_list( 

327 operators.and_, 

328 type_api.NULLTYPE, 

329 coercions.expect( 

330 roles.BinaryElementRole, 

331 cleft, 

332 expr=expr, 

333 operator=operators.and_, 

334 ), 

335 coercions.expect( 

336 roles.BinaryElementRole, 

337 cright, 

338 expr=expr, 

339 operator=operators.and_, 

340 ), 

341 group=False, 

342 ), 

343 op, 

344 negate=( 

345 operators.not_between_op 

346 if op is operators.between_op 

347 else operators.between_op 

348 ), 

349 modifiers=kw, 

350 ) 

351 

352 

353def _collate_impl( 

354 expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any 

355) -> ColumnElement[str]: 

356 return CollationClause._create_collation_expression(expr, collation) 

357 

358 

359def _regexp_match_impl( 

360 expr: ColumnElement[str], 

361 op: OperatorType, 

362 pattern: Any, 

363 flags: Optional[str], 

364 **kw: Any, 

365) -> ColumnElement[Any]: 

366 return BinaryExpression( 

367 expr, 

368 coercions.expect( 

369 roles.BinaryElementRole, 

370 pattern, 

371 expr=expr, 

372 operator=operators.comma_op, 

373 ), 

374 op, 

375 negate=operators.not_regexp_match_op, 

376 modifiers={"flags": flags}, 

377 ) 

378 

379 

380def _regexp_replace_impl( 

381 expr: ColumnElement[Any], 

382 op: OperatorType, 

383 pattern: Any, 

384 replacement: Any, 

385 flags: Optional[str], 

386 **kw: Any, 

387) -> ColumnElement[Any]: 

388 return BinaryExpression( 

389 expr, 

390 ExpressionClauseList._construct_for_list( 

391 operators.comma_op, 

392 type_api.NULLTYPE, 

393 coercions.expect( 

394 roles.BinaryElementRole, 

395 pattern, 

396 expr=expr, 

397 operator=operators.comma_op, 

398 ), 

399 coercions.expect( 

400 roles.BinaryElementRole, 

401 replacement, 

402 expr=expr, 

403 operator=operators.comma_op, 

404 ), 

405 group=False, 

406 ), 

407 op, 

408 modifiers={"flags": flags}, 

409 ) 

410 

411 

412# a mapping of operators with the method they use, along with 

413# additional keyword arguments to be passed 

414operator_lookup: Dict[ 

415 str, 

416 Tuple[ 

417 Callable[..., ColumnElement[Any]], 

418 util.immutabledict[ 

419 str, Union[OperatorType, Callable[..., ColumnElement[Any]]] 

420 ], 

421 ], 

422] = { 

423 "and_": (_conjunction_operate, util.EMPTY_DICT), 

424 "or_": (_conjunction_operate, util.EMPTY_DICT), 

425 "inv": (_inv_impl, util.EMPTY_DICT), 

426 "add": (_binary_operate, util.EMPTY_DICT), 

427 "mul": (_binary_operate, util.EMPTY_DICT), 

428 "sub": (_binary_operate, util.EMPTY_DICT), 

429 "div": (_binary_operate, util.EMPTY_DICT), 

430 "mod": (_binary_operate, util.EMPTY_DICT), 

431 "bitwise_xor_op": (_binary_operate, util.EMPTY_DICT), 

432 "bitwise_or_op": (_binary_operate, util.EMPTY_DICT), 

433 "bitwise_and_op": (_binary_operate, util.EMPTY_DICT), 

434 "bitwise_not_op": (_bitwise_not_impl, util.EMPTY_DICT), 

435 "bitwise_lshift_op": (_binary_operate, util.EMPTY_DICT), 

436 "bitwise_rshift_op": (_binary_operate, util.EMPTY_DICT), 

437 "truediv": (_binary_operate, util.EMPTY_DICT), 

438 "floordiv": (_binary_operate, util.EMPTY_DICT), 

439 "custom_op": (_custom_op_operate, util.EMPTY_DICT), 

440 "json_path_getitem_op": (_binary_operate, util.EMPTY_DICT), 

441 "json_getitem_op": (_binary_operate, util.EMPTY_DICT), 

442 "concat_op": (_binary_operate, util.EMPTY_DICT), 

443 "any_op": ( 

444 _scalar, 

445 util.immutabledict({"fn": CollectionAggregate._create_any}), 

446 ), 

447 "all_op": ( 

448 _scalar, 

449 util.immutabledict({"fn": CollectionAggregate._create_all}), 

450 ), 

451 "lt": (_boolean_compare, util.immutabledict({"negate_op": operators.ge})), 

452 "le": (_boolean_compare, util.immutabledict({"negate_op": operators.gt})), 

453 "ne": (_boolean_compare, util.immutabledict({"negate_op": operators.eq})), 

454 "gt": (_boolean_compare, util.immutabledict({"negate_op": operators.le})), 

455 "ge": (_boolean_compare, util.immutabledict({"negate_op": operators.lt})), 

456 "eq": (_boolean_compare, util.immutabledict({"negate_op": operators.ne})), 

457 "is_distinct_from": ( 

458 _boolean_compare, 

459 util.immutabledict({"negate_op": operators.is_not_distinct_from}), 

460 ), 

461 "is_not_distinct_from": ( 

462 _boolean_compare, 

463 util.immutabledict({"negate_op": operators.is_distinct_from}), 

464 ), 

465 "like_op": ( 

466 _boolean_compare, 

467 util.immutabledict({"negate_op": operators.not_like_op}), 

468 ), 

469 "ilike_op": ( 

470 _boolean_compare, 

471 util.immutabledict({"negate_op": operators.not_ilike_op}), 

472 ), 

473 "not_like_op": ( 

474 _boolean_compare, 

475 util.immutabledict({"negate_op": operators.like_op}), 

476 ), 

477 "not_ilike_op": ( 

478 _boolean_compare, 

479 util.immutabledict({"negate_op": operators.ilike_op}), 

480 ), 

481 "contains_op": ( 

482 _boolean_compare, 

483 util.immutabledict({"negate_op": operators.not_contains_op}), 

484 ), 

485 "icontains_op": ( 

486 _boolean_compare, 

487 util.immutabledict({"negate_op": operators.not_icontains_op}), 

488 ), 

489 "startswith_op": ( 

490 _boolean_compare, 

491 util.immutabledict({"negate_op": operators.not_startswith_op}), 

492 ), 

493 "istartswith_op": ( 

494 _boolean_compare, 

495 util.immutabledict({"negate_op": operators.not_istartswith_op}), 

496 ), 

497 "endswith_op": ( 

498 _boolean_compare, 

499 util.immutabledict({"negate_op": operators.not_endswith_op}), 

500 ), 

501 "iendswith_op": ( 

502 _boolean_compare, 

503 util.immutabledict({"negate_op": operators.not_iendswith_op}), 

504 ), 

505 "desc_op": ( 

506 _scalar, 

507 util.immutabledict({"fn": UnaryExpression._create_desc}), 

508 ), 

509 "asc_op": ( 

510 _scalar, 

511 util.immutabledict({"fn": UnaryExpression._create_asc}), 

512 ), 

513 "nulls_first_op": ( 

514 _scalar, 

515 util.immutabledict({"fn": UnaryExpression._create_nulls_first}), 

516 ), 

517 "nulls_last_op": ( 

518 _scalar, 

519 util.immutabledict({"fn": UnaryExpression._create_nulls_last}), 

520 ), 

521 "in_op": ( 

522 _in_impl, 

523 util.immutabledict({"negate_op": operators.not_in_op}), 

524 ), 

525 "not_in_op": ( 

526 _in_impl, 

527 util.immutabledict({"negate_op": operators.in_op}), 

528 ), 

529 "is_": ( 

530 _boolean_compare, 

531 util.immutabledict({"negate_op": operators.is_}), 

532 ), 

533 "is_not": ( 

534 _boolean_compare, 

535 util.immutabledict({"negate_op": operators.is_not}), 

536 ), 

537 "collate": (_collate_impl, util.EMPTY_DICT), 

538 "match_op": (_match_impl, util.EMPTY_DICT), 

539 "not_match_op": (_match_impl, util.EMPTY_DICT), 

540 "distinct_op": (_distinct_impl, util.EMPTY_DICT), 

541 "between_op": (_between_impl, util.EMPTY_DICT), 

542 "not_between_op": (_between_impl, util.EMPTY_DICT), 

543 "neg": (_neg_impl, util.EMPTY_DICT), 

544 "getitem": (_getitem_impl, util.EMPTY_DICT), 

545 "lshift": (_unsupported_impl, util.EMPTY_DICT), 

546 "rshift": (_unsupported_impl, util.EMPTY_DICT), 

547 "contains": (_unsupported_impl, util.EMPTY_DICT), 

548 "regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT), 

549 "not_regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT), 

550 "regexp_replace_op": (_regexp_replace_impl, util.EMPTY_DICT), 

551}