Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

575 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Sequence 

26from typing import Tuple 

27from typing import Type 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import roles 

33from . import visitors 

34from ._typing import is_from_clause 

35from .base import ExecutableOption 

36from .base import Options 

37from .cache_key import HasCacheKey 

38from .visitors import Visitable 

39from .. import exc 

40from .. import inspection 

41from .. import util 

42from ..util.typing import Literal 

43 

44if typing.TYPE_CHECKING: 

45 # elements lambdas schema selectable are set by __init__ 

46 from . import elements 

47 from . import lambdas 

48 from . import schema 

49 from . import selectable 

50 from ._typing import _ColumnExpressionArgument 

51 from ._typing import _ColumnsClauseArgument 

52 from ._typing import _DDLColumnArgument 

53 from ._typing import _DMLTableArgument 

54 from ._typing import _FromClauseArgument 

55 from .dml import _DMLTableElement 

56 from .elements import BindParameter 

57 from .elements import ClauseElement 

58 from .elements import ColumnClause 

59 from .elements import ColumnElement 

60 from .elements import NamedColumn 

61 from .elements import SQLCoreOperations 

62 from .elements import TextClause 

63 from .schema import Column 

64 from .selectable import _ColumnsClauseElement 

65 from .selectable import _JoinTargetProtocol 

66 from .selectable import FromClause 

67 from .selectable import HasCTE 

68 from .selectable import SelectBase 

69 from .selectable import Subquery 

70 from .visitors import _TraverseCallableType 

71 

72_SR = TypeVar("_SR", bound=roles.SQLRole) 

73_F = TypeVar("_F", bound=Callable[..., Any]) 

74_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

75_T = TypeVar("_T", bound=Any) 

76 

77 

78def _is_literal(element: Any) -> bool: 

79 """Return whether or not the element is a "literal" in the context 

80 of a SQL expression construct. 

81 

82 """ 

83 

84 return not isinstance( 

85 element, 

86 (Visitable, schema.SchemaEventTarget), 

87 ) and not hasattr(element, "__clause_element__") 

88 

89 

90def _deep_is_literal(element): 

91 """Return whether or not the element is a "literal" in the context 

92 of a SQL expression construct. 

93 

94 does a deeper more esoteric check than _is_literal. is used 

95 for lambda elements that have to distinguish values that would 

96 be bound vs. not without any context. 

97 

98 """ 

99 

100 if isinstance(element, collections_abc.Sequence) and not isinstance( 

101 element, str 

102 ): 

103 for elem in element: 

104 if not _deep_is_literal(elem): 

105 return False 

106 else: 

107 return True 

108 

109 return ( 

110 not isinstance( 

111 element, 

112 ( 

113 Visitable, 

114 schema.SchemaEventTarget, 

115 HasCacheKey, 

116 Options, 

117 util.langhelpers.symbol, 

118 ), 

119 ) 

120 and not hasattr(element, "__clause_element__") 

121 and ( 

122 not isinstance(element, type) 

123 or not issubclass(element, HasCacheKey) 

124 ) 

125 ) 

126 

127 

128def _document_text_coercion( 

129 paramname: str, meth_rst: str, param_rst: str 

130) -> Callable[[_F], _F]: 

131 return util.add_parameter_text( 

132 paramname, 

133 ( 

134 ".. warning:: " 

135 "The %s argument to %s can be passed as a Python string argument, " 

136 "which will be treated " 

137 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

138 "UNTRUSTED INPUT TO THIS PARAMETER**." 

139 ) 

140 % (param_rst, meth_rst), 

141 ) 

142 

143 

144def _expression_collection_was_a_list( 

145 attrname: str, 

146 fnname: str, 

147 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

148) -> Sequence[_T]: 

149 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

150 if isinstance(args[0], list): 

151 raise exc.ArgumentError( 

152 f'The "{attrname}" argument to {fnname}(), when ' 

153 "referring to a sequence " 

154 "of items, is now passed as a series of positional " 

155 "elements, rather than as a list. " 

156 ) 

157 return cast("Sequence[_T]", args[0]) 

158 

159 return cast("Sequence[_T]", args) 

160 

161 

162@overload 

163def expect( 

164 role: Type[roles.TruncatedLabelRole], 

165 element: Any, 

166 **kw: Any, 

167) -> str: ... 

168 

169 

170@overload 

171def expect( 

172 role: Type[roles.DMLColumnRole], 

173 element: Any, 

174 *, 

175 as_key: Literal[True] = ..., 

176 **kw: Any, 

177) -> str: ... 

178 

179 

180@overload 

181def expect( 

182 role: Type[roles.LiteralValueRole], 

183 element: Any, 

184 **kw: Any, 

185) -> BindParameter[Any]: ... 

186 

187 

188@overload 

189def expect( 

190 role: Type[roles.DDLReferredColumnRole], 

191 element: Any, 

192 **kw: Any, 

193) -> Union[Column[Any], str]: ... 

194 

195 

196@overload 

197def expect( 

198 role: Type[roles.DDLConstraintColumnRole], 

199 element: Any, 

200 **kw: Any, 

201) -> Union[Column[Any], str]: ... 

202 

203 

204@overload 

205def expect( 

206 role: Type[roles.StatementOptionRole], 

207 element: Any, 

208 **kw: Any, 

209) -> Union[ColumnElement[Any], TextClause]: ... 

210 

211 

212@overload 

213def expect( 

214 role: Type[roles.LabeledColumnExprRole[Any]], 

215 element: _ColumnExpressionArgument[_T], 

216 **kw: Any, 

217) -> NamedColumn[_T]: ... 

218 

219 

220@overload 

221def expect( 

222 role: Union[ 

223 Type[roles.ExpressionElementRole[Any]], 

224 Type[roles.LimitOffsetRole], 

225 Type[roles.WhereHavingRole], 

226 ], 

227 element: _ColumnExpressionArgument[_T], 

228 **kw: Any, 

229) -> ColumnElement[_T]: ... 

230 

231 

232@overload 

233def expect( 

234 role: Union[ 

235 Type[roles.ExpressionElementRole[Any]], 

236 Type[roles.LimitOffsetRole], 

237 Type[roles.WhereHavingRole], 

238 Type[roles.OnClauseRole], 

239 Type[roles.ColumnArgumentRole], 

240 ], 

241 element: Any, 

242 **kw: Any, 

243) -> ColumnElement[Any]: ... 

244 

245 

246@overload 

247def expect( 

248 role: Type[roles.DMLTableRole], 

249 element: _DMLTableArgument, 

250 **kw: Any, 

251) -> _DMLTableElement: ... 

252 

253 

254@overload 

255def expect( 

256 role: Type[roles.HasCTERole], 

257 element: HasCTE, 

258 **kw: Any, 

259) -> HasCTE: ... 

260 

261 

262@overload 

263def expect( 

264 role: Type[roles.SelectStatementRole], 

265 element: SelectBase, 

266 **kw: Any, 

267) -> SelectBase: ... 

268 

269 

270@overload 

271def expect( 

272 role: Type[roles.FromClauseRole], 

273 element: _FromClauseArgument, 

274 **kw: Any, 

275) -> FromClause: ... 

276 

277 

278@overload 

279def expect( 

280 role: Type[roles.FromClauseRole], 

281 element: SelectBase, 

282 *, 

283 explicit_subquery: Literal[True] = ..., 

284 **kw: Any, 

285) -> Subquery: ... 

286 

287 

288@overload 

289def expect( 

290 role: Type[roles.ColumnsClauseRole], 

291 element: _ColumnsClauseArgument[Any], 

292 **kw: Any, 

293) -> _ColumnsClauseElement: ... 

294 

295 

296@overload 

297def expect( 

298 role: Type[roles.JoinTargetRole], 

299 element: _JoinTargetProtocol, 

300 **kw: Any, 

301) -> _JoinTargetProtocol: ... 

302 

303 

304# catchall for not-yet-implemented overloads 

305@overload 

306def expect( 

307 role: Type[_SR], 

308 element: Any, 

309 **kw: Any, 

310) -> Any: ... 

311 

312 

313def expect( 

314 role: Type[_SR], 

315 element: Any, 

316 *, 

317 apply_propagate_attrs: Optional[ClauseElement] = None, 

318 argname: Optional[str] = None, 

319 post_inspect: bool = False, 

320 disable_inspection: bool = False, 

321 **kw: Any, 

322) -> Any: 

323 if ( 

324 role.allows_lambda 

325 # note callable() will not invoke a __getattr__() method, whereas 

326 # hasattr(obj, "__call__") will. by keeping the callable() check here 

327 # we prevent most needless calls to hasattr() and therefore 

328 # __getattr__(), which is present on ColumnElement. 

329 and callable(element) 

330 and hasattr(element, "__code__") 

331 ): 

332 return lambdas.LambdaElement( 

333 element, 

334 role, 

335 lambdas.LambdaOptions(**kw), 

336 apply_propagate_attrs=apply_propagate_attrs, 

337 ) 

338 

339 # major case is that we are given a ClauseElement already, skip more 

340 # elaborate logic up front if possible 

341 impl = _impl_lookup[role] 

342 

343 original_element = element 

344 

345 if not isinstance( 

346 element, 

347 ( 

348 elements.CompilerElement, 

349 schema.SchemaItem, 

350 schema.FetchedValue, 

351 lambdas.PyWrapper, 

352 ), 

353 ): 

354 resolved = None 

355 

356 if impl._resolve_literal_only: 

357 resolved = impl._literal_coercion(element, **kw) 

358 else: 

359 original_element = element 

360 

361 is_clause_element = False 

362 

363 # this is a special performance optimization for ORM 

364 # joins used by JoinTargetImpl that we don't go through the 

365 # work of creating __clause_element__() when we only need the 

366 # original QueryableAttribute, as the former will do clause 

367 # adaption and all that which is just thrown away here. 

368 if ( 

369 impl._skip_clauseelement_for_target_match 

370 and isinstance(element, role) 

371 and hasattr(element, "__clause_element__") 

372 ): 

373 is_clause_element = True 

374 else: 

375 while hasattr(element, "__clause_element__"): 

376 is_clause_element = True 

377 

378 if not getattr(element, "is_clause_element", False): 

379 element = element.__clause_element__() 

380 else: 

381 break 

382 

383 if not is_clause_element: 

384 if impl._use_inspection and not disable_inspection: 

385 insp = inspection.inspect(element, raiseerr=False) 

386 if insp is not None: 

387 if post_inspect: 

388 insp._post_inspect 

389 try: 

390 resolved = insp.__clause_element__() 

391 except AttributeError: 

392 impl._raise_for_expected(original_element, argname) 

393 

394 if resolved is None: 

395 resolved = impl._literal_coercion( 

396 element, argname=argname, **kw 

397 ) 

398 else: 

399 resolved = element 

400 elif isinstance(element, lambdas.PyWrapper): 

401 resolved = element._sa__py_wrapper_literal(**kw) 

402 else: 

403 resolved = element 

404 

405 if apply_propagate_attrs is not None: 

406 if typing.TYPE_CHECKING: 

407 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

408 

409 if not apply_propagate_attrs._propagate_attrs and getattr( 

410 resolved, "_propagate_attrs", None 

411 ): 

412 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

413 

414 if impl._role_class in resolved.__class__.__mro__: 

415 if impl._post_coercion: 

416 resolved = impl._post_coercion( 

417 resolved, 

418 argname=argname, 

419 original_element=original_element, 

420 **kw, 

421 ) 

422 return resolved 

423 else: 

424 return impl._implicit_coercions( 

425 original_element, resolved, argname=argname, **kw 

426 ) 

427 

428 

429def expect_as_key( 

430 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

431) -> str: 

432 kw.pop("as_key", None) 

433 return expect(role, element, as_key=True, **kw) 

434 

435 

436def expect_col_expression_collection( 

437 role: Type[roles.DDLConstraintColumnRole], 

438 expressions: Iterable[_DDLColumnArgument], 

439) -> Iterator[ 

440 Tuple[ 

441 Union[str, Column[Any]], 

442 Optional[ColumnClause[Any]], 

443 Optional[str], 

444 Optional[Union[Column[Any], str]], 

445 ] 

446]: 

447 for expr in expressions: 

448 strname = None 

449 column = None 

450 

451 resolved: Union[Column[Any], str] = expect(role, expr) 

452 if isinstance(resolved, str): 

453 assert isinstance(expr, str) 

454 strname = resolved = expr 

455 else: 

456 cols: List[Column[Any]] = [] 

457 col_append: _TraverseCallableType[Column[Any]] = cols.append 

458 visitors.traverse(resolved, {}, {"column": col_append}) 

459 if cols: 

460 column = cols[0] 

461 add_element = column if column is not None else strname 

462 

463 yield resolved, column, strname, add_element 

464 

465 

466class RoleImpl: 

467 __slots__ = ("_role_class", "name", "_use_inspection") 

468 

469 def _literal_coercion(self, element, **kw): 

470 raise NotImplementedError() 

471 

472 _post_coercion: Any = None 

473 _resolve_literal_only = False 

474 _skip_clauseelement_for_target_match = False 

475 

476 def __init__(self, role_class): 

477 self._role_class = role_class 

478 self.name = role_class._role_name 

479 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

480 

481 def _implicit_coercions( 

482 self, 

483 element: Any, 

484 resolved: Any, 

485 argname: Optional[str] = None, 

486 **kw: Any, 

487 ) -> Any: 

488 self._raise_for_expected(element, argname, resolved) 

489 

490 def _raise_for_expected( 

491 self, 

492 element: Any, 

493 argname: Optional[str] = None, 

494 resolved: Optional[Any] = None, 

495 *, 

496 advice: Optional[str] = None, 

497 code: Optional[str] = None, 

498 err: Optional[Exception] = None, 

499 **kw: Any, 

500 ) -> NoReturn: 

501 if resolved is not None and resolved is not element: 

502 got = "%r object resolved from %r object" % (resolved, element) 

503 else: 

504 got = repr(element) 

505 

506 if argname: 

507 msg = "%s expected for argument %r; got %s." % ( 

508 self.name, 

509 argname, 

510 got, 

511 ) 

512 else: 

513 msg = "%s expected, got %s." % (self.name, got) 

514 

515 if advice: 

516 msg += " " + advice 

517 

518 raise exc.ArgumentError(msg, code=code) from err 

519 

520 

521class _Deannotate: 

522 __slots__ = () 

523 

524 def _post_coercion(self, resolved, **kw): 

525 from .util import _deep_deannotate 

526 

527 return _deep_deannotate(resolved) 

528 

529 

530class _StringOnly: 

531 __slots__ = () 

532 

533 _resolve_literal_only = True 

534 

535 

536class _ReturnsStringKey(RoleImpl): 

537 __slots__ = () 

538 

539 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

540 if isinstance(element, str): 

541 return element 

542 else: 

543 self._raise_for_expected(element, argname, resolved) 

544 

545 def _literal_coercion(self, element, **kw): 

546 return element 

547 

548 

549class _ColumnCoercions(RoleImpl): 

550 __slots__ = () 

551 

552 def _warn_for_scalar_subquery_coercion(self): 

553 util.warn( 

554 "implicitly coercing SELECT object to scalar subquery; " 

555 "please use the .scalar_subquery() method to produce a scalar " 

556 "subquery.", 

557 ) 

558 

559 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

560 original_element = element 

561 if not getattr(resolved, "is_clause_element", False): 

562 self._raise_for_expected(original_element, argname, resolved) 

563 elif resolved._is_select_base: 

564 self._warn_for_scalar_subquery_coercion() 

565 return resolved.scalar_subquery() 

566 elif resolved._is_from_clause and isinstance( 

567 resolved, selectable.Subquery 

568 ): 

569 self._warn_for_scalar_subquery_coercion() 

570 return resolved.element.scalar_subquery() 

571 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

572 return resolved 

573 else: 

574 self._raise_for_expected(original_element, argname, resolved) 

575 

576 

577def _no_text_coercion( 

578 element: Any, 

579 argname: Optional[str] = None, 

580 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

581 extra: Optional[str] = None, 

582 err: Optional[Exception] = None, 

583) -> NoReturn: 

584 raise exc_cls( 

585 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

586 "explicitly declared as text(%(expr)r)" 

587 % { 

588 "expr": util.ellipses_string(element), 

589 "argname": "for argument %s" % (argname,) if argname else "", 

590 "extra": "%s " % extra if extra else "", 

591 } 

592 ) from err 

593 

594 

595class _NoTextCoercion(RoleImpl): 

596 __slots__ = () 

597 

598 def _literal_coercion(self, element, *, argname=None, **kw): 

599 if isinstance(element, str) and issubclass( 

600 elements.TextClause, self._role_class 

601 ): 

602 _no_text_coercion(element, argname) 

603 else: 

604 self._raise_for_expected(element, argname) 

605 

606 

607class _CoerceLiterals(RoleImpl): 

608 __slots__ = () 

609 _coerce_consts = False 

610 _coerce_star = False 

611 _coerce_numerics = False 

612 

613 def _text_coercion(self, element, argname=None): 

614 return _no_text_coercion(element, argname) 

615 

616 def _literal_coercion(self, element, *, argname=None, **kw): 

617 if isinstance(element, str): 

618 if self._coerce_star and element == "*": 

619 return elements.ColumnClause("*", is_literal=True) 

620 else: 

621 return self._text_coercion(element, argname, **kw) 

622 

623 if self._coerce_consts: 

624 if element is None: 

625 return elements.Null() 

626 elif element is False: 

627 return elements.False_() 

628 elif element is True: 

629 return elements.True_() 

630 

631 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

632 return elements.ColumnClause(str(element), is_literal=True) 

633 

634 self._raise_for_expected(element, argname) 

635 

636 

637class LiteralValueImpl(RoleImpl): 

638 _resolve_literal_only = True 

639 

640 def _implicit_coercions( 

641 self, 

642 element, 

643 resolved, 

644 argname=None, 

645 *, 

646 type_=None, 

647 literal_execute=False, 

648 **kw, 

649 ): 

650 if not _is_literal(resolved): 

651 self._raise_for_expected( 

652 element, resolved=resolved, argname=argname, **kw 

653 ) 

654 

655 return elements.BindParameter( 

656 None, 

657 element, 

658 type_=type_, 

659 unique=True, 

660 literal_execute=literal_execute, 

661 ) 

662 

663 def _literal_coercion(self, element, **kw): 

664 return element 

665 

666 

667class _SelectIsNotFrom(RoleImpl): 

668 __slots__ = () 

669 

670 def _raise_for_expected( 

671 self, 

672 element: Any, 

673 argname: Optional[str] = None, 

674 resolved: Optional[Any] = None, 

675 *, 

676 advice: Optional[str] = None, 

677 code: Optional[str] = None, 

678 err: Optional[Exception] = None, 

679 **kw: Any, 

680 ) -> NoReturn: 

681 if ( 

682 not advice 

683 and isinstance(element, roles.SelectStatementRole) 

684 or isinstance(resolved, roles.SelectStatementRole) 

685 ): 

686 advice = ( 

687 "To create a " 

688 "FROM clause from a %s object, use the .subquery() method." 

689 % (resolved.__class__ if resolved is not None else element,) 

690 ) 

691 code = "89ve" 

692 else: 

693 code = None 

694 

695 super()._raise_for_expected( 

696 element, 

697 argname=argname, 

698 resolved=resolved, 

699 advice=advice, 

700 code=code, 

701 err=err, 

702 **kw, 

703 ) 

704 # never reached 

705 assert False 

706 

707 

708class HasCacheKeyImpl(RoleImpl): 

709 __slots__ = () 

710 

711 def _implicit_coercions( 

712 self, 

713 element: Any, 

714 resolved: Any, 

715 argname: Optional[str] = None, 

716 **kw: Any, 

717 ) -> Any: 

718 if isinstance(element, HasCacheKey): 

719 return element 

720 else: 

721 self._raise_for_expected(element, argname, resolved) 

722 

723 def _literal_coercion(self, element, **kw): 

724 return element 

725 

726 

727class ExecutableOptionImpl(RoleImpl): 

728 __slots__ = () 

729 

730 def _implicit_coercions( 

731 self, 

732 element: Any, 

733 resolved: Any, 

734 argname: Optional[str] = None, 

735 **kw: Any, 

736 ) -> Any: 

737 if isinstance(element, ExecutableOption): 

738 return element 

739 else: 

740 self._raise_for_expected(element, argname, resolved) 

741 

742 def _literal_coercion(self, element, **kw): 

743 return element 

744 

745 

746class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

747 __slots__ = () 

748 

749 def _literal_coercion( 

750 self, element, *, name=None, type_=None, is_crud=False, **kw 

751 ): 

752 if ( 

753 element is None 

754 and not is_crud 

755 and (type_ is None or not type_.should_evaluate_none) 

756 ): 

757 # TODO: there's no test coverage now for the 

758 # "should_evaluate_none" part of this, as outside of "crud" this 

759 # codepath is not normally used except in some special cases 

760 return elements.Null() 

761 else: 

762 try: 

763 return elements.BindParameter( 

764 name, element, type_, unique=True, _is_crud=is_crud 

765 ) 

766 except exc.ArgumentError as err: 

767 self._raise_for_expected(element, err=err) 

768 

769 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

770 # select uses implicit coercion with warning instead of raising 

771 if isinstance(element, selectable.Values): 

772 advice = ( 

773 "To create a column expression from a VALUES clause, " 

774 "use the .scalar_values() method." 

775 ) 

776 elif isinstance(element, roles.AnonymizedFromClauseRole): 

777 advice = ( 

778 "To create a column expression from a FROM clause row " 

779 "as a whole, use the .table_valued() method." 

780 ) 

781 else: 

782 advice = None 

783 

784 return super()._raise_for_expected( 

785 element, argname=argname, resolved=resolved, advice=advice, **kw 

786 ) 

787 

788 

789class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

790 __slots__ = () 

791 

792 def _literal_coercion( # type: ignore[override] 

793 self, 

794 element, 

795 *, 

796 expr, 

797 operator, 

798 bindparam_type=None, 

799 argname=None, 

800 **kw, 

801 ): 

802 try: 

803 return expr._bind_param(operator, element, type_=bindparam_type) 

804 except exc.ArgumentError as err: 

805 self._raise_for_expected(element, err=err) 

806 

807 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw): 

808 if resolved.type._isnull and not expr.type._isnull: 

809 resolved = resolved._with_binary_element_type( 

810 bindparam_type if bindparam_type is not None else expr.type 

811 ) 

812 return resolved 

813 

814 

815class InElementImpl(RoleImpl): 

816 __slots__ = () 

817 

818 def _implicit_coercions( 

819 self, 

820 element: Any, 

821 resolved: Any, 

822 argname: Optional[str] = None, 

823 **kw: Any, 

824 ) -> Any: 

825 if resolved._is_from_clause: 

826 if ( 

827 isinstance(resolved, selectable.Alias) 

828 and resolved.element._is_select_base 

829 ): 

830 self._warn_for_implicit_coercion(resolved) 

831 return self._post_coercion(resolved.element, **kw) 

832 else: 

833 self._warn_for_implicit_coercion(resolved) 

834 return self._post_coercion(resolved.select(), **kw) 

835 else: 

836 self._raise_for_expected(element, argname, resolved) 

837 

838 def _warn_for_implicit_coercion(self, elem): 

839 util.warn( 

840 "Coercing %s object into a select() for use in IN(); " 

841 "please pass a select() construct explicitly" 

842 % (elem.__class__.__name__) 

843 ) 

844 

845 @util.preload_module("sqlalchemy.sql.elements") 

846 def _literal_coercion(self, element, *, expr, operator, **kw): 

847 if util.is_non_string_iterable(element): 

848 non_literal_expressions: Dict[ 

849 Optional[_ColumnExpressionArgument[Any]], 

850 _ColumnExpressionArgument[Any], 

851 ] = {} 

852 element = list(element) 

853 for o in element: 

854 if not _is_literal(o): 

855 if not isinstance( 

856 o, util.preloaded.sql_elements.ColumnElement 

857 ) and not hasattr(o, "__clause_element__"): 

858 self._raise_for_expected(element, **kw) 

859 

860 else: 

861 non_literal_expressions[o] = o 

862 

863 if non_literal_expressions: 

864 return elements.ClauseList( 

865 *[ 

866 ( 

867 non_literal_expressions[o] 

868 if o in non_literal_expressions 

869 else expr._bind_param(operator, o) 

870 ) 

871 for o in element 

872 ] 

873 ) 

874 else: 

875 return expr._bind_param(operator, element, expanding=True) 

876 

877 else: 

878 self._raise_for_expected(element, **kw) 

879 

880 def _post_coercion(self, element, *, expr, operator, **kw): 

881 if element._is_select_base: 

882 # for IN, we are doing scalar_subquery() coercion without 

883 # a warning 

884 return element.scalar_subquery() 

885 elif isinstance(element, elements.ClauseList): 

886 assert not len(element.clauses) == 0 

887 return element.self_group(against=operator) 

888 

889 elif isinstance(element, elements.BindParameter): 

890 element = element._clone(maintain_key=True) 

891 element.expanding = True 

892 element.expand_op = operator 

893 

894 return element 

895 elif isinstance(element, selectable.Values): 

896 return element.scalar_values() 

897 else: 

898 return element 

899 

900 

901class OnClauseImpl(_ColumnCoercions, RoleImpl): 

902 __slots__ = () 

903 

904 _coerce_consts = True 

905 

906 def _literal_coercion(self, element, **kw): 

907 self._raise_for_expected(element) 

908 

909 def _post_coercion(self, resolved, *, original_element=None, **kw): 

910 # this is a hack right now as we want to use coercion on an 

911 # ORM InstrumentedAttribute, but we want to return the object 

912 # itself if it is one, not its clause element. 

913 # ORM context _join and _legacy_join() would need to be improved 

914 # to look for annotations in a clause element form. 

915 if isinstance(original_element, roles.JoinTargetRole): 

916 return original_element 

917 return resolved 

918 

919 

920class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

921 __slots__ = () 

922 

923 _coerce_consts = True 

924 

925 def _text_coercion(self, element, argname=None): 

926 return _no_text_coercion(element, argname) 

927 

928 

929class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

930 __slots__ = () 

931 

932 _coerce_consts = True 

933 

934 def _text_coercion(self, element, argname=None): 

935 return elements.TextClause(element) 

936 

937 

938class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

939 __slots__ = () 

940 

941 

942class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

943 __slots__ = () 

944 

945 

946class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

947 __slots__ = () 

948 

949 def _text_coercion(self, element, argname=None): 

950 return elements.ColumnClause(element) 

951 

952 

953class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

954 __slots__ = () 

955 

956 _coerce_consts = True 

957 

958 def _text_coercion(self, element, argname=None): 

959 return elements._textual_label_reference(element) 

960 

961 

962class OrderByImpl(ByOfImpl, RoleImpl): 

963 __slots__ = () 

964 

965 def _post_coercion(self, resolved, **kw): 

966 if ( 

967 isinstance(resolved, self._role_class) 

968 and resolved._order_by_label_element is not None 

969 ): 

970 return elements._label_reference(resolved) 

971 else: 

972 return resolved 

973 

974 

975class GroupByImpl(ByOfImpl, RoleImpl): 

976 __slots__ = () 

977 

978 def _implicit_coercions( 

979 self, 

980 element: Any, 

981 resolved: Any, 

982 argname: Optional[str] = None, 

983 **kw: Any, 

984 ) -> Any: 

985 if is_from_clause(resolved): 

986 return elements.ClauseList(*resolved.c) 

987 else: 

988 return resolved 

989 

990 

991class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

992 __slots__ = () 

993 

994 def _post_coercion(self, element, *, as_key=False, **kw): 

995 if as_key: 

996 return element.key 

997 else: 

998 return element 

999 

1000 

1001class ConstExprImpl(RoleImpl): 

1002 __slots__ = () 

1003 

1004 def _literal_coercion(self, element, *, argname=None, **kw): 

1005 if element is None: 

1006 return elements.Null() 

1007 elif element is False: 

1008 return elements.False_() 

1009 elif element is True: 

1010 return elements.True_() 

1011 else: 

1012 self._raise_for_expected(element, argname) 

1013 

1014 

1015class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1016 __slots__ = () 

1017 

1018 def _implicit_coercions( 

1019 self, 

1020 element: Any, 

1021 resolved: Any, 

1022 argname: Optional[str] = None, 

1023 **kw: Any, 

1024 ) -> Any: 

1025 if isinstance(element, str): 

1026 return resolved 

1027 else: 

1028 self._raise_for_expected(element, argname, resolved) 

1029 

1030 def _literal_coercion(self, element, **kw): 

1031 """coerce the given value to :class:`._truncated_label`. 

1032 

1033 Existing :class:`._truncated_label` and 

1034 :class:`._anonymous_label` objects are passed 

1035 unchanged. 

1036 """ 

1037 

1038 if isinstance(element, elements._truncated_label): 

1039 return element 

1040 else: 

1041 return elements._truncated_label(element) 

1042 

1043 

1044class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1045 __slots__ = () 

1046 

1047 _coerce_consts = True 

1048 

1049 def _text_coercion(self, element, argname=None): 

1050 # see #5754 for why we can't easily deprecate this coercion. 

1051 # essentially expressions like postgresql_where would have to be 

1052 # text() as they come back from reflection and we don't want to 

1053 # have text() elements wired into the inspection dictionaries. 

1054 return elements.TextClause(element) 

1055 

1056 

1057class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1058 __slots__ = () 

1059 

1060 

1061class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1062 __slots__ = () 

1063 

1064 

1065class LimitOffsetImpl(RoleImpl): 

1066 __slots__ = () 

1067 

1068 def _implicit_coercions( 

1069 self, 

1070 element: Any, 

1071 resolved: Any, 

1072 argname: Optional[str] = None, 

1073 **kw: Any, 

1074 ) -> Any: 

1075 if resolved is None: 

1076 return None 

1077 else: 

1078 self._raise_for_expected(element, argname, resolved) 

1079 

1080 def _literal_coercion( # type: ignore[override] 

1081 self, element, *, name, type_, **kw 

1082 ): 

1083 if element is None: 

1084 return None 

1085 else: 

1086 value = util.asint(element) 

1087 return selectable._OffsetLimitParam( 

1088 name, value, type_=type_, unique=True 

1089 ) 

1090 

1091 

1092class LabeledColumnExprImpl(ExpressionElementImpl): 

1093 __slots__ = () 

1094 

1095 def _implicit_coercions( 

1096 self, 

1097 element: Any, 

1098 resolved: Any, 

1099 argname: Optional[str] = None, 

1100 **kw: Any, 

1101 ) -> Any: 

1102 if isinstance(resolved, roles.ExpressionElementRole): 

1103 return resolved.label(None) 

1104 else: 

1105 new = super()._implicit_coercions( 

1106 element, resolved, argname=argname, **kw 

1107 ) 

1108 if isinstance(new, roles.ExpressionElementRole): 

1109 return new.label(None) 

1110 else: 

1111 self._raise_for_expected(element, argname, resolved) 

1112 

1113 

1114class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1115 __slots__ = () 

1116 

1117 _coerce_consts = True 

1118 _coerce_numerics = True 

1119 _coerce_star = True 

1120 

1121 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1122 

1123 def _raise_for_expected( 

1124 self, element, argname=None, resolved=None, *, advice=None, **kw 

1125 ): 

1126 if not advice and isinstance(element, list): 

1127 advice = ( 

1128 f"Did you mean to say select(" 

1129 f"{', '.join(repr(e) for e in element)})?" 

1130 ) 

1131 

1132 return super()._raise_for_expected( 

1133 element, argname=argname, resolved=resolved, advice=advice, **kw 

1134 ) 

1135 

1136 def _text_coercion(self, element, argname=None): 

1137 element = str(element) 

1138 

1139 guess_is_literal = not self._guess_straight_column.match(element) 

1140 raise exc.ArgumentError( 

1141 "Textual column expression %(column)r %(argname)sshould be " 

1142 "explicitly declared with text(%(column)r), " 

1143 "or use %(literal_column)s(%(column)r) " 

1144 "for more specificity" 

1145 % { 

1146 "column": util.ellipses_string(element), 

1147 "argname": "for argument %s" % (argname,) if argname else "", 

1148 "literal_column": ( 

1149 "literal_column" if guess_is_literal else "column" 

1150 ), 

1151 } 

1152 ) 

1153 

1154 

1155class ReturnsRowsImpl(RoleImpl): 

1156 __slots__ = () 

1157 

1158 

1159class StatementImpl(_CoerceLiterals, RoleImpl): 

1160 __slots__ = () 

1161 

1162 def _post_coercion( 

1163 self, resolved, *, original_element, argname=None, **kw 

1164 ): 

1165 if resolved is not original_element and not isinstance( 

1166 original_element, str 

1167 ): 

1168 # use same method as Connection uses; this will later raise 

1169 # ObjectNotExecutableError 

1170 try: 

1171 original_element._execute_on_connection 

1172 except AttributeError: 

1173 util.warn_deprecated( 

1174 "Object %r should not be used directly in a SQL statement " 

1175 "context, such as passing to methods such as " 

1176 "session.execute(). This usage will be disallowed in a " 

1177 "future release. " 

1178 "Please use Core select() / update() / delete() etc. " 

1179 "with Session.execute() and other statement execution " 

1180 "methods." % original_element, 

1181 "1.4", 

1182 ) 

1183 

1184 return resolved 

1185 

1186 def _implicit_coercions( 

1187 self, 

1188 element: Any, 

1189 resolved: Any, 

1190 argname: Optional[str] = None, 

1191 **kw: Any, 

1192 ) -> Any: 

1193 if resolved._is_lambda_element: 

1194 return resolved 

1195 else: 

1196 return super()._implicit_coercions( 

1197 element, resolved, argname=argname, **kw 

1198 ) 

1199 

1200 

1201class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1202 __slots__ = () 

1203 

1204 def _implicit_coercions( 

1205 self, 

1206 element: Any, 

1207 resolved: Any, 

1208 argname: Optional[str] = None, 

1209 **kw: Any, 

1210 ) -> Any: 

1211 if resolved._is_text_clause: 

1212 return resolved.columns() 

1213 else: 

1214 self._raise_for_expected(element, argname, resolved) 

1215 

1216 

1217class HasCTEImpl(ReturnsRowsImpl): 

1218 __slots__ = () 

1219 

1220 

1221class IsCTEImpl(RoleImpl): 

1222 __slots__ = () 

1223 

1224 

1225class JoinTargetImpl(RoleImpl): 

1226 __slots__ = () 

1227 

1228 _skip_clauseelement_for_target_match = True 

1229 

1230 def _literal_coercion(self, element, *, argname=None, **kw): 

1231 self._raise_for_expected(element, argname) 

1232 

1233 def _implicit_coercions( 

1234 self, 

1235 element: Any, 

1236 resolved: Any, 

1237 argname: Optional[str] = None, 

1238 *, 

1239 legacy: bool = False, 

1240 **kw: Any, 

1241 ) -> Any: 

1242 if isinstance(element, roles.JoinTargetRole): 

1243 # note that this codepath no longer occurs as of 

1244 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1245 # were set to False. 

1246 return element 

1247 elif legacy and resolved._is_select_base: 

1248 util.warn_deprecated( 

1249 "Implicit coercion of SELECT and textual SELECT " 

1250 "constructs into FROM clauses is deprecated; please call " 

1251 ".subquery() on any Core select or ORM Query object in " 

1252 "order to produce a subquery object.", 

1253 version="1.4", 

1254 ) 

1255 # TODO: doing _implicit_subquery here causes tests to fail, 

1256 # how was this working before? probably that ORM 

1257 # join logic treated it as a select and subquery would happen 

1258 # in _ORMJoin->Join 

1259 return resolved 

1260 else: 

1261 self._raise_for_expected(element, argname, resolved) 

1262 

1263 

1264class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1265 __slots__ = () 

1266 

1267 def _implicit_coercions( 

1268 self, 

1269 element: Any, 

1270 resolved: Any, 

1271 argname: Optional[str] = None, 

1272 *, 

1273 explicit_subquery: bool = False, 

1274 allow_select: bool = True, 

1275 **kw: Any, 

1276 ) -> Any: 

1277 if resolved._is_select_base: 

1278 if explicit_subquery: 

1279 return resolved.subquery() 

1280 elif allow_select: 

1281 util.warn_deprecated( 

1282 "Implicit coercion of SELECT and textual SELECT " 

1283 "constructs into FROM clauses is deprecated; please call " 

1284 ".subquery() on any Core select or ORM Query object in " 

1285 "order to produce a subquery object.", 

1286 version="1.4", 

1287 ) 

1288 return resolved._implicit_subquery 

1289 elif resolved._is_text_clause: 

1290 return resolved 

1291 else: 

1292 self._raise_for_expected(element, argname, resolved) 

1293 

1294 def _post_coercion(self, element, *, deannotate=False, **kw): 

1295 if deannotate: 

1296 return element._deannotate() 

1297 else: 

1298 return element 

1299 

1300 

1301class StrictFromClauseImpl(FromClauseImpl): 

1302 __slots__ = () 

1303 

1304 def _implicit_coercions( 

1305 self, 

1306 element: Any, 

1307 resolved: Any, 

1308 argname: Optional[str] = None, 

1309 *, 

1310 allow_select: bool = False, 

1311 **kw: Any, 

1312 ) -> Any: 

1313 if resolved._is_select_base and allow_select: 

1314 util.warn_deprecated( 

1315 "Implicit coercion of SELECT and textual SELECT constructs " 

1316 "into FROM clauses is deprecated; please call .subquery() " 

1317 "on any Core select or ORM Query object in order to produce a " 

1318 "subquery object.", 

1319 version="1.4", 

1320 ) 

1321 return resolved._implicit_subquery 

1322 else: 

1323 self._raise_for_expected(element, argname, resolved) 

1324 

1325 

1326class AnonymizedFromClauseImpl(StrictFromClauseImpl): 

1327 __slots__ = () 

1328 

1329 def _post_coercion(self, element, *, flat=False, name=None, **kw): 

1330 assert name is None 

1331 

1332 return element._anonymous_fromclause(flat=flat) 

1333 

1334 

1335class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1336 __slots__ = () 

1337 

1338 def _post_coercion(self, element, **kw): 

1339 if "dml_table" in element._annotations: 

1340 return element._annotations["dml_table"] 

1341 else: 

1342 return element 

1343 

1344 

1345class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1346 __slots__ = () 

1347 

1348 def _implicit_coercions( 

1349 self, 

1350 element: Any, 

1351 resolved: Any, 

1352 argname: Optional[str] = None, 

1353 **kw: Any, 

1354 ) -> Any: 

1355 if resolved._is_from_clause: 

1356 if ( 

1357 isinstance(resolved, selectable.Alias) 

1358 and resolved.element._is_select_base 

1359 ): 

1360 return resolved.element 

1361 else: 

1362 return resolved.select() 

1363 else: 

1364 self._raise_for_expected(element, argname, resolved) 

1365 

1366 

1367class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1368 __slots__ = () 

1369 

1370 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1371 if isinstance(element, roles.FromClauseRole): 

1372 if element._is_subquery: 

1373 advice = ( 

1374 "Use the plain select() object without " 

1375 "calling .subquery() or .alias()." 

1376 ) 

1377 else: 

1378 advice = ( 

1379 "To SELECT from any FROM clause, use the .select() method." 

1380 ) 

1381 else: 

1382 advice = None 

1383 return super()._raise_for_expected( 

1384 element, argname=argname, resolved=resolved, advice=advice, **kw 

1385 ) 

1386 

1387 

1388_impl_lookup = {} 

1389 

1390 

1391for name in dir(roles): 

1392 cls = getattr(roles, name) 

1393 if name.endswith("Role"): 

1394 name = name.replace("Role", "Impl") 

1395 if name in globals(): 

1396 impl = globals()[name](cls) 

1397 _impl_lookup[cls] = impl 

1398 

1399if not TYPE_CHECKING: 

1400 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1401 

1402 for py_type in (int, bool, str, float): 

1403 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl