Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

570 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import Literal 

23from typing import NoReturn 

24from typing import Optional 

25from typing import overload 

26from typing import Sequence 

27from typing import Tuple 

28from typing import Type 

29from typing import TYPE_CHECKING 

30from typing import TypeVar 

31from typing import Union 

32 

33from . import roles 

34from . import visitors 

35from ._typing import is_from_clause 

36from .base import ExecutableOption 

37from .base import Options 

38from .cache_key import HasCacheKey 

39from .visitors import Visitable 

40from .. import exc 

41from .. import inspection 

42from .. import util 

43 

44if typing.TYPE_CHECKING: 

45 # elements lambdas schema selectable are set by __init__ 

46 from . import elements 

47 from . import lambdas 

48 from . import schema 

49 from . import selectable 

50 from ._typing import _ColumnExpressionArgument 

51 from ._typing import _ColumnsClauseArgument 

52 from ._typing import _DDLColumnArgument 

53 from ._typing import _DMLTableArgument 

54 from ._typing import _FromClauseArgument 

55 from ._typing import _OnlyColumnArgument 

56 from .base import SyntaxExtension 

57 from .dml import _DMLTableElement 

58 from .elements import BindParameter 

59 from .elements import ClauseElement 

60 from .elements import ColumnClause 

61 from .elements import ColumnElement 

62 from .elements import NamedColumn 

63 from .elements import SQLCoreOperations 

64 from .elements import TextClause 

65 from .schema import Column 

66 from .selectable import _ColumnsClauseElement 

67 from .selectable import _JoinTargetProtocol 

68 from .selectable import FromClause 

69 from .selectable import HasCTE 

70 from .selectable import SelectBase 

71 from .selectable import Subquery 

72 from .visitors import _TraverseCallableType 

73 

74_SR = TypeVar("_SR", bound=roles.SQLRole) 

75_F = TypeVar("_F", bound=Callable[..., Any]) 

76_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

77_T = TypeVar("_T", bound=Any) 

78 

79 

80def _is_literal(element: Any) -> bool: 

81 """Return whether or not the element is a "literal" in the context 

82 of a SQL expression construct. 

83 

84 """ 

85 

86 return not isinstance( 

87 element, 

88 (Visitable, schema.SchemaEventTarget), 

89 ) and not hasattr(element, "__clause_element__") 

90 

91 

92def _deep_is_literal(element): 

93 """Return whether or not the element is a "literal" in the context 

94 of a SQL expression construct. 

95 

96 does a deeper more esoteric check than _is_literal. is used 

97 for lambda elements that have to distinguish values that would 

98 be bound vs. not without any context. 

99 

100 """ 

101 

102 if isinstance(element, collections_abc.Sequence) and not isinstance( 

103 element, str 

104 ): 

105 for elem in element: 

106 if not _deep_is_literal(elem): 

107 return False 

108 else: 

109 return True 

110 

111 return ( 

112 not isinstance( 

113 element, 

114 ( 

115 Visitable, 

116 schema.SchemaEventTarget, 

117 HasCacheKey, 

118 Options, 

119 util.langhelpers.symbol, 

120 ), 

121 ) 

122 and not hasattr(element, "__clause_element__") 

123 and ( 

124 not isinstance(element, type) 

125 or not issubclass(element, HasCacheKey) 

126 ) 

127 ) 

128 

129 

130def _document_text_coercion( 

131 paramname: str, meth_rst: str, param_rst: str 

132) -> Callable[[_F], _F]: 

133 return util.add_parameter_text( 

134 paramname, 

135 ( 

136 ".. warning:: " 

137 "The %s argument to %s can be passed as a Python string argument, " 

138 "which will be treated " 

139 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

140 "UNTRUSTED INPUT TO THIS PARAMETER**." 

141 ) 

142 % (param_rst, meth_rst), 

143 ) 

144 

145 

146def _expression_collection_was_a_list( 

147 attrname: str, 

148 fnname: str, 

149 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

150) -> Sequence[_T]: 

151 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

152 if isinstance(args[0], list): 

153 raise exc.ArgumentError( 

154 f'The "{attrname}" argument to {fnname}(), when ' 

155 "referring to a sequence " 

156 "of items, is now passed as a series of positional " 

157 "elements, rather than as a list. " 

158 ) 

159 return cast("Sequence[_T]", args[0]) 

160 

161 return cast("Sequence[_T]", args) 

162 

163 

164@overload 

165def expect( 

166 role: Type[roles.TruncatedLabelRole], 

167 element: Any, 

168 **kw: Any, 

169) -> str: ... 

170 

171 

172@overload 

173def expect( 

174 role: Type[roles.DMLColumnRole], 

175 element: Any, 

176 *, 

177 as_key: Literal[True] = ..., 

178 **kw: Any, 

179) -> str: ... 

180 

181 

182@overload 

183def expect( 

184 role: Type[roles.LiteralValueRole], 

185 element: Any, 

186 **kw: Any, 

187) -> BindParameter[Any]: ... 

188 

189 

190@overload 

