Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

576 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Sequence 

26from typing import Tuple 

27from typing import Type 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import operators 

33from . import roles 

34from . import visitors 

35from ._typing import is_from_clause 

36from .base import ExecutableOption 

37from .base import Options 

38from .cache_key import HasCacheKey 

39from .visitors import Visitable 

40from .. import exc 

41from .. import inspection 

42from .. import util 

43from ..util.typing import Literal 

44 

45if typing.TYPE_CHECKING: 

46 # elements lambdas schema selectable are set by __init__ 

47 from . import elements 

48 from . import lambdas 

49 from . import schema 

50 from . import selectable 

51 from ._typing import _ColumnExpressionArgument 

52 from ._typing import _ColumnsClauseArgument 

53 from ._typing import _DDLColumnArgument 

54 from ._typing import _DMLTableArgument 

55 from ._typing import _FromClauseArgument 

56 from .dml import _DMLTableElement 

57 from .elements import BindParameter 

58 from .elements import ClauseElement 

59 from .elements import ColumnClause 

60 from .elements import ColumnElement 

61 from .elements import DQLDMLClauseElement 

62 from .elements import NamedColumn 

63 from .elements import SQLCoreOperations 

64 from .schema import Column 

65 from .selectable import _ColumnsClauseElement 

66 from .selectable import _JoinTargetProtocol 

67 from .selectable import FromClause 

68 from .selectable import HasCTE 

69 from .selectable import SelectBase 

70 from .selectable import Subquery 

71 from .visitors import _TraverseCallableType 

72 

73_SR = TypeVar("_SR", bound=roles.SQLRole) 

74_F = TypeVar("_F", bound=Callable[..., Any]) 

75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

76_T = TypeVar("_T", bound=Any) 

77 

78 

79def _is_literal(element): 

80 """Return whether or not the element is a "literal" in the context 

81 of a SQL expression construct. 

82 

83 """ 

84 

85 return not isinstance( 

86 element, 

87 (Visitable, schema.SchemaEventTarget), 

88 ) and not hasattr(element, "__clause_element__") 

89 

90 

91def _deep_is_literal(element): 

92 """Return whether or not the element is a "literal" in the context 

93 of a SQL expression construct. 

94 

95 does a deeper more esoteric check than _is_literal. is used 

96 for lambda elements that have to distinguish values that would 

97 be bound vs. not without any context. 

98 

99 """ 

100 

101 if isinstance(element, collections_abc.Sequence) and not isinstance( 

102 element, str 

103 ): 

104 for elem in element: 

105 if not _deep_is_literal(elem): 

106 return False 

107 else: 

108 return True 

109 

110 return ( 

111 not isinstance( 

112 element, 

113 ( 

114 Visitable, 

115 schema.SchemaEventTarget, 

116 HasCacheKey, 

117 Options, 

118 util.langhelpers.symbol, 

119 ), 

120 ) 

121 and not hasattr(element, "__clause_element__") 

122 and ( 

123 not isinstance(element, type) 

124 or not issubclass(element, HasCacheKey) 

125 ) 

126 ) 

127 

128 

129def _document_text_coercion( 

130 paramname: str, meth_rst: str, param_rst: str 

131) -> Callable[[_F], _F]: 

132 return util.add_parameter_text( 

133 paramname, 

134 ( 

135 ".. warning:: " 

136 "The %s argument to %s can be passed as a Python string argument, " 

137 "which will be treated " 

138 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

139 "UNTRUSTED INPUT TO THIS PARAMETER**." 

140 ) 

141 % (param_rst, meth_rst), 

142 ) 

143 

144 

145def _expression_collection_was_a_list( 

146 attrname: str, 

147 fnname: str, 

148 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

149) -> Sequence[_T]: 

150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

151 if isinstance(args[0], list): 

152 raise exc.ArgumentError( 

153 f'The "{attrname}" argument to {fnname}(), when ' 

154 "referring to a sequence " 

155 "of items, is now passed as a series of positional " 

156 "elements, rather than as a list. " 

157 ) 

158 return cast("Sequence[_T]", args[0]) 

159 

160 return cast("Sequence[_T]", args) 

161 

162 

163@overload 

164def expect( 

165 role: Type[roles.TruncatedLabelRole], 

166 element: Any, 

167 **kw: Any, 

168) -> str: ... 

169 

170 

171@overload 

172def expect( 

173 role: Type[roles.DMLColumnRole], 

174 element: Any, 

175 *, 

176 as_key: Literal[True] = ..., 

177 **kw: Any, 

178) -> str: ... 

179 

180 

181@overload 

182def expect( 

183 role: Type[roles.LiteralValueRole], 

184 element: Any, 

185 **kw: Any, 

186) -> BindParameter[Any]: ... 

187 

188 

189@overload 

190def expect( 

191 role: Type[roles.DDLReferredColumnRole], 

192 element: Any, 

193 **kw: Any, 

194) -> Column[Any]: ... 

195 

196 

197@overload 

198def expect( 

199 role: Type[roles.DDLConstraintColumnRole], 

200 element: Any, 

201 **kw: Any, 

202) -> Union[Column[Any], str]: ... 

203 

204 

205@overload 

206def expect( 

207 role: Type[roles.StatementOptionRole], 

208 element: Any, 

209 **kw: Any, 

210) -> DQLDMLClauseElement: ... 

211 

212 

213@overload 

214def expect( 

215 role: Type[roles.LabeledColumnExprRole[Any]], 

216 element: _ColumnExpressionArgument[_T], 

217 **kw: Any, 

218) -> NamedColumn[_T]: ... 

219 

220 

221@overload 

222def expect( 

223 role: Union[ 

224 Type[roles.ExpressionElementRole[Any]], 

225 Type[roles.LimitOffsetRole], 

226 Type[roles.WhereHavingRole], 

227 ], 

228 element: _ColumnExpressionArgument[_T], 

229 **kw: Any, 

230) -> ColumnElement[_T]: ... 

231 

232 

233@overload 

234def expect( 

235 role: Union[ 

236 Type[roles.ExpressionElementRole[Any]], 

237 Type[roles.LimitOffsetRole], 

238 Type[roles.WhereHavingRole], 

239 Type[roles.OnClauseRole], 

240 Type[roles.ColumnArgumentRole], 

241 ], 

242 element: Any, 

243 **kw: Any, 

244) -> ColumnElement[Any]: ... 

245 

246 

247@overload 

248def expect( 

249 role: Type[roles.DMLTableRole], 

250 element: _DMLTableArgument, 

251 **kw: Any, 

252) -> _DMLTableElement: ... 

253 

254 

255@overload 

256def expect( 

257 role: Type[roles.HasCTERole], 

258 element: HasCTE, 

259 **kw: Any, 

260) -> HasCTE: ... 

261 

262 

263@overload 

264def expect( 

265 role: Type[roles.SelectStatementRole], 

266 element: SelectBase, 

267 **kw: Any, 

268) -> SelectBase: ... 

269 

270 

271@overload 

272def expect( 

273 role: Type[roles.FromClauseRole], 

274 element: _FromClauseArgument, 

275 **kw: Any, 

276) -> FromClause: ... 

277 

278 

279@overload 

280def expect( 

281 role: Type[roles.FromClauseRole], 

282 element: SelectBase, 

283 *, 

284 explicit_subquery: Literal[True] = ..., 

285 **kw: Any, 

286) -> Subquery: ... 

287 

288 

289@overload 

290def expect( 

291 role: Type[roles.ColumnsClauseRole], 

292 element: _ColumnsClauseArgument[Any], 

293 **kw: Any, 

294) -> _ColumnsClauseElement: ... 

295 

296 

297@overload 

298def expect( 

299 role: Type[roles.JoinTargetRole], 

300 element: _JoinTargetProtocol, 

301 **kw: Any, 

302) -> _JoinTargetProtocol: ... 

303 

304 

305# catchall for not-yet-implemented overloads 

306@overload 

307def expect( 

308 role: Type[_SR], 

309 element: Any, 

310 **kw: Any, 

311) -> Any: ... 

312 

313 

314def expect( 

315 role: Type[_SR], 

316 element: Any, 

317 *, 

318 apply_propagate_attrs: Optional[ClauseElement] = None, 

319 argname: Optional[str] = None, 

320 post_inspect: bool = False, 

321 disable_inspection: bool = False, 

322 **kw: Any, 

323) -> Any: 

324 if ( 

325 role.allows_lambda 

326 # note callable() will not invoke a __getattr__() method, whereas 

327 # hasattr(obj, "__call__") will. by keeping the callable() check here 

328 # we prevent most needless calls to hasattr() and therefore 

329 # __getattr__(), which is present on ColumnElement. 

330 and callable(element) 

331 and hasattr(element, "__code__") 

332 ): 

333 return lambdas.LambdaElement( 

334 element, 

335 role, 

336 lambdas.LambdaOptions(**kw), 

337 apply_propagate_attrs=apply_propagate_attrs, 

338 ) 

339 

340 # major case is that we are given a ClauseElement already, skip more 

341 # elaborate logic up front if possible 

342 impl = _impl_lookup[role] 

343 

344 original_element = element 

345 

346 if not isinstance( 

347 element, 

348 ( 

349 elements.CompilerElement, 

350 schema.SchemaItem, 

351 schema.FetchedValue, 

352 lambdas.PyWrapper, 

353 ), 

354 ): 

355 resolved = None 

356 

357 if impl._resolve_literal_only: 

358 resolved = impl._literal_coercion(element, **kw) 

359 else: 

360 original_element = element 

361 

362 is_clause_element = False 

363 

364 # this is a special performance optimization for ORM 

365 # joins used by JoinTargetImpl that we don't go through the 

366 # work of creating __clause_element__() when we only need the 

367 # original QueryableAttribute, as the former will do clause 

368 # adaption and all that which is just thrown away here. 

369 if ( 

370 impl._skip_clauseelement_for_target_match 

371 and isinstance(element, role) 

372 and hasattr(element, "__clause_element__") 

373 ): 

374 is_clause_element = True 

375 else: 

376 while hasattr(element, "__clause_element__"): 

377 is_clause_element = True 

378 

379 if not getattr(element, "is_clause_element", False): 

380 element = element.__clause_element__() 

381 else: 

382 break 

383 

384 if not is_clause_element: 

385 if impl._use_inspection and not disable_inspection: 

386 insp = inspection.inspect(element, raiseerr=False) 

387 if insp is not None: 

388 if post_inspect: 

389 insp._post_inspect 

390 try: 

391 resolved = insp.__clause_element__() 

392 except AttributeError: 

393 impl._raise_for_expected(original_element, argname) 

394 

395 if resolved is None: 

396 resolved = impl._literal_coercion( 

397 element, argname=argname, **kw 

398 ) 

399 else: 

400 resolved = element 

401 elif isinstance(element, lambdas.PyWrapper): 

402 resolved = element._sa__py_wrapper_literal(**kw) 

403 else: 

404 resolved = element 

405 

406 if apply_propagate_attrs is not None: 

407 if typing.TYPE_CHECKING: 

408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

409 

410 if not apply_propagate_attrs._propagate_attrs and getattr( 

411 resolved, "_propagate_attrs", None 

412 ): 

413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

414 

415 if impl._role_class in resolved.__class__.__mro__: 

416 if impl._post_coercion: 

417 resolved = impl._post_coercion( 

418 resolved, 

419 argname=argname, 

420 original_element=original_element, 

421 **kw, 

422 ) 

423 return resolved 

424 else: 

425 return impl._implicit_coercions( 

426 original_element, resolved, argname=argname, **kw 

427 ) 

428 

429 

430def expect_as_key( 

431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

432) -> str: 

433 kw.pop("as_key", None) 

434 return expect(role, element, as_key=True, **kw) 

435 

436 

437def expect_col_expression_collection( 

438 role: Type[roles.DDLConstraintColumnRole], 

439 expressions: Iterable[_DDLColumnArgument], 

440) -> Iterator[ 

441 Tuple[ 

442 Union[str, Column[Any]], 

443 Optional[ColumnClause[Any]], 

444 Optional[str], 

445 Optional[Union[Column[Any], str]], 

446 ] 

447]: 

448 for expr in expressions: 

449 strname = None 

450 column = None 

451 

452 resolved: Union[Column[Any], str] = expect(role, expr) 

453 if isinstance(resolved, str): 

454 assert isinstance(expr, str) 

455 strname = resolved = expr 

456 else: 

457 cols: List[Column[Any]] = [] 

458 col_append: _TraverseCallableType[Column[Any]] = cols.append 

459 visitors.traverse(resolved, {}, {"column": col_append}) 

460 if cols: 

461 column = cols[0] 

462 add_element = column if column is not None else strname 

463 

464 yield resolved, column, strname, add_element 

465 

466 

467class RoleImpl: 

468 __slots__ = ("_role_class", "name", "_use_inspection") 

469 

470 def _literal_coercion(self, element, **kw): 

471 raise NotImplementedError() 

472 

473 _post_coercion: Any = None 

474 _resolve_literal_only = False 

475 _skip_clauseelement_for_target_match = False 

476 

477 def __init__(self, role_class): 

478 self._role_class = role_class 

479 self.name = role_class._role_name 

480 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

481 

482 def _implicit_coercions( 

483 self, 

484 element: Any, 

485 resolved: Any, 

486 argname: Optional[str] = None, 

487 **kw: Any, 

488 ) -> Any: 

489 self._raise_for_expected(element, argname, resolved) 

490 

491 def _raise_for_expected( 

492 self, 

493 element: Any, 

494 argname: Optional[str] = None, 

495 resolved: Optional[Any] = None, 

496 advice: Optional[str] = None, 

497 code: Optional[str] = None, 

498 err: Optional[Exception] = None, 

499 **kw: Any, 

500 ) -> NoReturn: 

501 if resolved is not None and resolved is not element: 

502 got = "%r object resolved from %r object" % (resolved, element) 

503 else: 

504 got = repr(element) 

505 

506 if argname: 

507 msg = "%s expected for argument %r; got %s." % ( 

508 self.name, 

509 argname, 

510 got, 

511 ) 

512 else: 

513 msg = "%s expected, got %s." % (self.name, got) 

514 

515 if advice: 

516 msg += " " + advice 

517 

518 raise exc.ArgumentError(msg, code=code) from err 

519 

520 

521class _Deannotate: 

522 __slots__ = () 

523 

524 def _post_coercion(self, resolved, **kw): 

525 from .util import _deep_deannotate 

526 

527 return _deep_deannotate(resolved) 

528 

529 

530class _StringOnly: 

531 __slots__ = () 

532 

533 _resolve_literal_only = True 

534 

535 

536class _ReturnsStringKey(RoleImpl): 

537 __slots__ = () 

538 

539 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

540 if isinstance(element, str): 

541 return element 

542 else: 

543 self._raise_for_expected(element, argname, resolved) 

544 

545 def _literal_coercion(self, element, **kw): 

546 return element 

547 

548 

549class _ColumnCoercions(RoleImpl): 

550 __slots__ = () 

551 

552 def _warn_for_scalar_subquery_coercion(self): 

553 util.warn( 

554 "implicitly coercing SELECT object to scalar subquery; " 

555 "please use the .scalar_subquery() method to produce a scalar " 

556 "subquery.", 

557 ) 

558 

559 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

560 original_element = element 

561 if not getattr(resolved, "is_clause_element", False): 

562 self._raise_for_expected(original_element, argname, resolved) 

563 elif resolved._is_select_base: 

564 self._warn_for_scalar_subquery_coercion() 

565 return resolved.scalar_subquery() 

566 elif resolved._is_from_clause and isinstance( 

567 resolved, selectable.Subquery 

568 ): 

569 self._warn_for_scalar_subquery_coercion() 

570 return resolved.element.scalar_subquery() 

571 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

572 return resolved 

573 else: 

574 self._raise_for_expected(original_element, argname, resolved) 

575 

576 

577def _no_text_coercion( 

578 element: Any, 

579 argname: Optional[str] = None, 

580 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

581 extra: Optional[str] = None, 

582 err: Optional[Exception] = None, 

583) -> NoReturn: 

584 raise exc_cls( 

585 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

586 "explicitly declared as text(%(expr)r)" 

587 % { 

588 "expr": util.ellipses_string(element), 

589 "argname": "for argument %s" % (argname,) if argname else "", 

590 "extra": "%s " % extra if extra else "", 

591 } 

592 ) from err 

593 

594 

595class _NoTextCoercion(RoleImpl): 

596 __slots__ = () 

597 

598 def _literal_coercion(self, element, argname=None, **kw): 

599 if isinstance(element, str) and issubclass( 

600 elements.TextClause, self._role_class 

601 ): 

602 _no_text_coercion(element, argname) 

603 else: 

604 self._raise_for_expected(element, argname) 

605 

606 

607class _CoerceLiterals(RoleImpl): 

608 __slots__ = () 

609 _coerce_consts = False 

610 _coerce_star = False 

611 _coerce_numerics = False 

612 

613 def _text_coercion(self, element, argname=None): 

614 return _no_text_coercion(element, argname) 

615 

616 def _literal_coercion(self, element, argname=None, **kw): 

617 if isinstance(element, str): 

618 if self._coerce_star and element == "*": 

619 return elements.ColumnClause("*", is_literal=True) 

620 else: 

621 return self._text_coercion(element, argname, **kw) 

622 

623 if self._coerce_consts: 

624 if element is None: 

625 return elements.Null() 

626 elif element is False: 

627 return elements.False_() 

628 elif element is True: 

629 return elements.True_() 

630 

631 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

632 return elements.ColumnClause(str(element), is_literal=True) 

633 

634 self._raise_for_expected(element, argname) 

635 

636 

637class LiteralValueImpl(RoleImpl): 

638 _resolve_literal_only = True 

639 

640 def _implicit_coercions( 

641 self, 

642 element, 

643 resolved, 

644 argname, 

645 type_=None, 

646 literal_execute=False, 

647 **kw, 

648 ): 

649 if not _is_literal(resolved): 

650 self._raise_for_expected( 

651 element, resolved=resolved, argname=argname, **kw 

652 ) 

653 

654 return elements.BindParameter( 

655 None, 

656 element, 

657 type_=type_, 

658 unique=True, 

659 literal_execute=literal_execute, 

660 ) 

661 

662 def _literal_coercion(self, element, argname=None, type_=None, **kw): 

663 return element 

664 

665 

666class _SelectIsNotFrom(RoleImpl): 

667 __slots__ = () 

668 

669 def _raise_for_expected( 

670 self, 

671 element: Any, 

672 argname: Optional[str] = None, 

673 resolved: Optional[Any] = None, 

674 advice: Optional[str] = None, 

675 code: Optional[str] = None, 

676 err: Optional[Exception] = None, 

677 **kw: Any, 

678 ) -> NoReturn: 

679 if ( 

680 not advice 

681 and isinstance(element, roles.SelectStatementRole) 

682 or isinstance(resolved, roles.SelectStatementRole) 

683 ): 

684 advice = ( 

685 "To create a " 

686 "FROM clause from a %s object, use the .subquery() method." 

687 % (resolved.__class__ if resolved is not None else element,) 

688 ) 

689 code = "89ve" 

690 else: 

691 code = None 

692 

693 super()._raise_for_expected( 

694 element, 

695 argname=argname, 

696 resolved=resolved, 

697 advice=advice, 

698 code=code, 

699 err=err, 

700 **kw, 

701 ) 

702 # never reached 

703 assert False 

704 

705 

706class HasCacheKeyImpl(RoleImpl): 

707 __slots__ = () 

708 

709 def _implicit_coercions( 

710 self, 

711 element: Any, 

712 resolved: Any, 

713 argname: Optional[str] = None, 

714 **kw: Any, 

715 ) -> Any: 

716 if isinstance(element, HasCacheKey): 

717 return element 

718 else: 

719 self._raise_for_expected(element, argname, resolved) 

720 

721 def _literal_coercion(self, element, **kw): 

722 return element 

723 

724 

725class ExecutableOptionImpl(RoleImpl): 

726 __slots__ = () 

727 

728 def _implicit_coercions( 

729 self, 

730 element: Any, 

731 resolved: Any, 

732 argname: Optional[str] = None, 

733 **kw: Any, 

734 ) -> Any: 

735 if isinstance(element, ExecutableOption): 

736 return element 

737 else: 

738 self._raise_for_expected(element, argname, resolved) 

739 

740 def _literal_coercion(self, element, **kw): 

741 return element 

742 

743 

744class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

745 __slots__ = () 

746 

747 def _literal_coercion( 

748 self, element, name=None, type_=None, argname=None, is_crud=False, **kw 

749 ): 

750 if ( 

751 element is None 

752 and not is_crud 

753 and (type_ is None or not type_.should_evaluate_none) 

754 ): 

755 # TODO: there's no test coverage now for the 

756 # "should_evaluate_none" part of this, as outside of "crud" this 

757 # codepath is not normally used except in some special cases 

758 return elements.Null() 

759 else: 

760 try: 

761 return elements.BindParameter( 

762 name, element, type_, unique=True, _is_crud=is_crud 

763 ) 

764 except exc.ArgumentError as err: 

765 self._raise_for_expected(element, err=err) 

766 

767 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

768 # select uses implicit coercion with warning instead of raising 

769 if isinstance(element, selectable.Values): 

770 advice = ( 

771 "To create a column expression from a VALUES clause, " 

772 "use the .scalar_values() method." 

773 ) 

774 elif isinstance(element, roles.AnonymizedFromClauseRole): 

775 advice = ( 

776 "To create a column expression from a FROM clause row " 

777 "as a whole, use the .table_valued() method." 

778 ) 

779 else: 

780 advice = None 

781 

782 return super()._raise_for_expected( 

783 element, argname=argname, resolved=resolved, advice=advice, **kw 

784 ) 

785 

786 

787class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

788 __slots__ = () 

789 

790 def _literal_coercion( 

791 self, element, expr, operator, bindparam_type=None, argname=None, **kw 

792 ): 

793 try: 

794 return expr._bind_param(operator, element, type_=bindparam_type) 

795 except exc.ArgumentError as err: 

796 self._raise_for_expected(element, err=err) 

797 

798 def _post_coercion(self, resolved, expr, bindparam_type=None, **kw): 

799 if resolved.type._isnull and not expr.type._isnull: 

800 resolved = resolved._with_binary_element_type( 

801 bindparam_type if bindparam_type is not None else expr.type 

802 ) 

803 return resolved 

804 

805 

806class InElementImpl(RoleImpl): 

807 __slots__ = () 

808 

809 def _implicit_coercions( 

810 self, 

811 element: Any, 

812 resolved: Any, 

813 argname: Optional[str] = None, 

814 **kw: Any, 

815 ) -> Any: 

816 if resolved._is_from_clause: 

817 if ( 

818 isinstance(resolved, selectable.Alias) 

819 and resolved.element._is_select_base 

820 ): 

821 self._warn_for_implicit_coercion(resolved) 

822 return self._post_coercion(resolved.element, **kw) 

823 else: 

824 self._warn_for_implicit_coercion(resolved) 

825 return self._post_coercion(resolved.select(), **kw) 

826 else: 

827 self._raise_for_expected(element, argname, resolved) 

828 

829 def _warn_for_implicit_coercion(self, elem): 

830 util.warn( 

831 "Coercing %s object into a select() for use in IN(); " 

832 "please pass a select() construct explicitly" 

833 % (elem.__class__.__name__) 

834 ) 

835 

836 def _literal_coercion(self, element, expr, operator, **kw): 

837 if util.is_non_string_iterable(element): 

838 non_literal_expressions: Dict[ 

839 Optional[operators.ColumnOperators], 

840 operators.ColumnOperators, 

841 ] = {} 

842 element = list(element) 

843 for o in element: 

844 if not _is_literal(o): 

845 if not isinstance(o, operators.ColumnOperators): 

846 self._raise_for_expected(element, **kw) 

847 

848 else: 

849 non_literal_expressions[o] = o 

850 elif o is None: 

851 non_literal_expressions[o] = elements.Null() 

852 

853 if non_literal_expressions: 

854 return elements.ClauseList( 

855 *[ 

856 ( 

857 non_literal_expressions[o] 

858 if o in non_literal_expressions 

859 else expr._bind_param(operator, o) 

860 ) 

861 for o in element 

862 ] 

863 ) 

864 else: 

865 return expr._bind_param(operator, element, expanding=True) 

866 

867 else: 

868 self._raise_for_expected(element, **kw) 

869 

870 def _post_coercion(self, element, expr, operator, **kw): 

871 if element._is_select_base: 

872 # for IN, we are doing scalar_subquery() coercion without 

873 # a warning 

874 return element.scalar_subquery() 

875 elif isinstance(element, elements.ClauseList): 

876 assert not len(element.clauses) == 0 

877 return element.self_group(against=operator) 

878 

879 elif isinstance(element, elements.BindParameter): 

880 element = element._clone(maintain_key=True) 

881 element.expanding = True 

882 element.expand_op = operator 

883 

884 return element 

885 elif isinstance(element, selectable.Values): 

886 return element.scalar_values() 

887 else: 

888 return element 

889 

890 

891class OnClauseImpl(_ColumnCoercions, RoleImpl): 

892 __slots__ = () 

893 

894 _coerce_consts = True 

895 

896 def _literal_coercion( 

897 self, element, name=None, type_=None, argname=None, is_crud=False, **kw 

898 ): 

899 self._raise_for_expected(element) 

900 

901 def _post_coercion(self, resolved, original_element=None, **kw): 

902 # this is a hack right now as we want to use coercion on an 

903 # ORM InstrumentedAttribute, but we want to return the object 

904 # itself if it is one, not its clause element. 

905 # ORM context _join and _legacy_join() would need to be improved 

906 # to look for annotations in a clause element form. 

907 if isinstance(original_element, roles.JoinTargetRole): 

908 return original_element 

909 return resolved 

910 

911 

912class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

913 __slots__ = () 

914 

915 _coerce_consts = True 

916 

917 def _text_coercion(self, element, argname=None): 

918 return _no_text_coercion(element, argname) 

919 

920 

921class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

922 __slots__ = () 

923 

924 _coerce_consts = True 

925 

926 def _text_coercion(self, element, argname=None): 

927 return elements.TextClause(element) 

928 

929 

930class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

931 __slots__ = () 

932 

933 

934class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

935 __slots__ = () 

936 

937 

938class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

939 __slots__ = () 

940 

941 def _text_coercion(self, element, argname=None): 

942 return elements.ColumnClause(element) 

943 

944 

945class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

946 __slots__ = () 

947 

948 _coerce_consts = True 

949 

950 def _text_coercion(self, element, argname=None): 

951 return elements._textual_label_reference(element) 

952 

953 

954class OrderByImpl(ByOfImpl, RoleImpl): 

955 __slots__ = () 

956 

957 def _post_coercion(self, resolved, **kw): 

958 if ( 

959 isinstance(resolved, self._role_class) 

960 and resolved._order_by_label_element is not None 

961 ): 

962 return elements._label_reference(resolved) 

963 else: 

964 return resolved 

965 

966 

967class GroupByImpl(ByOfImpl, RoleImpl): 

968 __slots__ = () 

969 

970 def _implicit_coercions( 

971 self, 

972 element: Any, 

973 resolved: Any, 

974 argname: Optional[str] = None, 

975 **kw: Any, 

976 ) -> Any: 

977 if is_from_clause(resolved): 

978 return elements.ClauseList(*resolved.c) 

979 else: 

980 return resolved 

981 

982 

983class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

984 __slots__ = () 

985 

986 def _post_coercion(self, element, as_key=False, **kw): 

987 if as_key: 

988 return element.key 

989 else: 

990 return element 

991 

992 

993class ConstExprImpl(RoleImpl): 

994 __slots__ = () 

995 

996 def _literal_coercion(self, element, argname=None, **kw): 

997 if element is None: 

998 return elements.Null() 

999 elif element is False: 

1000 return elements.False_() 

1001 elif element is True: 

1002 return elements.True_() 

1003 else: 

1004 self._raise_for_expected(element, argname) 

1005 

1006 

1007class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1008 __slots__ = () 

1009 

1010 def _implicit_coercions( 

1011 self, 

1012 element: Any, 

1013 resolved: Any, 

1014 argname: Optional[str] = None, 

1015 **kw: Any, 

1016 ) -> Any: 

1017 if isinstance(element, str): 

1018 return resolved 

1019 else: 

1020 self._raise_for_expected(element, argname, resolved) 

1021 

1022 def _literal_coercion(self, element, argname=None, **kw): 

1023 """coerce the given value to :class:`._truncated_label`. 

1024 

1025 Existing :class:`._truncated_label` and 

1026 :class:`._anonymous_label` objects are passed 

1027 unchanged. 

1028 """ 

1029 

1030 if isinstance(element, elements._truncated_label): 

1031 return element 

1032 else: 

1033 return elements._truncated_label(element) 

1034 

1035 

1036class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1037 __slots__ = () 

1038 

1039 _coerce_consts = True 

1040 

1041 def _text_coercion(self, element, argname=None): 

1042 # see #5754 for why we can't easily deprecate this coercion. 

1043 # essentially expressions like postgresql_where would have to be 

1044 # text() as they come back from reflection and we don't want to 

1045 # have text() elements wired into the inspection dictionaries. 

1046 return elements.TextClause(element) 

1047 

1048 

1049class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1050 __slots__ = () 

1051 

1052 

1053class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1054 __slots__ = () 

1055 

1056 

1057class LimitOffsetImpl(RoleImpl): 

1058 __slots__ = () 

1059 

1060 def _implicit_coercions( 

1061 self, 

1062 element: Any, 

1063 resolved: Any, 

1064 argname: Optional[str] = None, 

1065 **kw: Any, 

1066 ) -> Any: 

1067 if resolved is None: 

1068 return None 

1069 else: 

1070 self._raise_for_expected(element, argname, resolved) 

1071 

1072 def _literal_coercion(self, element, name, type_, **kw): 

1073 if element is None: 

1074 return None 

1075 else: 

1076 value = util.asint(element) 

1077 return selectable._OffsetLimitParam( 

1078 name, value, type_=type_, unique=True 

1079 ) 

1080 

1081 

1082class LabeledColumnExprImpl(ExpressionElementImpl): 

1083 __slots__ = () 

1084 

1085 def _implicit_coercions( 

1086 self, 

1087 element: Any, 

1088 resolved: Any, 

1089 argname: Optional[str] = None, 

1090 **kw: Any, 

1091 ) -> Any: 

1092 if isinstance(resolved, roles.ExpressionElementRole): 

1093 return resolved.label(None) 

1094 else: 

1095 new = super()._implicit_coercions( 

1096 element, resolved, argname=argname, **kw 

1097 ) 

1098 if isinstance(new, roles.ExpressionElementRole): 

1099 return new.label(None) 

1100 else: 

1101 self._raise_for_expected(element, argname, resolved) 

1102 

1103 

1104class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1105 __slots__ = () 

1106 

1107 _coerce_consts = True 

1108 _coerce_numerics = True 

1109 _coerce_star = True 

1110 

1111 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1112 

1113 def _raise_for_expected( 

1114 self, element, argname=None, resolved=None, advice=None, **kw 

1115 ): 

1116 if not advice and isinstance(element, list): 

1117 advice = ( 

1118 f"Did you mean to say select(" 

1119 f"{', '.join(repr(e) for e in element)})?" 

1120 ) 

1121 

1122 return super()._raise_for_expected( 

1123 element, argname=argname, resolved=resolved, advice=advice, **kw 

1124 ) 

1125 

1126 def _text_coercion(self, element, argname=None): 

1127 element = str(element) 

1128 

1129 guess_is_literal = not self._guess_straight_column.match(element) 

1130 raise exc.ArgumentError( 

1131 "Textual column expression %(column)r %(argname)sshould be " 

1132 "explicitly declared with text(%(column)r), " 

1133 "or use %(literal_column)s(%(column)r) " 

1134 "for more specificity" 

1135 % { 

1136 "column": util.ellipses_string(element), 

1137 "argname": "for argument %s" % (argname,) if argname else "", 

1138 "literal_column": ( 

1139 "literal_column" if guess_is_literal else "column" 

1140 ), 

1141 } 

1142 ) 

1143 

1144 

1145class ReturnsRowsImpl(RoleImpl): 

1146 __slots__ = () 

1147 

1148 

1149class StatementImpl(_CoerceLiterals, RoleImpl): 

1150 __slots__ = () 

1151 

1152 def _post_coercion(self, resolved, original_element, argname=None, **kw): 

1153 if resolved is not original_element and not isinstance( 

1154 original_element, str 

1155 ): 

1156 # use same method as Connection uses; this will later raise 

1157 # ObjectNotExecutableError 

1158 try: 

1159 original_element._execute_on_connection 

1160 except AttributeError: 

1161 util.warn_deprecated( 

1162 "Object %r should not be used directly in a SQL statement " 

1163 "context, such as passing to methods such as " 

1164 "session.execute(). This usage will be disallowed in a " 

1165 "future release. " 

1166 "Please use Core select() / update() / delete() etc. " 

1167 "with Session.execute() and other statement execution " 

1168 "methods." % original_element, 

1169 "1.4", 

1170 ) 

1171 

1172 return resolved 

1173 

1174 def _implicit_coercions( 

1175 self, 

1176 element: Any, 

1177 resolved: Any, 

1178 argname: Optional[str] = None, 

1179 **kw: Any, 

1180 ) -> Any: 

1181 if resolved._is_lambda_element: 

1182 return resolved 

1183 else: 

1184 return super()._implicit_coercions( 

1185 element, resolved, argname=argname, **kw 

1186 ) 

1187 

1188 

1189class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1190 __slots__ = () 

1191 

1192 def _implicit_coercions( 

1193 self, 

1194 element: Any, 

1195 resolved: Any, 

1196 argname: Optional[str] = None, 

1197 **kw: Any, 

1198 ) -> Any: 

1199 if resolved._is_text_clause: 

1200 return resolved.columns() 

1201 else: 

1202 self._raise_for_expected(element, argname, resolved) 

1203 

1204 

1205class HasCTEImpl(ReturnsRowsImpl): 

1206 __slots__ = () 

1207 

1208 

1209class IsCTEImpl(RoleImpl): 

1210 __slots__ = () 

1211 

1212 

1213class JoinTargetImpl(RoleImpl): 

1214 __slots__ = () 

1215 

1216 _skip_clauseelement_for_target_match = True 

1217 

1218 def _literal_coercion(self, element, argname=None, **kw): 

1219 self._raise_for_expected(element, argname) 

1220 

1221 def _implicit_coercions( 

1222 self, 

1223 element: Any, 

1224 resolved: Any, 

1225 argname: Optional[str] = None, 

1226 legacy: bool = False, 

1227 **kw: Any, 

1228 ) -> Any: 

1229 if isinstance(element, roles.JoinTargetRole): 

1230 # note that this codepath no longer occurs as of 

1231 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1232 # were set to False. 

1233 return element 

1234 elif legacy and resolved._is_select_base: 

1235 util.warn_deprecated( 

1236 "Implicit coercion of SELECT and textual SELECT " 

1237 "constructs into FROM clauses is deprecated; please call " 

1238 ".subquery() on any Core select or ORM Query object in " 

1239 "order to produce a subquery object.", 

1240 version="1.4", 

1241 ) 

1242 # TODO: doing _implicit_subquery here causes tests to fail, 

1243 # how was this working before? probably that ORM 

1244 # join logic treated it as a select and subquery would happen 

1245 # in _ORMJoin->Join 

1246 return resolved 

1247 else: 

1248 self._raise_for_expected(element, argname, resolved) 

1249 

1250 

1251class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1252 __slots__ = () 

1253 

1254 def _implicit_coercions( 

1255 self, 

1256 element: Any, 

1257 resolved: Any, 

1258 argname: Optional[str] = None, 

1259 explicit_subquery: bool = False, 

1260 allow_select: bool = True, 

1261 **kw: Any, 

1262 ) -> Any: 

1263 if resolved._is_select_base: 

1264 if explicit_subquery: 

1265 return resolved.subquery() 

1266 elif allow_select: 

1267 util.warn_deprecated( 

1268 "Implicit coercion of SELECT and textual SELECT " 

1269 "constructs into FROM clauses is deprecated; please call " 

1270 ".subquery() on any Core select or ORM Query object in " 

1271 "order to produce a subquery object.", 

1272 version="1.4", 

1273 ) 

1274 return resolved._implicit_subquery 

1275 elif resolved._is_text_clause: 

1276 return resolved 

1277 else: 

1278 self._raise_for_expected(element, argname, resolved) 

1279 

1280 def _post_coercion(self, element, deannotate=False, **kw): 

1281 if deannotate: 

1282 return element._deannotate() 

1283 else: 

1284 return element 

1285 

1286 

1287class StrictFromClauseImpl(FromClauseImpl): 

1288 __slots__ = () 

1289 

1290 def _implicit_coercions( 

1291 self, 

1292 element: Any, 

1293 resolved: Any, 

1294 argname: Optional[str] = None, 

1295 explicit_subquery: bool = False, 

1296 allow_select: bool = False, 

1297 **kw: Any, 

1298 ) -> Any: 

1299 if resolved._is_select_base and allow_select: 

1300 util.warn_deprecated( 

1301 "Implicit coercion of SELECT and textual SELECT constructs " 

1302 "into FROM clauses is deprecated; please call .subquery() " 

1303 "on any Core select or ORM Query object in order to produce a " 

1304 "subquery object.", 

1305 version="1.4", 

1306 ) 

1307 return resolved._implicit_subquery 

1308 else: 

1309 self._raise_for_expected(element, argname, resolved) 

1310 

1311 

1312class AnonymizedFromClauseImpl(StrictFromClauseImpl): 

1313 __slots__ = () 

1314 

1315 def _post_coercion(self, element, flat=False, name=None, **kw): 

1316 assert name is None 

1317 

1318 return element._anonymous_fromclause(flat=flat) 

1319 

1320 

1321class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1322 __slots__ = () 

1323 

1324 def _post_coercion(self, element, **kw): 

1325 if "dml_table" in element._annotations: 

1326 return element._annotations["dml_table"] 

1327 else: 

1328 return element 

1329 

1330 

1331class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1332 __slots__ = () 

1333 

1334 def _implicit_coercions( 

1335 self, 

1336 element: Any, 

1337 resolved: Any, 

1338 argname: Optional[str] = None, 

1339 **kw: Any, 

1340 ) -> Any: 

1341 if resolved._is_from_clause: 

1342 if ( 

1343 isinstance(resolved, selectable.Alias) 

1344 and resolved.element._is_select_base 

1345 ): 

1346 return resolved.element 

1347 else: 

1348 return resolved.select() 

1349 else: 

1350 self._raise_for_expected(element, argname, resolved) 

1351 

1352 

1353class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1354 __slots__ = () 

1355 

1356 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1357 if isinstance(element, roles.FromClauseRole): 

1358 if element._is_subquery: 

1359 advice = ( 

1360 "Use the plain select() object without " 

1361 "calling .subquery() or .alias()." 

1362 ) 

1363 else: 

1364 advice = ( 

1365 "To SELECT from any FROM clause, use the .select() method." 

1366 ) 

1367 else: 

1368 advice = None 

1369 return super()._raise_for_expected( 

1370 element, argname=argname, resolved=resolved, advice=advice, **kw 

1371 ) 

1372 

1373 

1374_impl_lookup = {} 

1375 

1376 

1377for name in dir(roles): 

1378 cls = getattr(roles, name) 

1379 if name.endswith("Role"): 

1380 name = name.replace("Role", "Impl") 

1381 if name in globals(): 

1382 impl = globals()[name](cls) 

1383 _impl_lookup[cls] = impl 

1384 

1385if not TYPE_CHECKING: 

1386 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1387 

1388 for py_type in (int, bool, str, float): 

1389 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl