Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/default_comparator.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

115 statements  

1# sql/default_comparator.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Default implementation of SQL comparison operations.""" 

9 

10from __future__ import annotations 

11 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import NoReturn 

16from typing import Optional 

17from typing import Tuple 

18from typing import Type 

19from typing import Union 

20 

21from . import coercions 

22from . import functions 

23from . import operators 

24from . import roles 

25from . import type_api 

26from .elements import and_ 

27from .elements import BinaryExpression 

28from .elements import ClauseElement 

29from .elements import CollationClause 

30from .elements import CollectionAggregate 

31from .elements import ExpressionClauseList 

32from .elements import False_ 

33from .elements import Null 

34from .elements import OperatorExpression 

35from .elements import or_ 

36from .elements import True_ 

37from .elements import UnaryExpression 

38from .operators import OperatorType 

39from .. import exc 

40from .. import util 

41 

42_T = typing.TypeVar("_T", bound=Any) 

43 

44if typing.TYPE_CHECKING: 

45 from .elements import ColumnElement 

46 from .operators import custom_op 

47 from .type_api import TypeEngine 

48 

49 

50def _boolean_compare( 

51 expr: ColumnElement[Any], 

52 op: OperatorType, 

53 obj: Any, 

54 *, 

55 negate_op: Optional[OperatorType] = None, 

56 reverse: bool = False, 

57 _python_is_types: Tuple[Type[Any], ...] = (type(None), bool), 

58 result_type: Optional[TypeEngine[bool]] = None, 

59 **kwargs: Any, 

60) -> OperatorExpression[bool]: 

61 if result_type is None: 

62 result_type = type_api.BOOLEANTYPE 

63 

64 if isinstance(obj, _python_is_types + (Null, True_, False_)): 

65 # allow x ==/!= True/False to be treated as a literal. 

66 # this comes out to "== / != true/false" or "1/0" if those 

67 # constants aren't supported and works on all platforms 

68 if op in (operators.eq, operators.ne) and isinstance( 

69 obj, (bool, True_, False_) 

70 ): 

71 return OperatorExpression._construct_for_op( 

72 expr, 

73 coercions.expect(roles.ConstExprRole, obj), 

74 op, 

75 type_=result_type, 

76 negate=negate_op, 

77 modifiers=kwargs, 

78 ) 

79 elif op in ( 

80 operators.is_distinct_from, 

81 operators.is_not_distinct_from, 

82 ): 

83 return OperatorExpression._construct_for_op( 

84 expr, 

85 coercions.expect(roles.ConstExprRole, obj), 

86 op, 

87 type_=result_type, 

88 negate=negate_op, 

89 modifiers=kwargs, 

90 ) 

91 elif expr._is_collection_aggregate: 

92 obj = coercions.expect( 

93 roles.ConstExprRole, element=obj, operator=op, expr=expr 

94 ) 

95 else: 

96 # all other None uses IS, IS NOT 

97 if op in (operators.eq, operators.is_): 

98 return OperatorExpression._construct_for_op( 

99 expr, 

100 coercions.expect(roles.ConstExprRole, obj), 

101 operators.is_, 

102 negate=operators.is_not, 

103 type_=result_type, 

104 ) 

105 elif op in (operators.ne, operators.is_not): 

106 return OperatorExpression._construct_for_op( 

107 expr, 

108 coercions.expect(roles.ConstExprRole, obj), 

109 operators.is_not, 

110 negate=operators.is_, 

111 type_=result_type, 

112 ) 

113 else: 

114 raise exc.ArgumentError( 

115 "Only '=', '!=', 'is_()', 'is_not()', " 

116 "'is_distinct_from()', 'is_not_distinct_from()' " 

117 "operators can be used with None/True/False" 

118 ) 

119 else: 

120 obj = coercions.expect( 

121 roles.BinaryElementRole, element=obj, operator=op, expr=expr 

122 ) 

123 

124 if reverse: 

125 return OperatorExpression._construct_for_op( 

126 obj, 

127 expr, 

128 op, 

129 type_=result_type, 

130 negate=negate_op, 

131 modifiers=kwargs, 

132 ) 

133 else: 

134 return OperatorExpression._construct_for_op( 

135 expr, 

136 obj, 

137 op, 

138 type_=result_type, 

139 negate=negate_op, 

140 modifiers=kwargs, 

141 ) 

142 

143 

144def _custom_op_operate( 

145 expr: ColumnElement[Any], 

146 op: custom_op[Any], 

147 obj: Any, 

148 reverse: bool = False, 

149 result_type: Optional[TypeEngine[Any]] = None, 

150 **kw: Any, 

151) -> ColumnElement[Any]: 

152 if result_type is None: 

153 if op.return_type: 

154 result_type = op.return_type 

155 elif op.is_comparison: 

156 result_type = type_api.BOOLEANTYPE 

157 

158 return _binary_operate( 

159 expr, op, obj, reverse=reverse, result_type=result_type, **kw 

160 ) 

161 

162 

163def _binary_operate( 

164 expr: ColumnElement[Any], 

165 op: OperatorType, 

166 obj: roles.BinaryElementRole[Any], 

167 *, 

168 reverse: bool = False, 

169 result_type: Optional[TypeEngine[_T]] = None, 

170 **kw: Any, 

171) -> OperatorExpression[_T]: 

172 coerced_obj = coercions.expect( 

173 roles.BinaryElementRole, obj, expr=expr, operator=op 

174 ) 

175 

176 if reverse: 

177 left, right = coerced_obj, expr 

178 else: 

179 left, right = expr, coerced_obj 

180 

181 if result_type is None: 

182 op, result_type = left.comparator._adapt_expression( 

183 op, right.comparator 

184 ) 

185 

186 return OperatorExpression._construct_for_op( 

187 left, right, op, type_=result_type, modifiers=kw 

188 ) 

189 

190 

191def _conjunction_operate( 

192 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

193) -> ColumnElement[Any]: 

194 if op is operators.and_: 

195 return and_(expr, other) 

196 elif op is operators.or_: 

197 return or_(expr, other) 

198 else: 

199 raise NotImplementedError() 

200 

201 

202def _scalar( 

203 expr: ColumnElement[Any], 

204 op: OperatorType, 

205 fn: Callable[[ColumnElement[Any]], ColumnElement[Any]], 

206 **kw: Any, 

207) -> ColumnElement[Any]: 

208 return fn(expr) 

209 

210 

211def _in_impl( 

212 expr: ColumnElement[Any], 

213 op: OperatorType, 

214 seq_or_selectable: ClauseElement, 

215 negate_op: OperatorType, 

216 **kw: Any, 

217) -> ColumnElement[Any]: 

218 seq_or_selectable = coercions.expect( 

219 roles.InElementRole, seq_or_selectable, expr=expr, operator=op 

220 ) 

221 if "in_ops" in seq_or_selectable._annotations: 

222 op, negate_op = seq_or_selectable._annotations["in_ops"] 

223 

224 return _boolean_compare( 

225 expr, op, seq_or_selectable, negate_op=negate_op, **kw 

226 ) 

227 

228 

229def _getitem_impl( 

230 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

231) -> ColumnElement[Any]: 

232 if ( 

233 isinstance(expr.type, type_api.INDEXABLE) 

234 or isinstance(expr.type, type_api.TypeDecorator) 

235 and isinstance(expr.type.impl_instance, type_api.INDEXABLE) 

236 ): 

237 other = coercions.expect( 

238 roles.BinaryElementRole, other, expr=expr, operator=op 

239 ) 

240 return _binary_operate(expr, op, other, **kw) 

241 else: 

242 _unsupported_impl(expr, op, other, **kw) 

243 

244 

245def _unsupported_impl( 

246 expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any 

247) -> NoReturn: 

248 raise NotImplementedError( 

249 "Operator '%s' is not supported on this expression" % op.__name__ 

250 ) 

251 

252 

253def _inv_impl( 

254 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

255) -> ColumnElement[Any]: 

256 """See :meth:`.ColumnOperators.__inv__`.""" 

257 

258 # undocumented element currently used by the ORM for 

259 # relationship.contains() 

260 if hasattr(expr, "negation_clause"): 

261 return expr.negation_clause 

262 else: 

263 return expr._negate() 

264 

265 

266def _neg_impl( 

267 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

268) -> ColumnElement[Any]: 

269 """See :meth:`.ColumnOperators.__neg__`.""" 

270 return UnaryExpression(expr, operator=operators.neg, type_=expr.type) 

271 

272 

273def _bitwise_not_impl( 

274 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

275) -> ColumnElement[Any]: 

276 """See :meth:`.ColumnOperators.bitwise_not`.""" 

277 

278 return UnaryExpression( 

279 expr, operator=operators.bitwise_not_op, type_=expr.type 

280 ) 

281 

282 

283def _match_impl( 

284 expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any 

285) -> ColumnElement[Any]: 

286 """See :meth:`.ColumnOperators.match`.""" 

287 

288 return _boolean_compare( 

289 expr, 

290 operators.match_op, 

291 coercions.expect( 

292 roles.BinaryElementRole, 

293 other, 

294 expr=expr, 

295 operator=operators.match_op, 

296 ), 

297 result_type=type_api.MATCHTYPE, 

298 negate_op=( 

299 operators.not_match_op 

300 if op is operators.match_op 

301 else operators.match_op 

302 ), 

303 **kw, 

304 ) 

305 

306 

307def _distinct_impl( 

308 expr: ColumnElement[Any], op: OperatorType, **kw: Any 

309) -> ColumnElement[Any]: 

310 """See :meth:`.ColumnOperators.distinct`.""" 

311 return UnaryExpression( 

312 expr, operator=operators.distinct_op, type_=expr.type 

313 ) 

314 

315 

316def _between_impl( 

317 expr: ColumnElement[Any], 

318 op: OperatorType, 

319 cleft: Any, 

320 cright: Any, 

321 **kw: Any, 

322) -> ColumnElement[Any]: 

323 """See :meth:`.ColumnOperators.between`.""" 

324 return BinaryExpression( 

325 expr, 

326 ExpressionClauseList._construct_for_list( 

327 operators.and_, 

328 type_api.NULLTYPE, 

329 coercions.expect( 

330 roles.BinaryElementRole, 

331 cleft, 

332 expr=expr, 

333 operator=operators.and_, 

334 ), 

335 coercions.expect( 

336 roles.BinaryElementRole, 

337 cright, 

338 expr=expr, 

339 operator=operators.and_, 

340 ), 

341 group=False, 

342 ), 

343 op, 

344 negate=( 

345 operators.not_between_op 

346 if op is operators.between_op 

347 else operators.between_op 

348 ), 

349 modifiers=kw, 

350 ) 

351 

352 

353def _pow_impl( 

354 expr: ColumnElement[Any], 

355 op: OperatorType, 

356 other: Any, 

357 reverse: bool = False, 

358 **kw: Any, 

359) -> ColumnElement[Any]: 

360 if reverse: 

361 return functions.pow(other, expr) 

362 else: 

363 return functions.pow(expr, other) 

364 

365 

366def _collate_impl( 

367 expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any 

368) -> ColumnElement[str]: 

369 return CollationClause._create_collation_expression(expr, collation) 

370 

371 

372def _regexp_match_impl( 

373 expr: ColumnElement[str], 

374 op: OperatorType, 

375 pattern: Any, 

376 flags: Optional[str], 

377 **kw: Any, 

378) -> ColumnElement[Any]: 

379 return BinaryExpression( 

380 expr, 

381 coercions.expect( 

382 roles.BinaryElementRole, 

383 pattern, 

384 expr=expr, 

385 operator=operators.comma_op, 

386 ), 

387 op, 

388 negate=operators.not_regexp_match_op, 

389 modifiers={"flags": flags}, 

390 ) 

391 

392 

393def _regexp_replace_impl( 

394 expr: ColumnElement[Any], 

395 op: OperatorType, 

396 pattern: Any, 

397 replacement: Any, 

398 flags: Optional[str], 

399 **kw: Any, 

400) -> ColumnElement[Any]: 

401 return BinaryExpression( 

402 expr, 

403 ExpressionClauseList._construct_for_list( 

404 operators.comma_op, 

405 type_api.NULLTYPE, 

406 coercions.expect( 

407 roles.BinaryElementRole, 

408 pattern, 

409 expr=expr, 

410 operator=operators.comma_op, 

411 ), 

412 coercions.expect( 

413 roles.BinaryElementRole, 

414 replacement, 

415 expr=expr, 

416 operator=operators.comma_op, 

417 ), 

418 group=False, 

419 ), 

420 op, 

421 modifiers={"flags": flags}, 

422 ) 

423 

424 

425operator_lookup: util.immutabledict[ 

426 str, 

427 Tuple[ 

428 Callable[..., "ColumnElement[Any]"], 

429 util.immutabledict[ 

430 str, Union["OperatorType", Callable[..., "ColumnElement[Any]"]] 

431 ], 

432 ], 

433] = util.immutabledict( 

434 { 

435 "any_op": ( 

436 _scalar, 

437 util.immutabledict({"fn": CollectionAggregate._create_any}), 

438 ), 

439 "all_op": ( 

440 _scalar, 

441 util.immutabledict({"fn": CollectionAggregate._create_all}), 

442 ), 

443 "lt": ( 

444 _boolean_compare, 

445 util.immutabledict({"negate_op": operators.ge}), 

446 ), 

447 "le": ( 

448 _boolean_compare, 

449 util.immutabledict({"negate_op": operators.gt}), 

450 ), 

451 "ne": ( 

452 _boolean_compare, 

453 util.immutabledict({"negate_op": operators.eq}), 

454 ), 

455 "gt": ( 

456 _boolean_compare, 

457 util.immutabledict({"negate_op": operators.le}), 

458 ), 

459 "ge": ( 

460 _boolean_compare, 

461 util.immutabledict({"negate_op": operators.lt}), 

462 ), 

463 "eq": ( 

464 _boolean_compare, 

465 util.immutabledict({"negate_op": operators.ne}), 

466 ), 

467 "is_distinct_from": ( 

468 _boolean_compare, 

469 util.immutabledict({"negate_op": operators.is_not_distinct_from}), 

470 ), 

471 "is_not_distinct_from": ( 

472 _boolean_compare, 

473 util.immutabledict({"negate_op": operators.is_distinct_from}), 

474 ), 

475 "in_op": ( 

476 _in_impl, 

477 util.immutabledict({"negate_op": operators.not_in_op}), 

478 ), 

479 "not_in_op": ( 

480 _in_impl, 

481 util.immutabledict({"negate_op": operators.in_op}), 

482 ), 

483 "is_": ( 

484 _boolean_compare, 

485 util.immutabledict({"negate_op": operators.is_}), 

486 ), 

487 "is_not": ( 

488 _boolean_compare, 

489 util.immutabledict({"negate_op": operators.is_not}), 

490 ), 

491 "between_op": ( 

492 _between_impl, 

493 util.EMPTY_DICT, 

494 ), 

495 "not_between_op": ( 

496 _between_impl, 

497 util.EMPTY_DICT, 

498 ), 

499 "desc_op": ( 

500 _scalar, 

501 util.immutabledict({"fn": UnaryExpression._create_desc}), 

502 ), 

503 "asc_op": ( 

504 _scalar, 

505 util.immutabledict({"fn": UnaryExpression._create_asc}), 

506 ), 

507 "nulls_first_op": ( 

508 _scalar, 

509 util.immutabledict({"fn": UnaryExpression._create_nulls_first}), 

510 ), 

511 "nulls_last_op": ( 

512 _scalar, 

513 util.immutabledict({"fn": UnaryExpression._create_nulls_last}), 

514 ), 

515 "distinct_op": ( 

516 _distinct_impl, 

517 util.EMPTY_DICT, 

518 ), 

519 "null_op": (_binary_operate, util.EMPTY_DICT), 

520 "custom_op": (_custom_op_operate, util.EMPTY_DICT), 

521 "and_": ( 

522 _conjunction_operate, 

523 util.EMPTY_DICT, 

524 ), 

525 "or_": ( 

526 _conjunction_operate, 

527 util.EMPTY_DICT, 

528 ), 

529 "inv": ( 

530 _inv_impl, 

531 util.EMPTY_DICT, 

532 ), 

533 "add": ( 

534 _binary_operate, 

535 util.EMPTY_DICT, 

536 ), 

537 "concat_op": ( 

538 _binary_operate, 

539 util.EMPTY_DICT, 

540 ), 

541 "getitem": (_getitem_impl, util.EMPTY_DICT), 

542 "contains_op": ( 

543 _boolean_compare, 

544 util.immutabledict({"negate_op": operators.not_contains_op}), 

545 ), 

546 "icontains_op": ( 

547 _boolean_compare, 

548 util.immutabledict({"negate_op": operators.not_icontains_op}), 

549 ), 

550 "contains": ( 

551 _unsupported_impl, 

552 util.EMPTY_DICT, 

553 ), 

554 "like_op": ( 

555 _boolean_compare, 

556 util.immutabledict({"negate_op": operators.not_like_op}), 

557 ), 

558 "ilike_op": ( 

559 _boolean_compare, 

560 util.immutabledict({"negate_op": operators.not_ilike_op}), 

561 ), 

562 "not_like_op": ( 

563 _boolean_compare, 

564 util.immutabledict({"negate_op": operators.like_op}), 

565 ), 

566 "not_ilike_op": ( 

567 _boolean_compare, 

568 util.immutabledict({"negate_op": operators.ilike_op}), 

569 ), 

570 "startswith_op": ( 

571 _boolean_compare, 

572 util.immutabledict({"negate_op": operators.not_startswith_op}), 

573 ), 

574 "istartswith_op": ( 

575 _boolean_compare, 

576 util.immutabledict({"negate_op": operators.not_istartswith_op}), 

577 ), 

578 "endswith_op": ( 

579 _boolean_compare, 

580 util.immutabledict({"negate_op": operators.not_endswith_op}), 

581 ), 

582 "iendswith_op": ( 

583 _boolean_compare, 

584 util.immutabledict({"negate_op": operators.not_iendswith_op}), 

585 ), 

586 "collate": ( 

587 _collate_impl, 

588 util.EMPTY_DICT, 

589 ), 

590 "match_op": (_match_impl, util.EMPTY_DICT), 

591 "not_match_op": ( 

592 _match_impl, 

593 util.EMPTY_DICT, 

594 ), 

595 "regexp_match_op": ( 

596 _regexp_match_impl, 

597 util.EMPTY_DICT, 

598 ), 

599 "not_regexp_match_op": ( 

600 _regexp_match_impl, 

601 util.EMPTY_DICT, 

602 ), 

603 "regexp_replace_op": ( 

604 _regexp_replace_impl, 

605 util.EMPTY_DICT, 

606 ), 

607 "lshift": (_unsupported_impl, util.EMPTY_DICT), 

608 "rshift": (_unsupported_impl, util.EMPTY_DICT), 

609 "bitwise_xor_op": ( 

610 _binary_operate, 

611 util.EMPTY_DICT, 

612 ), 

613 "bitwise_or_op": ( 

614 _binary_operate, 

615 util.EMPTY_DICT, 

616 ), 

617 "bitwise_and_op": ( 

618 _binary_operate, 

619 util.EMPTY_DICT, 

620 ), 

621 "bitwise_not_op": ( 

622 _bitwise_not_impl, 

623 util.EMPTY_DICT, 

624 ), 

625 "bitwise_lshift_op": ( 

626 _binary_operate, 

627 util.EMPTY_DICT, 

628 ), 

629 "bitwise_rshift_op": ( 

630 _binary_operate, 

631 util.EMPTY_DICT, 

632 ), 

633 "matmul": (_unsupported_impl, util.EMPTY_DICT), 

634 "pow": (_pow_impl, util.EMPTY_DICT), 

635 "neg": (_neg_impl, util.EMPTY_DICT), 

636 "mul": (_binary_operate, util.EMPTY_DICT), 

637 "sub": ( 

638 _binary_operate, 

639 util.EMPTY_DICT, 

640 ), 

641 "div": (_binary_operate, util.EMPTY_DICT), 

642 "mod": (_binary_operate, util.EMPTY_DICT), 

643 "truediv": (_binary_operate, util.EMPTY_DICT), 

644 "floordiv": (_binary_operate, util.EMPTY_DICT), 

645 "json_path_getitem_op": ( 

646 _binary_operate, 

647 util.EMPTY_DICT, 

648 ), 

649 "json_getitem_op": ( 

650 _binary_operate, 

651 util.EMPTY_DICT, 

652 ), 

653 } 

654)