191def expect( 

192 role: Type[roles.DDLReferredColumnRole], 

193 element: Any, 

194 **kw: Any, 

195) -> Union[Column[Any], str]: ... 

196 

197 

198@overload 

199def expect( 

200 role: Type[roles.DDLConstraintColumnRole], 

201 element: Any, 

202 **kw: Any, 

203) -> Union[Column[Any], str]: ... 

204 

205 

206@overload 

207def expect( 

208 role: Type[roles.StatementOptionRole], 

209 element: Any, 

210 **kw: Any, 

211) -> Union[ColumnElement[Any], TextClause]: ... 

212 

213 

214@overload 

215def expect( 

216 role: Type[roles.SyntaxExtensionRole], 

217 element: Any, 

218 **kw: Any, 

219) -> SyntaxExtension: ... 

220 

221 

222@overload 

223def expect( 

224 role: Type[roles.LabeledColumnExprRole[Any]], 

225 element: _ColumnExpressionArgument[_T] | _OnlyColumnArgument[_T], 

226 **kw: Any, 

227) -> NamedColumn[_T]: ... 

228 

229 

230@overload 

231def expect( 

232 role: Union[ 

233 Type[roles.ExpressionElementRole[Any]], 

234 Type[roles.LimitOffsetRole], 

235 Type[roles.WhereHavingRole], 

236 ], 

237 element: _ColumnExpressionArgument[_T], 

238 **kw: Any, 

239) -> ColumnElement[_T]: ... 

240 

241 

242@overload 

243def expect( 

244 role: Union[ 

245 Type[roles.ExpressionElementRole[Any]], 

246 Type[roles.LimitOffsetRole], 

247 Type[roles.WhereHavingRole], 

248 Type[roles.OnClauseRole], 

249 Type[roles.ColumnArgumentRole], 

250 ], 

251 element: Any, 

252 **kw: Any, 

253) -> ColumnElement[Any]: ... 

254 

255 

256@overload 

257def expect( 

258 role: Type[roles.DMLTableRole], 

259 element: _DMLTableArgument, 

260 **kw: Any, 

261) -> _DMLTableElement: ... 

262 

263 

264@overload 

265def expect( 

266 role: Type[roles.HasCTERole], 

267 element: HasCTE, 

268 **kw: Any, 

269) -> HasCTE: ... 

270 

271 

272@overload 

273def expect( 

274 role: Type[roles.SelectStatementRole], 

275 element: SelectBase, 

276 **kw: Any, 

277) -> SelectBase: ... 

278 

279 

280@overload 

281def expect( 

282 role: Type[roles.FromClauseRole], 

283 element: _FromClauseArgument, 

284 **kw: Any, 

285) -> FromClause: ... 

286 

287 

288@overload 

289def expect( 

290 role: Type[roles.FromClauseRole], 

291 element: SelectBase, 

292 *, 

293 explicit_subquery: Literal[True] = ..., 

294 **kw: Any, 

295) -> Subquery: ... 

296 

297 

298@overload 

299def expect( 

300 role: Type[roles.ColumnsClauseRole], 

301 element: _ColumnsClauseArgument[Any], 

302 **kw: Any, 

303) -> _ColumnsClauseElement: ... 

304 

305 

306@overload 

307def expect( 

308 role: Type[roles.JoinTargetRole], 

309 element: _JoinTargetProtocol, 

310 **kw: Any, 

311) -> _JoinTargetProtocol: ... 

312 

313 

314# catchall for not-yet-implemented overloads 

315@overload 

316def expect( 

317 role: Type[_SR], 

318 element: Any, 

319 **kw: Any, 

320) -> Any: ... 

321 

322 

323def expect( 

324 role: Type[_SR], 

325 element: Any, 

326 *, 

327 apply_propagate_attrs: Optional[ClauseElement] = None, 

328 argname: Optional[str] = None, 

329 post_inspect: bool = False, 

330 disable_inspection: bool = False, 

331 **kw: Any, 

332) -> Any: 

333 if ( 

334 role.allows_lambda 

335 # note callable() will not invoke a __getattr__() method, whereas 

336 # hasattr(obj, "__call__") will. by keeping the callable() check here 

337 # we prevent most needless calls to hasattr() and therefore 

338 # __getattr__(), which is present on ColumnElement. 

339 and callable(element) 

340 and hasattr(element, "__code__") 

341 ): 

342 return lambdas.LambdaElement( 

343 element, 

344 role, 

345 lambdas.LambdaOptions(**kw), 

346 apply_propagate_attrs=apply_propagate_attrs, 

347 ) 

348 

349 # major case is that we are given a ClauseElement already, skip more 

350 # elaborate logic up front if possible 

351 impl = _impl_lookup[role] 

352 

353 original_element = element 

354 

355 if not isinstance( 

356 element, 

357 ( 

358 elements.CompilerElement, 

359 schema.SchemaItem, 

360 schema.FetchedValue, 

361 lambdas.PyWrapper, 

362 ), 

363 ): 

364 resolved = None 

365 

366 if impl._resolve_literal_only: 

367 resolved = impl._literal_coercion(element, **kw) 

368 else: 

369 original_element = element 

370 

371 is_clause_element = False 

372 

373 # this is a special performance optimization for ORM 

374 # joins used by JoinTargetImpl that we don't go through the 

375 # work of creating __clause_element__() when we only need the 

376 # original QueryableAttribute, as the former will do clause 

377 # adaption and all that which is just thrown away here. 

378 if ( 

379 impl._skip_clauseelement_for_target_match 

380 and isinstance(element, role) 

381 and hasattr(element, "__clause_element__") 

382 ): 

383 is_clause_element = True 

384 else: 

385 while hasattr(element, "__clause_element__"): 

386 is_clause_element = True 

387 

388 if not getattr(element, "is_clause_element", False): 

389 element = element.__clause_element__() 

390 else: 

391 break 

392 

393 if not is_clause_element: 

394 if impl._use_inspection and not disable_inspection: 

395 insp = inspection.inspect(element, raiseerr=False) 

396 if insp is not None: 

397 if post_inspect: 

398 insp._post_inspect 

399 try: 

400 resolved = insp.__clause_element__() 

401 except AttributeError: 

402 impl._raise_for_expected(original_element, argname) 

403 

404 if resolved is None: 

405 resolved = impl._literal_coercion( 

406 element, argname=argname, **kw 

407 ) 

408 else: 

409 resolved = element 

410 elif isinstance(element, lambdas.PyWrapper): 

411 resolved = element._sa__py_wrapper_literal(**kw) 

412 else: 

413 resolved = element 

414 

415 if apply_propagate_attrs is not None: 

416 if typing.TYPE_CHECKING: 

417 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

418 

419 if not apply_propagate_attrs._propagate_attrs and getattr( 

420 resolved, "_propagate_attrs", None 

421 ): 

422 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

423 

424 if impl._role_class in resolved.__class__.__mro__: 

425 if impl._post_coercion: 

426 resolved = impl._post_coercion( 

427 resolved, 

428 argname=argname, 

429 original_element=original_element, 

430 **kw, 

431 ) 

432 return resolved 

433 else: 

434 return impl._implicit_coercions( 

435 original_element, resolved, argname=argname, **kw 

436 ) 

437 

438 

439def expect_as_key( 

440 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

441) -> str: 

442 kw.pop("as_key", None) 

443 return expect(role, element, as_key=True, **kw) 

444 

445 

446def expect_col_expression_collection( 

447 role: Type[roles.DDLConstraintColumnRole], 

448 expressions: Iterable[_DDLColumnArgument], 

449) -> Iterator[ 

450 Tuple[ 

451 Union[str, Column[Any]], 

452 Optional[ColumnClause[Any]], 

453 Optional[str], 

454 Optional[Union[Column[Any], str]], 

455 ] 

456]: 

457 for expr in expressions: 

458 strname = None 

459 column = None 

460 

461 resolved: Union[Column[Any], str] = expect(role, expr) 

462 if isinstance(resolved, str): 

463 assert isinstance(expr, str) 

464 strname = resolved = expr 

465 else: 

466 cols: List[Column[Any]] = [] 

467 col_append: _TraverseCallableType[Column[Any]] = cols.append 

468 visitors.traverse(resolved, {}, {"column": col_append}) 

469 if cols: 

470 column = cols[0] 

471 add_element = column if column is not None else strname 

472 

473 yield resolved, column, strname, add_element 

474 

475 

476class RoleImpl: 

477 __slots__ = ("_role_class", "name", "_use_inspection") 

478 

479 def _literal_coercion(self, element, **kw): 

480 raise NotImplementedError() 

481 

482 _post_coercion: Any = None 

483 _resolve_literal_only = False 

484 _skip_clauseelement_for_target_match = False 

485 

486 def __init__(self, role_class): 

487 self._role_class = role_class 

488 self.name = role_class._role_name 

489 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

490 

491 def _implicit_coercions( 

492 self, 

493 element: Any, 

494 resolved: Any, 

495 argname: Optional[str] = None, 

496 **kw: Any, 

497 ) -> Any: 

498 self._raise_for_expected(element, argname, resolved) 

499 

500 def _raise_for_expected( 

501 self, 

502 element: Any, 

503 argname: Optional[str] = None, 

504 resolved: Optional[Any] = None, 

505 *, 

506 advice: Optional[str] = None, 

507 code: Optional[str] = None, 

508 err: Optional[Exception] = None, 

509 **kw: Any, 

510 ) -> NoReturn: 

511 if resolved is not None and resolved is not element: 

512 got = "%r object resolved from %r object" % (resolved, element) 

513 else: 

514 got = repr(element) 

515 

516 if argname: 

517 msg = "%s expected for argument %r; got %s." % ( 

518 self.name, 

519 argname, 

520 got, 

521 ) 

522 else: 

523 msg = "%s expected, got %s." % (self.name, got) 

524 

525 if advice: 

526 msg += " " + advice 

527 

528 raise exc.ArgumentError(msg, code=code) from err 

529 

530 

531class _Deannotate: 

532 __slots__ = () 

533 

534 def _post_coercion(self, resolved, **kw): 

535 from .util import _deep_deannotate 

536 

537 return _deep_deannotate(resolved) 

538 

539 

540class _StringOnly: 

541 __slots__ = () 

542 

543 _resolve_literal_only = True 

544 

545 

546class _ReturnsStringKey(RoleImpl): 

547 __slots__ = () 

548 

549 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

550 if isinstance(element, str): 

551 return element 

552 else: 

553 self._raise_for_expected(element, argname, resolved) 

554 

555 def _literal_coercion(self, element, **kw): 

556 return element 

557 

558 

559class _ColumnCoercions(RoleImpl): 

560 __slots__ = () 

561 

562 def _warn_for_scalar_subquery_coercion(self): 

563 util.warn( 

564 "implicitly coercing SELECT object to scalar subquery; " 

565 "please use the .scalar_subquery() method to produce a scalar " 

566 "subquery.", 

567 ) 

568 

569 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

570 original_element = element 

571 if not getattr(resolved, "is_clause_element", False): 

572 self._raise_for_expected(original_element, argname, resolved) 

573 elif resolved._is_select_base: 

574 self._warn_for_scalar_subquery_coercion() 

575 return resolved.scalar_subquery() 

576 elif resolved._is_from_clause and isinstance( 

577 resolved, selectable.Subquery 

578 ): 

579 self._warn_for_scalar_subquery_coercion() 

580 return resolved.element.scalar_subquery() 

581 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

582 return resolved 

583 else: 

584 self._raise_for_expected(original_element, argname, resolved) 

585 

586 

587def _no_text_coercion( 

588 element: Any, 

589 argname: Optional[str] = None, 

590 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

591 extra: Optional[str] = None, 

592 err: Optional[Exception] = None, 

593) -> NoReturn: 

594 raise exc_cls( 

595 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

596 "explicitly declared as text(%(expr)r)" 

597 % { 

598 "expr": util.ellipses_string(element), 

599 "argname": "for argument %s" % (argname,) if argname else "", 

600 "extra": "%s " % extra if extra else "", 

601 } 

602 ) from err 

603 

604 

605class _NoTextCoercion(RoleImpl): 

606 __slots__ = () 

607 

608 def _literal_coercion(self, element, *, argname=None, **kw): 

609 if isinstance(element, str) and issubclass( 

610 elements.TextClause, self._role_class 

611 ): 

612 _no_text_coercion(element, argname) 

613 else: 

614 self._raise_for_expected(element, argname) 

615 

616 

617class _CoerceLiterals(RoleImpl): 

618 __slots__ = () 

619 _coerce_consts = False 

620 _coerce_star = False 

621 _coerce_numerics = False 

622 

623 def _text_coercion(self, element, argname=None): 

624 return _no_text_coercion(element, argname) 

625 

626 def _literal_coercion(self, element, *, argname=None, **kw): 

627 if isinstance(element, str): 

628 if self._coerce_star and element == "*": 

629 return elements.ColumnClause("*", is_literal=True) 

630 else: 

631 return self._text_coercion(element, argname, **kw) 

632 

633 if self._coerce_consts: 

634 if element is None: 

635 return elements.Null() 

636 elif element is False: 

637 return elements.False_() 

638 elif element is True: 

639 return elements.True_() 

640 

641 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

642 return elements.ColumnClause(str(element), is_literal=True) 

643 

644 self._raise_for_expected(element, argname) 

645 

646 

647class LiteralValueImpl(RoleImpl): 

648 _resolve_literal_only = True 

649 

650 def _implicit_coercions( 

651 self, 

652 element, 

653 resolved, 

654 argname=None, 

655 *, 

656 type_=None, 

657 literal_execute=False, 

658 **kw, 

659 ): 

660 if not _is_literal(resolved): 

661 self._raise_for_expected( 

662 element, resolved=resolved, argname=argname, **kw 

663 ) 

664 

665 return elements.BindParameter( 

666 None, 

667 element, 

668 type_=type_, 

669 unique=True, 

670 literal_execute=literal_execute, 

671 ) 

672 

673 def _literal_coercion(self, element, **kw): 

674 return element 

675 

676 

677class _SelectIsNotFrom(RoleImpl): 

678 __slots__ = () 

679 

680 def _raise_for_expected( 

681 self, 

682 element: Any, 

683 argname: Optional[str] = None, 

684 resolved: Optional[Any] = None, 

685 *, 

686 advice: Optional[str] = None, 

687 code: Optional[str] = None, 

688 err: Optional[Exception] = None, 

689 **kw: Any, 

690 ) -> NoReturn: 

691 if ( 

692 not advice 

693 and isinstance(element, roles.SelectStatementRole) 

694 or isinstance(resolved, roles.SelectStatementRole) 

695 ): 

696 advice = ( 

697 "To create a " 

698 "FROM clause from a %s object, use the .subquery() method." 

699 % (resolved.__class__ if resolved is not None else element,) 

700 ) 

701 code = "89ve" 

702 else: 

703 code = None 

704 

705 super()._raise_for_expected( 

706 element, 

707 argname=argname, 

708 resolved=resolved, 

709 advice=advice, 

710 code=code, 

711 err=err, 

712 **kw, 

713 ) 

714 # never reached 

715 assert False 

716 

717 

718class HasCacheKeyImpl(RoleImpl): 

719 __slots__ = () 

720 

721 def _implicit_coercions( 

722 self, 

723 element: Any, 

724 resolved: Any, 

725 argname: Optional[str] = None, 

726 **kw: Any, 

727 ) -> Any: 

728 if isinstance(element, HasCacheKey): 

729 return element 

730 else: 

731 self._raise_for_expected(element, argname, resolved) 

732 

733 def _literal_coercion(self, element, **kw): 

734 return element 

735 

736 

737class ExecutableOptionImpl(RoleImpl): 

738 __slots__ = () 

739 

740 def _implicit_coercions( 

741 self, 

742 element: Any, 

743 resolved: Any, 

744 argname: Optional[str] = None, 

745 **kw: Any, 

746 ) -> Any: 

747 if isinstance(element, ExecutableOption): 

748 return element 

749 else: 

750 self._raise_for_expected(element, argname, resolved) 

751 

752 def _literal_coercion(self, element, **kw): 

753 return element 

754 

755 

756class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

757 __slots__ = () 

758 

759 def _literal_coercion( 

760 self, element, *, name=None, type_=None, is_crud=False, **kw 

761 ): 

762 if ( 

763 element is None 

764 and not is_crud 

765 and (type_ is None or not type_.should_evaluate_none) 

766 ): 

767 # TODO: there's no test coverage now for the 

768 # "should_evaluate_none" part of this, as outside of "crud" this 

769 # codepath is not normally used except in some special cases 

770 return elements.Null() 

771 else: 

772 try: 

773 return elements.BindParameter( 

774 name, element, type_, unique=True, _is_crud=is_crud 

775 ) 

776 except exc.ArgumentError as err: 

777 self._raise_for_expected(element, err=err) 

778 

779 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

780 # select uses implicit coercion with warning instead of raising 

781 if isinstance(element, selectable.Values): 

782 advice = ( 

783 "To create a column expression from a VALUES clause, " 

784 "use the .scalar_values() method." 

785 ) 

786 elif isinstance(element, roles.AnonymizedFromClauseRole): 

787 advice = ( 

788 "To create a column expression from a FROM clause row " 

789 "as a whole, use the .table_valued() method." 

790 ) 

791 else: 

792 advice = None 

793 

794 return super()._raise_for_expected( 

795 element, argname=argname, resolved=resolved, advice=advice, **kw 

796 ) 

797 

798 

799class TStringElementImpl(ExpressionElementImpl, RoleImpl): 

800 __slots__ = () 

801 

802 

803class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

804 __slots__ = () 

805 

806 def _literal_coercion( # type: ignore[override] 

807 self, 

808 element, 

809 *, 

810 expr, 

811 operator, 

812 bindparam_type=None, 

813 argname=None, 

814 **kw, 

815 ): 

816 try: 

817 return expr._bind_param(operator, element, type_=bindparam_type) 

818 except exc.ArgumentError as err: 

819 self._raise_for_expected(element, err=err) 

820 

821 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw): 

822 if resolved.type._isnull and not expr.type._isnull: 

823 resolved = resolved._with_binary_element_type( 

824 bindparam_type if bindparam_type is not None else expr.type 

825 ) 

826 return resolved 

827 

828 

829class InElementImpl(RoleImpl): 

830 __slots__ = () 

831 

832 def _implicit_coercions( 

833 self, 

834 element: Any, 

835 resolved: Any, 

836 argname: Optional[str] = None, 

837 **kw: Any, 

838 ) -> Any: 

839 if resolved._is_from_clause: 

840 if ( 

841 isinstance(resolved, selectable.Alias) 

842 and resolved.element._is_select_base 

843 ): 

844 self._warn_for_implicit_coercion(resolved) 

845 return self._post_coercion(resolved.element, **kw) 

846 else: 

847 self._warn_for_implicit_coercion(resolved) 

848 return self._post_coercion(resolved.select(), **kw) 

849 else: 

850 self._raise_for_expected(element, argname, resolved) 

851 

852 def _warn_for_implicit_coercion(self, elem): 

853 util.warn( 

854 "Coercing %s object into a select() for use in IN(); " 

855 "please pass a select() construct explicitly" 

856 % (elem.__class__.__name__) 

857 ) 

858 

859 @util.preload_module("sqlalchemy.sql.elements") 

860 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501 

861 if util.is_non_string_iterable(element): 

862 non_literal_expressions: Dict[ 

863 Optional[_ColumnExpressionArgument[Any]], 

864 _ColumnExpressionArgument[Any], 

865 ] = {} 

866 element = list(element) 

867 for o in element: 

868 if not _is_literal(o): 

869 if not isinstance( 

870 o, util.preloaded.sql_elements.ColumnElement 

871 ) and not hasattr(o, "__clause_element__"): 

872 self._raise_for_expected(element, **kw) 

873 

874 else: 

875 non_literal_expressions[o] = o 

876 

877 if non_literal_expressions: 

878 return elements.ClauseList( 

879 *[ 

880 ( 

881 non_literal_expressions[o] 

882 if o in non_literal_expressions 

883 else expr._bind_param(operator, o) 

884 ) 

885 for o in element 

886 ] 

887 ) 

888 else: 

889 return expr._bind_param(operator, element, expanding=True) 

890 

891 else: 

892 self._raise_for_expected(element, **kw) 

893 

894 def _post_coercion(self, element, *, expr, operator, **kw): 

895 if element._is_select_base: 

896 # for IN, we are doing scalar_subquery() coercion without 

897 # a warning 

898 return element.scalar_subquery() 

899 elif isinstance(element, elements.ClauseList): 

900 assert not len(element.clauses) == 0 

901 return element.self_group(against=operator) 

902 

903 elif isinstance(element, elements.BindParameter): 

904 element = element._clone(maintain_key=True) 

905 element.expanding = True 

906 element.expand_op = operator 

907 

908 return element 

909 elif isinstance(element, selectable.Values): 

910 return element.scalar_values() 

911 else: 

912 return element 

913 

914 

915class OnClauseImpl(_ColumnCoercions, RoleImpl): 

916 __slots__ = () 

917 

918 _coerce_consts = True 

919 

920 def _literal_coercion(self, element, **kw): 

921 self._raise_for_expected(element) 

922 

923 def _post_coercion(self, resolved, *, original_element=None, **kw): 

924 # this is a hack right now as we want to use coercion on an 

925 # ORM InstrumentedAttribute, but we want to return the object 

926 # itself if it is one, not its clause element. 

927 # ORM context _join and _legacy_join() would need to be improved 

928 # to look for annotations in a clause element form. 

929 if isinstance(original_element, roles.JoinTargetRole): 

930 return original_element 

931 return resolved 

932 

933 

934class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

935 __slots__ = () 

936 

937 _coerce_consts = True 

938 

939 def _text_coercion(self, element, argname=None): 

940 return _no_text_coercion(element, argname) 

941 

942 

943class SyntaxExtensionImpl(RoleImpl): 

944 __slots__ = () 

945 

946 

947class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

948 __slots__ = () 

949 

950 _coerce_consts = True 

951 

952 def _text_coercion(self, element, argname=None): 

953 return elements.TextClause(element) 

954 

955 

956class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

957 __slots__ = () 

958 

959 

960class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

961 __slots__ = () 

962 

963 

964class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

965 __slots__ = () 

966 

967 def _text_coercion(self, element, argname=None): 

968 return elements.ColumnClause(element) 

969 

970 

971class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

972 __slots__ = () 

973 

974 _coerce_consts = True 

975 

976 def _text_coercion(self, element, argname=None): 

977 return elements._textual_label_reference(element) 

978 

979 

980class OrderByImpl(ByOfImpl, RoleImpl): 

981 __slots__ = () 

982 

983 def _post_coercion(self, resolved, **kw): 

984 if ( 

985 isinstance(resolved, self._role_class) 

986 and resolved._order_by_label_element is not None 

987 ): 

988 return elements._label_reference(resolved) 

989 else: 

990 return resolved 

991 

992 

993class GroupByImpl(ByOfImpl, RoleImpl): 

994 __slots__ = () 

995 

996 def _implicit_coercions( 

997 self, 

998 element: Any, 

999 resolved: Any, 

1000 argname: Optional[str] = None, 

1001 **kw: Any, 

1002 ) -> Any: 

1003 if is_from_clause(resolved): 

1004 return elements.ClauseList(*resolved.c) 

1005 else: 

1006 return resolved 

1007 

1008 

1009class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

1010 __slots__ = () 

1011 

1012 def _post_coercion(self, element, *, as_key=False, **kw): 

1013 if as_key: 

1014 return element.key 

1015 else: 

1016 return element 

1017 

1018 

1019class ConstExprImpl(RoleImpl): 

1020 __slots__ = () 

1021 

1022 def _literal_coercion(self, element, *, argname=None, **kw): 

1023 if element is None: 

1024 return elements.Null() 

1025 elif element is False: 

1026 return elements.False_() 

1027 elif element is True: 

1028 return elements.True_() 

1029 else: 

1030 self._raise_for_expected(element, argname) 

1031 

1032 

1033class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1034 __slots__ = () 

1035 

1036 def _implicit_coercions( 

1037 self, 

1038 element: Any, 

1039 resolved: Any, 

1040 argname: Optional[str] = None, 

1041 **kw: Any, 

1042 ) -> Any: 

1043 if isinstance(element, str): 

1044 return resolved 

1045 else: 

1046 self._raise_for_expected(element, argname, resolved) 

1047 

1048 def _literal_coercion(self, element, **kw): 

1049 """coerce the given value to :class:`._truncated_label`. 

1050 

1051 Existing :class:`._truncated_label` and 

1052 :class:`._anonymous_label` objects are passed 

1053 unchanged. 

1054 """ 

1055 

1056 if isinstance(element, elements._truncated_label): 

1057 return element 

1058 else: 

1059 return elements._truncated_label(element) 

1060 

1061 

1062class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1063 __slots__ = () 

1064 

1065 _coerce_consts = True 

1066 

1067 def _text_coercion(self, element, argname=None): 

1068 # see #5754 for why we can't easily deprecate this coercion. 

1069 # essentially expressions like postgresql_where would have to be 

1070 # text() as they come back from reflection and we don't want to 

1071 # have text() elements wired into the inspection dictionaries. 

1072 return elements.TextClause(element) 

1073 

1074 

1075class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1076 __slots__ = () 

1077 

1078 

1079class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1080 __slots__ = () 

1081 

1082 

1083class LimitOffsetImpl(RoleImpl): 

1084 __slots__ = () 

1085 

1086 def _implicit_coercions( 

1087 self, 

1088 element: Any, 

1089 resolved: Any, 

1090 argname: Optional[str] = None, 

1091 **kw: Any, 

1092 ) -> Any: 

1093 if resolved is None: 

1094 return None 

1095 else: 

1096 self._raise_for_expected(element, argname, resolved) 

1097 

1098 def _literal_coercion( # type: ignore[override] 

1099 self, element, *, name, type_, **kw 

1100 ): 

1101 if element is None: 

1102 return None 

1103 else: 

1104 value = util.asint(element) 

1105 return selectable._OffsetLimitParam( 

1106 name, value, type_=type_, unique=True 

1107 ) 

1108 

1109 

1110class LabeledColumnExprImpl(ExpressionElementImpl): 

1111 __slots__ = () 

1112 

1113 def _implicit_coercions( 

1114 self, 

1115 element: Any, 

1116 resolved: Any, 

1117 argname: Optional[str] = None, 

1118 **kw: Any, 

1119 ) -> Any: 

1120 if isinstance(resolved, roles.ExpressionElementRole): 

1121 return resolved.label(None) 

1122 else: 

1123 new = super()._implicit_coercions( 

1124 element, resolved, argname=argname, **kw 

1125 ) 

1126 if isinstance(new, roles.ExpressionElementRole): 

1127 return new.label(None) 

1128 else: 

1129 self._raise_for_expected(element, argname, resolved) 

1130 

1131 

1132class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1133 __slots__ = () 

1134 

1135 _coerce_consts = True 

1136 _coerce_numerics = True 

1137 _coerce_star = True 

1138 

1139 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1140 

1141 def _raise_for_expected( 

1142 self, element, argname=None, resolved=None, *, advice=None, **kw 

1143 ): 

1144 if not advice and isinstance(element, list): 

1145 advice = ( 

1146 f"Did you mean to say select(" 

1147 f"{', '.join(repr(e) for e in element)})?" 

1148 ) 

1149 

1150 return super()._raise_for_expected( 

1151 element, argname=argname, resolved=resolved, advice=advice, **kw 

1152 ) 

1153 

1154 def _text_coercion(self, element, argname=None): 

1155 element = str(element) 

1156 

1157 guess_is_literal = not self._guess_straight_column.match(element) 

1158 raise exc.ArgumentError( 

1159 "Textual column expression %(column)r %(argname)sshould be " 

1160 "explicitly declared with text(%(column)r), " 

1161 "or use %(literal_column)s(%(column)r) " 

1162 "for more specificity" 

1163 % { 

1164 "column": util.ellipses_string(element), 

1165 "argname": "for argument %s" % (argname,) if argname else "", 

1166 "literal_column": ( 

1167 "literal_column" if guess_is_literal else "column" 

1168 ), 

1169 } 

1170 ) 

1171 

1172 

1173class ReturnsRowsImpl(RoleImpl): 

1174 __slots__ = () 

1175 

1176 

1177class StatementImpl(_CoerceLiterals, RoleImpl): 

1178 __slots__ = () 

1179 

1180 def _post_coercion( 

1181 self, resolved, *, original_element, argname=None, **kw 

1182 ): 

1183 if resolved is not original_element and not isinstance( 

1184 original_element, str 

1185 ): 

1186 # use same method as Connection uses 

1187 try: 

1188 original_element._execute_on_connection 

1189 except AttributeError as err: 

1190 raise exc.ObjectNotExecutableError(original_element) from err 

1191 

1192 return resolved 

1193 

1194 def _implicit_coercions( 

1195 self, 

1196 element: Any, 

1197 resolved: Any, 

1198 argname: Optional[str] = None, 

1199 **kw: Any, 

1200 ) -> Any: 

1201 if resolved._is_lambda_element: 

1202 return resolved 

1203 else: 

1204 return super()._implicit_coercions( 

1205 element, resolved, argname=argname, **kw 

1206 ) 

1207 

1208 

1209class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1210 __slots__ = () 

1211 

1212 def _implicit_coercions( 

1213 self, 

1214 element: Any, 

1215 resolved: Any, 

1216 argname: Optional[str] = None, 

1217 **kw: Any, 

1218 ) -> Any: 

1219 if resolved._is_text_clause: 

1220 return resolved.columns() 

1221 else: 

1222 self._raise_for_expected(element, argname, resolved) 

1223 

1224 

1225class HasCTEImpl(ReturnsRowsImpl): 

1226 __slots__ = () 

1227 

1228 

1229class IsCTEImpl(RoleImpl): 

1230 __slots__ = () 

1231 

1232 

1233class JoinTargetImpl(RoleImpl): 

1234 __slots__ = () 

1235 

1236 _skip_clauseelement_for_target_match = True 

1237 

1238 def _literal_coercion(self, element, *, argname=None, **kw): 

1239 self._raise_for_expected(element, argname) 

1240 

1241 def _implicit_coercions( 

1242 self, 

1243 element: Any, 

1244 resolved: Any, 

1245 argname: Optional[str] = None, 

1246 *, 

1247 legacy: bool = False, 

1248 **kw: Any, 

1249 ) -> Any: 

1250 if isinstance(element, roles.JoinTargetRole): 

1251 # note that this codepath no longer occurs as of 

1252 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1253 # were set to False. 

1254 return element 

1255 elif legacy and resolved._is_select_base: 

1256 util.warn_deprecated( 

1257 "Implicit coercion of SELECT and textual SELECT " 

1258 "constructs into FROM clauses is deprecated; please call " 

1259 ".subquery() on any Core select or ORM Query object in " 

1260 "order to produce a subquery object.", 

1261 version="1.4", 

1262 ) 

1263 # TODO: doing _implicit_subquery here causes tests to fail, 

1264 # how was this working before? probably that ORM 

1265 # join logic treated it as a select and subquery would happen 

1266 # in _ORMJoin->Join 

1267 return resolved 

1268 else: 

1269 self._raise_for_expected(element, argname, resolved) 

1270 

1271 

1272class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1273 __slots__ = () 

1274 

1275 def _implicit_coercions( 

1276 self, 

1277 element: Any, 

1278 resolved: Any, 

1279 argname: Optional[str] = None, 

1280 *, 

1281 explicit_subquery: bool = False, 

1282 **kw: Any, 

1283 ) -> Any: 

1284 if resolved._is_select_base and explicit_subquery: 

1285 return resolved.subquery() 

1286 

1287 self._raise_for_expected(element, argname, resolved) 

1288 

1289 def _post_coercion(self, element, *, deannotate=False, **kw): 

1290 if deannotate: 

1291 return element._deannotate() 

1292 else: 

1293 return element 

1294 

1295 

1296class AnonymizedFromClauseImpl(FromClauseImpl): 

1297 __slots__ = () 

1298 

1299 def _post_coercion(self, element, *, flat=False, name=None, **kw): 

1300 assert name is None 

1301 

1302 return element._anonymous_fromclause(flat=flat) 

1303 

1304 

1305class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1306 __slots__ = () 

1307 

1308 def _post_coercion(self, element, **kw): 

1309 if "dml_table" in element._annotations: 

1310 return element._annotations["dml_table"] 

1311 else: 

1312 return element 

1313 

1314 

1315class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1316 __slots__ = () 

1317 

1318 def _implicit_coercions( 

1319 self, 

1320 element: Any, 

1321 resolved: Any, 

1322 argname: Optional[str] = None, 

1323 **kw: Any, 

1324 ) -> Any: 

1325 if resolved._is_from_clause: 

1326 if ( 

1327 isinstance(resolved, selectable.Alias) 

1328 and resolved.element._is_select_base 

1329 ): 

1330 return resolved.element 

1331 else: 

1332 return resolved.select() 

1333 else: 

1334 self._raise_for_expected(element, argname, resolved) 

1335 

1336 

1337class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1338 __slots__ = () 

1339 

1340 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1341 if isinstance(element, roles.FromClauseRole): 

1342 if element._is_subquery: 

1343 advice = ( 

1344 "Use the plain select() object without " 

1345 "calling .subquery() or .alias()." 

1346 ) 

1347 else: 

1348 advice = ( 

1349 "To SELECT from any FROM clause, use the .select() method." 

1350 ) 

1351 else: 

1352 advice = None 

1353 return super()._raise_for_expected( 

1354 element, argname=argname, resolved=resolved, advice=advice, **kw 

1355 ) 

1356 

1357 

1358_impl_lookup = {} 

1359 

1360 

1361for name in dir(roles): 

1362 cls = getattr(roles, name) 

1363 if name.endswith("Role"): 

1364 name = name.replace("Role", "Impl") 

1365 if name in globals(): 

1366 impl = globals()[name](cls) 

1367 _impl_lookup[cls] = impl 

1368 

1369if not TYPE_CHECKING: 

1370 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1371 

1372 for py_type in (int, bool, str, float): 

1373 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl