Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

567 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Sequence 

26from typing import Tuple 

27from typing import Type 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import roles 

33from . import visitors 

34from ._typing import is_from_clause 

35from .base import ExecutableOption 

36from .base import Options 

37from .cache_key import HasCacheKey 

38from .visitors import Visitable 

39from .. import exc 

40from .. import inspection 

41from .. import util 

42from ..util.typing import Literal 

43 

44if typing.TYPE_CHECKING: 

45 # elements lambdas schema selectable are set by __init__ 

46 from . import elements 

47 from . import lambdas 

48 from . import schema 

49 from . import selectable 

50 from ._typing import _ColumnExpressionArgument 

51 from ._typing import _ColumnsClauseArgument 

52 from ._typing import _DDLColumnArgument 

53 from ._typing import _DMLTableArgument 

54 from ._typing import _FromClauseArgument 

55 from .base import SyntaxExtension 

56 from .dml import _DMLTableElement 

57 from .elements import BindParameter 

58 from .elements import ClauseElement 

59 from .elements import ColumnClause 

60 from .elements import ColumnElement 

61 from .elements import NamedColumn 

62 from .elements import SQLCoreOperations 

63 from .elements import TextClause 

64 from .schema import Column 

65 from .selectable import _ColumnsClauseElement 

66 from .selectable import _JoinTargetProtocol 

67 from .selectable import FromClause 

68 from .selectable import HasCTE 

69 from .selectable import SelectBase 

70 from .selectable import Subquery 

71 from .visitors import _TraverseCallableType 

72 

73_SR = TypeVar("_SR", bound=roles.SQLRole) 

74_F = TypeVar("_F", bound=Callable[..., Any]) 

75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

76_T = TypeVar("_T", bound=Any) 

77 

78 

79def _is_literal(element: Any) -> bool: 

80 """Return whether or not the element is a "literal" in the context 

81 of a SQL expression construct. 

82 

83 """ 

84 

85 return not isinstance( 

86 element, 

87 (Visitable, schema.SchemaEventTarget), 

88 ) and not hasattr(element, "__clause_element__") 

89 

90 

91def _deep_is_literal(element): 

92 """Return whether or not the element is a "literal" in the context 

93 of a SQL expression construct. 

94 

95 does a deeper more esoteric check than _is_literal. is used 

96 for lambda elements that have to distinguish values that would 

97 be bound vs. not without any context. 

98 

99 """ 

100 

101 if isinstance(element, collections_abc.Sequence) and not isinstance( 

102 element, str 

103 ): 

104 for elem in element: 

105 if not _deep_is_literal(elem): 

106 return False 

107 else: 

108 return True 

109 

110 return ( 

111 not isinstance( 

112 element, 

113 ( 

114 Visitable, 

115 schema.SchemaEventTarget, 

116 HasCacheKey, 

117 Options, 

118 util.langhelpers.symbol, 

119 ), 

120 ) 

121 and not hasattr(element, "__clause_element__") 

122 and ( 

123 not isinstance(element, type) 

124 or not issubclass(element, HasCacheKey) 

125 ) 

126 ) 

127 

128 

129def _document_text_coercion( 

130 paramname: str, meth_rst: str, param_rst: str 

131) -> Callable[[_F], _F]: 

132 return util.add_parameter_text( 

133 paramname, 

134 ( 

135 ".. warning:: " 

136 "The %s argument to %s can be passed as a Python string argument, " 

137 "which will be treated " 

138 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

139 "UNTRUSTED INPUT TO THIS PARAMETER**." 

140 ) 

141 % (param_rst, meth_rst), 

142 ) 

143 

144 

145def _expression_collection_was_a_list( 

146 attrname: str, 

147 fnname: str, 

148 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

149) -> Sequence[_T]: 

150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

151 if isinstance(args[0], list): 

152 raise exc.ArgumentError( 

153 f'The "{attrname}" argument to {fnname}(), when ' 

154 "referring to a sequence " 

155 "of items, is now passed as a series of positional " 

156 "elements, rather than as a list. " 

157 ) 

158 return cast("Sequence[_T]", args[0]) 

159 

160 return cast("Sequence[_T]", args) 

161 

162 

163@overload 

164def expect( 

165 role: Type[roles.TruncatedLabelRole], 

166 element: Any, 

167 **kw: Any, 

168) -> str: ... 

169 

170 

171@overload 

172def expect( 

173 role: Type[roles.DMLColumnRole], 

174 element: Any, 

175 *, 

176 as_key: Literal[True] = ..., 

177 **kw: Any, 

178) -> str: ... 

179 

180 

181@overload 

182def expect( 

183 role: Type[roles.LiteralValueRole], 

184 element: Any, 

185 **kw: Any, 

186) -> BindParameter[Any]: ... 

187 

188 

189@overload 

190def expect( 

191 role: Type[roles.DDLReferredColumnRole], 

192 element: Any, 

193 **kw: Any, 

194) -> Union[Column[Any], str]: ... 

195 

196 

197@overload 

198def expect( 

199 role: Type[roles.DDLConstraintColumnRole], 

200 element: Any, 

201 **kw: Any, 

202) -> Union[Column[Any], str]: ... 

203 

204 

205@overload 

206def expect( 

207 role: Type[roles.StatementOptionRole], 

208 element: Any, 

209 **kw: Any, 

210) -> Union[ColumnElement[Any], TextClause]: ... 

211 

212 

213@overload 

214def expect( 

215 role: Type[roles.SyntaxExtensionRole], 

216 element: Any, 

217 **kw: Any, 

218) -> SyntaxExtension: ... 

219 

220 

221@overload 

222def expect( 

223 role: Type[roles.LabeledColumnExprRole[Any]], 

224 element: _ColumnExpressionArgument[_T], 

225 **kw: Any, 

226) -> NamedColumn[_T]: ... 

227 

228 

229@overload 

230def expect( 

231 role: Union[ 

232 Type[roles.ExpressionElementRole[Any]], 

233 Type[roles.LimitOffsetRole], 

234 Type[roles.WhereHavingRole], 

235 ], 

236 element: _ColumnExpressionArgument[_T], 

237 **kw: Any, 

238) -> ColumnElement[_T]: ... 

239 

240 

241@overload 

242def expect( 

243 role: Union[ 

244 Type[roles.ExpressionElementRole[Any]], 

245 Type[roles.LimitOffsetRole], 

246 Type[roles.WhereHavingRole], 

247 Type[roles.OnClauseRole], 

248 Type[roles.ColumnArgumentRole], 

249 ], 

250 element: Any, 

251 **kw: Any, 

252) -> ColumnElement[Any]: ... 

253 

254 

255@overload 

256def expect( 

257 role: Type[roles.DMLTableRole], 

258 element: _DMLTableArgument, 

259 **kw: Any, 

260) -> _DMLTableElement: ... 

261 

262 

263@overload 

264def expect( 

265 role: Type[roles.HasCTERole], 

266 element: HasCTE, 

267 **kw: Any, 

268) -> HasCTE: ... 

269 

270 

271@overload 

272def expect( 

273 role: Type[roles.SelectStatementRole], 

274 element: SelectBase, 

275 **kw: Any, 

276) -> SelectBase: ... 

277 

278 

279@overload 

280def expect( 

281 role: Type[roles.FromClauseRole], 

282 element: _FromClauseArgument, 

283 **kw: Any, 

284) -> FromClause: ... 

285 

286 

287@overload 

288def expect( 

289 role: Type[roles.FromClauseRole], 

290 element: SelectBase, 

291 *, 

292 explicit_subquery: Literal[True] = ..., 

293 **kw: Any, 

294) -> Subquery: ... 

295 

296 

297@overload 

298def expect( 

299 role: Type[roles.ColumnsClauseRole], 

300 element: _ColumnsClauseArgument[Any], 

301 **kw: Any, 

302) -> _ColumnsClauseElement: ... 

303 

304 

305@overload 

306def expect( 

307 role: Type[roles.JoinTargetRole], 

308 element: _JoinTargetProtocol, 

309 **kw: Any, 

310) -> _JoinTargetProtocol: ... 

311 

312 

313# catchall for not-yet-implemented overloads 

314@overload 

315def expect( 

316 role: Type[_SR], 

317 element: Any, 

318 **kw: Any, 

319) -> Any: ... 

320 

321 

322def expect( 

323 role: Type[_SR], 

324 element: Any, 

325 *, 

326 apply_propagate_attrs: Optional[ClauseElement] = None, 

327 argname: Optional[str] = None, 

328 post_inspect: bool = False, 

329 disable_inspection: bool = False, 

330 **kw: Any, 

331) -> Any: 

332 if ( 

333 role.allows_lambda 

334 # note callable() will not invoke a __getattr__() method, whereas 

335 # hasattr(obj, "__call__") will. by keeping the callable() check here 

336 # we prevent most needless calls to hasattr() and therefore 

337 # __getattr__(), which is present on ColumnElement. 

338 and callable(element) 

339 and hasattr(element, "__code__") 

340 ): 

341 return lambdas.LambdaElement( 

342 element, 

343 role, 

344 lambdas.LambdaOptions(**kw), 

345 apply_propagate_attrs=apply_propagate_attrs, 

346 ) 

347 

348 # major case is that we are given a ClauseElement already, skip more 

349 # elaborate logic up front if possible 

350 impl = _impl_lookup[role] 

351 

352 original_element = element 

353 

354 if not isinstance( 

355 element, 

356 ( 

357 elements.CompilerElement, 

358 schema.SchemaItem, 

359 schema.FetchedValue, 

360 lambdas.PyWrapper, 

361 ), 

362 ): 

363 resolved = None 

364 

365 if impl._resolve_literal_only: 

366 resolved = impl._literal_coercion(element, **kw) 

367 else: 

368 original_element = element 

369 

370 is_clause_element = False 

371 

372 # this is a special performance optimization for ORM 

373 # joins used by JoinTargetImpl that we don't go through the 

374 # work of creating __clause_element__() when we only need the 

375 # original QueryableAttribute, as the former will do clause 

376 # adaption and all that which is just thrown away here. 

377 if ( 

378 impl._skip_clauseelement_for_target_match 

379 and isinstance(element, role) 

380 and hasattr(element, "__clause_element__") 

381 ): 

382 is_clause_element = True 

383 else: 

384 while hasattr(element, "__clause_element__"): 

385 is_clause_element = True 

386 

387 if not getattr(element, "is_clause_element", False): 

388 element = element.__clause_element__() 

389 else: 

390 break 

391 

392 if not is_clause_element: 

393 if impl._use_inspection and not disable_inspection: 

394 insp = inspection.inspect(element, raiseerr=False) 

395 if insp is not None: 

396 if post_inspect: 

397 insp._post_inspect 

398 try: 

399 resolved = insp.__clause_element__() 

400 except AttributeError: 

401 impl._raise_for_expected(original_element, argname) 

402 

403 if resolved is None: 

404 resolved = impl._literal_coercion( 

405 element, argname=argname, **kw 

406 ) 

407 else: 

408 resolved = element 

409 elif isinstance(element, lambdas.PyWrapper): 

410 resolved = element._sa__py_wrapper_literal(**kw) 

411 else: 

412 resolved = element 

413 

414 if apply_propagate_attrs is not None: 

415 if typing.TYPE_CHECKING: 

416 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

417 

418 if not apply_propagate_attrs._propagate_attrs and getattr( 

419 resolved, "_propagate_attrs", None 

420 ): 

421 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

422 

423 if impl._role_class in resolved.__class__.__mro__: 

424 if impl._post_coercion: 

425 resolved = impl._post_coercion( 

426 resolved, 

427 argname=argname, 

428 original_element=original_element, 

429 **kw, 

430 ) 

431 return resolved 

432 else: 

433 return impl._implicit_coercions( 

434 original_element, resolved, argname=argname, **kw 

435 ) 

436 

437 

438def expect_as_key( 

439 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

440) -> str: 

441 kw.pop("as_key", None) 

442 return expect(role, element, as_key=True, **kw) 

443 

444 

445def expect_col_expression_collection( 

446 role: Type[roles.DDLConstraintColumnRole], 

447 expressions: Iterable[_DDLColumnArgument], 

448) -> Iterator[ 

449 Tuple[ 

450 Union[str, Column[Any]], 

451 Optional[ColumnClause[Any]], 

452 Optional[str], 

453 Optional[Union[Column[Any], str]], 

454 ] 

455]: 

456 for expr in expressions: 

457 strname = None 

458 column = None 

459 

460 resolved: Union[Column[Any], str] = expect(role, expr) 

461 if isinstance(resolved, str): 

462 assert isinstance(expr, str) 

463 strname = resolved = expr 

464 else: 

465 cols: List[Column[Any]] = [] 

466 col_append: _TraverseCallableType[Column[Any]] = cols.append 

467 visitors.traverse(resolved, {}, {"column": col_append}) 

468 if cols: 

469 column = cols[0] 

470 add_element = column if column is not None else strname 

471 

472 yield resolved, column, strname, add_element 

473 

474 

475class RoleImpl: 

476 __slots__ = ("_role_class", "name", "_use_inspection") 

477 

478 def _literal_coercion(self, element, **kw): 

479 raise NotImplementedError() 

480 

481 _post_coercion: Any = None 

482 _resolve_literal_only = False 

483 _skip_clauseelement_for_target_match = False 

484 

485 def __init__(self, role_class): 

486 self._role_class = role_class 

487 self.name = role_class._role_name 

488 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

489 

490 def _implicit_coercions( 

491 self, 

492 element: Any, 

493 resolved: Any, 

494 argname: Optional[str] = None, 

495 **kw: Any, 

496 ) -> Any: 

497 self._raise_for_expected(element, argname, resolved) 

498 

499 def _raise_for_expected( 

500 self, 

501 element: Any, 

502 argname: Optional[str] = None, 

503 resolved: Optional[Any] = None, 

504 *, 

505 advice: Optional[str] = None, 

506 code: Optional[str] = None, 

507 err: Optional[Exception] = None, 

508 **kw: Any, 

509 ) -> NoReturn: 

510 if resolved is not None and resolved is not element: 

511 got = "%r object resolved from %r object" % (resolved, element) 

512 else: 

513 got = repr(element) 

514 

515 if argname: 

516 msg = "%s expected for argument %r; got %s." % ( 

517 self.name, 

518 argname, 

519 got, 

520 ) 

521 else: 

522 msg = "%s expected, got %s." % (self.name, got) 

523 

524 if advice: 

525 msg += " " + advice 

526 

527 raise exc.ArgumentError(msg, code=code) from err 

528 

529 

530class _Deannotate: 

531 __slots__ = () 

532 

533 def _post_coercion(self, resolved, **kw): 

534 from .util import _deep_deannotate 

535 

536 return _deep_deannotate(resolved) 

537 

538 

539class _StringOnly: 

540 __slots__ = () 

541 

542 _resolve_literal_only = True 

543 

544 

545class _ReturnsStringKey(RoleImpl): 

546 __slots__ = () 

547 

548 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

549 if isinstance(element, str): 

550 return element 

551 else: 

552 self._raise_for_expected(element, argname, resolved) 

553 

554 def _literal_coercion(self, element, **kw): 

555 return element 

556 

557 

558class _ColumnCoercions(RoleImpl): 

559 __slots__ = () 

560 

561 def _warn_for_scalar_subquery_coercion(self): 

562 util.warn( 

563 "implicitly coercing SELECT object to scalar subquery; " 

564 "please use the .scalar_subquery() method to produce a scalar " 

565 "subquery.", 

566 ) 

567 

568 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

569 original_element = element 

570 if not getattr(resolved, "is_clause_element", False): 

571 self._raise_for_expected(original_element, argname, resolved) 

572 elif resolved._is_select_base: 

573 self._warn_for_scalar_subquery_coercion() 

574 return resolved.scalar_subquery() 

575 elif resolved._is_from_clause and isinstance( 

576 resolved, selectable.Subquery 

577 ): 

578 self._warn_for_scalar_subquery_coercion() 

579 return resolved.element.scalar_subquery() 

580 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

581 return resolved 

582 else: 

583 self._raise_for_expected(original_element, argname, resolved) 

584 

585 

586def _no_text_coercion( 

587 element: Any, 

588 argname: Optional[str] = None, 

589 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

590 extra: Optional[str] = None, 

591 err: Optional[Exception] = None, 

592) -> NoReturn: 

593 raise exc_cls( 

594 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

595 "explicitly declared as text(%(expr)r)" 

596 % { 

597 "expr": util.ellipses_string(element), 

598 "argname": "for argument %s" % (argname,) if argname else "", 

599 "extra": "%s " % extra if extra else "", 

600 } 

601 ) from err 

602 

603 

604class _NoTextCoercion(RoleImpl): 

605 __slots__ = () 

606 

607 def _literal_coercion(self, element, *, argname=None, **kw): 

608 if isinstance(element, str) and issubclass( 

609 elements.TextClause, self._role_class 

610 ): 

611 _no_text_coercion(element, argname) 

612 else: 

613 self._raise_for_expected(element, argname) 

614 

615 

616class _CoerceLiterals(RoleImpl): 

617 __slots__ = () 

618 _coerce_consts = False 

619 _coerce_star = False 

620 _coerce_numerics = False 

621 

622 def _text_coercion(self, element, argname=None): 

623 return _no_text_coercion(element, argname) 

624 

625 def _literal_coercion(self, element, *, argname=None, **kw): 

626 if isinstance(element, str): 

627 if self._coerce_star and element == "*": 

628 return elements.ColumnClause("*", is_literal=True) 

629 else: 

630 return self._text_coercion(element, argname, **kw) 

631 

632 if self._coerce_consts: 

633 if element is None: 

634 return elements.Null() 

635 elif element is False: 

636 return elements.False_() 

637 elif element is True: 

638 return elements.True_() 

639 

640 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

641 return elements.ColumnClause(str(element), is_literal=True) 

642 

643 self._raise_for_expected(element, argname) 

644 

645 

646class LiteralValueImpl(RoleImpl): 

647 _resolve_literal_only = True 

648 

649 def _implicit_coercions( 

650 self, 

651 element, 

652 resolved, 

653 argname=None, 

654 *, 

655 type_=None, 

656 literal_execute=False, 

657 **kw, 

658 ): 

659 if not _is_literal(resolved): 

660 self._raise_for_expected( 

661 element, resolved=resolved, argname=argname, **kw 

662 ) 

663 

664 return elements.BindParameter( 

665 None, 

666 element, 

667 type_=type_, 

668 unique=True, 

669 literal_execute=literal_execute, 

670 ) 

671 

672 def _literal_coercion(self, element, **kw): 

673 return element 

674 

675 

676class _SelectIsNotFrom(RoleImpl): 

677 __slots__ = () 

678 

679 def _raise_for_expected( 

680 self, 

681 element: Any, 

682 argname: Optional[str] = None, 

683 resolved: Optional[Any] = None, 

684 *, 

685 advice: Optional[str] = None, 

686 code: Optional[str] = None, 

687 err: Optional[Exception] = None, 

688 **kw: Any, 

689 ) -> NoReturn: 

690 if ( 

691 not advice 

692 and isinstance(element, roles.SelectStatementRole) 

693 or isinstance(resolved, roles.SelectStatementRole) 

694 ): 

695 advice = ( 

696 "To create a " 

697 "FROM clause from a %s object, use the .subquery() method." 

698 % (resolved.__class__ if resolved is not None else element,) 

699 ) 

700 code = "89ve" 

701 else: 

702 code = None 

703 

704 super()._raise_for_expected( 

705 element, 

706 argname=argname, 

707 resolved=resolved, 

708 advice=advice, 

709 code=code, 

710 err=err, 

711 **kw, 

712 ) 

713 # never reached 

714 assert False 

715 

716 

717class HasCacheKeyImpl(RoleImpl): 

718 __slots__ = () 

719 

720 def _implicit_coercions( 

721 self, 

722 element: Any, 

723 resolved: Any, 

724 argname: Optional[str] = None, 

725 **kw: Any, 

726 ) -> Any: 

727 if isinstance(element, HasCacheKey): 

728 return element 

729 else: 

730 self._raise_for_expected(element, argname, resolved) 

731 

732 def _literal_coercion(self, element, **kw): 

733 return element 

734 

735 

736class ExecutableOptionImpl(RoleImpl): 

737 __slots__ = () 

738 

739 def _implicit_coercions( 

740 self, 

741 element: Any, 

742 resolved: Any, 

743 argname: Optional[str] = None, 

744 **kw: Any, 

745 ) -> Any: 

746 if isinstance(element, ExecutableOption): 

747 return element 

748 else: 

749 self._raise_for_expected(element, argname, resolved) 

750 

751 def _literal_coercion(self, element, **kw): 

752 return element 

753 

754 

755class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

756 __slots__ = () 

757 

758 def _literal_coercion( 

759 self, element, *, name=None, type_=None, is_crud=False, **kw 

760 ): 

761 if ( 

762 element is None 

763 and not is_crud 

764 and (type_ is None or not type_.should_evaluate_none) 

765 ): 

766 # TODO: there's no test coverage now for the 

767 # "should_evaluate_none" part of this, as outside of "crud" this 

768 # codepath is not normally used except in some special cases 

769 return elements.Null() 

770 else: 

771 try: 

772 return elements.BindParameter( 

773 name, element, type_, unique=True, _is_crud=is_crud 

774 ) 

775 except exc.ArgumentError as err: 

776 self._raise_for_expected(element, err=err) 

777 

778 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

779 # select uses implicit coercion with warning instead of raising 

780 if isinstance(element, selectable.Values): 

781 advice = ( 

782 "To create a column expression from a VALUES clause, " 

783 "use the .scalar_values() method." 

784 ) 

785 elif isinstance(element, roles.AnonymizedFromClauseRole): 

786 advice = ( 

787 "To create a column expression from a FROM clause row " 

788 "as a whole, use the .table_valued() method." 

789 ) 

790 else: 

791 advice = None 

792 

793 return super()._raise_for_expected( 

794 element, argname=argname, resolved=resolved, advice=advice, **kw 

795 ) 

796 

797 

798class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

799 __slots__ = () 

800 

801 def _literal_coercion( # type: ignore[override] 

802 self, 

803 element, 

804 *, 

805 expr, 

806 operator, 

807 bindparam_type=None, 

808 argname=None, 

809 **kw, 

810 ): 

811 try: 

812 return expr._bind_param(operator, element, type_=bindparam_type) 

813 except exc.ArgumentError as err: 

814 self._raise_for_expected(element, err=err) 

815 

816 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw): 

817 if resolved.type._isnull and not expr.type._isnull: 

818 resolved = resolved._with_binary_element_type( 

819 bindparam_type if bindparam_type is not None else expr.type 

820 ) 

821 return resolved 

822 

823 

824class InElementImpl(RoleImpl): 

825 __slots__ = () 

826 

827 def _implicit_coercions( 

828 self, 

829 element: Any, 

830 resolved: Any, 

831 argname: Optional[str] = None, 

832 **kw: Any, 

833 ) -> Any: 

834 if resolved._is_from_clause: 

835 if ( 

836 isinstance(resolved, selectable.Alias) 

837 and resolved.element._is_select_base 

838 ): 

839 self._warn_for_implicit_coercion(resolved) 

840 return self._post_coercion(resolved.element, **kw) 

841 else: 

842 self._warn_for_implicit_coercion(resolved) 

843 return self._post_coercion(resolved.select(), **kw) 

844 else: 

845 self._raise_for_expected(element, argname, resolved) 

846 

847 def _warn_for_implicit_coercion(self, elem): 

848 util.warn( 

849 "Coercing %s object into a select() for use in IN(); " 

850 "please pass a select() construct explicitly" 

851 % (elem.__class__.__name__) 

852 ) 

853 

854 @util.preload_module("sqlalchemy.sql.elements") 

855 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501 

856 if util.is_non_string_iterable(element): 

857 non_literal_expressions: Dict[ 

858 Optional[_ColumnExpressionArgument[Any]], 

859 _ColumnExpressionArgument[Any], 

860 ] = {} 

861 element = list(element) 

862 for o in element: 

863 if not _is_literal(o): 

864 if not isinstance( 

865 o, util.preloaded.sql_elements.ColumnElement 

866 ) and not hasattr(o, "__clause_element__"): 

867 self._raise_for_expected(element, **kw) 

868 

869 else: 

870 non_literal_expressions[o] = o 

871 

872 if non_literal_expressions: 

873 return elements.ClauseList( 

874 *[ 

875 ( 

876 non_literal_expressions[o] 

877 if o in non_literal_expressions 

878 else expr._bind_param(operator, o) 

879 ) 

880 for o in element 

881 ] 

882 ) 

883 else: 

884 return expr._bind_param(operator, element, expanding=True) 

885 

886 else: 

887 self._raise_for_expected(element, **kw) 

888 

889 def _post_coercion(self, element, *, expr, operator, **kw): 

890 if element._is_select_base: 

891 # for IN, we are doing scalar_subquery() coercion without 

892 # a warning 

893 return element.scalar_subquery() 

894 elif isinstance(element, elements.ClauseList): 

895 assert not len(element.clauses) == 0 

896 return element.self_group(against=operator) 

897 

898 elif isinstance(element, elements.BindParameter): 

899 element = element._clone(maintain_key=True) 

900 element.expanding = True 

901 element.expand_op = operator 

902 

903 return element 

904 elif isinstance(element, selectable.Values): 

905 return element.scalar_values() 

906 else: 

907 return element 

908 

909 

910class OnClauseImpl(_ColumnCoercions, RoleImpl): 

911 __slots__ = () 

912 

913 _coerce_consts = True 

914 

915 def _literal_coercion(self, element, **kw): 

916 self._raise_for_expected(element) 

917 

918 def _post_coercion(self, resolved, *, original_element=None, **kw): 

919 # this is a hack right now as we want to use coercion on an 

920 # ORM InstrumentedAttribute, but we want to return the object 

921 # itself if it is one, not its clause element. 

922 # ORM context _join and _legacy_join() would need to be improved 

923 # to look for annotations in a clause element form. 

924 if isinstance(original_element, roles.JoinTargetRole): 

925 return original_element 

926 return resolved 

927 

928 

929class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

930 __slots__ = () 

931 

932 _coerce_consts = True 

933 

934 def _text_coercion(self, element, argname=None): 

935 return _no_text_coercion(element, argname) 

936 

937 

938class SyntaxExtensionImpl(RoleImpl): 

939 __slots__ = () 

940 

941 

942class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

943 __slots__ = () 

944 

945 _coerce_consts = True 

946 

947 def _text_coercion(self, element, argname=None): 

948 return elements.TextClause(element) 

949 

950 

951class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

952 __slots__ = () 

953 

954 

955class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

956 __slots__ = () 

957 

958 

959class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

960 __slots__ = () 

961 

962 def _text_coercion(self, element, argname=None): 

963 return elements.ColumnClause(element) 

964 

965 

966class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

967 __slots__ = () 

968 

969 _coerce_consts = True 

970 

971 def _text_coercion(self, element, argname=None): 

972 return elements._textual_label_reference(element) 

973 

974 

975class OrderByImpl(ByOfImpl, RoleImpl): 

976 __slots__ = () 

977 

978 def _post_coercion(self, resolved, **kw): 

979 if ( 

980 isinstance(resolved, self._role_class) 

981 and resolved._order_by_label_element is not None 

982 ): 

983 return elements._label_reference(resolved) 

984 else: 

985 return resolved 

986 

987 

988class GroupByImpl(ByOfImpl, RoleImpl): 

989 __slots__ = () 

990 

991 def _implicit_coercions( 

992 self, 

993 element: Any, 

994 resolved: Any, 

995 argname: Optional[str] = None, 

996 **kw: Any, 

997 ) -> Any: 

998 if is_from_clause(resolved): 

999 return elements.ClauseList(*resolved.c) 

1000 else: 

1001 return resolved 

1002 

1003 

1004class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

1005 __slots__ = () 

1006 

1007 def _post_coercion(self, element, *, as_key=False, **kw): 

1008 if as_key: 

1009 return element.key 

1010 else: 

1011 return element 

1012 

1013 

1014class ConstExprImpl(RoleImpl): 

1015 __slots__ = () 

1016 

1017 def _literal_coercion(self, element, *, argname=None, **kw): 

1018 if element is None: 

1019 return elements.Null() 

1020 elif element is False: 

1021 return elements.False_() 

1022 elif element is True: 

1023 return elements.True_() 

1024 else: 

1025 self._raise_for_expected(element, argname) 

1026 

1027 

1028class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1029 __slots__ = () 

1030 

1031 def _implicit_coercions( 

1032 self, 

1033 element: Any, 

1034 resolved: Any, 

1035 argname: Optional[str] = None, 

1036 **kw: Any, 

1037 ) -> Any: 

1038 if isinstance(element, str): 

1039 return resolved 

1040 else: 

1041 self._raise_for_expected(element, argname, resolved) 

1042 

1043 def _literal_coercion(self, element, **kw): 

1044 """coerce the given value to :class:`._truncated_label`. 

1045 

1046 Existing :class:`._truncated_label` and 

1047 :class:`._anonymous_label` objects are passed 

1048 unchanged. 

1049 """ 

1050 

1051 if isinstance(element, elements._truncated_label): 

1052 return element 

1053 else: 

1054 return elements._truncated_label(element) 

1055 

1056 

1057class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1058 __slots__ = () 

1059 

1060 _coerce_consts = True 

1061 

1062 def _text_coercion(self, element, argname=None): 

1063 # see #5754 for why we can't easily deprecate this coercion. 

1064 # essentially expressions like postgresql_where would have to be 

1065 # text() as they come back from reflection and we don't want to 

1066 # have text() elements wired into the inspection dictionaries. 

1067 return elements.TextClause(element) 

1068 

1069 

1070class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1071 __slots__ = () 

1072 

1073 

1074class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1075 __slots__ = () 

1076 

1077 

1078class LimitOffsetImpl(RoleImpl): 

1079 __slots__ = () 

1080 

1081 def _implicit_coercions( 

1082 self, 

1083 element: Any, 

1084 resolved: Any, 

1085 argname: Optional[str] = None, 

1086 **kw: Any, 

1087 ) -> Any: 

1088 if resolved is None: 

1089 return None 

1090 else: 

1091 self._raise_for_expected(element, argname, resolved) 

1092 

1093 def _literal_coercion( # type: ignore[override] 

1094 self, element, *, name, type_, **kw 

1095 ): 

1096 if element is None: 

1097 return None 

1098 else: 

1099 value = util.asint(element) 

1100 return selectable._OffsetLimitParam( 

1101 name, value, type_=type_, unique=True 

1102 ) 

1103 

1104 

1105class LabeledColumnExprImpl(ExpressionElementImpl): 

1106 __slots__ = () 

1107 

1108 def _implicit_coercions( 

1109 self, 

1110 element: Any, 

1111 resolved: Any, 

1112 argname: Optional[str] = None, 

1113 **kw: Any, 

1114 ) -> Any: 

1115 if isinstance(resolved, roles.ExpressionElementRole): 

1116 return resolved.label(None) 

1117 else: 

1118 new = super()._implicit_coercions( 

1119 element, resolved, argname=argname, **kw 

1120 ) 

1121 if isinstance(new, roles.ExpressionElementRole): 

1122 return new.label(None) 

1123 else: 

1124 self._raise_for_expected(element, argname, resolved) 

1125 

1126 

1127class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1128 __slots__ = () 

1129 

1130 _coerce_consts = True 

1131 _coerce_numerics = True 

1132 _coerce_star = True 

1133 

1134 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1135 

1136 def _raise_for_expected( 

1137 self, element, argname=None, resolved=None, *, advice=None, **kw 

1138 ): 

1139 if not advice and isinstance(element, list): 

1140 advice = ( 

1141 f"Did you mean to say select(" 

1142 f"{', '.join(repr(e) for e in element)})?" 

1143 ) 

1144 

1145 return super()._raise_for_expected( 

1146 element, argname=argname, resolved=resolved, advice=advice, **kw 

1147 ) 

1148 

1149 def _text_coercion(self, element, argname=None): 

1150 element = str(element) 

1151 

1152 guess_is_literal = not self._guess_straight_column.match(element) 

1153 raise exc.ArgumentError( 

1154 "Textual column expression %(column)r %(argname)sshould be " 

1155 "explicitly declared with text(%(column)r), " 

1156 "or use %(literal_column)s(%(column)r) " 

1157 "for more specificity" 

1158 % { 

1159 "column": util.ellipses_string(element), 

1160 "argname": "for argument %s" % (argname,) if argname else "", 

1161 "literal_column": ( 

1162 "literal_column" if guess_is_literal else "column" 

1163 ), 

1164 } 

1165 ) 

1166 

1167 

1168class ReturnsRowsImpl(RoleImpl): 

1169 __slots__ = () 

1170 

1171 

1172class StatementImpl(_CoerceLiterals, RoleImpl): 

1173 __slots__ = () 

1174 

1175 def _post_coercion( 

1176 self, resolved, *, original_element, argname=None, **kw 

1177 ): 

1178 if resolved is not original_element and not isinstance( 

1179 original_element, str 

1180 ): 

1181 # use same method as Connection uses 

1182 try: 

1183 original_element._execute_on_connection 

1184 except AttributeError as err: 

1185 raise exc.ObjectNotExecutableError(original_element) from err 

1186 

1187 return resolved 

1188 

1189 def _implicit_coercions( 

1190 self, 

1191 element: Any, 

1192 resolved: Any, 

1193 argname: Optional[str] = None, 

1194 **kw: Any, 

1195 ) -> Any: 

1196 if resolved._is_lambda_element: 

1197 return resolved 

1198 else: 

1199 return super()._implicit_coercions( 

1200 element, resolved, argname=argname, **kw 

1201 ) 

1202 

1203 

1204class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1205 __slots__ = () 

1206 

1207 def _implicit_coercions( 

1208 self, 

1209 element: Any, 

1210 resolved: Any, 

1211 argname: Optional[str] = None, 

1212 **kw: Any, 

1213 ) -> Any: 

1214 if resolved._is_text_clause: 

1215 return resolved.columns() 

1216 else: 

1217 self._raise_for_expected(element, argname, resolved) 

1218 

1219 

1220class HasCTEImpl(ReturnsRowsImpl): 

1221 __slots__ = () 

1222 

1223 

1224class IsCTEImpl(RoleImpl): 

1225 __slots__ = () 

1226 

1227 

1228class JoinTargetImpl(RoleImpl): 

1229 __slots__ = () 

1230 

1231 _skip_clauseelement_for_target_match = True 

1232 

1233 def _literal_coercion(self, element, *, argname=None, **kw): 

1234 self._raise_for_expected(element, argname) 

1235 

1236 def _implicit_coercions( 

1237 self, 

1238 element: Any, 

1239 resolved: Any, 

1240 argname: Optional[str] = None, 

1241 *, 

1242 legacy: bool = False, 

1243 **kw: Any, 

1244 ) -> Any: 

1245 if isinstance(element, roles.JoinTargetRole): 

1246 # note that this codepath no longer occurs as of 

1247 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1248 # were set to False. 

1249 return element 

1250 elif legacy and resolved._is_select_base: 

1251 util.warn_deprecated( 

1252 "Implicit coercion of SELECT and textual SELECT " 

1253 "constructs into FROM clauses is deprecated; please call " 

1254 ".subquery() on any Core select or ORM Query object in " 

1255 "order to produce a subquery object.", 

1256 version="1.4", 

1257 ) 

1258 # TODO: doing _implicit_subquery here causes tests to fail, 

1259 # how was this working before? probably that ORM 

1260 # join logic treated it as a select and subquery would happen 

1261 # in _ORMJoin->Join 

1262 return resolved 

1263 else: 

1264 self._raise_for_expected(element, argname, resolved) 

1265 

1266 

1267class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1268 __slots__ = () 

1269 

1270 def _implicit_coercions( 

1271 self, 

1272 element: Any, 

1273 resolved: Any, 

1274 argname: Optional[str] = None, 

1275 *, 

1276 explicit_subquery: bool = False, 

1277 **kw: Any, 

1278 ) -> Any: 

1279 if resolved._is_select_base and explicit_subquery: 

1280 return resolved.subquery() 

1281 

1282 self._raise_for_expected(element, argname, resolved) 

1283 

1284 def _post_coercion(self, element, *, deannotate=False, **kw): 

1285 if deannotate: 

1286 return element._deannotate() 

1287 else: 

1288 return element 

1289 

1290 

1291class AnonymizedFromClauseImpl(FromClauseImpl): 

1292 __slots__ = () 

1293 

1294 def _post_coercion(self, element, *, flat=False, name=None, **kw): 

1295 assert name is None 

1296 

1297 return element._anonymous_fromclause(flat=flat) 

1298 

1299 

1300class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1301 __slots__ = () 

1302 

1303 def _post_coercion(self, element, **kw): 

1304 if "dml_table" in element._annotations: 

1305 return element._annotations["dml_table"] 

1306 else: 

1307 return element 

1308 

1309 

1310class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1311 __slots__ = () 

1312 

1313 def _implicit_coercions( 

1314 self, 

1315 element: Any, 

1316 resolved: Any, 

1317 argname: Optional[str] = None, 

1318 **kw: Any, 

1319 ) -> Any: 

1320 if resolved._is_from_clause: 

1321 if ( 

1322 isinstance(resolved, selectable.Alias) 

1323 and resolved.element._is_select_base 

1324 ): 

1325 return resolved.element 

1326 else: 

1327 return resolved.select() 

1328 else: 

1329 self._raise_for_expected(element, argname, resolved) 

1330 

1331 

1332class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1333 __slots__ = () 

1334 

1335 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1336 if isinstance(element, roles.FromClauseRole): 

1337 if element._is_subquery: 

1338 advice = ( 

1339 "Use the plain select() object without " 

1340 "calling .subquery() or .alias()." 

1341 ) 

1342 else: 

1343 advice = ( 

1344 "To SELECT from any FROM clause, use the .select() method." 

1345 ) 

1346 else: 

1347 advice = None 

1348 return super()._raise_for_expected( 

1349 element, argname=argname, resolved=resolved, advice=advice, **kw 

1350 ) 

1351 

1352 

1353_impl_lookup = {} 

1354 

1355 

1356for name in dir(roles): 

1357 cls = getattr(roles, name) 

1358 if name.endswith("Role"): 

1359 name = name.replace("Role", "Impl") 

1360 if name in globals(): 

1361 impl = globals()[name](cls) 

1362 _impl_lookup[cls] = impl 

1363 

1364if not TYPE_CHECKING: 

1365 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1366 

1367 for py_type in (int, bool, str, float): 

1368 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl