1# sql/coercions.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import roles
33from . import visitors
34from ._typing import is_from_clause
35from .base import ExecutableOption
36from .base import Options
37from .cache_key import HasCacheKey
38from .visitors import Visitable
39from .. import exc
40from .. import inspection
41from .. import util
42from ..util.typing import Literal
43
44if typing.TYPE_CHECKING:
45 # elements lambdas schema selectable are set by __init__
46 from . import elements
47 from . import lambdas
48 from . import schema
49 from . import selectable
50 from ._typing import _ColumnExpressionArgument
51 from ._typing import _ColumnsClauseArgument
52 from ._typing import _DDLColumnArgument
53 from ._typing import _DMLTableArgument
54 from ._typing import _FromClauseArgument
55 from .base import SyntaxExtension
56 from .dml import _DMLTableElement
57 from .elements import BindParameter
58 from .elements import ClauseElement
59 from .elements import ColumnClause
60 from .elements import ColumnElement
61 from .elements import NamedColumn
62 from .elements import SQLCoreOperations
63 from .elements import TextClause
64 from .schema import Column
65 from .selectable import _ColumnsClauseElement
66 from .selectable import _JoinTargetProtocol
67 from .selectable import FromClause
68 from .selectable import HasCTE
69 from .selectable import SelectBase
70 from .selectable import Subquery
71 from .visitors import _TraverseCallableType
72
73_SR = TypeVar("_SR", bound=roles.SQLRole)
74_F = TypeVar("_F", bound=Callable[..., Any])
75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
76_T = TypeVar("_T", bound=Any)
77
78
79def _is_literal(element: Any) -> bool:
80 """Return whether or not the element is a "literal" in the context
81 of a SQL expression construct.
82
83 """
84
85 return not isinstance(
86 element,
87 (Visitable, schema.SchemaEventTarget),
88 ) and not hasattr(element, "__clause_element__")
89
90
91def _deep_is_literal(element):
92 """Return whether or not the element is a "literal" in the context
93 of a SQL expression construct.
94
95 does a deeper more esoteric check than _is_literal. is used
96 for lambda elements that have to distinguish values that would
97 be bound vs. not without any context.
98
99 """
100
101 if isinstance(element, collections_abc.Sequence) and not isinstance(
102 element, str
103 ):
104 for elem in element:
105 if not _deep_is_literal(elem):
106 return False
107 else:
108 return True
109
110 return (
111 not isinstance(
112 element,
113 (
114 Visitable,
115 schema.SchemaEventTarget,
116 HasCacheKey,
117 Options,
118 util.langhelpers.symbol,
119 ),
120 )
121 and not hasattr(element, "__clause_element__")
122 and (
123 not isinstance(element, type)
124 or not issubclass(element, HasCacheKey)
125 )
126 )
127
128
129def _document_text_coercion(
130 paramname: str, meth_rst: str, param_rst: str
131) -> Callable[[_F], _F]:
132 return util.add_parameter_text(
133 paramname,
134 (
135 ".. warning:: "
136 "The %s argument to %s can be passed as a Python string argument, "
137 "which will be treated "
138 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
139 "UNTRUSTED INPUT TO THIS PARAMETER**."
140 )
141 % (param_rst, meth_rst),
142 )
143
144
145def _expression_collection_was_a_list(
146 attrname: str,
147 fnname: str,
148 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
149) -> Sequence[_T]:
150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
151 if isinstance(args[0], list):
152 raise exc.ArgumentError(
153 f'The "{attrname}" argument to {fnname}(), when '
154 "referring to a sequence "
155 "of items, is now passed as a series of positional "
156 "elements, rather than as a list. "
157 )
158 return cast("Sequence[_T]", args[0])
159
160 return cast("Sequence[_T]", args)
161
162
163@overload
164def expect(
165 role: Type[roles.TruncatedLabelRole],
166 element: Any,
167 **kw: Any,
168) -> str: ...
169
170
171@overload
172def expect(
173 role: Type[roles.DMLColumnRole],
174 element: Any,
175 *,
176 as_key: Literal[True] = ...,
177 **kw: Any,
178) -> str: ...
179
180
181@overload
182def expect(
183 role: Type[roles.LiteralValueRole],
184 element: Any,
185 **kw: Any,
186) -> BindParameter[Any]: ...
187
188
189@overload
190def expect(
191 role: Type[roles.DDLReferredColumnRole],
192 element: Any,
193 **kw: Any,
194) -> Union[Column[Any], str]: ...
195
196
197@overload
198def expect(
199 role: Type[roles.DDLConstraintColumnRole],
200 element: Any,
201 **kw: Any,
202) -> Union[Column[Any], str]: ...
203
204
205@overload
206def expect(
207 role: Type[roles.StatementOptionRole],
208 element: Any,
209 **kw: Any,
210) -> Union[ColumnElement[Any], TextClause]: ...
211
212
213@overload
214def expect(
215 role: Type[roles.SyntaxExtensionRole],
216 element: Any,
217 **kw: Any,
218) -> SyntaxExtension: ...
219
220
221@overload
222def expect(
223 role: Type[roles.LabeledColumnExprRole[Any]],
224 element: _ColumnExpressionArgument[_T],
225 **kw: Any,
226) -> NamedColumn[_T]: ...
227
228
229@overload
230def expect(
231 role: Union[
232 Type[roles.ExpressionElementRole[Any]],
233 Type[roles.LimitOffsetRole],
234 Type[roles.WhereHavingRole],
235 ],
236 element: _ColumnExpressionArgument[_T],
237 **kw: Any,
238) -> ColumnElement[_T]: ...
239
240
241@overload
242def expect(
243 role: Union[
244 Type[roles.ExpressionElementRole[Any]],
245 Type[roles.LimitOffsetRole],
246 Type[roles.WhereHavingRole],
247 Type[roles.OnClauseRole],
248 Type[roles.ColumnArgumentRole],
249 ],
250 element: Any,
251 **kw: Any,
252) -> ColumnElement[Any]: ...
253
254
255@overload
256def expect(
257 role: Type[roles.DMLTableRole],
258 element: _DMLTableArgument,
259 **kw: Any,
260) -> _DMLTableElement: ...
261
262
263@overload
264def expect(
265 role: Type[roles.HasCTERole],
266 element: HasCTE,
267 **kw: Any,
268) -> HasCTE: ...
269
270
271@overload
272def expect(
273 role: Type[roles.SelectStatementRole],
274 element: SelectBase,
275 **kw: Any,
276) -> SelectBase: ...
277
278
279@overload
280def expect(
281 role: Type[roles.FromClauseRole],
282 element: _FromClauseArgument,
283 **kw: Any,
284) -> FromClause: ...
285
286
287@overload
288def expect(
289 role: Type[roles.FromClauseRole],
290 element: SelectBase,
291 *,
292 explicit_subquery: Literal[True] = ...,
293 **kw: Any,
294) -> Subquery: ...
295
296
297@overload
298def expect(
299 role: Type[roles.ColumnsClauseRole],
300 element: _ColumnsClauseArgument[Any],
301 **kw: Any,
302) -> _ColumnsClauseElement: ...
303
304
305@overload
306def expect(
307 role: Type[roles.JoinTargetRole],
308 element: _JoinTargetProtocol,
309 **kw: Any,
310) -> _JoinTargetProtocol: ...
311
312
313# catchall for not-yet-implemented overloads
314@overload
315def expect(
316 role: Type[_SR],
317 element: Any,
318 **kw: Any,
319) -> Any: ...
320
321
322def expect(
323 role: Type[_SR],
324 element: Any,
325 *,
326 apply_propagate_attrs: Optional[ClauseElement] = None,
327 argname: Optional[str] = None,
328 post_inspect: bool = False,
329 disable_inspection: bool = False,
330 **kw: Any,
331) -> Any:
332 if (
333 role.allows_lambda
334 # note callable() will not invoke a __getattr__() method, whereas
335 # hasattr(obj, "__call__") will. by keeping the callable() check here
336 # we prevent most needless calls to hasattr() and therefore
337 # __getattr__(), which is present on ColumnElement.
338 and callable(element)
339 and hasattr(element, "__code__")
340 ):
341 return lambdas.LambdaElement(
342 element,
343 role,
344 lambdas.LambdaOptions(**kw),
345 apply_propagate_attrs=apply_propagate_attrs,
346 )
347
348 # major case is that we are given a ClauseElement already, skip more
349 # elaborate logic up front if possible
350 impl = _impl_lookup[role]
351
352 original_element = element
353
354 if not isinstance(
355 element,
356 (
357 elements.CompilerElement,
358 schema.SchemaItem,
359 schema.FetchedValue,
360 lambdas.PyWrapper,
361 ),
362 ):
363 resolved = None
364
365 if impl._resolve_literal_only:
366 resolved = impl._literal_coercion(element, **kw)
367 else:
368 original_element = element
369
370 is_clause_element = False
371
372 # this is a special performance optimization for ORM
373 # joins used by JoinTargetImpl that we don't go through the
374 # work of creating __clause_element__() when we only need the
375 # original QueryableAttribute, as the former will do clause
376 # adaption and all that which is just thrown away here.
377 if (
378 impl._skip_clauseelement_for_target_match
379 and isinstance(element, role)
380 and hasattr(element, "__clause_element__")
381 ):
382 is_clause_element = True
383 else:
384 while hasattr(element, "__clause_element__"):
385 is_clause_element = True
386
387 if not getattr(element, "is_clause_element", False):
388 element = element.__clause_element__()
389 else:
390 break
391
392 if not is_clause_element:
393 if impl._use_inspection and not disable_inspection:
394 insp = inspection.inspect(element, raiseerr=False)
395 if insp is not None:
396 if post_inspect:
397 insp._post_inspect
398 try:
399 resolved = insp.__clause_element__()
400 except AttributeError:
401 impl._raise_for_expected(original_element, argname)
402
403 if resolved is None:
404 resolved = impl._literal_coercion(
405 element, argname=argname, **kw
406 )
407 else:
408 resolved = element
409 elif isinstance(element, lambdas.PyWrapper):
410 resolved = element._sa__py_wrapper_literal(**kw)
411 else:
412 resolved = element
413
414 if apply_propagate_attrs is not None:
415 if typing.TYPE_CHECKING:
416 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
417
418 if not apply_propagate_attrs._propagate_attrs and getattr(
419 resolved, "_propagate_attrs", None
420 ):
421 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
422
423 if impl._role_class in resolved.__class__.__mro__:
424 if impl._post_coercion:
425 resolved = impl._post_coercion(
426 resolved,
427 argname=argname,
428 original_element=original_element,
429 **kw,
430 )
431 return resolved
432 else:
433 return impl._implicit_coercions(
434 original_element, resolved, argname=argname, **kw
435 )
436
437
438def expect_as_key(
439 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
440) -> str:
441 kw.pop("as_key", None)
442 return expect(role, element, as_key=True, **kw)
443
444
445def expect_col_expression_collection(
446 role: Type[roles.DDLConstraintColumnRole],
447 expressions: Iterable[_DDLColumnArgument],
448) -> Iterator[
449 Tuple[
450 Union[str, Column[Any]],
451 Optional[ColumnClause[Any]],
452 Optional[str],
453 Optional[Union[Column[Any], str]],
454 ]
455]:
456 for expr in expressions:
457 strname = None
458 column = None
459
460 resolved: Union[Column[Any], str] = expect(role, expr)
461 if isinstance(resolved, str):
462 assert isinstance(expr, str)
463 strname = resolved = expr
464 else:
465 cols: List[Column[Any]] = []
466 col_append: _TraverseCallableType[Column[Any]] = cols.append
467 visitors.traverse(resolved, {}, {"column": col_append})
468 if cols:
469 column = cols[0]
470 add_element = column if column is not None else strname
471
472 yield resolved, column, strname, add_element
473
474
475class RoleImpl:
476 __slots__ = ("_role_class", "name", "_use_inspection")
477
478 def _literal_coercion(self, element, **kw):
479 raise NotImplementedError()
480
481 _post_coercion: Any = None
482 _resolve_literal_only = False
483 _skip_clauseelement_for_target_match = False
484
485 def __init__(self, role_class):
486 self._role_class = role_class
487 self.name = role_class._role_name
488 self._use_inspection = issubclass(role_class, roles.UsesInspection)
489
490 def _implicit_coercions(
491 self,
492 element: Any,
493 resolved: Any,
494 argname: Optional[str] = None,
495 **kw: Any,
496 ) -> Any:
497 self._raise_for_expected(element, argname, resolved)
498
499 def _raise_for_expected(
500 self,
501 element: Any,
502 argname: Optional[str] = None,
503 resolved: Optional[Any] = None,
504 *,
505 advice: Optional[str] = None,
506 code: Optional[str] = None,
507 err: Optional[Exception] = None,
508 **kw: Any,
509 ) -> NoReturn:
510 if resolved is not None and resolved is not element:
511 got = "%r object resolved from %r object" % (resolved, element)
512 else:
513 got = repr(element)
514
515 if argname:
516 msg = "%s expected for argument %r; got %s." % (
517 self.name,
518 argname,
519 got,
520 )
521 else:
522 msg = "%s expected, got %s." % (self.name, got)
523
524 if advice:
525 msg += " " + advice
526
527 raise exc.ArgumentError(msg, code=code) from err
528
529
530class _Deannotate:
531 __slots__ = ()
532
533 def _post_coercion(self, resolved, **kw):
534 from .util import _deep_deannotate
535
536 return _deep_deannotate(resolved)
537
538
539class _StringOnly:
540 __slots__ = ()
541
542 _resolve_literal_only = True
543
544
545class _ReturnsStringKey(RoleImpl):
546 __slots__ = ()
547
548 def _implicit_coercions(self, element, resolved, argname=None, **kw):
549 if isinstance(element, str):
550 return element
551 else:
552 self._raise_for_expected(element, argname, resolved)
553
554 def _literal_coercion(self, element, **kw):
555 return element
556
557
558class _ColumnCoercions(RoleImpl):
559 __slots__ = ()
560
561 def _warn_for_scalar_subquery_coercion(self):
562 util.warn(
563 "implicitly coercing SELECT object to scalar subquery; "
564 "please use the .scalar_subquery() method to produce a scalar "
565 "subquery.",
566 )
567
568 def _implicit_coercions(self, element, resolved, argname=None, **kw):
569 original_element = element
570 if not getattr(resolved, "is_clause_element", False):
571 self._raise_for_expected(original_element, argname, resolved)
572 elif resolved._is_select_base:
573 self._warn_for_scalar_subquery_coercion()
574 return resolved.scalar_subquery()
575 elif resolved._is_from_clause and isinstance(
576 resolved, selectable.Subquery
577 ):
578 self._warn_for_scalar_subquery_coercion()
579 return resolved.element.scalar_subquery()
580 elif self._role_class.allows_lambda and resolved._is_lambda_element:
581 return resolved
582 else:
583 self._raise_for_expected(original_element, argname, resolved)
584
585
586def _no_text_coercion(
587 element: Any,
588 argname: Optional[str] = None,
589 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
590 extra: Optional[str] = None,
591 err: Optional[Exception] = None,
592) -> NoReturn:
593 raise exc_cls(
594 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
595 "explicitly declared as text(%(expr)r)"
596 % {
597 "expr": util.ellipses_string(element),
598 "argname": "for argument %s" % (argname,) if argname else "",
599 "extra": "%s " % extra if extra else "",
600 }
601 ) from err
602
603
604class _NoTextCoercion(RoleImpl):
605 __slots__ = ()
606
607 def _literal_coercion(self, element, *, argname=None, **kw):
608 if isinstance(element, str) and issubclass(
609 elements.TextClause, self._role_class
610 ):
611 _no_text_coercion(element, argname)
612 else:
613 self._raise_for_expected(element, argname)
614
615
616class _CoerceLiterals(RoleImpl):
617 __slots__ = ()
618 _coerce_consts = False
619 _coerce_star = False
620 _coerce_numerics = False
621
622 def _text_coercion(self, element, argname=None):
623 return _no_text_coercion(element, argname)
624
625 def _literal_coercion(self, element, *, argname=None, **kw):
626 if isinstance(element, str):
627 if self._coerce_star and element == "*":
628 return elements.ColumnClause("*", is_literal=True)
629 else:
630 return self._text_coercion(element, argname, **kw)
631
632 if self._coerce_consts:
633 if element is None:
634 return elements.Null()
635 elif element is False:
636 return elements.False_()
637 elif element is True:
638 return elements.True_()
639
640 if self._coerce_numerics and isinstance(element, (numbers.Number)):
641 return elements.ColumnClause(str(element), is_literal=True)
642
643 self._raise_for_expected(element, argname)
644
645
646class LiteralValueImpl(RoleImpl):
647 _resolve_literal_only = True
648
649 def _implicit_coercions(
650 self,
651 element,
652 resolved,
653 argname=None,
654 *,
655 type_=None,
656 literal_execute=False,
657 **kw,
658 ):
659 if not _is_literal(resolved):
660 self._raise_for_expected(
661 element, resolved=resolved, argname=argname, **kw
662 )
663
664 return elements.BindParameter(
665 None,
666 element,
667 type_=type_,
668 unique=True,
669 literal_execute=literal_execute,
670 )
671
672 def _literal_coercion(self, element, **kw):
673 return element
674
675
676class _SelectIsNotFrom(RoleImpl):
677 __slots__ = ()
678
679 def _raise_for_expected(
680 self,
681 element: Any,
682 argname: Optional[str] = None,
683 resolved: Optional[Any] = None,
684 *,
685 advice: Optional[str] = None,
686 code: Optional[str] = None,
687 err: Optional[Exception] = None,
688 **kw: Any,
689 ) -> NoReturn:
690 if (
691 not advice
692 and isinstance(element, roles.SelectStatementRole)
693 or isinstance(resolved, roles.SelectStatementRole)
694 ):
695 advice = (
696 "To create a "
697 "FROM clause from a %s object, use the .subquery() method."
698 % (resolved.__class__ if resolved is not None else element,)
699 )
700 code = "89ve"
701 else:
702 code = None
703
704 super()._raise_for_expected(
705 element,
706 argname=argname,
707 resolved=resolved,
708 advice=advice,
709 code=code,
710 err=err,
711 **kw,
712 )
713 # never reached
714 assert False
715
716
717class HasCacheKeyImpl(RoleImpl):
718 __slots__ = ()
719
720 def _implicit_coercions(
721 self,
722 element: Any,
723 resolved: Any,
724 argname: Optional[str] = None,
725 **kw: Any,
726 ) -> Any:
727 if isinstance(element, HasCacheKey):
728 return element
729 else:
730 self._raise_for_expected(element, argname, resolved)
731
732 def _literal_coercion(self, element, **kw):
733 return element
734
735
736class ExecutableOptionImpl(RoleImpl):
737 __slots__ = ()
738
739 def _implicit_coercions(
740 self,
741 element: Any,
742 resolved: Any,
743 argname: Optional[str] = None,
744 **kw: Any,
745 ) -> Any:
746 if isinstance(element, ExecutableOption):
747 return element
748 else:
749 self._raise_for_expected(element, argname, resolved)
750
751 def _literal_coercion(self, element, **kw):
752 return element
753
754
755class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
756 __slots__ = ()
757
758 def _literal_coercion(
759 self, element, *, name=None, type_=None, is_crud=False, **kw
760 ):
761 if (
762 element is None
763 and not is_crud
764 and (type_ is None or not type_.should_evaluate_none)
765 ):
766 # TODO: there's no test coverage now for the
767 # "should_evaluate_none" part of this, as outside of "crud" this
768 # codepath is not normally used except in some special cases
769 return elements.Null()
770 else:
771 try:
772 return elements.BindParameter(
773 name, element, type_, unique=True, _is_crud=is_crud
774 )
775 except exc.ArgumentError as err:
776 self._raise_for_expected(element, err=err)
777
778 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
779 # select uses implicit coercion with warning instead of raising
780 if isinstance(element, selectable.Values):
781 advice = (
782 "To create a column expression from a VALUES clause, "
783 "use the .scalar_values() method."
784 )
785 elif isinstance(element, roles.AnonymizedFromClauseRole):
786 advice = (
787 "To create a column expression from a FROM clause row "
788 "as a whole, use the .table_valued() method."
789 )
790 else:
791 advice = None
792
793 return super()._raise_for_expected(
794 element, argname=argname, resolved=resolved, advice=advice, **kw
795 )
796
797
798class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
799 __slots__ = ()
800
801 def _literal_coercion( # type: ignore[override]
802 self,
803 element,
804 *,
805 expr,
806 operator,
807 bindparam_type=None,
808 argname=None,
809 **kw,
810 ):
811 try:
812 return expr._bind_param(operator, element, type_=bindparam_type)
813 except exc.ArgumentError as err:
814 self._raise_for_expected(element, err=err)
815
816 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
817 if resolved.type._isnull and not expr.type._isnull:
818 resolved = resolved._with_binary_element_type(
819 bindparam_type if bindparam_type is not None else expr.type
820 )
821 return resolved
822
823
824class InElementImpl(RoleImpl):
825 __slots__ = ()
826
827 def _implicit_coercions(
828 self,
829 element: Any,
830 resolved: Any,
831 argname: Optional[str] = None,
832 **kw: Any,
833 ) -> Any:
834 if resolved._is_from_clause:
835 if (
836 isinstance(resolved, selectable.Alias)
837 and resolved.element._is_select_base
838 ):
839 self._warn_for_implicit_coercion(resolved)
840 return self._post_coercion(resolved.element, **kw)
841 else:
842 self._warn_for_implicit_coercion(resolved)
843 return self._post_coercion(resolved.select(), **kw)
844 else:
845 self._raise_for_expected(element, argname, resolved)
846
847 def _warn_for_implicit_coercion(self, elem):
848 util.warn(
849 "Coercing %s object into a select() for use in IN(); "
850 "please pass a select() construct explicitly"
851 % (elem.__class__.__name__)
852 )
853
854 @util.preload_module("sqlalchemy.sql.elements")
855 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501
856 if util.is_non_string_iterable(element):
857 non_literal_expressions: Dict[
858 Optional[_ColumnExpressionArgument[Any]],
859 _ColumnExpressionArgument[Any],
860 ] = {}
861 element = list(element)
862 for o in element:
863 if not _is_literal(o):
864 if not isinstance(
865 o, util.preloaded.sql_elements.ColumnElement
866 ) and not hasattr(o, "__clause_element__"):
867 self._raise_for_expected(element, **kw)
868
869 else:
870 non_literal_expressions[o] = o
871
872 if non_literal_expressions:
873 return elements.ClauseList(
874 *[
875 (
876 non_literal_expressions[o]
877 if o in non_literal_expressions
878 else expr._bind_param(operator, o)
879 )
880 for o in element
881 ]
882 )
883 else:
884 return expr._bind_param(operator, element, expanding=True)
885
886 else:
887 self._raise_for_expected(element, **kw)
888
889 def _post_coercion(self, element, *, expr, operator, **kw):
890 if element._is_select_base:
891 # for IN, we are doing scalar_subquery() coercion without
892 # a warning
893 return element.scalar_subquery()
894 elif isinstance(element, elements.ClauseList):
895 assert not len(element.clauses) == 0
896 return element.self_group(against=operator)
897
898 elif isinstance(element, elements.BindParameter):
899 element = element._clone(maintain_key=True)
900 element.expanding = True
901 element.expand_op = operator
902
903 return element
904 elif isinstance(element, selectable.Values):
905 return element.scalar_values()
906 else:
907 return element
908
909
910class OnClauseImpl(_ColumnCoercions, RoleImpl):
911 __slots__ = ()
912
913 _coerce_consts = True
914
915 def _literal_coercion(self, element, **kw):
916 self._raise_for_expected(element)
917
918 def _post_coercion(self, resolved, *, original_element=None, **kw):
919 # this is a hack right now as we want to use coercion on an
920 # ORM InstrumentedAttribute, but we want to return the object
921 # itself if it is one, not its clause element.
922 # ORM context _join and _legacy_join() would need to be improved
923 # to look for annotations in a clause element form.
924 if isinstance(original_element, roles.JoinTargetRole):
925 return original_element
926 return resolved
927
928
929class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
930 __slots__ = ()
931
932 _coerce_consts = True
933
934 def _text_coercion(self, element, argname=None):
935 return _no_text_coercion(element, argname)
936
937
938class SyntaxExtensionImpl(RoleImpl):
939 __slots__ = ()
940
941
942class StatementOptionImpl(_CoerceLiterals, RoleImpl):
943 __slots__ = ()
944
945 _coerce_consts = True
946
947 def _text_coercion(self, element, argname=None):
948 return elements.TextClause(element)
949
950
951class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
952 __slots__ = ()
953
954
955class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
956 __slots__ = ()
957
958
959class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
960 __slots__ = ()
961
962 def _text_coercion(self, element, argname=None):
963 return elements.ColumnClause(element)
964
965
966class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
967 __slots__ = ()
968
969 _coerce_consts = True
970
971 def _text_coercion(self, element, argname=None):
972 return elements._textual_label_reference(element)
973
974
975class OrderByImpl(ByOfImpl, RoleImpl):
976 __slots__ = ()
977
978 def _post_coercion(self, resolved, **kw):
979 if (
980 isinstance(resolved, self._role_class)
981 and resolved._order_by_label_element is not None
982 ):
983 return elements._label_reference(resolved)
984 else:
985 return resolved
986
987
988class GroupByImpl(ByOfImpl, RoleImpl):
989 __slots__ = ()
990
991 def _implicit_coercions(
992 self,
993 element: Any,
994 resolved: Any,
995 argname: Optional[str] = None,
996 **kw: Any,
997 ) -> Any:
998 if is_from_clause(resolved):
999 return elements.ClauseList(*resolved.c)
1000 else:
1001 return resolved
1002
1003
1004class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
1005 __slots__ = ()
1006
1007 def _post_coercion(self, element, *, as_key=False, **kw):
1008 if as_key:
1009 return element.key
1010 else:
1011 return element
1012
1013
1014class ConstExprImpl(RoleImpl):
1015 __slots__ = ()
1016
1017 def _literal_coercion(self, element, *, argname=None, **kw):
1018 if element is None:
1019 return elements.Null()
1020 elif element is False:
1021 return elements.False_()
1022 elif element is True:
1023 return elements.True_()
1024 else:
1025 self._raise_for_expected(element, argname)
1026
1027
1028class TruncatedLabelImpl(_StringOnly, RoleImpl):
1029 __slots__ = ()
1030
1031 def _implicit_coercions(
1032 self,
1033 element: Any,
1034 resolved: Any,
1035 argname: Optional[str] = None,
1036 **kw: Any,
1037 ) -> Any:
1038 if isinstance(element, str):
1039 return resolved
1040 else:
1041 self._raise_for_expected(element, argname, resolved)
1042
1043 def _literal_coercion(self, element, **kw):
1044 """coerce the given value to :class:`._truncated_label`.
1045
1046 Existing :class:`._truncated_label` and
1047 :class:`._anonymous_label` objects are passed
1048 unchanged.
1049 """
1050
1051 if isinstance(element, elements._truncated_label):
1052 return element
1053 else:
1054 return elements._truncated_label(element)
1055
1056
1057class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1058 __slots__ = ()
1059
1060 _coerce_consts = True
1061
1062 def _text_coercion(self, element, argname=None):
1063 # see #5754 for why we can't easily deprecate this coercion.
1064 # essentially expressions like postgresql_where would have to be
1065 # text() as they come back from reflection and we don't want to
1066 # have text() elements wired into the inspection dictionaries.
1067 return elements.TextClause(element)
1068
1069
1070class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1071 __slots__ = ()
1072
1073
1074class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1075 __slots__ = ()
1076
1077
1078class LimitOffsetImpl(RoleImpl):
1079 __slots__ = ()
1080
1081 def _implicit_coercions(
1082 self,
1083 element: Any,
1084 resolved: Any,
1085 argname: Optional[str] = None,
1086 **kw: Any,
1087 ) -> Any:
1088 if resolved is None:
1089 return None
1090 else:
1091 self._raise_for_expected(element, argname, resolved)
1092
1093 def _literal_coercion( # type: ignore[override]
1094 self, element, *, name, type_, **kw
1095 ):
1096 if element is None:
1097 return None
1098 else:
1099 value = util.asint(element)
1100 return selectable._OffsetLimitParam(
1101 name, value, type_=type_, unique=True
1102 )
1103
1104
1105class LabeledColumnExprImpl(ExpressionElementImpl):
1106 __slots__ = ()
1107
1108 def _implicit_coercions(
1109 self,
1110 element: Any,
1111 resolved: Any,
1112 argname: Optional[str] = None,
1113 **kw: Any,
1114 ) -> Any:
1115 if isinstance(resolved, roles.ExpressionElementRole):
1116 return resolved.label(None)
1117 else:
1118 new = super()._implicit_coercions(
1119 element, resolved, argname=argname, **kw
1120 )
1121 if isinstance(new, roles.ExpressionElementRole):
1122 return new.label(None)
1123 else:
1124 self._raise_for_expected(element, argname, resolved)
1125
1126
1127class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1128 __slots__ = ()
1129
1130 _coerce_consts = True
1131 _coerce_numerics = True
1132 _coerce_star = True
1133
1134 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1135
1136 def _raise_for_expected(
1137 self, element, argname=None, resolved=None, *, advice=None, **kw
1138 ):
1139 if not advice and isinstance(element, list):
1140 advice = (
1141 f"Did you mean to say select("
1142 f"{', '.join(repr(e) for e in element)})?"
1143 )
1144
1145 return super()._raise_for_expected(
1146 element, argname=argname, resolved=resolved, advice=advice, **kw
1147 )
1148
1149 def _text_coercion(self, element, argname=None):
1150 element = str(element)
1151
1152 guess_is_literal = not self._guess_straight_column.match(element)
1153 raise exc.ArgumentError(
1154 "Textual column expression %(column)r %(argname)sshould be "
1155 "explicitly declared with text(%(column)r), "
1156 "or use %(literal_column)s(%(column)r) "
1157 "for more specificity"
1158 % {
1159 "column": util.ellipses_string(element),
1160 "argname": "for argument %s" % (argname,) if argname else "",
1161 "literal_column": (
1162 "literal_column" if guess_is_literal else "column"
1163 ),
1164 }
1165 )
1166
1167
1168class ReturnsRowsImpl(RoleImpl):
1169 __slots__ = ()
1170
1171
1172class StatementImpl(_CoerceLiterals, RoleImpl):
1173 __slots__ = ()
1174
1175 def _post_coercion(
1176 self, resolved, *, original_element, argname=None, **kw
1177 ):
1178 if resolved is not original_element and not isinstance(
1179 original_element, str
1180 ):
1181 # use same method as Connection uses
1182 try:
1183 original_element._execute_on_connection
1184 except AttributeError as err:
1185 raise exc.ObjectNotExecutableError(original_element) from err
1186
1187 return resolved
1188
1189 def _implicit_coercions(
1190 self,
1191 element: Any,
1192 resolved: Any,
1193 argname: Optional[str] = None,
1194 **kw: Any,
1195 ) -> Any:
1196 if resolved._is_lambda_element:
1197 return resolved
1198 else:
1199 return super()._implicit_coercions(
1200 element, resolved, argname=argname, **kw
1201 )
1202
1203
1204class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1205 __slots__ = ()
1206
1207 def _implicit_coercions(
1208 self,
1209 element: Any,
1210 resolved: Any,
1211 argname: Optional[str] = None,
1212 **kw: Any,
1213 ) -> Any:
1214 if resolved._is_text_clause:
1215 return resolved.columns()
1216 else:
1217 self._raise_for_expected(element, argname, resolved)
1218
1219
1220class HasCTEImpl(ReturnsRowsImpl):
1221 __slots__ = ()
1222
1223
1224class IsCTEImpl(RoleImpl):
1225 __slots__ = ()
1226
1227
1228class JoinTargetImpl(RoleImpl):
1229 __slots__ = ()
1230
1231 _skip_clauseelement_for_target_match = True
1232
1233 def _literal_coercion(self, element, *, argname=None, **kw):
1234 self._raise_for_expected(element, argname)
1235
1236 def _implicit_coercions(
1237 self,
1238 element: Any,
1239 resolved: Any,
1240 argname: Optional[str] = None,
1241 *,
1242 legacy: bool = False,
1243 **kw: Any,
1244 ) -> Any:
1245 if isinstance(element, roles.JoinTargetRole):
1246 # note that this codepath no longer occurs as of
1247 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1248 # were set to False.
1249 return element
1250 elif legacy and resolved._is_select_base:
1251 util.warn_deprecated(
1252 "Implicit coercion of SELECT and textual SELECT "
1253 "constructs into FROM clauses is deprecated; please call "
1254 ".subquery() on any Core select or ORM Query object in "
1255 "order to produce a subquery object.",
1256 version="1.4",
1257 )
1258 # TODO: doing _implicit_subquery here causes tests to fail,
1259 # how was this working before? probably that ORM
1260 # join logic treated it as a select and subquery would happen
1261 # in _ORMJoin->Join
1262 return resolved
1263 else:
1264 self._raise_for_expected(element, argname, resolved)
1265
1266
1267class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1268 __slots__ = ()
1269
1270 def _implicit_coercions(
1271 self,
1272 element: Any,
1273 resolved: Any,
1274 argname: Optional[str] = None,
1275 *,
1276 explicit_subquery: bool = False,
1277 **kw: Any,
1278 ) -> Any:
1279 if resolved._is_select_base and explicit_subquery:
1280 return resolved.subquery()
1281
1282 self._raise_for_expected(element, argname, resolved)
1283
1284 def _post_coercion(self, element, *, deannotate=False, **kw):
1285 if deannotate:
1286 return element._deannotate()
1287 else:
1288 return element
1289
1290
1291class AnonymizedFromClauseImpl(FromClauseImpl):
1292 __slots__ = ()
1293
1294 def _post_coercion(self, element, *, flat=False, name=None, **kw):
1295 assert name is None
1296
1297 return element._anonymous_fromclause(flat=flat)
1298
1299
1300class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1301 __slots__ = ()
1302
1303 def _post_coercion(self, element, **kw):
1304 if "dml_table" in element._annotations:
1305 return element._annotations["dml_table"]
1306 else:
1307 return element
1308
1309
1310class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1311 __slots__ = ()
1312
1313 def _implicit_coercions(
1314 self,
1315 element: Any,
1316 resolved: Any,
1317 argname: Optional[str] = None,
1318 **kw: Any,
1319 ) -> Any:
1320 if resolved._is_from_clause:
1321 if (
1322 isinstance(resolved, selectable.Alias)
1323 and resolved.element._is_select_base
1324 ):
1325 return resolved.element
1326 else:
1327 return resolved.select()
1328 else:
1329 self._raise_for_expected(element, argname, resolved)
1330
1331
1332class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1333 __slots__ = ()
1334
1335 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1336 if isinstance(element, roles.FromClauseRole):
1337 if element._is_subquery:
1338 advice = (
1339 "Use the plain select() object without "
1340 "calling .subquery() or .alias()."
1341 )
1342 else:
1343 advice = (
1344 "To SELECT from any FROM clause, use the .select() method."
1345 )
1346 else:
1347 advice = None
1348 return super()._raise_for_expected(
1349 element, argname=argname, resolved=resolved, advice=advice, **kw
1350 )
1351
1352
1353_impl_lookup = {}
1354
1355
1356for name in dir(roles):
1357 cls = getattr(roles, name)
1358 if name.endswith("Role"):
1359 name = name.replace("Role", "Impl")
1360 if name in globals():
1361 impl = globals()[name](cls)
1362 _impl_lookup[cls] = impl
1363
1364if not TYPE_CHECKING:
1365 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1366
1367 for py_type in (int, bool, str, float):
1368 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl