Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

576 statements  

1# sql/coercions.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import collections.abc as collections_abc 

12import numbers 

13import re 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import Iterator 

21from typing import List 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Sequence 

26from typing import Tuple 

27from typing import Type 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import operators 

33from . import roles 

34from . import visitors 

35from ._typing import is_from_clause 

36from .base import ExecutableOption 

37from .base import Options 

38from .cache_key import HasCacheKey 

39from .visitors import Visitable 

40from .. import exc 

41from .. import inspection 

42from .. import util 

43from ..util.typing import Literal 

44 

45if typing.TYPE_CHECKING: 

46 # elements lambdas schema selectable are set by __init__ 

47 from . import elements 

48 from . import lambdas 

49 from . import schema 

50 from . import selectable 

51 from ._typing import _ColumnExpressionArgument 

52 from ._typing import _ColumnsClauseArgument 

53 from ._typing import _DDLColumnArgument 

54 from ._typing import _DMLTableArgument 

55 from ._typing import _FromClauseArgument 

56 from .dml import _DMLTableElement 

57 from .elements import BindParameter 

58 from .elements import ClauseElement 

59 from .elements import ColumnClause 

60 from .elements import ColumnElement 

61 from .elements import DQLDMLClauseElement 

62 from .elements import NamedColumn 

63 from .elements import SQLCoreOperations 

64 from .schema import Column 

65 from .selectable import _ColumnsClauseElement 

66 from .selectable import _JoinTargetProtocol 

67 from .selectable import FromClause 

68 from .selectable import HasCTE 

69 from .selectable import SelectBase 

70 from .selectable import Subquery 

71 from .visitors import _TraverseCallableType 

72 

73_SR = TypeVar("_SR", bound=roles.SQLRole) 

74_F = TypeVar("_F", bound=Callable[..., Any]) 

75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole) 

76_T = TypeVar("_T", bound=Any) 

77 

78 

79def _is_literal(element): 

80 """Return whether or not the element is a "literal" in the context 

81 of a SQL expression construct. 

82 

83 """ 

84 

85 return not isinstance( 

86 element, 

87 (Visitable, schema.SchemaEventTarget), 

88 ) and not hasattr(element, "__clause_element__") 

89 

90 

91def _deep_is_literal(element): 

92 """Return whether or not the element is a "literal" in the context 

93 of a SQL expression construct. 

94 

95 does a deeper more esoteric check than _is_literal. is used 

96 for lambda elements that have to distinguish values that would 

97 be bound vs. not without any context. 

98 

99 """ 

100 

101 if isinstance(element, collections_abc.Sequence) and not isinstance( 

102 element, str 

103 ): 

104 for elem in element: 

105 if not _deep_is_literal(elem): 

106 return False 

107 else: 

108 return True 

109 

110 return ( 

111 not isinstance( 

112 element, 

113 ( 

114 Visitable, 

115 schema.SchemaEventTarget, 

116 HasCacheKey, 

117 Options, 

118 util.langhelpers.symbol, 

119 ), 

120 ) 

121 and not hasattr(element, "__clause_element__") 

122 and ( 

123 not isinstance(element, type) 

124 or not issubclass(element, HasCacheKey) 

125 ) 

126 ) 

127 

128 

129def _document_text_coercion( 

130 paramname: str, meth_rst: str, param_rst: str 

131) -> Callable[[_F], _F]: 

132 return util.add_parameter_text( 

133 paramname, 

134 ( 

135 ".. warning:: " 

136 "The %s argument to %s can be passed as a Python string argument, " 

137 "which will be treated " 

138 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

139 "UNTRUSTED INPUT TO THIS PARAMETER**." 

140 ) 

141 % (param_rst, meth_rst), 

142 ) 

143 

144 

145def _expression_collection_was_a_list( 

146 attrname: str, 

147 fnname: str, 

148 args: Union[Sequence[_T], Sequence[Sequence[_T]]], 

149) -> Sequence[_T]: 

150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

151 if isinstance(args[0], list): 

152 raise exc.ArgumentError( 

153 f'The "{attrname}" argument to {fnname}(), when ' 

154 "referring to a sequence " 

155 "of items, is now passed as a series of positional " 

156 "elements, rather than as a list. " 

157 ) 

158 return cast("Sequence[_T]", args[0]) 

159 

160 return cast("Sequence[_T]", args) 

161 

162 

163@overload 

164def expect( 

165 role: Type[roles.TruncatedLabelRole], 

166 element: Any, 

167 **kw: Any, 

168) -> str: ... 

169 

170 

171@overload 

172def expect( 

173 role: Type[roles.DMLColumnRole], 

174 element: Any, 

175 *, 

176 as_key: Literal[True] = ..., 

177 **kw: Any, 

178) -> str: ... 

179 

180 

181@overload 

182def expect( 

183 role: Type[roles.LiteralValueRole], 

184 element: Any, 

185 **kw: Any, 

186) -> BindParameter[Any]: ... 

187 

188 

189@overload 

190def expect( 

191 role: Type[roles.DDLReferredColumnRole], 

192 element: Any, 

193 **kw: Any, 

194) -> Column[Any]: ... 

195 

196 

197@overload 

198def expect( 

199 role: Type[roles.DDLConstraintColumnRole], 

200 element: Any, 

201 **kw: Any, 

202) -> Union[Column[Any], str]: ... 

203 

204 

205@overload 

206def expect( 

207 role: Type[roles.StatementOptionRole], 

208 element: Any, 

209 **kw: Any, 

210) -> DQLDMLClauseElement: ... 

211 

212 

213@overload 

214def expect( 

215 role: Type[roles.LabeledColumnExprRole[Any]], 

216 element: _ColumnExpressionArgument[_T], 

217 **kw: Any, 

218) -> NamedColumn[_T]: ... 

219 

220 

221@overload 

222def expect( 

223 role: Union[ 

224 Type[roles.ExpressionElementRole[Any]], 

225 Type[roles.LimitOffsetRole], 

226 Type[roles.WhereHavingRole], 

227 ], 

228 element: _ColumnExpressionArgument[_T], 

229 **kw: Any, 

230) -> ColumnElement[_T]: ... 

231 

232 

233@overload 

234def expect( 

235 role: Union[ 

236 Type[roles.ExpressionElementRole[Any]], 

237 Type[roles.LimitOffsetRole], 

238 Type[roles.WhereHavingRole], 

239 Type[roles.OnClauseRole], 

240 Type[roles.ColumnArgumentRole], 

241 ], 

242 element: Any, 

243 **kw: Any, 

244) -> ColumnElement[Any]: ... 

245 

246 

247@overload 

248def expect( 

249 role: Type[roles.DMLTableRole], 

250 element: _DMLTableArgument, 

251 **kw: Any, 

252) -> _DMLTableElement: ... 

253 

254 

255@overload 

256def expect( 

257 role: Type[roles.HasCTERole], 

258 element: HasCTE, 

259 **kw: Any, 

260) -> HasCTE: ... 

261 

262 

263@overload 

264def expect( 

265 role: Type[roles.SelectStatementRole], 

266 element: SelectBase, 

267 **kw: Any, 

268) -> SelectBase: ... 

269 

270 

271@overload 

272def expect( 

273 role: Type[roles.FromClauseRole], 

274 element: _FromClauseArgument, 

275 **kw: Any, 

276) -> FromClause: ... 

277 

278 

279@overload 

280def expect( 

281 role: Type[roles.FromClauseRole], 

282 element: SelectBase, 

283 *, 

284 explicit_subquery: Literal[True] = ..., 

285 **kw: Any, 

286) -> Subquery: ... 

287 

288 

289@overload 

290def expect( 

291 role: Type[roles.ColumnsClauseRole], 

292 element: _ColumnsClauseArgument[Any], 

293 **kw: Any, 

294) -> _ColumnsClauseElement: ... 

295 

296 

297@overload 

298def expect( 

299 role: Type[roles.JoinTargetRole], 

300 element: _JoinTargetProtocol, 

301 **kw: Any, 

302) -> _JoinTargetProtocol: ... 

303 

304 

305# catchall for not-yet-implemented overloads 

306@overload 

307def expect( 

308 role: Type[_SR], 

309 element: Any, 

310 **kw: Any, 

311) -> Any: ... 

312 

313 

314def expect( 

315 role: Type[_SR], 

316 element: Any, 

317 *, 

318 apply_propagate_attrs: Optional[ClauseElement] = None, 

319 argname: Optional[str] = None, 

320 post_inspect: bool = False, 

321 disable_inspection: bool = False, 

322 **kw: Any, 

323) -> Any: 

324 if ( 

325 role.allows_lambda 

326 # note callable() will not invoke a __getattr__() method, whereas 

327 # hasattr(obj, "__call__") will. by keeping the callable() check here 

328 # we prevent most needless calls to hasattr() and therefore 

329 # __getattr__(), which is present on ColumnElement. 

330 and callable(element) 

331 and hasattr(element, "__code__") 

332 ): 

333 return lambdas.LambdaElement( 

334 element, 

335 role, 

336 lambdas.LambdaOptions(**kw), 

337 apply_propagate_attrs=apply_propagate_attrs, 

338 ) 

339 

340 # major case is that we are given a ClauseElement already, skip more 

341 # elaborate logic up front if possible 

342 impl = _impl_lookup[role] 

343 

344 original_element = element 

345 

346 if not isinstance( 

347 element, 

348 ( 

349 elements.CompilerElement, 

350 schema.SchemaItem, 

351 schema.FetchedValue, 

352 lambdas.PyWrapper, 

353 ), 

354 ): 

355 resolved = None 

356 

357 if impl._resolve_literal_only: 

358 resolved = impl._literal_coercion(element, **kw) 

359 else: 

360 original_element = element 

361 

362 is_clause_element = False 

363 

364 # this is a special performance optimization for ORM 

365 # joins used by JoinTargetImpl that we don't go through the 

366 # work of creating __clause_element__() when we only need the 

367 # original QueryableAttribute, as the former will do clause 

368 # adaption and all that which is just thrown away here. 

369 if ( 

370 impl._skip_clauseelement_for_target_match 

371 and isinstance(element, role) 

372 and hasattr(element, "__clause_element__") 

373 ): 

374 is_clause_element = True 

375 else: 

376 while hasattr(element, "__clause_element__"): 

377 is_clause_element = True 

378 

379 if not getattr(element, "is_clause_element", False): 

380 element = element.__clause_element__() 

381 else: 

382 break 

383 

384 if not is_clause_element: 

385 if impl._use_inspection and not disable_inspection: 

386 insp = inspection.inspect(element, raiseerr=False) 

387 if insp is not None: 

388 if post_inspect: 

389 insp._post_inspect 

390 try: 

391 resolved = insp.__clause_element__() 

392 except AttributeError: 

393 impl._raise_for_expected(original_element, argname) 

394 

395 if resolved is None: 

396 resolved = impl._literal_coercion( 

397 element, argname=argname, **kw 

398 ) 

399 else: 

400 resolved = element 

401 elif isinstance(element, lambdas.PyWrapper): 

402 resolved = element._sa__py_wrapper_literal(**kw) 

403 else: 

404 resolved = element 

405 

406 if apply_propagate_attrs is not None: 

407 if typing.TYPE_CHECKING: 

408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement)) 

409 

410 if not apply_propagate_attrs._propagate_attrs and getattr( 

411 resolved, "_propagate_attrs", None 

412 ): 

413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

414 

415 if impl._role_class in resolved.__class__.__mro__: 

416 if impl._post_coercion: 

417 resolved = impl._post_coercion( 

418 resolved, 

419 argname=argname, 

420 original_element=original_element, 

421 **kw, 

422 ) 

423 return resolved 

424 else: 

425 return impl._implicit_coercions( 

426 original_element, resolved, argname=argname, **kw 

427 ) 

428 

429 

430def expect_as_key( 

431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any 

432) -> str: 

433 kw.pop("as_key", None) 

434 return expect(role, element, as_key=True, **kw) 

435 

436 

437def expect_col_expression_collection( 

438 role: Type[roles.DDLConstraintColumnRole], 

439 expressions: Iterable[_DDLColumnArgument], 

440) -> Iterator[ 

441 Tuple[ 

442 Union[str, Column[Any]], 

443 Optional[ColumnClause[Any]], 

444 Optional[str], 

445 Optional[Union[Column[Any], str]], 

446 ] 

447]: 

448 for expr in expressions: 

449 strname = None 

450 column = None 

451 

452 resolved: Union[Column[Any], str] = expect(role, expr) 

453 if isinstance(resolved, str): 

454 assert isinstance(expr, str) 

455 strname = resolved = expr 

456 else: 

457 cols: List[Column[Any]] = [] 

458 col_append: _TraverseCallableType[Column[Any]] = cols.append 

459 visitors.traverse(resolved, {}, {"column": col_append}) 

460 if cols: 

461 column = cols[0] 

462 add_element = column if column is not None else strname 

463 

464 yield resolved, column, strname, add_element 

465 

466 

467class RoleImpl: 

468 __slots__ = ("_role_class", "name", "_use_inspection") 

469 

470 def _literal_coercion(self, element, **kw): 

471 raise NotImplementedError() 

472 

473 _post_coercion: Any = None 

474 _resolve_literal_only = False 

475 _skip_clauseelement_for_target_match = False 

476 

477 def __init__(self, role_class): 

478 self._role_class = role_class 

479 self.name = role_class._role_name 

480 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

481 

482 def _implicit_coercions( 

483 self, 

484 element: Any, 

485 resolved: Any, 

486 argname: Optional[str] = None, 

487 **kw: Any, 

488 ) -> Any: 

489 self._raise_for_expected(element, argname, resolved) 

490 

491 def _raise_for_expected( 

492 self, 

493 element: Any, 

494 argname: Optional[str] = None, 

495 resolved: Optional[Any] = None, 

496 *, 

497 advice: Optional[str] = None, 

498 code: Optional[str] = None, 

499 err: Optional[Exception] = None, 

500 **kw: Any, 

501 ) -> NoReturn: 

502 if resolved is not None and resolved is not element: 

503 got = "%r object resolved from %r object" % (resolved, element) 

504 else: 

505 got = repr(element) 

506 

507 if argname: 

508 msg = "%s expected for argument %r; got %s." % ( 

509 self.name, 

510 argname, 

511 got, 

512 ) 

513 else: 

514 msg = "%s expected, got %s." % (self.name, got) 

515 

516 if advice: 

517 msg += " " + advice 

518 

519 raise exc.ArgumentError(msg, code=code) from err 

520 

521 

522class _Deannotate: 

523 __slots__ = () 

524 

525 def _post_coercion(self, resolved, **kw): 

526 from .util import _deep_deannotate 

527 

528 return _deep_deannotate(resolved) 

529 

530 

531class _StringOnly: 

532 __slots__ = () 

533 

534 _resolve_literal_only = True 

535 

536 

537class _ReturnsStringKey(RoleImpl): 

538 __slots__ = () 

539 

540 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

541 if isinstance(element, str): 

542 return element 

543 else: 

544 self._raise_for_expected(element, argname, resolved) 

545 

546 def _literal_coercion(self, element, **kw): 

547 return element 

548 

549 

550class _ColumnCoercions(RoleImpl): 

551 __slots__ = () 

552 

553 def _warn_for_scalar_subquery_coercion(self): 

554 util.warn( 

555 "implicitly coercing SELECT object to scalar subquery; " 

556 "please use the .scalar_subquery() method to produce a scalar " 

557 "subquery.", 

558 ) 

559 

560 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

561 original_element = element 

562 if not getattr(resolved, "is_clause_element", False): 

563 self._raise_for_expected(original_element, argname, resolved) 

564 elif resolved._is_select_base: 

565 self._warn_for_scalar_subquery_coercion() 

566 return resolved.scalar_subquery() 

567 elif resolved._is_from_clause and isinstance( 

568 resolved, selectable.Subquery 

569 ): 

570 self._warn_for_scalar_subquery_coercion() 

571 return resolved.element.scalar_subquery() 

572 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

573 return resolved 

574 else: 

575 self._raise_for_expected(original_element, argname, resolved) 

576 

577 

578def _no_text_coercion( 

579 element: Any, 

580 argname: Optional[str] = None, 

581 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError, 

582 extra: Optional[str] = None, 

583 err: Optional[Exception] = None, 

584) -> NoReturn: 

585 raise exc_cls( 

586 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

587 "explicitly declared as text(%(expr)r)" 

588 % { 

589 "expr": util.ellipses_string(element), 

590 "argname": "for argument %s" % (argname,) if argname else "", 

591 "extra": "%s " % extra if extra else "", 

592 } 

593 ) from err 

594 

595 

596class _NoTextCoercion(RoleImpl): 

597 __slots__ = () 

598 

599 def _literal_coercion(self, element, *, argname=None, **kw): 

600 if isinstance(element, str) and issubclass( 

601 elements.TextClause, self._role_class 

602 ): 

603 _no_text_coercion(element, argname) 

604 else: 

605 self._raise_for_expected(element, argname) 

606 

607 

608class _CoerceLiterals(RoleImpl): 

609 __slots__ = () 

610 _coerce_consts = False 

611 _coerce_star = False 

612 _coerce_numerics = False 

613 

614 def _text_coercion(self, element, argname=None): 

615 return _no_text_coercion(element, argname) 

616 

617 def _literal_coercion(self, element, *, argname=None, **kw): 

618 if isinstance(element, str): 

619 if self._coerce_star and element == "*": 

620 return elements.ColumnClause("*", is_literal=True) 

621 else: 

622 return self._text_coercion(element, argname, **kw) 

623 

624 if self._coerce_consts: 

625 if element is None: 

626 return elements.Null() 

627 elif element is False: 

628 return elements.False_() 

629 elif element is True: 

630 return elements.True_() 

631 

632 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

633 return elements.ColumnClause(str(element), is_literal=True) 

634 

635 self._raise_for_expected(element, argname) 

636 

637 

638class LiteralValueImpl(RoleImpl): 

639 _resolve_literal_only = True 

640 

641 def _implicit_coercions( 

642 self, 

643 element, 

644 resolved, 

645 argname=None, 

646 *, 

647 type_=None, 

648 literal_execute=False, 

649 **kw, 

650 ): 

651 if not _is_literal(resolved): 

652 self._raise_for_expected( 

653 element, resolved=resolved, argname=argname, **kw 

654 ) 

655 

656 return elements.BindParameter( 

657 None, 

658 element, 

659 type_=type_, 

660 unique=True, 

661 literal_execute=literal_execute, 

662 ) 

663 

664 def _literal_coercion(self, element, **kw): 

665 return element 

666 

667 

668class _SelectIsNotFrom(RoleImpl): 

669 __slots__ = () 

670 

671 def _raise_for_expected( 

672 self, 

673 element: Any, 

674 argname: Optional[str] = None, 

675 resolved: Optional[Any] = None, 

676 *, 

677 advice: Optional[str] = None, 

678 code: Optional[str] = None, 

679 err: Optional[Exception] = None, 

680 **kw: Any, 

681 ) -> NoReturn: 

682 if ( 

683 not advice 

684 and isinstance(element, roles.SelectStatementRole) 

685 or isinstance(resolved, roles.SelectStatementRole) 

686 ): 

687 advice = ( 

688 "To create a " 

689 "FROM clause from a %s object, use the .subquery() method." 

690 % (resolved.__class__ if resolved is not None else element,) 

691 ) 

692 code = "89ve" 

693 else: 

694 code = None 

695 

696 super()._raise_for_expected( 

697 element, 

698 argname=argname, 

699 resolved=resolved, 

700 advice=advice, 

701 code=code, 

702 err=err, 

703 **kw, 

704 ) 

705 # never reached 

706 assert False 

707 

708 

709class HasCacheKeyImpl(RoleImpl): 

710 __slots__ = () 

711 

712 def _implicit_coercions( 

713 self, 

714 element: Any, 

715 resolved: Any, 

716 argname: Optional[str] = None, 

717 **kw: Any, 

718 ) -> Any: 

719 if isinstance(element, HasCacheKey): 

720 return element 

721 else: 

722 self._raise_for_expected(element, argname, resolved) 

723 

724 def _literal_coercion(self, element, **kw): 

725 return element 

726 

727 

728class ExecutableOptionImpl(RoleImpl): 

729 __slots__ = () 

730 

731 def _implicit_coercions( 

732 self, 

733 element: Any, 

734 resolved: Any, 

735 argname: Optional[str] = None, 

736 **kw: Any, 

737 ) -> Any: 

738 if isinstance(element, ExecutableOption): 

739 return element 

740 else: 

741 self._raise_for_expected(element, argname, resolved) 

742 

743 def _literal_coercion(self, element, **kw): 

744 return element 

745 

746 

747class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

748 __slots__ = () 

749 

750 def _literal_coercion( 

751 self, element, *, name=None, type_=None, is_crud=False, **kw 

752 ): 

753 if ( 

754 element is None 

755 and not is_crud 

756 and (type_ is None or not type_.should_evaluate_none) 

757 ): 

758 # TODO: there's no test coverage now for the 

759 # "should_evaluate_none" part of this, as outside of "crud" this 

760 # codepath is not normally used except in some special cases 

761 return elements.Null() 

762 else: 

763 try: 

764 return elements.BindParameter( 

765 name, element, type_, unique=True, _is_crud=is_crud 

766 ) 

767 except exc.ArgumentError as err: 

768 self._raise_for_expected(element, err=err) 

769 

770 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

771 # select uses implicit coercion with warning instead of raising 

772 if isinstance(element, selectable.Values): 

773 advice = ( 

774 "To create a column expression from a VALUES clause, " 

775 "use the .scalar_values() method." 

776 ) 

777 elif isinstance(element, roles.AnonymizedFromClauseRole): 

778 advice = ( 

779 "To create a column expression from a FROM clause row " 

780 "as a whole, use the .table_valued() method." 

781 ) 

782 else: 

783 advice = None 

784 

785 return super()._raise_for_expected( 

786 element, argname=argname, resolved=resolved, advice=advice, **kw 

787 ) 

788 

789 

790class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

791 __slots__ = () 

792 

793 def _literal_coercion( # type: ignore[override] 

794 self, 

795 element, 

796 *, 

797 expr, 

798 operator, 

799 bindparam_type=None, 

800 argname=None, 

801 **kw, 

802 ): 

803 try: 

804 return expr._bind_param(operator, element, type_=bindparam_type) 

805 except exc.ArgumentError as err: 

806 self._raise_for_expected(element, err=err) 

807 

808 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw): 

809 if resolved.type._isnull and not expr.type._isnull: 

810 resolved = resolved._with_binary_element_type( 

811 bindparam_type if bindparam_type is not None else expr.type 

812 ) 

813 return resolved 

814 

815 

816class InElementImpl(RoleImpl): 

817 __slots__ = () 

818 

819 def _implicit_coercions( 

820 self, 

821 element: Any, 

822 resolved: Any, 

823 argname: Optional[str] = None, 

824 **kw: Any, 

825 ) -> Any: 

826 if resolved._is_from_clause: 

827 if ( 

828 isinstance(resolved, selectable.Alias) 

829 and resolved.element._is_select_base 

830 ): 

831 self._warn_for_implicit_coercion(resolved) 

832 return self._post_coercion(resolved.element, **kw) 

833 else: 

834 self._warn_for_implicit_coercion(resolved) 

835 return self._post_coercion(resolved.select(), **kw) 

836 else: 

837 self._raise_for_expected(element, argname, resolved) 

838 

839 def _warn_for_implicit_coercion(self, elem): 

840 util.warn( 

841 "Coercing %s object into a select() for use in IN(); " 

842 "please pass a select() construct explicitly" 

843 % (elem.__class__.__name__) 

844 ) 

845 

846 def _literal_coercion( # type: ignore[override] 

847 self, element, *, expr, operator, **kw 

848 ): 

849 if util.is_non_string_iterable(element): 

850 non_literal_expressions: Dict[ 

851 Optional[operators.ColumnOperators], 

852 operators.ColumnOperators, 

853 ] = {} 

854 element = list(element) 

855 for o in element: 

856 if not _is_literal(o): 

857 if not isinstance(o, operators.ColumnOperators): 

858 self._raise_for_expected(element, **kw) 

859 

860 else: 

861 non_literal_expressions[o] = o 

862 elif o is None: 

863 non_literal_expressions[o] = elements.Null() 

864 

865 if non_literal_expressions: 

866 return elements.ClauseList( 

867 *[ 

868 ( 

869 non_literal_expressions[o] 

870 if o in non_literal_expressions 

871 else expr._bind_param(operator, o) 

872 ) 

873 for o in element 

874 ] 

875 ) 

876 else: 

877 return expr._bind_param(operator, element, expanding=True) 

878 

879 else: 

880 self._raise_for_expected(element, **kw) 

881 

882 def _post_coercion(self, element, *, expr, operator, **kw): 

883 if element._is_select_base: 

884 # for IN, we are doing scalar_subquery() coercion without 

885 # a warning 

886 return element.scalar_subquery() 

887 elif isinstance(element, elements.ClauseList): 

888 assert not len(element.clauses) == 0 

889 return element.self_group(against=operator) 

890 

891 elif isinstance(element, elements.BindParameter): 

892 element = element._clone(maintain_key=True) 

893 element.expanding = True 

894 element.expand_op = operator 

895 

896 return element 

897 elif isinstance(element, selectable.Values): 

898 return element.scalar_values() 

899 else: 

900 return element 

901 

902 

903class OnClauseImpl(_ColumnCoercions, RoleImpl): 

904 __slots__ = () 

905 

906 _coerce_consts = True 

907 

908 def _literal_coercion(self, element, **kw): 

909 self._raise_for_expected(element) 

910 

911 def _post_coercion(self, resolved, *, original_element=None, **kw): 

912 # this is a hack right now as we want to use coercion on an 

913 # ORM InstrumentedAttribute, but we want to return the object 

914 # itself if it is one, not its clause element. 

915 # ORM context _join and _legacy_join() would need to be improved 

916 # to look for annotations in a clause element form. 

917 if isinstance(original_element, roles.JoinTargetRole): 

918 return original_element 

919 return resolved 

920 

921 

922class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

923 __slots__ = () 

924 

925 _coerce_consts = True 

926 

927 def _text_coercion(self, element, argname=None): 

928 return _no_text_coercion(element, argname) 

929 

930 

931class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

932 __slots__ = () 

933 

934 _coerce_consts = True 

935 

936 def _text_coercion(self, element, argname=None): 

937 return elements.TextClause(element) 

938 

939 

940class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

941 __slots__ = () 

942 

943 

944class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

945 __slots__ = () 

946 

947 

948class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

949 __slots__ = () 

950 

951 def _text_coercion(self, element, argname=None): 

952 return elements.ColumnClause(element) 

953 

954 

955class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

956 __slots__ = () 

957 

958 _coerce_consts = True 

959 

960 def _text_coercion(self, element, argname=None): 

961 return elements._textual_label_reference(element) 

962 

963 

964class OrderByImpl(ByOfImpl, RoleImpl): 

965 __slots__ = () 

966 

967 def _post_coercion(self, resolved, **kw): 

968 if ( 

969 isinstance(resolved, self._role_class) 

970 and resolved._order_by_label_element is not None 

971 ): 

972 return elements._label_reference(resolved) 

973 else: 

974 return resolved 

975 

976 

977class GroupByImpl(ByOfImpl, RoleImpl): 

978 __slots__ = () 

979 

980 def _implicit_coercions( 

981 self, 

982 element: Any, 

983 resolved: Any, 

984 argname: Optional[str] = None, 

985 **kw: Any, 

986 ) -> Any: 

987 if is_from_clause(resolved): 

988 return elements.ClauseList(*resolved.c) 

989 else: 

990 return resolved 

991 

992 

993class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

994 __slots__ = () 

995 

996 def _post_coercion(self, element, *, as_key=False, **kw): 

997 if as_key: 

998 return element.key 

999 else: 

1000 return element 

1001 

1002 

1003class ConstExprImpl(RoleImpl): 

1004 __slots__ = () 

1005 

1006 def _literal_coercion(self, element, *, argname=None, **kw): 

1007 if element is None: 

1008 return elements.Null() 

1009 elif element is False: 

1010 return elements.False_() 

1011 elif element is True: 

1012 return elements.True_() 

1013 else: 

1014 self._raise_for_expected(element, argname) 

1015 

1016 

1017class TruncatedLabelImpl(_StringOnly, RoleImpl): 

1018 __slots__ = () 

1019 

1020 def _implicit_coercions( 

1021 self, 

1022 element: Any, 

1023 resolved: Any, 

1024 argname: Optional[str] = None, 

1025 **kw: Any, 

1026 ) -> Any: 

1027 if isinstance(element, str): 

1028 return resolved 

1029 else: 

1030 self._raise_for_expected(element, argname, resolved) 

1031 

1032 def _literal_coercion(self, element, **kw): 

1033 """coerce the given value to :class:`._truncated_label`. 

1034 

1035 Existing :class:`._truncated_label` and 

1036 :class:`._anonymous_label` objects are passed 

1037 unchanged. 

1038 """ 

1039 

1040 if isinstance(element, elements._truncated_label): 

1041 return element 

1042 else: 

1043 return elements._truncated_label(element) 

1044 

1045 

1046class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

1047 __slots__ = () 

1048 

1049 _coerce_consts = True 

1050 

1051 def _text_coercion(self, element, argname=None): 

1052 # see #5754 for why we can't easily deprecate this coercion. 

1053 # essentially expressions like postgresql_where would have to be 

1054 # text() as they come back from reflection and we don't want to 

1055 # have text() elements wired into the inspection dictionaries. 

1056 return elements.TextClause(element) 

1057 

1058 

1059class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

1060 __slots__ = () 

1061 

1062 

1063class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

1064 __slots__ = () 

1065 

1066 

1067class LimitOffsetImpl(RoleImpl): 

1068 __slots__ = () 

1069 

1070 def _implicit_coercions( 

1071 self, 

1072 element: Any, 

1073 resolved: Any, 

1074 argname: Optional[str] = None, 

1075 **kw: Any, 

1076 ) -> Any: 

1077 if resolved is None: 

1078 return None 

1079 else: 

1080 self._raise_for_expected(element, argname, resolved) 

1081 

1082 def _literal_coercion( # type: ignore[override] 

1083 self, element, *, name, type_, **kw 

1084 ): 

1085 if element is None: 

1086 return None 

1087 else: 

1088 value = util.asint(element) 

1089 return selectable._OffsetLimitParam( 

1090 name, value, type_=type_, unique=True 

1091 ) 

1092 

1093 

1094class LabeledColumnExprImpl(ExpressionElementImpl): 

1095 __slots__ = () 

1096 

1097 def _implicit_coercions( 

1098 self, 

1099 element: Any, 

1100 resolved: Any, 

1101 argname: Optional[str] = None, 

1102 **kw: Any, 

1103 ) -> Any: 

1104 if isinstance(resolved, roles.ExpressionElementRole): 

1105 return resolved.label(None) 

1106 else: 

1107 new = super()._implicit_coercions( 

1108 element, resolved, argname=argname, **kw 

1109 ) 

1110 if isinstance(new, roles.ExpressionElementRole): 

1111 return new.label(None) 

1112 else: 

1113 self._raise_for_expected(element, argname, resolved) 

1114 

1115 

1116class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

1117 __slots__ = () 

1118 

1119 _coerce_consts = True 

1120 _coerce_numerics = True 

1121 _coerce_star = True 

1122 

1123 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

1124 

1125 def _raise_for_expected( 

1126 self, element, argname=None, resolved=None, *, advice=None, **kw 

1127 ): 

1128 if not advice and isinstance(element, list): 

1129 advice = ( 

1130 f"Did you mean to say select(" 

1131 f"{', '.join(repr(e) for e in element)})?" 

1132 ) 

1133 

1134 return super()._raise_for_expected( 

1135 element, argname=argname, resolved=resolved, advice=advice, **kw 

1136 ) 

1137 

1138 def _text_coercion(self, element, argname=None): 

1139 element = str(element) 

1140 

1141 guess_is_literal = not self._guess_straight_column.match(element) 

1142 raise exc.ArgumentError( 

1143 "Textual column expression %(column)r %(argname)sshould be " 

1144 "explicitly declared with text(%(column)r), " 

1145 "or use %(literal_column)s(%(column)r) " 

1146 "for more specificity" 

1147 % { 

1148 "column": util.ellipses_string(element), 

1149 "argname": "for argument %s" % (argname,) if argname else "", 

1150 "literal_column": ( 

1151 "literal_column" if guess_is_literal else "column" 

1152 ), 

1153 } 

1154 ) 

1155 

1156 

1157class ReturnsRowsImpl(RoleImpl): 

1158 __slots__ = () 

1159 

1160 

1161class StatementImpl(_CoerceLiterals, RoleImpl): 

1162 __slots__ = () 

1163 

1164 def _post_coercion( 

1165 self, resolved, *, original_element, argname=None, **kw 

1166 ): 

1167 if resolved is not original_element and not isinstance( 

1168 original_element, str 

1169 ): 

1170 # use same method as Connection uses; this will later raise 

1171 # ObjectNotExecutableError 

1172 try: 

1173 original_element._execute_on_connection 

1174 except AttributeError: 

1175 util.warn_deprecated( 

1176 "Object %r should not be used directly in a SQL statement " 

1177 "context, such as passing to methods such as " 

1178 "session.execute(). This usage will be disallowed in a " 

1179 "future release. " 

1180 "Please use Core select() / update() / delete() etc. " 

1181 "with Session.execute() and other statement execution " 

1182 "methods." % original_element, 

1183 "1.4", 

1184 ) 

1185 

1186 return resolved 

1187 

1188 def _implicit_coercions( 

1189 self, 

1190 element: Any, 

1191 resolved: Any, 

1192 argname: Optional[str] = None, 

1193 **kw: Any, 

1194 ) -> Any: 

1195 if resolved._is_lambda_element: 

1196 return resolved 

1197 else: 

1198 return super()._implicit_coercions( 

1199 element, resolved, argname=argname, **kw 

1200 ) 

1201 

1202 

1203class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

1204 __slots__ = () 

1205 

1206 def _implicit_coercions( 

1207 self, 

1208 element: Any, 

1209 resolved: Any, 

1210 argname: Optional[str] = None, 

1211 **kw: Any, 

1212 ) -> Any: 

1213 if resolved._is_text_clause: 

1214 return resolved.columns() 

1215 else: 

1216 self._raise_for_expected(element, argname, resolved) 

1217 

1218 

1219class HasCTEImpl(ReturnsRowsImpl): 

1220 __slots__ = () 

1221 

1222 

1223class IsCTEImpl(RoleImpl): 

1224 __slots__ = () 

1225 

1226 

1227class JoinTargetImpl(RoleImpl): 

1228 __slots__ = () 

1229 

1230 _skip_clauseelement_for_target_match = True 

1231 

1232 def _literal_coercion(self, element, *, argname=None, **kw): 

1233 self._raise_for_expected(element, argname) 

1234 

1235 def _implicit_coercions( 

1236 self, 

1237 element: Any, 

1238 resolved: Any, 

1239 argname: Optional[str] = None, 

1240 *, 

1241 legacy: bool = False, 

1242 **kw: Any, 

1243 ) -> Any: 

1244 if isinstance(element, roles.JoinTargetRole): 

1245 # note that this codepath no longer occurs as of 

1246 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

1247 # were set to False. 

1248 return element 

1249 elif legacy and resolved._is_select_base: 

1250 util.warn_deprecated( 

1251 "Implicit coercion of SELECT and textual SELECT " 

1252 "constructs into FROM clauses is deprecated; please call " 

1253 ".subquery() on any Core select or ORM Query object in " 

1254 "order to produce a subquery object.", 

1255 version="1.4", 

1256 ) 

1257 # TODO: doing _implicit_subquery here causes tests to fail, 

1258 # how was this working before? probably that ORM 

1259 # join logic treated it as a select and subquery would happen 

1260 # in _ORMJoin->Join 

1261 return resolved 

1262 else: 

1263 self._raise_for_expected(element, argname, resolved) 

1264 

1265 

1266class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1267 __slots__ = () 

1268 

1269 def _implicit_coercions( 

1270 self, 

1271 element: Any, 

1272 resolved: Any, 

1273 argname: Optional[str] = None, 

1274 *, 

1275 explicit_subquery: bool = False, 

1276 allow_select: bool = True, 

1277 **kw: Any, 

1278 ) -> Any: 

1279 if resolved._is_select_base: 

1280 if explicit_subquery: 

1281 return resolved.subquery() 

1282 elif allow_select: 

1283 util.warn_deprecated( 

1284 "Implicit coercion of SELECT and textual SELECT " 

1285 "constructs into FROM clauses is deprecated; please call " 

1286 ".subquery() on any Core select or ORM Query object in " 

1287 "order to produce a subquery object.", 

1288 version="1.4", 

1289 ) 

1290 return resolved._implicit_subquery 

1291 elif resolved._is_text_clause: 

1292 return resolved 

1293 else: 

1294 self._raise_for_expected(element, argname, resolved) 

1295 

1296 def _post_coercion(self, element, *, deannotate=False, **kw): 

1297 if deannotate: 

1298 return element._deannotate() 

1299 else: 

1300 return element 

1301 

1302 

1303class StrictFromClauseImpl(FromClauseImpl): 

1304 __slots__ = () 

1305 

1306 def _implicit_coercions( 

1307 self, 

1308 element: Any, 

1309 resolved: Any, 

1310 argname: Optional[str] = None, 

1311 *, 

1312 allow_select: bool = False, 

1313 **kw: Any, 

1314 ) -> Any: 

1315 if resolved._is_select_base and allow_select: 

1316 util.warn_deprecated( 

1317 "Implicit coercion of SELECT and textual SELECT constructs " 

1318 "into FROM clauses is deprecated; please call .subquery() " 

1319 "on any Core select or ORM Query object in order to produce a " 

1320 "subquery object.", 

1321 version="1.4", 

1322 ) 

1323 return resolved._implicit_subquery 

1324 else: 

1325 self._raise_for_expected(element, argname, resolved) 

1326 

1327 

1328class AnonymizedFromClauseImpl(StrictFromClauseImpl): 

1329 __slots__ = () 

1330 

1331 def _post_coercion(self, element, *, flat=False, name=None, **kw): 

1332 assert name is None 

1333 

1334 return element._anonymous_fromclause(flat=flat) 

1335 

1336 

1337class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1338 __slots__ = () 

1339 

1340 def _post_coercion(self, element, **kw): 

1341 if "dml_table" in element._annotations: 

1342 return element._annotations["dml_table"] 

1343 else: 

1344 return element 

1345 

1346 

1347class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1348 __slots__ = () 

1349 

1350 def _implicit_coercions( 

1351 self, 

1352 element: Any, 

1353 resolved: Any, 

1354 argname: Optional[str] = None, 

1355 **kw: Any, 

1356 ) -> Any: 

1357 if resolved._is_from_clause: 

1358 if ( 

1359 isinstance(resolved, selectable.Alias) 

1360 and resolved.element._is_select_base 

1361 ): 

1362 return resolved.element 

1363 else: 

1364 return resolved.select() 

1365 else: 

1366 self._raise_for_expected(element, argname, resolved) 

1367 

1368 

1369class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1370 __slots__ = () 

1371 

1372 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1373 if isinstance(element, roles.FromClauseRole): 

1374 if element._is_subquery: 

1375 advice = ( 

1376 "Use the plain select() object without " 

1377 "calling .subquery() or .alias()." 

1378 ) 

1379 else: 

1380 advice = ( 

1381 "To SELECT from any FROM clause, use the .select() method." 

1382 ) 

1383 else: 

1384 advice = None 

1385 return super()._raise_for_expected( 

1386 element, argname=argname, resolved=resolved, advice=advice, **kw 

1387 ) 

1388 

1389 

1390_impl_lookup = {} 

1391 

1392 

1393for name in dir(roles): 

1394 cls = getattr(roles, name) 

1395 if name.endswith("Role"): 

1396 name = name.replace("Role", "Impl") 

1397 if name in globals(): 

1398 impl = globals()[name](cls) 

1399 _impl_lookup[cls] = impl 

1400 

1401if not TYPE_CHECKING: 

1402 ee_impl = _impl_lookup[roles.ExpressionElementRole] 

1403 

1404 for py_type in (int, bool, str, float): 

1405 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl