Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

576 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Sequence 

26from typing import Tuple 

27from typing import Type 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import roles 

33from . import visitors 

34from ._typing import is_from_clause 

35from .base import ExecutableOption 

36from .base import Options 

37from .cache_key import HasCacheKey 

38from .visitors import Visitable 

39from .. import exc 

40from .. import inspection 

41from .. import util 

42from ..util.typing import Literal 

43 

44if typing.TYPE_CHECKING: 

45 # elements lambdas schema selectable are set by __init__ 

46 from . import elements 

47 from . import lambdas 

48 from . import schema 

49 from . import selectable 

50 from ._typing import _ColumnExpressionArgument 

51 from ._typing import _ColumnsClauseArgument 

52 from ._typing import _DDLColumnArgument 

53 from ._typing import _DMLTableArgument 

54 from ._typing import _FromClauseArgument 

55 from ._typing import _OnlyColumnArgument 

56 from .dml import _DMLTableElement 

57 from .elements import BindParameter 

58 from .elements import ClauseElement 

59 from .elements import ColumnClause 

60 from .elements import ColumnElement 

61 from .elements import NamedColumn 

62 from .elements import SQLCoreOperations 

63 from .elements import TextClause 

64 from .schema import Column 

65 from .selectable import _ColumnsClauseElement 

66 from .selectable import _JoinTargetProtocol 

67 from .selectable import FromClause 

68 from .selectable import HasCTE 

69 from .selectable import SelectBase 

70 from .selectable import Subquery 

71 from .visitors import _TraverseCallableType 

72 

73_SR = TypeVar("_SR", bound=roles.SQLRole) 

74_F = TypeVar("_F", bound=Callable[..., Any]) 

75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

76_T = TypeVar("_T", bound=Any) 

77 

78 

79def _is_literal(element: Any) -> bool: 

80 """Return whether or not the element is a "literal" in the context 

81 of a SQL expression construct. 

82 

83 """ 

84 

85 return not isinstance( 

86 element, 

87 (Visitable, schema.SchemaEventTarget), 

88 ) and not hasattr(element, "__clause_element__") 

89 

90 

91def _deep_is_literal(element): 

92 """Return whether or not the element is a "literal" in the context 

93 of a SQL expression construct. 

94 

95 does a deeper more esoteric check than _is_literal. is used 

96 for lambda elements that have to distinguish values that would 

97 be bound vs. not without any context. 

98 

99 """ 

100 

101 if isinstance(element, collections_abc.Sequence) and not isinstance( 

102 element, str 

103 ): 

104 for elem in element: 

105 if not _deep_is_literal(elem): 

106 return False 

107 else: 

108 return True 

109 

110 return ( 

111 not isinstance( 

112 element, 

113 ( 

114 Visitable, 

115 schema.SchemaEventTarget, 

116 HasCacheKey, 

117 Options, 

118 util.langhelpers.symbol, 

119 ), 

120 ) 

121 and not hasattr(element, "__clause_element__") 

122 and ( 

123 not isinstance(element, type) 

124 or not issubclass(element, HasCacheKey) 

125 ) 

126 ) 

127 

128 

129def _document_text_coercion( 

130 paramname: str, meth_rst: str, param_rst: str 

131) -> Callable[[_F], _F]: 

132 return util.add_parameter_text( 

133 paramname, 

134 ( 

135 ".. warning:: " 

136 "The %s argument to %s can be passed as a Python string argument, " 

137 "which will be treated " 

138 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

139 "UNTRUSTED INPUT TO THIS PARAMETER**." 

140 ) 

141 % (param_rst, meth_rst), 

142 ) 

143 

144 

145def _expression_collection_was_a_list( 

146 attrname: str, 

147 fnname: str, 

148 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

149) -> Sequence[_T]: 

150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

151 if isinstance(args[0], list): 

152 raise exc.ArgumentError( 

153 f'The "{attrname}" argument to {fnname}(), when ' 

154 "referring to a sequence " 

155 "of items, is now passed as a series of positional " 

156 "elements, rather than as a list. " 

157 ) 

158 return cast("Sequence[_T]", args[0]) 

159 

160 return cast("Sequence[_T]", args) 

161 

162 

163@overload 

164def expect( 

165 role: Type[roles.TruncatedLabelRole], 

166 element: Any, 

167 **kw: Any, 

168) -> str: ... 

169 

170 

171@overload 

172def expect( 

173 role: Type[roles.DMLColumnRole], 

174 element: Any, 

175 *, 

176 as_key: Literal[True] = ..., 

177 **kw: Any, 

178) -> str: ... 

179 

180 

181@overload 

182def expect( 

183 role: Type[roles.LiteralValueRole], 

184 element: Any, 

185 **kw: Any, 

186) -> BindParameter[Any]: ... 

187 

188 

189@overload 

190def expect( 

191 role: Type[roles.DDLReferredColumnRole], 

192 element: Any, 

193 **kw: Any, 

194) -> Union[Column[Any], str]: ... 

195 

196 

197@overload 

198def expect( 

199 role: Type[roles.DDLConstraintColumnRole], 

200 element: Any, 

201 **kw: Any, 

202) -> Union[Column[Any], str]: ... 

203 

204 

205@overload 

206def expect( 

207 role: Type[roles.StatementOptionRole], 

208 element: Any, 

209 **kw: Any, 

210) -> Union[ColumnElement[Any], TextClause]: ... 

211 

212 

213@overload 

214def expect( 

215 role: Type[roles.LabeledColumnExprRole[Any]], 

216 element: Union[_ColumnExpressionArgument[_T], _OnlyColumnArgument[_T]], 

217 **kw: Any, 

218) -> NamedColumn[_T]: ... 

219 

220 

221@overload 

222def expect( 

223 role: Union[ 

224 Type[roles.ExpressionElementRole[Any]], 

225 Type[roles.LimitOffsetRole], 

226 Type[roles.WhereHavingRole], 

227 ], 

228 element: _ColumnExpressionArgument[_T], 

229 **kw: Any, 

230) -> ColumnElement[_T]: ... 

231 

232 

233@overload 

234def expect( 

235 role: Union[ 

236 Type[roles.ExpressionElementRole[Any]], 

237 Type[roles.LimitOffsetRole], 

238 Type[roles.WhereHavingRole], 

239 Type[roles.OnClauseRole], 

240 Type[roles.ColumnArgumentRole], 

241 ], 

242 element: Any, 

243 **kw: Any, 

244) -> ColumnElement[Any]: ... 

245 

246 

247@overload 

248def expect( 

249 role: Type[roles.DMLTableRole], 

250 element: _DMLTableArgument, 

251 **kw: Any, 

252) -> _DMLTableElement: ... 

253 

254 

255@overload 

256def expect( 

257 role: Type[roles.HasCTERole], 

258 element: HasCTE, 

259 **kw: Any, 

260) -> HasCTE: ... 

261 

262 

263@overload 

264def expect( 

265 role: Type[roles.SelectStatementRole], 

266 element: SelectBase, 

267 **kw: Any, 

268) -> SelectBase: ... 

269 

270 

271@overload 

272def expect( 

273 role: Type[roles.FromClauseRole], 

274 element: _FromClauseArgument, 

275 **kw: Any, 

276) -> FromClause: ... 

277 

278 

279@overload 

280def expect( 

281 role: Type[roles.FromClauseRole], 

282 element: SelectBase, 

283 *, 

284 explicit_subquery: Literal[True] = ..., 

285 **kw: Any, 

286) -> Subquery: ... 

287 

288 

289@overload 

290def expect( 

291 role: Type[roles.ColumnsClauseRole], 

292 element: _ColumnsClauseArgument[Any], 

293 **kw: Any, 

294) -> _ColumnsClauseElement: ... 

295 

296 

297@overload 

298def expect( 

299 role: Type[roles.JoinTargetRole], 

300 element: _JoinTargetProtocol, 

301 **kw: Any, 

302) -> _JoinTargetProtocol: ... 

303 

304 

305# catchall for not-yet-implemented overloads 

306@overload 

307def expect( 

308 role: Type[_SR], 

309 element: Any, 

310 **kw: Any, 

311) -> Any: ... 

312 

313 

314def expect( 

315 role: Type[_SR], 

316 element: Any, 

317 *, 

318 apply_propagate_attrs: Optional[ClauseElement] = None, 

319 argname: Optional[str] = None, 

320 post_inspect: bool = False, 

321 disable_inspection: bool = False, 

322 **kw: Any, 

323) -> Any: 

324 if ( 

325 role.allows_lambda 

326 # note callable() will not invoke a __getattr__() method, whereas 

327 # hasattr(obj, "__call__") will. by keeping the callable() check here 

328 # we prevent most needless calls to hasattr() and therefore 

329 # __getattr__(), which is present on ColumnElement. 

330 and callable(element) 

331 and hasattr(element, "__code__") 

332 ): 

333 return lambdas.LambdaElement( 

334 element, 

335 role, 

336 lambdas.LambdaOptions(**kw), 

337 apply_propagate_attrs=apply_propagate_attrs, 

338 ) 

339 

340 # major case is that we are given a ClauseElement already, skip more 

341 # elaborate logic up front if possible 

342 impl = _impl_lookup[role] 

343 

344 original_element = element 

345 

346 if not isinstance( 

347 element, 

348 ( 

349 elements.CompilerElement, 

350 schema.SchemaItem, 

351 schema.FetchedValue, 

352 lambdas.PyWrapper, 

353 ), 

354 ): 

355 resolved = None 

356 

357 if impl._resolve_literal_only: 

358 resolved = impl._literal_coercion(element, **kw) 

359 else: 

360 original_element = element 

361 

362 is_clause_element = False 

363 

364 # this is a special performance optimization for ORM 

365 # joins used by JoinTargetImpl that we don't go through the 

366 # work of creating __clause_element__() when we only need the 

367 # original QueryableAttribute, as the former will do clause 

368 # adaption and all that which is just thrown away here. 

369 if ( 

370 impl._skip_clauseelement_for_target_match 

371 and isinstance(element, role) 

372 and hasattr(element, "__clause_element__") 

373 ): 

374 is_clause_element = True 

375 else: 

376 while hasattr(element, "__clause_element__"): 

377 is_clause_element = True 

378 

379 if not getattr(element, "is_clause_element", False): 

380 element = element.__clause_element__() 

381 else: 

382 break 

383 

384 if not is_clause_element: 

385 if impl._use_inspection and not disable_inspection: 

386 insp = inspection.inspect(element, raiseerr=False) 

387 if insp is not None: 

388 if post_inspect: 

389 insp._post_inspect 

390 try: 

391 resolved = insp.__clause_element__() 

392 except AttributeError: 

393 impl._raise_for_expected(original_element, argname) 

394 

395 if resolved is None: 

396 resolved = impl._literal_coercion( 

397 element, argname=argname, **kw 

398 ) 

399 else: 

400 resolved = element 

401 elif isinstance(element, lambdas.PyWrapper): 

402 resolved = element._sa__py_wrapper_literal(**kw) 

403 else: 

404 resolved = element 

405 

406 if apply_propagate_attrs is not None: 

407 if typing.TYPE_CHECKING: 

408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

409 

410 if not apply_propagate_attrs._propagate_attrs and getattr( 

411 resolved, "_propagate_attrs", None 

412 ): 

413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

414 

415 if impl._role_class in resolved.__class__.__mro__: 

416 if impl._post_coercion: 

417 resolved = impl._post_coercion( 

418 resolved, 

419 argname=argname, 

420 original_element=original_element, 

421 **kw, 

422 ) 

423 return resolved 

424 else: 

425 return impl._implicit_coercions( 

426 original_element, resolved, argname=argname, **kw 

427 ) 

428 

429 

430def expect_as_key( 

431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

432) -> str: 

433 kw.pop("as_key", None) 

434 return expect(role, element, as_key=True, **kw) 

435 

436 

437def expect_col_expression_collection( 

438 role: Type[roles.DDLConstraintColumnRole], 

439 expressions: Iterable[_DDLColumnArgument], 

440) -> Iterator[ 

441 Tuple[ 

442 Union[str, Column[Any]], 

443 Optional[ColumnClause[Any]], 

444 Optional[str], 

445 Optional[Union[Column[Any], str]], 

446 ] 

447]: 

448 for expr in expressions: 

449 strname = None 

450 column = None 

451 

452 resolved: Union[Column[Any], str] = expect(role, expr) 

453 if isinstance(resolved, str): 

454 assert isinstance(expr, str) 

455 strname = resolved = expr 

456 else: 

457 cols: List[Column[Any]] = [] 

458 col_append: _TraverseCallableType[Column[Any]] = cols.append 

459 visitors.traverse(resolved, {}, {"column": col_append}) 

460 if cols: 

461 column = cols[0] 

462 add_element = column if column is not None else strname 

463 

464 yield resolved, column, strname, add_element 

465 

466 

467class RoleImpl: 

468 __slots__ = ("_role_class", "name", "_use_inspection") 

469 

470 def _literal_coercion(self, element, **kw): 

471 raise NotImplementedError() 

472 

473 _post_coercion: Any = None 

474 _resolve_literal_only = False 

475 _skip_clauseelement_for_target_match = False 

476 

477 def __init__(self, role_class): 

478 self._role_class = role_class 

479 self.name = role_class._role_name 

480 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

481 

482 def _implicit_coercions( 

483 self, 

484 element: Any, 

485 resolved: Any, 

486 argname: Optional[str] = None, 

487 **kw: Any, 

488 ) -> Any: 

489 self._raise_for_expected(element, argname, resolved) 

490 

491 def _raise_for_expected( 

492 self, 

493 element: Any, 

494 argname: Optional[str] = None, 

495 resolved: Optional[Any] = None, 

496 *, 

497 advice: Optional[str] = None, 

498 code: Optional[str] = None, 

499 err: Optional[Exception] = None, 

500 **kw: Any, 

501 ) -> NoReturn: 

502 if resolved is not None and resolved is not element: 

503 got = "%r object resolved from %r object" % (resolved, element) 

504 else: 

505 got = repr(element) 

506 

507 if argname: 

508 msg = "%s expected for argument %r; got %s." % ( 

509 self.name, 

510 argname, 

511 got, 

512 ) 

513 else: 

514 msg = "%s expected, got %s." % (self.name, got) 

515 

516 if advice: 

517 msg += " " + advice 

518 

519 raise exc.ArgumentError(msg, code=code) from err 

520 

521 

522class _Deannotate: 

523 __slots__ = () 

524 

525 def _post_coercion(self, resolved, **kw): 

526 from .util import _deep_deannotate 

527 

528 return _deep_deannotate(resolved) 

529 

530 

531class _StringOnly: 

532 __slots__ = () 

533 

534 _resolve_literal_only = True 

535 

536 

537class _ReturnsStringKey(RoleImpl): 

538 __slots__ = () 

539 

540 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

541 if isinstance(element, str): 

542 return element 

543 else: 

544 self._raise_for_expected(element, argname, resolved) 

545 

546 def _literal_coercion(self, element, **kw): 

547 return element 

548 

549 

550class _ColumnCoercions(RoleImpl): 

551 __slots__ = () 

552 

553 def _warn_for_scalar_subquery_coercion(self): 

554 util.warn( 

555 "implicitly coercing SELECT object to scalar subquery; " 

556 "please use the .scalar_subquery() method to produce a scalar " 

557 "subquery.", 

558 ) 

559 

560 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

561 original_element = element 

562 if not getattr(resolved, "is_clause_element", False): 

563 self._raise_for_expected(original_element, argname, resolved) 

564 elif resolved._is_select_base: 

565 self._warn_for_scalar_subquery_coercion() 

566 return resolved.scalar_subquery() 

567 elif resolved._is_from_clause and isinstance( 

568 resolved, selectable.Subquery 

569 ): 

570 self._warn_for_scalar_subquery_coercion() 

571 return resolved.element.scalar_subquery() 

572 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

573 return resolved 

574 else: 

575 self._raise_for_expected(original_element, argname, resolved) 

576 

577 

578def _no_text_coercion( 

579 element: Any, 

580 argname: Optional[str] = None, 

581 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

582 extra: Optional[str] = None, 

583 err: Optional[Exception] = None, 

584) -> NoReturn: 

585 raise exc_cls( 

586 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

587 "explicitly declared as text(%(expr)r)" 

588 % { 

589 "expr": util.ellipses_string(element), 

590 "argname": "for argument %s" % (argname,) if argname else "", 

591 "extra": "%s " % extra if extra else "", 

592 } 

593 ) from err 

594 

595 

596class _NoTextCoercion(RoleImpl): 

597 __slots__ = () 

598 

599 def _literal_coercion(self, element, *, argname=None, **kw): 

600 if isinstance(element, str) and issubclass( 

601 elements.TextClause, self._role_class 

602 ): 

603 _no_text_coercion(element, argname) 

604 else: 

605 self._raise_for_expected(element, argname) 

606 

607 

608class _CoerceLiterals(RoleImpl): 

609 __slots__ = () 

610 _coerce_consts = False 

611 _coerce_star = False 

612 _coerce_numerics = False 

613 

614 def _text_coercion(self, element, argname=None): 

615 return _no_text_coercion(element, argname) 

616 

617 def _literal_coercion(self, element, *, argname=None, **kw): 

618 if isinstance(element, str): 

619 if self._coerce_star and element == "*": 

620 return elements.ColumnClause("*", is_literal=True) 

621 else: 

622 return self._text_coercion(element, argname, **kw) 

623 

624 if self._coerce_consts: 

625 if element is None: 

626 return elements.Null() 

627 elif element is False: 

628 return elements.False_() 

629 elif element is True: 

630 return elements.True_() 

631 

632 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

633 return elements.ColumnClause(str(element), is_literal=True) 

634 

635 self._raise_for_expected(element, argname) 

636 

637 

638class LiteralValueImpl(RoleImpl): 

639 _resolve_literal_only = True 

640 

641 def _implicit_coercions( 

642 self, 

643 element, 

644 resolved, 

645 argname=None, 

646 *, 

647 type_=None, 

648 literal_execute=False, 

649 **kw, 

650 ): 

651 if not _is_literal(resolved): 

652 self._raise_for_expected( 

653 element, resolved=resolved, argname=argname, **kw 

654 ) 

655 

656 return elements.BindParameter( 

657 None, 

658 element, 

659 type_=type_, 

660 unique=True, 

661 literal_execute=literal_execute, 

662 ) 

663 

664 def _literal_coercion(self, element, **kw): 

665 return element 

666 

667 

668class _SelectIsNotFrom(RoleImpl): 

669 __slots__ = () 

670 

671 def _raise_for_expected( 

672 self, 

673 element: Any, 

674 argname: Optional[str] = None, 

675 resolved: Optional[Any] = None, 

676 *, 

677 advice: Optional[str] = None, 

678 code: Optional[str] = None, 

679 err: Optional[Exception] = None, 

680 **kw: Any, 

681 ) -> NoReturn: 

682 if ( 

683 not advice 

684 and isinstance(element, roles.SelectStatementRole) 

685 or isinstance(resolved, roles.SelectStatementRole) 

686 ): 

687 advice = ( 

688 "To create a " 

689 "FROM clause from a %s object, use the .subquery() method." 

690 % (resolved.__class__ if resolved is not None else element,) 

691 ) 

692 code = "89ve" 

693 else: 

694 code = None 

695 

696 super()._raise_for_expected( 

697 element, 

698 argname=argname, 

699 resolved=resolved, 

700 advice=advice, 

701 code=code, 

702 err=err, 

703 **kw, 

704 ) 

705 # never reached 

706 assert False 

707 

708 

709class HasCacheKeyImpl(RoleImpl): 

710 __slots__ = () 

711 

712 def _implicit_coercions( 

713 self, 

714 element: Any, 

715 resolved: Any, 

716 argname: Optional[str] = None, 

717 **kw: Any, 

718 ) -> Any: 

719 if isinstance(element, HasCacheKey): 

720 return element 

721 else: 

722 self._raise_for_expected(element, argname, resolved) 

723 

724 def _literal_coercion(self, element, **kw): 

725 return element 

726 

727 

728class ExecutableOptionImpl(RoleImpl): 

729 __slots__ = () 

730 

731 def _implicit_coercions( 

732 self, 

733 element: Any, 

734 resolved: Any, 

735 argname: Optional[str] = None, 

736 **kw: Any, 

737 ) -> Any: 

738 if isinstance(element, ExecutableOption): 

739 return element 

740 else: 

741 self._raise_for_expected(element, argname, resolved) 

742 

743 def _literal_coercion(self, element, **kw): 

744 return element 

745 

746 

747class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

748 __slots__ = () 

749 

750 def _literal_coercion( 

751 self, element, *, name=None, type_=None, is_crud=False, **kw 

752 ): 

753 if ( 

754 element is None 

755 and not is_crud 

756 and (type_ is None or not type_.should_evaluate_none) 

757 ): 

758 # TODO: there's no test coverage now for the 

759 # "should_evaluate_none" part of this, as outside of "crud" this 

760 # codepath is not normally used except in some special cases 

761 return elements.Null() 

762 else: 

763 try: 

764 return elements.BindParameter( 

765 name, element, type_, unique=True, _is_crud=is_crud 

766 ) 

767 except exc.ArgumentError as err: 

768 self._raise_for_expected(element, err=err) 

769 

770 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

771 # select uses implicit coercion with warning instead of raising 

772 if isinstance(element, selectable.Values): 

773 advice = ( 

774 "To create a column expression from a VALUES clause, " 

775 "use the .scalar_values() method." 

776 ) 

777 elif isinstance(element, roles.AnonymizedFromClauseRole): 

778 advice = ( 

779 "To create a column expression from a FROM clause row " 

780 "as a whole, use the .table_valued() method." 

781 ) 

782 else: 

783 advice = None 

784 

785 return super()._raise_for_expected( 

786 element, argname=argname, resolved=resolved, advice=advice, **kw 

787 ) 

788 

789 

790class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

791 __slots__ = () 

792 

793 def _literal_coercion( # type: ignore[override] 

794 self, 

795 element, 

796 *, 

797 expr, 

798 operator, 

799 bindparam_type=None, 

800 argname=None, 

801 **kw, 

802 ): 

803 try: 

804 return expr._bind_param(operator, element, type_=bindparam_type) 

805 except exc.ArgumentError as err: 

806 self._raise_for_expected(element, err=err) 

807 

808 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw): 

809 if resolved.type._isnull and not expr.type._isnull: 

810 resolved = resolved._with_binary_element_type( 

811 bindparam_type if bindparam_type is not None else expr.type 

812 ) 

813 return resolved 

814 

815 

816class InElementImpl(RoleImpl): 

817 __slots__ = () 

818 

819 def _implicit_coercions( 

820 self, 

821 element: Any, 

822 resolved: Any, 

823 argname: Optional[str] = None, 

824 **kw: Any, 

825 ) -> Any: 

826 if resolved._is_from_clause: 

827 if ( 

828 isinstance(resolved, selectable.Alias) 

829 and resolved.element._is_select_base 

830 ): 

831 self._warn_for_implicit_coercion(resolved) 

832 return self._post_coercion(resolved.element, **kw) 

833 else: 

834 self._warn_for_implicit_coercion(resolved) 

835 return self._post_coercion(resolved.select(), **kw) 

836 else: 

837 self._raise_for_expected(element, argname, resolved) 

838 

839 def _warn_for_implicit_coercion(self, elem): 

840 util.warn( 

841 "Coercing %s object into a select() for use in IN(); " 

842 "please pass a select() construct explicitly" 

843 % (elem.__class__.__name__) 

844 ) 

845 

846 @util.preload_module("sqlalchemy.sql.elements") 

847 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501 

848 if util.is_non_string_iterable(element): 

849 non_literal_expressions: Dict[ 

850 Optional[_ColumnExpressionArgument[Any]], 

851 _ColumnExpressionArgument[Any], 

852 ] = {} 

853 element = list(element) 

854 for o in element: 

855 if not _is_literal(o): 

856 if not isinstance( 

857 o, util.preloaded.sql_elements.ColumnElement 

858 ) and not hasattr(o, "__clause_element__"): 

859 self._raise_for_expected(element, **kw) 

860 

861 else: 

862 non_literal_expressions[o] = o 

863 

864 if non_literal_expressions: 

865 return elements.ClauseList( 

866 *[ 

867 ( 

868 non_literal_expressions[o] 

869 if o in non_literal_expressions 

870 else expr._bind_param(operator, o) 

871 ) 

872 for o in element 

873 ] 

874 ) 

875 else: 

876 return expr._bind_param(operator, element, expanding=True) 

877 

878 else: 

879 self._raise_for_expected(element, **kw) 

880 

881 def _post_coercion(self, element, *, expr, operator, **kw): 

882 if element._is_select_base: 

883 # for IN, we are doing scalar_subquery() coercion without 

884 # a warning 

885 return element.scalar_subquery() 

886 elif isinstance(element, elements.ClauseList): 

887 assert not len(element.clauses) == 0 

888 return element.self_group(against=operator) 

889 

890 elif isinstance(element, elements.BindParameter): 

891 element = element._clone(maintain_key=True) 

892 element.expanding = True 

893 element.expand_op = operator 

894 

895 return element 

896 elif isinstance(element, selectable.Values): 

897 return element.scalar_values() 

898 else: 

899 return element 

900 

901 

902class OnClauseImpl(_ColumnCoercions, RoleImpl): 

903 __slots__ = () 

904 

905 _coerce_consts = True 

906 

907 def _literal_coercion(self, element, **kw): 

908 self._raise_for_expected(element) 

909 

910 def _post_coercion(self, resolved, *, original_element=None, **kw): 

911 # this is a hack right now as we want to use coercion on an 

912 # ORM InstrumentedAttribute, but we want to return the object 

913 # itself if it is one, not its clause element. 

914 # ORM context _join and _legacy_join() would need to be improved 

915 # to look for annotations in a clause element form. 

916 if isinstance(original_element, roles.JoinTargetRole): 

917 return original_element 

918 return resolved 

919 

920 

921class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

922 __slots__ = () 

923 

924 _coerce_consts = True 

925 

926 def _text_coercion(self, element, argname=None): 

927 return _no_text_coercion(element, argname) 

928 

929 

930class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

931 __slots__ = () 

932 

933 _coerce_consts = True 

934 

935 def _text_coercion(self, element, argname=None): 

936 return elements.TextClause(element) 

937 

938 

939class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

940 __slots__ = () 

941 

942 

943class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

944 __slots__ = () 

945 

946 

947class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

948 __slots__ = () 

949 

950 def _text_coercion(self, element, argname=None): 

951 return elements.ColumnClause(element) 

952 

953 

954class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

955 __slots__ = () 

956 

957 _coerce_consts = True 

958 

959 def _text_coercion(self, element, argname=None): 

960 return elements._textual_label_reference(element) 

961 

962 

963class OrderByImpl(ByOfImpl, RoleImpl): 

964 __slots__ = () 

965 

966 def _post_coercion(self, resolved, **kw): 

967 if ( 

968 isinstance(resolved, self._role_class) 

969 and resolved._order_by_label_element is not None 

970 ): 

971 return elements._label_reference(resolved) 

972 else: 

973 return resolved 

974 

975 

976class GroupByImpl(ByOfImpl, RoleImpl): 

977 __slots__ = () 

978 

979 def _implicit_coercions( 

980 self, 

981 element: Any, 

982 resolved: Any, 

983 argname: Optional[str] = None, 

984 **kw: Any, 

985 ) -> Any: 

986 if is_from_clause(resolved): 

987 return elements.ClauseList(*resolved.c) 

988 else: 

989 return resolved 

990 

991 

992class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

993 __slots__ = () 

994 

995 def _post_coercion(self, element, *, as_key=False, **kw): 

996 if as_key: 

997 return element.key 

998 else: 

999 return element 

1000 

1001 

1002class ConstExprImpl(RoleImpl): 

1003 __slots__ = () 

1004 

1005 def _literal_coercion(self, element, *, argname=None, **kw): 

1006 if element is None: 

1007 return elements.Null() 

1008 elif element is False: 

1009 return elements.False_() 

1010 elif element is True: 

1011 return elements.True_() 

1012 else: 

1013 self._raise_for_expected(element, argname) 

1014 

1015 

1016class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1017 __slots__ = () 

1018 

1019 def _implicit_coercions( 

1020 self, 

1021 element: Any, 

1022 resolved: Any, 

1023 argname: Optional[str] = None, 

1024 **kw: Any, 

1025 ) -> Any: 

1026 if isinstance(element, str): 

1027 return resolved 

1028 else: 

1029 self._raise_for_expected(element, argname, resolved) 

1030 

1031 def _literal_coercion(self, element, **kw): 

1032 """coerce the given value to :class:`._truncated_label`. 

1033 

1034 Existing :class:`._truncated_label` and 

1035 :class:`._anonymous_label` objects are passed 

1036 unchanged. 

1037 """ 

1038 

1039 if isinstance(element, elements._truncated_label): 

1040 return element 

1041 else: 

1042 return elements._truncated_label(element) 

1043 

1044 

1045class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1046 __slots__ = () 

1047 

1048 _coerce_consts = True 

1049 

1050 def _text_coercion(self, element, argname=None): 

1051 # see #5754 for why we can't easily deprecate this coercion. 

1052 # essentially expressions like postgresql_where would have to be 

1053 # text() as they come back from reflection and we don't want to 

1054 # have text() elements wired into the inspection dictionaries. 

1055 return elements.TextClause(element) 

1056 

1057 

1058class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1059 __slots__ = () 

1060 

1061 

1062class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1063 __slots__ = () 

1064 

1065 

1066class LimitOffsetImpl(RoleImpl): 

1067 __slots__ = () 

1068 

1069 def _implicit_coercions( 

1070 self, 

1071 element: Any, 

1072 resolved: Any, 

1073 argname: Optional[str] = None, 

1074 **kw: Any, 

1075 ) -> Any: 

1076 if resolved is None: 

1077 return None 

1078 else: 

1079 self._raise_for_expected(element, argname, resolved) 

1080 

1081 def _literal_coercion( # type: ignore[override] 

1082 self, element, *, name, type_, **kw 

1083 ): 

1084 if element is None: 

1085 return None 

1086 else: 

1087 value = util.asint(element) 

1088 return selectable._OffsetLimitParam( 

1089 name, value, type_=type_, unique=True 

1090 ) 

1091 

1092 

1093class LabeledColumnExprImpl(ExpressionElementImpl): 

1094 __slots__ = () 

1095 

1096 def _implicit_coercions( 

1097 self, 

1098 element: Any, 

1099 resolved: Any, 

1100 argname: Optional[str] = None, 

1101 **kw: Any, 

1102 ) -> Any: 

1103 if isinstance(resolved, roles.ExpressionElementRole): 

1104 return resolved.label(None) 

1105 else: 

1106 new = super()._implicit_coercions( 

1107 element, resolved, argname=argname, **kw 

1108 ) 

1109 if isinstance(new, roles.ExpressionElementRole): 

1110 return new.label(None) 

1111 else: 

1112 self._raise_for_expected(element, argname, resolved) 

1113 

1114 

1115class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1116 __slots__ = () 

1117 

1118 _coerce_consts = True 

1119 _coerce_numerics = True 

1120 _coerce_star = True 

1121 

1122 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1123 

1124 def _raise_for_expected( 

1125 self, element, argname=None, resolved=None, *, advice=None, **kw 

1126 ): 

1127 if not advice and isinstance(element, list): 

1128 advice = ( 

1129 f"Did you mean to say select(" 

1130 f"{', '.join(repr(e) for e in element)})?" 

1131 ) 

1132 

1133 return super()._raise_for_expected( 

1134 element, argname=argname, resolved=resolved, advice=advice, **kw 

1135 ) 

1136 

1137 def _text_coercion(self, element, argname=None): 

1138 element = str(element) 

1139 

1140 guess_is_literal = not self._guess_straight_column.match(element) 

1141 raise exc.ArgumentError( 

1142 "Textual column expression %(column)r %(argname)sshould be " 

1143 "explicitly declared with text(%(column)r), " 

1144 "or use %(literal_column)s(%(column)r) " 

1145 "for more specificity" 

1146 % { 

1147 "column": util.ellipses_string(element), 

1148 "argname": "for argument %s" % (argname,) if argname else "", 

1149 "literal_column": ( 

1150 "literal_column" if guess_is_literal else "column" 

1151 ), 

1152 } 

1153 ) 

1154 

1155 

1156class ReturnsRowsImpl(RoleImpl): 

1157 __slots__ = () 

1158 

1159 

1160class StatementImpl(_CoerceLiterals, RoleImpl): 

1161 __slots__ = () 

1162 

1163 def _post_coercion( 

1164 self, resolved, *, original_element, argname=None, **kw 

1165 ): 

1166 if resolved is not original_element and not isinstance( 

1167 original_element, str 

1168 ): 

1169 # use same method as Connection uses; this will later raise 

1170 # ObjectNotExecutableError 

1171 try: 

1172 original_element._execute_on_connection 

1173 except AttributeError: 

1174 util.warn_deprecated( 

1175 "Object %r should not be used directly in a SQL statement " 

1176 "context, such as passing to methods such as " 

1177 "session.execute(). This usage will be disallowed in a " 

1178 "future release. " 

1179 "Please use Core select() / update() / delete() etc. " 

1180 "with Session.execute() and other statement execution " 

1181 "methods." % original_element, 

1182 "1.4", 

1183 ) 

1184 

1185 return resolved 

1186 

1187 def _implicit_coercions( 

1188 self, 

1189 element: Any, 

1190 resolved: Any, 

1191 argname: Optional[str] = None, 

1192 **kw: Any, 

1193 ) -> Any: 

1194 if resolved._is_lambda_element: 

1195 return resolved 

1196 else: 

1197 return super()._implicit_coercions( 

1198 element, resolved, argname=argname, **kw 

1199 ) 

1200 

1201 

1202class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1203 __slots__ = () 

1204 

1205 def _implicit_coercions( 

1206 self, 

1207 element: Any, 

1208 resolved: Any, 

1209 argname: Optional[str] = None, 

1210 **kw: Any, 

1211 ) -> Any: 

1212 if resolved._is_text_clause: 

1213 return resolved.columns() 

1214 else: 

1215 self._raise_for_expected(element, argname, resolved) 

1216 

1217 

1218class HasCTEImpl(ReturnsRowsImpl): 

1219 __slots__ = () 

1220 

1221 

1222class IsCTEImpl(RoleImpl): 

1223 __slots__ = () 

1224 

1225 

1226class JoinTargetImpl(RoleImpl): 

1227 __slots__ = () 

1228 

1229 _skip_clauseelement_for_target_match = True 

1230 

1231 def _literal_coercion(self, element, *, argname=None, **kw): 

1232 self._raise_for_expected(element, argname) 

1233 

1234 def _implicit_coercions( 

1235 self, 

1236 element: Any, 

1237 resolved: Any, 

1238 argname: Optional[str] = None, 

1239 *, 

1240 legacy: bool = False, 

1241 **kw: Any, 

1242 ) -> Any: 

1243 if isinstance(element, roles.JoinTargetRole): 

1244 # note that this codepath no longer occurs as of 

1245 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1246 # were set to False. 

1247 return element 

1248 elif legacy and resolved._is_select_base: 

1249 util.warn_deprecated( 

1250 "Implicit coercion of SELECT and textual SELECT " 

1251 "constructs into FROM clauses is deprecated; please call " 

1252 ".subquery() on any Core select or ORM Query object in " 

1253 "order to produce a subquery object.", 

1254 version="1.4", 

1255 ) 

1256 # TODO: doing _implicit_subquery here causes tests to fail, 

1257 # how was this working before? probably that ORM 

1258 # join logic treated it as a select and subquery would happen 

1259 # in _ORMJoin->Join 

1260 return resolved 

1261 else: 

1262 self._raise_for_expected(element, argname, resolved) 

1263 

1264 

1265class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1266 __slots__ = () 

1267 

1268 def _implicit_coercions( 

1269 self, 

1270 element: Any, 

1271 resolved: Any, 

1272 argname: Optional[str] = None, 

1273 *, 

1274 explicit_subquery: bool = False, 

1275 allow_select: bool = True, 

1276 **kw: Any, 

1277 ) -> Any: 

1278 if resolved._is_select_base: 

1279 if explicit_subquery: 

1280 return resolved.subquery() 

1281 elif allow_select: 

1282 util.warn_deprecated( 

1283 "Implicit coercion of SELECT and textual SELECT " 

1284 "constructs into FROM clauses is deprecated; please call " 

1285 ".subquery() on any Core select or ORM Query object in " 

1286 "order to produce a subquery object.", 

1287 version="1.4", 

1288 ) 

1289 return resolved._implicit_subquery 

1290 elif resolved._is_text_clause: 

1291 return resolved 

1292 else: 

1293 self._raise_for_expected(element, argname, resolved) 

1294 

1295 def _post_coercion(self, element, *, deannotate=False, **kw): 

1296 if deannotate: 

1297 return element._deannotate() 

1298 else: 

1299 return element 

1300 

1301 

1302class StrictFromClauseImpl(FromClauseImpl): 

1303 __slots__ = () 

1304 

1305 def _implicit_coercions( 

1306 self, 

1307 element: Any, 

1308 resolved: Any, 

1309 argname: Optional[str] = None, 

1310 *, 

1311 allow_select: bool = False, 

1312 **kw: Any, 

1313 ) -> Any: 

1314 if resolved._is_select_base and allow_select: 

1315 util.warn_deprecated( 

1316 "Implicit coercion of SELECT and textual SELECT constructs " 

1317 "into FROM clauses is deprecated; please call .subquery() " 

1318 "on any Core select or ORM Query object in order to produce a " 

1319 "subquery object.", 

1320 version="1.4", 

1321 ) 

1322 return resolved._implicit_subquery 

1323 else: 

1324 self._raise_for_expected(element, argname, resolved) 

1325 

1326 

1327class AnonymizedFromClauseImpl(StrictFromClauseImpl): 

1328 __slots__ = () 

1329 

1330 def _post_coercion(self, element, *, flat=False, name=None, **kw): 

1331 assert name is None 

1332 

1333 return element._anonymous_fromclause(flat=flat) 

1334 

1335 

1336class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1337 __slots__ = () 

1338 

1339 def _post_coercion(self, element, **kw): 

1340 if "dml_table" in element._annotations: 

1341 return element._annotations["dml_table"] 

1342 else: 

1343 return element 

1344 

1345 

1346class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1347 __slots__ = () 

1348 

1349 def _implicit_coercions( 

1350 self, 

1351 element: Any, 

1352 resolved: Any, 

1353 argname: Optional[str] = None, 

1354 **kw: Any, 

1355 ) -> Any: 

1356 if resolved._is_from_clause: 

1357 if ( 

1358 isinstance(resolved, selectable.Alias) 

1359 and resolved.element._is_select_base 

1360 ): 

1361 return resolved.element 

1362 else: 

1363 return resolved.select() 

1364 else: 

1365 self._raise_for_expected(element, argname, resolved) 

1366 

1367 

1368class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1369 __slots__ = () 

1370 

1371 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1372 if isinstance(element, roles.FromClauseRole): 

1373 if element._is_subquery: 

1374 advice = ( 

1375 "Use the plain select() object without " 

1376 "calling .subquery() or .alias()." 

1377 ) 

1378 else: 

1379 advice = ( 

1380 "To SELECT from any FROM clause, use the .select() method." 

1381 ) 

1382 else: 

1383 advice = None 

1384 return super()._raise_for_expected( 

1385 element, argname=argname, resolved=resolved, advice=advice, **kw 

1386 ) 

1387 

1388 

1389_impl_lookup = {} 

1390 

1391 

1392for name in dir(roles): 

1393 cls = getattr(roles, name) 

1394 if name.endswith("Role"): 

1395 name = name.replace("Role", "Impl") 

1396 if name in globals(): 

1397 impl = globals()[name](cls) 

1398 _impl_lookup[cls] = impl 

1399 

1400if not TYPE_CHECKING: 

1401 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1402 

1403 for py_type in (int, bool, str, float): 

1404 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl