1# sql/coercions.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import Literal
23from typing import NoReturn
24from typing import Optional
25from typing import overload
26from typing import Sequence
27from typing import Tuple
28from typing import Type
29from typing import TYPE_CHECKING
30from typing import TypeVar
31from typing import Union
32
33from . import roles
34from . import visitors
35from ._typing import is_from_clause
36from .base import ExecutableOption
37from .base import Options
38from .cache_key import HasCacheKey
39from .visitors import Visitable
40from .. import exc
41from .. import inspection
42from .. import util
43
44if typing.TYPE_CHECKING:
45 # elements lambdas schema selectable are set by __init__
46 from . import elements
47 from . import lambdas
48 from . import schema
49 from . import selectable
50 from ._typing import _ColumnExpressionArgument
51 from ._typing import _ColumnsClauseArgument
52 from ._typing import _DDLColumnArgument
53 from ._typing import _DMLTableArgument
54 from ._typing import _FromClauseArgument
55 from ._typing import _OnlyColumnArgument
56 from .base import SyntaxExtension
57 from .dml import _DMLTableElement
58 from .elements import BindParameter
59 from .elements import ClauseElement
60 from .elements import ColumnClause
61 from .elements import ColumnElement
62 from .elements import NamedColumn
63 from .elements import SQLCoreOperations
64 from .elements import TextClause
65 from .schema import Column
66 from .selectable import _ColumnsClauseElement
67 from .selectable import _JoinTargetProtocol
68 from .selectable import FromClause
69 from .selectable import HasCTE
70 from .selectable import SelectBase
71 from .selectable import Subquery
72 from .visitors import _TraverseCallableType
73
74_SR = TypeVar("_SR", bound=roles.SQLRole)
75_F = TypeVar("_F", bound=Callable[..., Any])
76_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
77_T = TypeVar("_T", bound=Any)
78
79
80def _is_literal(element: Any) -> bool:
81 """Return whether or not the element is a "literal" in the context
82 of a SQL expression construct.
83
84 """
85
86 return not isinstance(
87 element,
88 (Visitable, schema.SchemaEventTarget),
89 ) and not hasattr(element, "__clause_element__")
90
91
92def _deep_is_literal(element):
93 """Return whether or not the element is a "literal" in the context
94 of a SQL expression construct.
95
96 does a deeper more esoteric check than _is_literal. is used
97 for lambda elements that have to distinguish values that would
98 be bound vs. not without any context.
99
100 """
101
102 if isinstance(element, collections_abc.Sequence) and not isinstance(
103 element, str
104 ):
105 for elem in element:
106 if not _deep_is_literal(elem):
107 return False
108 else:
109 return True
110
111 return (
112 not isinstance(
113 element,
114 (
115 Visitable,
116 schema.SchemaEventTarget,
117 HasCacheKey,
118 Options,
119 util.langhelpers.symbol,
120 ),
121 )
122 and not hasattr(element, "__clause_element__")
123 and (
124 not isinstance(element, type)
125 or not issubclass(element, HasCacheKey)
126 )
127 )
128
129
130def _document_text_coercion(
131 paramname: str, meth_rst: str, param_rst: str
132) -> Callable[[_F], _F]:
133 return util.add_parameter_text(
134 paramname,
135 (
136 ".. warning:: "
137 "The %s argument to %s can be passed as a Python string argument, "
138 "which will be treated "
139 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
140 "UNTRUSTED INPUT TO THIS PARAMETER**."
141 )
142 % (param_rst, meth_rst),
143 )
144
145
146def _expression_collection_was_a_list(
147 attrname: str,
148 fnname: str,
149 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
150) -> Sequence[_T]:
151 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
152 if isinstance(args[0], list):
153 raise exc.ArgumentError(
154 f'The "{attrname}" argument to {fnname}(), when '
155 "referring to a sequence "
156 "of items, is now passed as a series of positional "
157 "elements, rather than as a list. "
158 )
159 return cast("Sequence[_T]", args[0])
160
161 return cast("Sequence[_T]", args)
162
163
164@overload
165def expect(
166 role: Type[roles.TruncatedLabelRole],
167 element: Any,
168 **kw: Any,
169) -> str: ...
170
171
172@overload
173def expect(
174 role: Type[roles.DMLColumnRole],
175 element: Any,
176 *,
177 as_key: Literal[True] = ...,
178 **kw: Any,
179) -> str: ...
180
181
182@overload
183def expect(
184 role: Type[roles.LiteralValueRole],
185 element: Any,
186 **kw: Any,
187) -> BindParameter[Any]: ...
188
189
190@overload
191def expect(
192 role: Type[roles.DDLReferredColumnRole],
193 element: Any,
194 **kw: Any,
195) -> Union[Column[Any], str]: ...
196
197
198@overload
199def expect(
200 role: Type[roles.DDLConstraintColumnRole],
201 element: Any,
202 **kw: Any,
203) -> Union[Column[Any], str]: ...
204
205
206@overload
207def expect(
208 role: Type[roles.StatementOptionRole],
209 element: Any,
210 **kw: Any,
211) -> Union[ColumnElement[Any], TextClause]: ...
212
213
214@overload
215def expect(
216 role: Type[roles.SyntaxExtensionRole],
217 element: Any,
218 **kw: Any,
219) -> SyntaxExtension: ...
220
221
222@overload
223def expect(
224 role: Type[roles.LabeledColumnExprRole[Any]],
225 element: _ColumnExpressionArgument[_T] | _OnlyColumnArgument[_T],
226 **kw: Any,
227) -> NamedColumn[_T]: ...
228
229
230@overload
231def expect(
232 role: Union[
233 Type[roles.ExpressionElementRole[Any]],
234 Type[roles.LimitOffsetRole],
235 Type[roles.WhereHavingRole],
236 ],
237 element: _ColumnExpressionArgument[_T],
238 **kw: Any,
239) -> ColumnElement[_T]: ...
240
241
242@overload
243def expect(
244 role: Union[
245 Type[roles.ExpressionElementRole[Any]],
246 Type[roles.LimitOffsetRole],
247 Type[roles.WhereHavingRole],
248 Type[roles.OnClauseRole],
249 Type[roles.ColumnArgumentRole],
250 ],
251 element: Any,
252 **kw: Any,
253) -> ColumnElement[Any]: ...
254
255
256@overload
257def expect(
258 role: Type[roles.DMLTableRole],
259 element: _DMLTableArgument,
260 **kw: Any,
261) -> _DMLTableElement: ...
262
263
264@overload
265def expect(
266 role: Type[roles.HasCTERole],
267 element: HasCTE,
268 **kw: Any,
269) -> HasCTE: ...
270
271
272@overload
273def expect(
274 role: Type[roles.SelectStatementRole],
275 element: SelectBase,
276 **kw: Any,
277) -> SelectBase: ...
278
279
280@overload
281def expect(
282 role: Type[roles.FromClauseRole],
283 element: _FromClauseArgument,
284 **kw: Any,
285) -> FromClause: ...
286
287
288@overload
289def expect(
290 role: Type[roles.FromClauseRole],
291 element: SelectBase,
292 *,
293 explicit_subquery: Literal[True] = ...,
294 **kw: Any,
295) -> Subquery: ...
296
297
298@overload
299def expect(
300 role: Type[roles.ColumnsClauseRole],
301 element: _ColumnsClauseArgument[Any],
302 **kw: Any,
303) -> _ColumnsClauseElement: ...
304
305
306@overload
307def expect(
308 role: Type[roles.JoinTargetRole],
309 element: _JoinTargetProtocol,
310 **kw: Any,
311) -> _JoinTargetProtocol: ...
312
313
314# catchall for not-yet-implemented overloads
315@overload
316def expect(
317 role: Type[_SR],
318 element: Any,
319 **kw: Any,
320) -> Any: ...
321
322
323def expect(
324 role: Type[_SR],
325 element: Any,
326 *,
327 apply_propagate_attrs: Optional[ClauseElement] = None,
328 argname: Optional[str] = None,
329 post_inspect: bool = False,
330 disable_inspection: bool = False,
331 **kw: Any,
332) -> Any:
333 if (
334 role.allows_lambda
335 # note callable() will not invoke a __getattr__() method, whereas
336 # hasattr(obj, "__call__") will. by keeping the callable() check here
337 # we prevent most needless calls to hasattr() and therefore
338 # __getattr__(), which is present on ColumnElement.
339 and callable(element)
340 and hasattr(element, "__code__")
341 ):
342 return lambdas.LambdaElement(
343 element,
344 role,
345 lambdas.LambdaOptions(**kw),
346 apply_propagate_attrs=apply_propagate_attrs,
347 )
348
349 # major case is that we are given a ClauseElement already, skip more
350 # elaborate logic up front if possible
351 impl = _impl_lookup[role]
352
353 original_element = element
354
355 if not isinstance(
356 element,
357 (
358 elements.CompilerElement,
359 schema.SchemaItem,
360 schema.FetchedValue,
361 lambdas.PyWrapper,
362 ),
363 ):
364 resolved = None
365
366 if impl._resolve_literal_only:
367 resolved = impl._literal_coercion(element, **kw)
368 else:
369 original_element = element
370
371 is_clause_element = False
372
373 # this is a special performance optimization for ORM
374 # joins used by JoinTargetImpl that we don't go through the
375 # work of creating __clause_element__() when we only need the
376 # original QueryableAttribute, as the former will do clause
377 # adaption and all that which is just thrown away here.
378 if (
379 impl._skip_clauseelement_for_target_match
380 and isinstance(element, role)
381 and hasattr(element, "__clause_element__")
382 ):
383 is_clause_element = True
384 else:
385 while hasattr(element, "__clause_element__"):
386 is_clause_element = True
387
388 if not getattr(element, "is_clause_element", False):
389 element = element.__clause_element__()
390 else:
391 break
392
393 if not is_clause_element:
394 if impl._use_inspection and not disable_inspection:
395 insp = inspection.inspect(element, raiseerr=False)
396 if insp is not None:
397 if post_inspect:
398 insp._post_inspect
399 try:
400 resolved = insp.__clause_element__()
401 except AttributeError:
402 impl._raise_for_expected(original_element, argname)
403
404 if resolved is None:
405 resolved = impl._literal_coercion(
406 element, argname=argname, **kw
407 )
408 else:
409 resolved = element
410 elif isinstance(element, lambdas.PyWrapper):
411 resolved = element._sa__py_wrapper_literal(**kw)
412 else:
413 resolved = element
414
415 if apply_propagate_attrs is not None:
416 if typing.TYPE_CHECKING:
417 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
418
419 if not apply_propagate_attrs._propagate_attrs and getattr(
420 resolved, "_propagate_attrs", None
421 ):
422 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
423
424 if impl._role_class in resolved.__class__.__mro__:
425 if impl._post_coercion:
426 resolved = impl._post_coercion(
427 resolved,
428 argname=argname,
429 original_element=original_element,
430 **kw,
431 )
432 return resolved
433 else:
434 return impl._implicit_coercions(
435 original_element, resolved, argname=argname, **kw
436 )
437
438
439def expect_as_key(
440 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
441) -> str:
442 kw.pop("as_key", None)
443 return expect(role, element, as_key=True, **kw)
444
445
446def expect_col_expression_collection(
447 role: Type[roles.DDLConstraintColumnRole],
448 expressions: Iterable[_DDLColumnArgument],
449) -> Iterator[
450 Tuple[
451 Union[str, Column[Any]],
452 Optional[ColumnClause[Any]],
453 Optional[str],
454 Optional[Union[Column[Any], str]],
455 ]
456]:
457 for expr in expressions:
458 strname = None
459 column = None
460
461 resolved: Union[Column[Any], str] = expect(role, expr)
462 if isinstance(resolved, str):
463 assert isinstance(expr, str)
464 strname = resolved = expr
465 else:
466 cols: List[Column[Any]] = []
467 col_append: _TraverseCallableType[Column[Any]] = cols.append
468 visitors.traverse(resolved, {}, {"column": col_append})
469 if cols:
470 column = cols[0]
471 add_element = column if column is not None else strname
472
473 yield resolved, column, strname, add_element
474
475
476class RoleImpl:
477 __slots__ = ("_role_class", "name", "_use_inspection")
478
479 def _literal_coercion(self, element, **kw):
480 raise NotImplementedError()
481
482 _post_coercion: Any = None
483 _resolve_literal_only = False
484 _skip_clauseelement_for_target_match = False
485
486 def __init__(self, role_class):
487 self._role_class = role_class
488 self.name = role_class._role_name
489 self._use_inspection = issubclass(role_class, roles.UsesInspection)
490
491 def _implicit_coercions(
492 self,
493 element: Any,
494 resolved: Any,
495 argname: Optional[str] = None,
496 **kw: Any,
497 ) -> Any:
498 self._raise_for_expected(element, argname, resolved)
499
500 def _raise_for_expected(
501 self,
502 element: Any,
503 argname: Optional[str] = None,
504 resolved: Optional[Any] = None,
505 *,
506 advice: Optional[str] = None,
507 code: Optional[str] = None,
508 err: Optional[Exception] = None,
509 **kw: Any,
510 ) -> NoReturn:
511 if resolved is not None and resolved is not element:
512 got = "%r object resolved from %r object" % (resolved, element)
513 else:
514 got = repr(element)
515
516 if argname:
517 msg = "%s expected for argument %r; got %s." % (
518 self.name,
519 argname,
520 got,
521 )
522 else:
523 msg = "%s expected, got %s." % (self.name, got)
524
525 if advice:
526 msg += " " + advice
527
528 raise exc.ArgumentError(msg, code=code) from err
529
530
531class _Deannotate:
532 __slots__ = ()
533
534 def _post_coercion(self, resolved, **kw):
535 from .util import _deep_deannotate
536
537 return _deep_deannotate(resolved)
538
539
540class _StringOnly:
541 __slots__ = ()
542
543 _resolve_literal_only = True
544
545
546class _ReturnsStringKey(RoleImpl):
547 __slots__ = ()
548
549 def _implicit_coercions(self, element, resolved, argname=None, **kw):
550 if isinstance(element, str):
551 return element
552 else:
553 self._raise_for_expected(element, argname, resolved)
554
555 def _literal_coercion(self, element, **kw):
556 return element
557
558
559class _ColumnCoercions(RoleImpl):
560 __slots__ = ()
561
562 def _warn_for_scalar_subquery_coercion(self):
563 util.warn(
564 "implicitly coercing SELECT object to scalar subquery; "
565 "please use the .scalar_subquery() method to produce a scalar "
566 "subquery.",
567 )
568
569 def _implicit_coercions(self, element, resolved, argname=None, **kw):
570 original_element = element
571 if not getattr(resolved, "is_clause_element", False):
572 self._raise_for_expected(original_element, argname, resolved)
573 elif resolved._is_select_base:
574 self._warn_for_scalar_subquery_coercion()
575 return resolved.scalar_subquery()
576 elif resolved._is_from_clause and isinstance(
577 resolved, selectable.Subquery
578 ):
579 self._warn_for_scalar_subquery_coercion()
580 return resolved.element.scalar_subquery()
581 elif self._role_class.allows_lambda and resolved._is_lambda_element:
582 return resolved
583 else:
584 self._raise_for_expected(original_element, argname, resolved)
585
586
587def _no_text_coercion(
588 element: Any,
589 argname: Optional[str] = None,
590 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
591 extra: Optional[str] = None,
592 err: Optional[Exception] = None,
593) -> NoReturn:
594 raise exc_cls(
595 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
596 "explicitly declared as text(%(expr)r)"
597 % {
598 "expr": util.ellipses_string(element),
599 "argname": "for argument %s" % (argname,) if argname else "",
600 "extra": "%s " % extra if extra else "",
601 }
602 ) from err
603
604
605class _NoTextCoercion(RoleImpl):
606 __slots__ = ()
607
608 def _literal_coercion(self, element, *, argname=None, **kw):
609 if isinstance(element, str) and issubclass(
610 elements.TextClause, self._role_class
611 ):
612 _no_text_coercion(element, argname)
613 else:
614 self._raise_for_expected(element, argname)
615
616
617class _CoerceLiterals(RoleImpl):
618 __slots__ = ()
619 _coerce_consts = False
620 _coerce_star = False
621 _coerce_numerics = False
622
623 def _text_coercion(self, element, argname=None):
624 return _no_text_coercion(element, argname)
625
626 def _literal_coercion(self, element, *, argname=None, **kw):
627 if isinstance(element, str):
628 if self._coerce_star and element == "*":
629 return elements.ColumnClause("*", is_literal=True)
630 else:
631 return self._text_coercion(element, argname, **kw)
632
633 if self._coerce_consts:
634 if element is None:
635 return elements.Null()
636 elif element is False:
637 return elements.False_()
638 elif element is True:
639 return elements.True_()
640
641 if self._coerce_numerics and isinstance(element, (numbers.Number)):
642 return elements.ColumnClause(str(element), is_literal=True)
643
644 self._raise_for_expected(element, argname)
645
646
647class LiteralValueImpl(RoleImpl):
648 _resolve_literal_only = True
649
650 def _implicit_coercions(
651 self,
652 element,
653 resolved,
654 argname=None,
655 *,
656 type_=None,
657 literal_execute=False,
658 **kw,
659 ):
660 if not _is_literal(resolved):
661 self._raise_for_expected(
662 element, resolved=resolved, argname=argname, **kw
663 )
664
665 return elements.BindParameter(
666 None,
667 element,
668 type_=type_,
669 unique=True,
670 literal_execute=literal_execute,
671 )
672
673 def _literal_coercion(self, element, **kw):
674 return element
675
676
677class _SelectIsNotFrom(RoleImpl):
678 __slots__ = ()
679
680 def _raise_for_expected(
681 self,
682 element: Any,
683 argname: Optional[str] = None,
684 resolved: Optional[Any] = None,
685 *,
686 advice: Optional[str] = None,
687 code: Optional[str] = None,
688 err: Optional[Exception] = None,
689 **kw: Any,
690 ) -> NoReturn:
691 if (
692 not advice
693 and isinstance(element, roles.SelectStatementRole)
694 or isinstance(resolved, roles.SelectStatementRole)
695 ):
696 advice = (
697 "To create a "
698 "FROM clause from a %s object, use the .subquery() method."
699 % (resolved.__class__ if resolved is not None else element,)
700 )
701 code = "89ve"
702 else:
703 code = None
704
705 super()._raise_for_expected(
706 element,
707 argname=argname,
708 resolved=resolved,
709 advice=advice,
710 code=code,
711 err=err,
712 **kw,
713 )
714 # never reached
715 assert False
716
717
718class HasCacheKeyImpl(RoleImpl):
719 __slots__ = ()
720
721 def _implicit_coercions(
722 self,
723 element: Any,
724 resolved: Any,
725 argname: Optional[str] = None,
726 **kw: Any,
727 ) -> Any:
728 if isinstance(element, HasCacheKey):
729 return element
730 else:
731 self._raise_for_expected(element, argname, resolved)
732
733 def _literal_coercion(self, element, **kw):
734 return element
735
736
737class ExecutableOptionImpl(RoleImpl):
738 __slots__ = ()
739
740 def _implicit_coercions(
741 self,
742 element: Any,
743 resolved: Any,
744 argname: Optional[str] = None,
745 **kw: Any,
746 ) -> Any:
747 if isinstance(element, ExecutableOption):
748 return element
749 else:
750 self._raise_for_expected(element, argname, resolved)
751
752 def _literal_coercion(self, element, **kw):
753 return element
754
755
756class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
757 __slots__ = ()
758
759 def _literal_coercion(
760 self, element, *, name=None, type_=None, is_crud=False, **kw
761 ):
762 if (
763 element is None
764 and not is_crud
765 and (type_ is None or not type_.should_evaluate_none)
766 ):
767 # TODO: there's no test coverage now for the
768 # "should_evaluate_none" part of this, as outside of "crud" this
769 # codepath is not normally used except in some special cases
770 return elements.Null()
771 else:
772 try:
773 return elements.BindParameter(
774 name, element, type_, unique=True, _is_crud=is_crud
775 )
776 except exc.ArgumentError as err:
777 self._raise_for_expected(element, err=err)
778
779 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
780 # select uses implicit coercion with warning instead of raising
781 if isinstance(element, selectable.Values):
782 advice = (
783 "To create a column expression from a VALUES clause, "
784 "use the .scalar_values() method."
785 )
786 elif isinstance(element, roles.AnonymizedFromClauseRole):
787 advice = (
788 "To create a column expression from a FROM clause row "
789 "as a whole, use the .table_valued() method."
790 )
791 else:
792 advice = None
793
794 return super()._raise_for_expected(
795 element, argname=argname, resolved=resolved, advice=advice, **kw
796 )
797
798
799class TStringElementImpl(ExpressionElementImpl, RoleImpl):
800 __slots__ = ()
801
802
803class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
804 __slots__ = ()
805
806 def _literal_coercion( # type: ignore[override]
807 self,
808 element,
809 *,
810 expr,
811 operator,
812 bindparam_type=None,
813 argname=None,
814 **kw,
815 ):
816 try:
817 return expr._bind_param(operator, element, type_=bindparam_type)
818 except exc.ArgumentError as err:
819 self._raise_for_expected(element, err=err)
820
821 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
822 if resolved.type._isnull and not expr.type._isnull:
823 resolved = resolved._with_binary_element_type(
824 bindparam_type if bindparam_type is not None else expr.type
825 )
826 return resolved
827
828
829class InElementImpl(RoleImpl):
830 __slots__ = ()
831
832 def _implicit_coercions(
833 self,
834 element: Any,
835 resolved: Any,
836 argname: Optional[str] = None,
837 **kw: Any,
838 ) -> Any:
839 if resolved._is_from_clause:
840 if (
841 isinstance(resolved, selectable.Alias)
842 and resolved.element._is_select_base
843 ):
844 self._warn_for_implicit_coercion(resolved)
845 return self._post_coercion(resolved.element, **kw)
846 else:
847 self._warn_for_implicit_coercion(resolved)
848 return self._post_coercion(resolved.select(), **kw)
849 else:
850 self._raise_for_expected(element, argname, resolved)
851
852 def _warn_for_implicit_coercion(self, elem):
853 util.warn(
854 "Coercing %s object into a select() for use in IN(); "
855 "please pass a select() construct explicitly"
856 % (elem.__class__.__name__)
857 )
858
859 @util.preload_module("sqlalchemy.sql.elements")
860 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501
861 if util.is_non_string_iterable(element):
862 non_literal_expressions: Dict[
863 Optional[_ColumnExpressionArgument[Any]],
864 _ColumnExpressionArgument[Any],
865 ] = {}
866 element = list(element)
867 for o in element:
868 if not _is_literal(o):
869 if not isinstance(
870 o, util.preloaded.sql_elements.ColumnElement
871 ) and not hasattr(o, "__clause_element__"):
872 self._raise_for_expected(element, **kw)
873
874 else:
875 non_literal_expressions[o] = o
876
877 if non_literal_expressions:
878 return elements.ClauseList(
879 *[
880 (
881 non_literal_expressions[o]
882 if o in non_literal_expressions
883 else expr._bind_param(operator, o)
884 )
885 for o in element
886 ]
887 )
888 else:
889 return expr._bind_param(operator, element, expanding=True)
890
891 else:
892 self._raise_for_expected(element, **kw)
893
894 def _post_coercion(self, element, *, expr, operator, **kw):
895 if element._is_select_base:
896 # for IN, we are doing scalar_subquery() coercion without
897 # a warning
898 return element.scalar_subquery()
899 elif isinstance(element, elements.ClauseList):
900 assert not len(element.clauses) == 0
901 return element.self_group(against=operator)
902
903 elif isinstance(element, elements.BindParameter):
904 element = element._clone(maintain_key=True)
905 element.expanding = True
906 element.expand_op = operator
907
908 return element
909 elif isinstance(element, selectable.Values):
910 return element.scalar_values()
911 else:
912 return element
913
914
915class OnClauseImpl(_ColumnCoercions, RoleImpl):
916 __slots__ = ()
917
918 _coerce_consts = True
919
920 def _literal_coercion(self, element, **kw):
921 self._raise_for_expected(element)
922
923 def _post_coercion(self, resolved, *, original_element=None, **kw):
924 # this is a hack right now as we want to use coercion on an
925 # ORM InstrumentedAttribute, but we want to return the object
926 # itself if it is one, not its clause element.
927 # ORM context _join and _legacy_join() would need to be improved
928 # to look for annotations in a clause element form.
929 if isinstance(original_element, roles.JoinTargetRole):
930 return original_element
931 return resolved
932
933
934class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
935 __slots__ = ()
936
937 _coerce_consts = True
938
939 def _text_coercion(self, element, argname=None):
940 return _no_text_coercion(element, argname)
941
942
943class SyntaxExtensionImpl(RoleImpl):
944 __slots__ = ()
945
946
947class StatementOptionImpl(_CoerceLiterals, RoleImpl):
948 __slots__ = ()
949
950 _coerce_consts = True
951
952 def _text_coercion(self, element, argname=None):
953 return elements.TextClause(element)
954
955
956class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
957 __slots__ = ()
958
959
960class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
961 __slots__ = ()
962
963
964class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
965 __slots__ = ()
966
967 def _text_coercion(self, element, argname=None):
968 return elements.ColumnClause(element)
969
970
971class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
972 __slots__ = ()
973
974 _coerce_consts = True
975
976 def _text_coercion(self, element, argname=None):
977 return elements._textual_label_reference(element)
978
979
980class OrderByImpl(ByOfImpl, RoleImpl):
981 __slots__ = ()
982
983 def _post_coercion(self, resolved, **kw):
984 if (
985 isinstance(resolved, self._role_class)
986 and resolved._order_by_label_element is not None
987 ):
988 return elements._label_reference(resolved)
989 else:
990 return resolved
991
992
993class GroupByImpl(ByOfImpl, RoleImpl):
994 __slots__ = ()
995
996 def _implicit_coercions(
997 self,
998 element: Any,
999 resolved: Any,
1000 argname: Optional[str] = None,
1001 **kw: Any,
1002 ) -> Any:
1003 if is_from_clause(resolved):
1004 return elements.ClauseList(*resolved.c)
1005 else:
1006 return resolved
1007
1008
1009class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
1010 __slots__ = ()
1011
1012 def _post_coercion(self, element, *, as_key=False, **kw):
1013 if as_key:
1014 return element.key
1015 else:
1016 return element
1017
1018
1019class ConstExprImpl(RoleImpl):
1020 __slots__ = ()
1021
1022 def _literal_coercion(self, element, *, argname=None, **kw):
1023 if element is None:
1024 return elements.Null()
1025 elif element is False:
1026 return elements.False_()
1027 elif element is True:
1028 return elements.True_()
1029 else:
1030 self._raise_for_expected(element, argname)
1031
1032
1033class TruncatedLabelImpl(_StringOnly, RoleImpl):
1034 __slots__ = ()
1035
1036 def _implicit_coercions(
1037 self,
1038 element: Any,
1039 resolved: Any,
1040 argname: Optional[str] = None,
1041 **kw: Any,
1042 ) -> Any:
1043 if isinstance(element, str):
1044 return resolved
1045 else:
1046 self._raise_for_expected(element, argname, resolved)
1047
1048 def _literal_coercion(self, element, **kw):
1049 """coerce the given value to :class:`._truncated_label`.
1050
1051 Existing :class:`._truncated_label` and
1052 :class:`._anonymous_label` objects are passed
1053 unchanged.
1054 """
1055
1056 if isinstance(element, elements._truncated_label):
1057 return element
1058 else:
1059 return elements._truncated_label(element)
1060
1061
1062class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1063 __slots__ = ()
1064
1065 _coerce_consts = True
1066
1067 def _text_coercion(self, element, argname=None):
1068 # see #5754 for why we can't easily deprecate this coercion.
1069 # essentially expressions like postgresql_where would have to be
1070 # text() as they come back from reflection and we don't want to
1071 # have text() elements wired into the inspection dictionaries.
1072 return elements.TextClause(element)
1073
1074
1075class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1076 __slots__ = ()
1077
1078
1079class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1080 __slots__ = ()
1081
1082
1083class LimitOffsetImpl(RoleImpl):
1084 __slots__ = ()
1085
1086 def _implicit_coercions(
1087 self,
1088 element: Any,
1089 resolved: Any,
1090 argname: Optional[str] = None,
1091 **kw: Any,
1092 ) -> Any:
1093 if resolved is None:
1094 return None
1095 else:
1096 self._raise_for_expected(element, argname, resolved)
1097
1098 def _literal_coercion( # type: ignore[override]
1099 self, element, *, name, type_, **kw
1100 ):
1101 if element is None:
1102 return None
1103 else:
1104 value = util.asint(element)
1105 return selectable._OffsetLimitParam(
1106 name, value, type_=type_, unique=True
1107 )
1108
1109
1110class LabeledColumnExprImpl(ExpressionElementImpl):
1111 __slots__ = ()
1112
1113 def _implicit_coercions(
1114 self,
1115 element: Any,
1116 resolved: Any,
1117 argname: Optional[str] = None,
1118 **kw: Any,
1119 ) -> Any:
1120 if isinstance(resolved, roles.ExpressionElementRole):
1121 return resolved.label(None)
1122 else:
1123 new = super()._implicit_coercions(
1124 element, resolved, argname=argname, **kw
1125 )
1126 if isinstance(new, roles.ExpressionElementRole):
1127 return new.label(None)
1128 else:
1129 self._raise_for_expected(element, argname, resolved)
1130
1131
1132class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1133 __slots__ = ()
1134
1135 _coerce_consts = True
1136 _coerce_numerics = True
1137 _coerce_star = True
1138
1139 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1140
1141 def _raise_for_expected(
1142 self, element, argname=None, resolved=None, *, advice=None, **kw
1143 ):
1144 if not advice and isinstance(element, list):
1145 advice = (
1146 f"Did you mean to say select("
1147 f"{', '.join(repr(e) for e in element)})?"
1148 )
1149
1150 return super()._raise_for_expected(
1151 element, argname=argname, resolved=resolved, advice=advice, **kw
1152 )
1153
1154 def _text_coercion(self, element, argname=None):
1155 element = str(element)
1156
1157 guess_is_literal = not self._guess_straight_column.match(element)
1158 raise exc.ArgumentError(
1159 "Textual column expression %(column)r %(argname)sshould be "
1160 "explicitly declared with text(%(column)r), "
1161 "or use %(literal_column)s(%(column)r) "
1162 "for more specificity"
1163 % {
1164 "column": util.ellipses_string(element),
1165 "argname": "for argument %s" % (argname,) if argname else "",
1166 "literal_column": (
1167 "literal_column" if guess_is_literal else "column"
1168 ),
1169 }
1170 )
1171
1172
1173class ReturnsRowsImpl(RoleImpl):
1174 __slots__ = ()
1175
1176
1177class StatementImpl(_CoerceLiterals, RoleImpl):
1178 __slots__ = ()
1179
1180 def _post_coercion(
1181 self, resolved, *, original_element, argname=None, **kw
1182 ):
1183 if resolved is not original_element and not isinstance(
1184 original_element, str
1185 ):
1186 # use same method as Connection uses
1187 try:
1188 original_element._execute_on_connection
1189 except AttributeError as err:
1190 raise exc.ObjectNotExecutableError(original_element) from err
1191
1192 return resolved
1193
1194 def _implicit_coercions(
1195 self,
1196 element: Any,
1197 resolved: Any,
1198 argname: Optional[str] = None,
1199 **kw: Any,
1200 ) -> Any:
1201 if resolved._is_lambda_element:
1202 return resolved
1203 else:
1204 return super()._implicit_coercions(
1205 element, resolved, argname=argname, **kw
1206 )
1207
1208
1209class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1210 __slots__ = ()
1211
1212 def _implicit_coercions(
1213 self,
1214 element: Any,
1215 resolved: Any,
1216 argname: Optional[str] = None,
1217 **kw: Any,
1218 ) -> Any:
1219 if resolved._is_text_clause:
1220 return resolved.columns()
1221 else:
1222 self._raise_for_expected(element, argname, resolved)
1223
1224
1225class HasCTEImpl(ReturnsRowsImpl):
1226 __slots__ = ()
1227
1228
1229class IsCTEImpl(RoleImpl):
1230 __slots__ = ()
1231
1232
1233class JoinTargetImpl(RoleImpl):
1234 __slots__ = ()
1235
1236 _skip_clauseelement_for_target_match = True
1237
1238 def _literal_coercion(self, element, *, argname=None, **kw):
1239 self._raise_for_expected(element, argname)
1240
1241 def _implicit_coercions(
1242 self,
1243 element: Any,
1244 resolved: Any,
1245 argname: Optional[str] = None,
1246 *,
1247 legacy: bool = False,
1248 **kw: Any,
1249 ) -> Any:
1250 if isinstance(element, roles.JoinTargetRole):
1251 # note that this codepath no longer occurs as of
1252 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1253 # were set to False.
1254 return element
1255 elif legacy and resolved._is_select_base:
1256 util.warn_deprecated(
1257 "Implicit coercion of SELECT and textual SELECT "
1258 "constructs into FROM clauses is deprecated; please call "
1259 ".subquery() on any Core select or ORM Query object in "
1260 "order to produce a subquery object.",
1261 version="1.4",
1262 )
1263 # TODO: doing _implicit_subquery here causes tests to fail,
1264 # how was this working before? probably that ORM
1265 # join logic treated it as a select and subquery would happen
1266 # in _ORMJoin->Join
1267 return resolved
1268 else:
1269 self._raise_for_expected(element, argname, resolved)
1270
1271
1272class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1273 __slots__ = ()
1274
1275 def _implicit_coercions(
1276 self,
1277 element: Any,
1278 resolved: Any,
1279 argname: Optional[str] = None,
1280 *,
1281 explicit_subquery: bool = False,
1282 **kw: Any,
1283 ) -> Any:
1284 if resolved._is_select_base and explicit_subquery:
1285 return resolved.subquery()
1286
1287 self._raise_for_expected(element, argname, resolved)
1288
1289 def _post_coercion(self, element, *, deannotate=False, **kw):
1290 if deannotate:
1291 return element._deannotate()
1292 else:
1293 return element
1294
1295
1296class AnonymizedFromClauseImpl(FromClauseImpl):
1297 __slots__ = ()
1298
1299 def _post_coercion(self, element, *, flat=False, name=None, **kw):
1300 assert name is None
1301
1302 return element._anonymous_fromclause(flat=flat)
1303
1304
1305class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1306 __slots__ = ()
1307
1308 def _post_coercion(self, element, **kw):
1309 if "dml_table" in element._annotations:
1310 return element._annotations["dml_table"]
1311 else:
1312 return element
1313
1314
1315class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1316 __slots__ = ()
1317
1318 def _implicit_coercions(
1319 self,
1320 element: Any,
1321 resolved: Any,
1322 argname: Optional[str] = None,
1323 **kw: Any,
1324 ) -> Any:
1325 if resolved._is_from_clause:
1326 if (
1327 isinstance(resolved, selectable.Alias)
1328 and resolved.element._is_select_base
1329 ):
1330 return resolved.element
1331 else:
1332 return resolved.select()
1333 else:
1334 self._raise_for_expected(element, argname, resolved)
1335
1336
1337class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1338 __slots__ = ()
1339
1340 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1341 if isinstance(element, roles.FromClauseRole):
1342 if element._is_subquery:
1343 advice = (
1344 "Use the plain select() object without "
1345 "calling .subquery() or .alias()."
1346 )
1347 else:
1348 advice = (
1349 "To SELECT from any FROM clause, use the .select() method."
1350 )
1351 else:
1352 advice = None
1353 return super()._raise_for_expected(
1354 element, argname=argname, resolved=resolved, advice=advice, **kw
1355 )
1356
1357
1358_impl_lookup = {}
1359
1360
1361for name in dir(roles):
1362 cls = getattr(roles, name)
1363 if name.endswith("Role"):
1364 name = name.replace("Role", "Impl")
1365 if name in globals():
1366 impl = globals()[name](cls)
1367 _impl_lookup[cls] = impl
1368
1369if not TYPE_CHECKING:
1370 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1371
1372 for py_type in (int, bool, str, float):
1373 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl