1# sql/coercions.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import roles
33from . import visitors
34from ._typing import is_from_clause
35from .base import ExecutableOption
36from .base import Options
37from .cache_key import HasCacheKey
38from .visitors import Visitable
39from .. import exc
40from .. import inspection
41from .. import util
42from ..util.typing import Literal
43
44if typing.TYPE_CHECKING:
45 # elements lambdas schema selectable are set by __init__
46 from . import elements
47 from . import lambdas
48 from . import schema
49 from . import selectable
50 from ._typing import _ColumnExpressionArgument
51 from ._typing import _ColumnsClauseArgument
52 from ._typing import _DDLColumnArgument
53 from ._typing import _DMLTableArgument
54 from ._typing import _FromClauseArgument
55 from .dml import _DMLTableElement
56 from .elements import BindParameter
57 from .elements import ClauseElement
58 from .elements import ColumnClause
59 from .elements import ColumnElement
60 from .elements import NamedColumn
61 from .elements import SQLCoreOperations
62 from .elements import TextClause
63 from .schema import Column
64 from .selectable import _ColumnsClauseElement
65 from .selectable import _JoinTargetProtocol
66 from .selectable import FromClause
67 from .selectable import HasCTE
68 from .selectable import SelectBase
69 from .selectable import Subquery
70 from .visitors import _TraverseCallableType
71
72_SR = TypeVar("_SR", bound=roles.SQLRole)
73_F = TypeVar("_F", bound=Callable[..., Any])
74_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
75_T = TypeVar("_T", bound=Any)
76
77
78def _is_literal(element: Any) -> bool:
79 """Return whether or not the element is a "literal" in the context
80 of a SQL expression construct.
81
82 """
83
84 return not isinstance(
85 element,
86 (Visitable, schema.SchemaEventTarget),
87 ) and not hasattr(element, "__clause_element__")
88
89
90def _deep_is_literal(element):
91 """Return whether or not the element is a "literal" in the context
92 of a SQL expression construct.
93
94 does a deeper more esoteric check than _is_literal. is used
95 for lambda elements that have to distinguish values that would
96 be bound vs. not without any context.
97
98 """
99
100 if isinstance(element, collections_abc.Sequence) and not isinstance(
101 element, str
102 ):
103 for elem in element:
104 if not _deep_is_literal(elem):
105 return False
106 else:
107 return True
108
109 return (
110 not isinstance(
111 element,
112 (
113 Visitable,
114 schema.SchemaEventTarget,
115 HasCacheKey,
116 Options,
117 util.langhelpers.symbol,
118 ),
119 )
120 and not hasattr(element, "__clause_element__")
121 and (
122 not isinstance(element, type)
123 or not issubclass(element, HasCacheKey)
124 )
125 )
126
127
128def _document_text_coercion(
129 paramname: str, meth_rst: str, param_rst: str
130) -> Callable[[_F], _F]:
131 return util.add_parameter_text(
132 paramname,
133 (
134 ".. warning:: "
135 "The %s argument to %s can be passed as a Python string argument, "
136 "which will be treated "
137 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
138 "UNTRUSTED INPUT TO THIS PARAMETER**."
139 )
140 % (param_rst, meth_rst),
141 )
142
143
144def _expression_collection_was_a_list(
145 attrname: str,
146 fnname: str,
147 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
148) -> Sequence[_T]:
149 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
150 if isinstance(args[0], list):
151 raise exc.ArgumentError(
152 f'The "{attrname}" argument to {fnname}(), when '
153 "referring to a sequence "
154 "of items, is now passed as a series of positional "
155 "elements, rather than as a list. "
156 )
157 return cast("Sequence[_T]", args[0])
158
159 return cast("Sequence[_T]", args)
160
161
162@overload
163def expect(
164 role: Type[roles.TruncatedLabelRole],
165 element: Any,
166 **kw: Any,
167) -> str: ...
168
169
170@overload
171def expect(
172 role: Type[roles.DMLColumnRole],
173 element: Any,
174 *,
175 as_key: Literal[True] = ...,
176 **kw: Any,
177) -> str: ...
178
179
180@overload
181def expect(
182 role: Type[roles.LiteralValueRole],
183 element: Any,
184 **kw: Any,
185) -> BindParameter[Any]: ...
186
187
188@overload
189def expect(
190 role: Type[roles.DDLReferredColumnRole],
191 element: Any,
192 **kw: Any,
193) -> Union[Column[Any], str]: ...
194
195
196@overload
197def expect(
198 role: Type[roles.DDLConstraintColumnRole],
199 element: Any,
200 **kw: Any,
201) -> Union[Column[Any], str]: ...
202
203
204@overload
205def expect(
206 role: Type[roles.StatementOptionRole],
207 element: Any,
208 **kw: Any,
209) -> Union[ColumnElement[Any], TextClause]: ...
210
211
212@overload
213def expect(
214 role: Type[roles.LabeledColumnExprRole[Any]],
215 element: _ColumnExpressionArgument[_T],
216 **kw: Any,
217) -> NamedColumn[_T]: ...
218
219
220@overload
221def expect(
222 role: Union[
223 Type[roles.ExpressionElementRole[Any]],
224 Type[roles.LimitOffsetRole],
225 Type[roles.WhereHavingRole],
226 ],
227 element: _ColumnExpressionArgument[_T],
228 **kw: Any,
229) -> ColumnElement[_T]: ...
230
231
232@overload
233def expect(
234 role: Union[
235 Type[roles.ExpressionElementRole[Any]],
236 Type[roles.LimitOffsetRole],
237 Type[roles.WhereHavingRole],
238 Type[roles.OnClauseRole],
239 Type[roles.ColumnArgumentRole],
240 ],
241 element: Any,
242 **kw: Any,
243) -> ColumnElement[Any]: ...
244
245
246@overload
247def expect(
248 role: Type[roles.DMLTableRole],
249 element: _DMLTableArgument,
250 **kw: Any,
251) -> _DMLTableElement: ...
252
253
254@overload
255def expect(
256 role: Type[roles.HasCTERole],
257 element: HasCTE,
258 **kw: Any,
259) -> HasCTE: ...
260
261
262@overload
263def expect(
264 role: Type[roles.SelectStatementRole],
265 element: SelectBase,
266 **kw: Any,
267) -> SelectBase: ...
268
269
270@overload
271def expect(
272 role: Type[roles.FromClauseRole],
273 element: _FromClauseArgument,
274 **kw: Any,
275) -> FromClause: ...
276
277
278@overload
279def expect(
280 role: Type[roles.FromClauseRole],
281 element: SelectBase,
282 *,
283 explicit_subquery: Literal[True] = ...,
284 **kw: Any,
285) -> Subquery: ...
286
287
288@overload
289def expect(
290 role: Type[roles.ColumnsClauseRole],
291 element: _ColumnsClauseArgument[Any],
292 **kw: Any,
293) -> _ColumnsClauseElement: ...
294
295
296@overload
297def expect(
298 role: Type[roles.JoinTargetRole],
299 element: _JoinTargetProtocol,
300 **kw: Any,
301) -> _JoinTargetProtocol: ...
302
303
304# catchall for not-yet-implemented overloads
305@overload
306def expect(
307 role: Type[_SR],
308 element: Any,
309 **kw: Any,
310) -> Any: ...
311
312
313def expect(
314 role: Type[_SR],
315 element: Any,
316 *,
317 apply_propagate_attrs: Optional[ClauseElement] = None,
318 argname: Optional[str] = None,
319 post_inspect: bool = False,
320 disable_inspection: bool = False,
321 **kw: Any,
322) -> Any:
323 if (
324 role.allows_lambda
325 # note callable() will not invoke a __getattr__() method, whereas
326 # hasattr(obj, "__call__") will. by keeping the callable() check here
327 # we prevent most needless calls to hasattr() and therefore
328 # __getattr__(), which is present on ColumnElement.
329 and callable(element)
330 and hasattr(element, "__code__")
331 ):
332 return lambdas.LambdaElement(
333 element,
334 role,
335 lambdas.LambdaOptions(**kw),
336 apply_propagate_attrs=apply_propagate_attrs,
337 )
338
339 # major case is that we are given a ClauseElement already, skip more
340 # elaborate logic up front if possible
341 impl = _impl_lookup[role]
342
343 original_element = element
344
345 if not isinstance(
346 element,
347 (
348 elements.CompilerElement,
349 schema.SchemaItem,
350 schema.FetchedValue,
351 lambdas.PyWrapper,
352 ),
353 ):
354 resolved = None
355
356 if impl._resolve_literal_only:
357 resolved = impl._literal_coercion(element, **kw)
358 else:
359 original_element = element
360
361 is_clause_element = False
362
363 # this is a special performance optimization for ORM
364 # joins used by JoinTargetImpl that we don't go through the
365 # work of creating __clause_element__() when we only need the
366 # original QueryableAttribute, as the former will do clause
367 # adaption and all that which is just thrown away here.
368 if (
369 impl._skip_clauseelement_for_target_match
370 and isinstance(element, role)
371 and hasattr(element, "__clause_element__")
372 ):
373 is_clause_element = True
374 else:
375 while hasattr(element, "__clause_element__"):
376 is_clause_element = True
377
378 if not getattr(element, "is_clause_element", False):
379 element = element.__clause_element__()
380 else:
381 break
382
383 if not is_clause_element:
384 if impl._use_inspection and not disable_inspection:
385 insp = inspection.inspect(element, raiseerr=False)
386 if insp is not None:
387 if post_inspect:
388 insp._post_inspect
389 try:
390 resolved = insp.__clause_element__()
391 except AttributeError:
392 impl._raise_for_expected(original_element, argname)
393
394 if resolved is None:
395 resolved = impl._literal_coercion(
396 element, argname=argname, **kw
397 )
398 else:
399 resolved = element
400 elif isinstance(element, lambdas.PyWrapper):
401 resolved = element._sa__py_wrapper_literal(**kw)
402 else:
403 resolved = element
404
405 if apply_propagate_attrs is not None:
406 if typing.TYPE_CHECKING:
407 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
408
409 if not apply_propagate_attrs._propagate_attrs and getattr(
410 resolved, "_propagate_attrs", None
411 ):
412 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
413
414 if impl._role_class in resolved.__class__.__mro__:
415 if impl._post_coercion:
416 resolved = impl._post_coercion(
417 resolved,
418 argname=argname,
419 original_element=original_element,
420 **kw,
421 )
422 return resolved
423 else:
424 return impl._implicit_coercions(
425 original_element, resolved, argname=argname, **kw
426 )
427
428
429def expect_as_key(
430 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
431) -> str:
432 kw.pop("as_key", None)
433 return expect(role, element, as_key=True, **kw)
434
435
436def expect_col_expression_collection(
437 role: Type[roles.DDLConstraintColumnRole],
438 expressions: Iterable[_DDLColumnArgument],
439) -> Iterator[
440 Tuple[
441 Union[str, Column[Any]],
442 Optional[ColumnClause[Any]],
443 Optional[str],
444 Optional[Union[Column[Any], str]],
445 ]
446]:
447 for expr in expressions:
448 strname = None
449 column = None
450
451 resolved: Union[Column[Any], str] = expect(role, expr)
452 if isinstance(resolved, str):
453 assert isinstance(expr, str)
454 strname = resolved = expr
455 else:
456 cols: List[Column[Any]] = []
457 col_append: _TraverseCallableType[Column[Any]] = cols.append
458 visitors.traverse(resolved, {}, {"column": col_append})
459 if cols:
460 column = cols[0]
461 add_element = column if column is not None else strname
462
463 yield resolved, column, strname, add_element
464
465
466class RoleImpl:
467 __slots__ = ("_role_class", "name", "_use_inspection")
468
469 def _literal_coercion(self, element, **kw):
470 raise NotImplementedError()
471
472 _post_coercion: Any = None
473 _resolve_literal_only = False
474 _skip_clauseelement_for_target_match = False
475
476 def __init__(self, role_class):
477 self._role_class = role_class
478 self.name = role_class._role_name
479 self._use_inspection = issubclass(role_class, roles.UsesInspection)
480
481 def _implicit_coercions(
482 self,
483 element: Any,
484 resolved: Any,
485 argname: Optional[str] = None,
486 **kw: Any,
487 ) -> Any:
488 self._raise_for_expected(element, argname, resolved)
489
490 def _raise_for_expected(
491 self,
492 element: Any,
493 argname: Optional[str] = None,
494 resolved: Optional[Any] = None,
495 *,
496 advice: Optional[str] = None,
497 code: Optional[str] = None,
498 err: Optional[Exception] = None,
499 **kw: Any,
500 ) -> NoReturn:
501 if resolved is not None and resolved is not element:
502 got = "%r object resolved from %r object" % (resolved, element)
503 else:
504 got = repr(element)
505
506 if argname:
507 msg = "%s expected for argument %r; got %s." % (
508 self.name,
509 argname,
510 got,
511 )
512 else:
513 msg = "%s expected, got %s." % (self.name, got)
514
515 if advice:
516 msg += " " + advice
517
518 raise exc.ArgumentError(msg, code=code) from err
519
520
521class _Deannotate:
522 __slots__ = ()
523
524 def _post_coercion(self, resolved, **kw):
525 from .util import _deep_deannotate
526
527 return _deep_deannotate(resolved)
528
529
530class _StringOnly:
531 __slots__ = ()
532
533 _resolve_literal_only = True
534
535
536class _ReturnsStringKey(RoleImpl):
537 __slots__ = ()
538
539 def _implicit_coercions(self, element, resolved, argname=None, **kw):
540 if isinstance(element, str):
541 return element
542 else:
543 self._raise_for_expected(element, argname, resolved)
544
545 def _literal_coercion(self, element, **kw):
546 return element
547
548
549class _ColumnCoercions(RoleImpl):
550 __slots__ = ()
551
552 def _warn_for_scalar_subquery_coercion(self):
553 util.warn(
554 "implicitly coercing SELECT object to scalar subquery; "
555 "please use the .scalar_subquery() method to produce a scalar "
556 "subquery.",
557 )
558
559 def _implicit_coercions(self, element, resolved, argname=None, **kw):
560 original_element = element
561 if not getattr(resolved, "is_clause_element", False):
562 self._raise_for_expected(original_element, argname, resolved)
563 elif resolved._is_select_base:
564 self._warn_for_scalar_subquery_coercion()
565 return resolved.scalar_subquery()
566 elif resolved._is_from_clause and isinstance(
567 resolved, selectable.Subquery
568 ):
569 self._warn_for_scalar_subquery_coercion()
570 return resolved.element.scalar_subquery()
571 elif self._role_class.allows_lambda and resolved._is_lambda_element:
572 return resolved
573 else:
574 self._raise_for_expected(original_element, argname, resolved)
575
576
577def _no_text_coercion(
578 element: Any,
579 argname: Optional[str] = None,
580 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
581 extra: Optional[str] = None,
582 err: Optional[Exception] = None,
583) -> NoReturn:
584 raise exc_cls(
585 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
586 "explicitly declared as text(%(expr)r)"
587 % {
588 "expr": util.ellipses_string(element),
589 "argname": "for argument %s" % (argname,) if argname else "",
590 "extra": "%s " % extra if extra else "",
591 }
592 ) from err
593
594
595class _NoTextCoercion(RoleImpl):
596 __slots__ = ()
597
598 def _literal_coercion(self, element, *, argname=None, **kw):
599 if isinstance(element, str) and issubclass(
600 elements.TextClause, self._role_class
601 ):
602 _no_text_coercion(element, argname)
603 else:
604 self._raise_for_expected(element, argname)
605
606
607class _CoerceLiterals(RoleImpl):
608 __slots__ = ()
609 _coerce_consts = False
610 _coerce_star = False
611 _coerce_numerics = False
612
613 def _text_coercion(self, element, argname=None):
614 return _no_text_coercion(element, argname)
615
616 def _literal_coercion(self, element, *, argname=None, **kw):
617 if isinstance(element, str):
618 if self._coerce_star and element == "*":
619 return elements.ColumnClause("*", is_literal=True)
620 else:
621 return self._text_coercion(element, argname, **kw)
622
623 if self._coerce_consts:
624 if element is None:
625 return elements.Null()
626 elif element is False:
627 return elements.False_()
628 elif element is True:
629 return elements.True_()
630
631 if self._coerce_numerics and isinstance(element, (numbers.Number)):
632 return elements.ColumnClause(str(element), is_literal=True)
633
634 self._raise_for_expected(element, argname)
635
636
637class LiteralValueImpl(RoleImpl):
638 _resolve_literal_only = True
639
640 def _implicit_coercions(
641 self,
642 element,
643 resolved,
644 argname=None,
645 *,
646 type_=None,
647 literal_execute=False,
648 **kw,
649 ):
650 if not _is_literal(resolved):
651 self._raise_for_expected(
652 element, resolved=resolved, argname=argname, **kw
653 )
654
655 return elements.BindParameter(
656 None,
657 element,
658 type_=type_,
659 unique=True,
660 literal_execute=literal_execute,
661 )
662
663 def _literal_coercion(self, element, **kw):
664 return element
665
666
667class _SelectIsNotFrom(RoleImpl):
668 __slots__ = ()
669
670 def _raise_for_expected(
671 self,
672 element: Any,
673 argname: Optional[str] = None,
674 resolved: Optional[Any] = None,
675 *,
676 advice: Optional[str] = None,
677 code: Optional[str] = None,
678 err: Optional[Exception] = None,
679 **kw: Any,
680 ) -> NoReturn:
681 if (
682 not advice
683 and isinstance(element, roles.SelectStatementRole)
684 or isinstance(resolved, roles.SelectStatementRole)
685 ):
686 advice = (
687 "To create a "
688 "FROM clause from a %s object, use the .subquery() method."
689 % (resolved.__class__ if resolved is not None else element,)
690 )
691 code = "89ve"
692 else:
693 code = None
694
695 super()._raise_for_expected(
696 element,
697 argname=argname,
698 resolved=resolved,
699 advice=advice,
700 code=code,
701 err=err,
702 **kw,
703 )
704 # never reached
705 assert False
706
707
708class HasCacheKeyImpl(RoleImpl):
709 __slots__ = ()
710
711 def _implicit_coercions(
712 self,
713 element: Any,
714 resolved: Any,
715 argname: Optional[str] = None,
716 **kw: Any,
717 ) -> Any:
718 if isinstance(element, HasCacheKey):
719 return element
720 else:
721 self._raise_for_expected(element, argname, resolved)
722
723 def _literal_coercion(self, element, **kw):
724 return element
725
726
727class ExecutableOptionImpl(RoleImpl):
728 __slots__ = ()
729
730 def _implicit_coercions(
731 self,
732 element: Any,
733 resolved: Any,
734 argname: Optional[str] = None,
735 **kw: Any,
736 ) -> Any:
737 if isinstance(element, ExecutableOption):
738 return element
739 else:
740 self._raise_for_expected(element, argname, resolved)
741
742 def _literal_coercion(self, element, **kw):
743 return element
744
745
746class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
747 __slots__ = ()
748
749 def _literal_coercion(
750 self, element, *, name=None, type_=None, is_crud=False, **kw
751 ):
752 if (
753 element is None
754 and not is_crud
755 and (type_ is None or not type_.should_evaluate_none)
756 ):
757 # TODO: there's no test coverage now for the
758 # "should_evaluate_none" part of this, as outside of "crud" this
759 # codepath is not normally used except in some special cases
760 return elements.Null()
761 else:
762 try:
763 return elements.BindParameter(
764 name, element, type_, unique=True, _is_crud=is_crud
765 )
766 except exc.ArgumentError as err:
767 self._raise_for_expected(element, err=err)
768
769 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
770 # select uses implicit coercion with warning instead of raising
771 if isinstance(element, selectable.Values):
772 advice = (
773 "To create a column expression from a VALUES clause, "
774 "use the .scalar_values() method."
775 )
776 elif isinstance(element, roles.AnonymizedFromClauseRole):
777 advice = (
778 "To create a column expression from a FROM clause row "
779 "as a whole, use the .table_valued() method."
780 )
781 else:
782 advice = None
783
784 return super()._raise_for_expected(
785 element, argname=argname, resolved=resolved, advice=advice, **kw
786 )
787
788
789class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
790 __slots__ = ()
791
792 def _literal_coercion( # type: ignore[override]
793 self,
794 element,
795 *,
796 expr,
797 operator,
798 bindparam_type=None,
799 argname=None,
800 **kw,
801 ):
802 try:
803 return expr._bind_param(operator, element, type_=bindparam_type)
804 except exc.ArgumentError as err:
805 self._raise_for_expected(element, err=err)
806
807 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
808 if resolved.type._isnull and not expr.type._isnull:
809 resolved = resolved._with_binary_element_type(
810 bindparam_type if bindparam_type is not None else expr.type
811 )
812 return resolved
813
814
815class InElementImpl(RoleImpl):
816 __slots__ = ()
817
818 def _implicit_coercions(
819 self,
820 element: Any,
821 resolved: Any,
822 argname: Optional[str] = None,
823 **kw: Any,
824 ) -> Any:
825 if resolved._is_from_clause:
826 if (
827 isinstance(resolved, selectable.Alias)
828 and resolved.element._is_select_base
829 ):
830 self._warn_for_implicit_coercion(resolved)
831 return self._post_coercion(resolved.element, **kw)
832 else:
833 self._warn_for_implicit_coercion(resolved)
834 return self._post_coercion(resolved.select(), **kw)
835 else:
836 self._raise_for_expected(element, argname, resolved)
837
838 def _warn_for_implicit_coercion(self, elem):
839 util.warn(
840 "Coercing %s object into a select() for use in IN(); "
841 "please pass a select() construct explicitly"
842 % (elem.__class__.__name__)
843 )
844
845 @util.preload_module("sqlalchemy.sql.elements")
846 def _literal_coercion(self, element, *, expr, operator, **kw):
847 if util.is_non_string_iterable(element):
848 non_literal_expressions: Dict[
849 Optional[_ColumnExpressionArgument[Any]],
850 _ColumnExpressionArgument[Any],
851 ] = {}
852 element = list(element)
853 for o in element:
854 if not _is_literal(o):
855 if not isinstance(
856 o, util.preloaded.sql_elements.ColumnElement
857 ) and not hasattr(o, "__clause_element__"):
858 self._raise_for_expected(element, **kw)
859
860 else:
861 non_literal_expressions[o] = o
862
863 if non_literal_expressions:
864 return elements.ClauseList(
865 *[
866 (
867 non_literal_expressions[o]
868 if o in non_literal_expressions
869 else expr._bind_param(operator, o)
870 )
871 for o in element
872 ]
873 )
874 else:
875 return expr._bind_param(operator, element, expanding=True)
876
877 else:
878 self._raise_for_expected(element, **kw)
879
880 def _post_coercion(self, element, *, expr, operator, **kw):
881 if element._is_select_base:
882 # for IN, we are doing scalar_subquery() coercion without
883 # a warning
884 return element.scalar_subquery()
885 elif isinstance(element, elements.ClauseList):
886 assert not len(element.clauses) == 0
887 return element.self_group(against=operator)
888
889 elif isinstance(element, elements.BindParameter):
890 element = element._clone(maintain_key=True)
891 element.expanding = True
892 element.expand_op = operator
893
894 return element
895 elif isinstance(element, selectable.Values):
896 return element.scalar_values()
897 else:
898 return element
899
900
901class OnClauseImpl(_ColumnCoercions, RoleImpl):
902 __slots__ = ()
903
904 _coerce_consts = True
905
906 def _literal_coercion(self, element, **kw):
907 self._raise_for_expected(element)
908
909 def _post_coercion(self, resolved, *, original_element=None, **kw):
910 # this is a hack right now as we want to use coercion on an
911 # ORM InstrumentedAttribute, but we want to return the object
912 # itself if it is one, not its clause element.
913 # ORM context _join and _legacy_join() would need to be improved
914 # to look for annotations in a clause element form.
915 if isinstance(original_element, roles.JoinTargetRole):
916 return original_element
917 return resolved
918
919
920class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
921 __slots__ = ()
922
923 _coerce_consts = True
924
925 def _text_coercion(self, element, argname=None):
926 return _no_text_coercion(element, argname)
927
928
929class StatementOptionImpl(_CoerceLiterals, RoleImpl):
930 __slots__ = ()
931
932 _coerce_consts = True
933
934 def _text_coercion(self, element, argname=None):
935 return elements.TextClause(element)
936
937
938class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
939 __slots__ = ()
940
941
942class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
943 __slots__ = ()
944
945
946class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
947 __slots__ = ()
948
949 def _text_coercion(self, element, argname=None):
950 return elements.ColumnClause(element)
951
952
953class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
954 __slots__ = ()
955
956 _coerce_consts = True
957
958 def _text_coercion(self, element, argname=None):
959 return elements._textual_label_reference(element)
960
961
962class OrderByImpl(ByOfImpl, RoleImpl):
963 __slots__ = ()
964
965 def _post_coercion(self, resolved, **kw):
966 if (
967 isinstance(resolved, self._role_class)
968 and resolved._order_by_label_element is not None
969 ):
970 return elements._label_reference(resolved)
971 else:
972 return resolved
973
974
975class GroupByImpl(ByOfImpl, RoleImpl):
976 __slots__ = ()
977
978 def _implicit_coercions(
979 self,
980 element: Any,
981 resolved: Any,
982 argname: Optional[str] = None,
983 **kw: Any,
984 ) -> Any:
985 if is_from_clause(resolved):
986 return elements.ClauseList(*resolved.c)
987 else:
988 return resolved
989
990
991class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
992 __slots__ = ()
993
994 def _post_coercion(self, element, *, as_key=False, **kw):
995 if as_key:
996 return element.key
997 else:
998 return element
999
1000
1001class ConstExprImpl(RoleImpl):
1002 __slots__ = ()
1003
1004 def _literal_coercion(self, element, *, argname=None, **kw):
1005 if element is None:
1006 return elements.Null()
1007 elif element is False:
1008 return elements.False_()
1009 elif element is True:
1010 return elements.True_()
1011 else:
1012 self._raise_for_expected(element, argname)
1013
1014
1015class TruncatedLabelImpl(_StringOnly, RoleImpl):
1016 __slots__ = ()
1017
1018 def _implicit_coercions(
1019 self,
1020 element: Any,
1021 resolved: Any,
1022 argname: Optional[str] = None,
1023 **kw: Any,
1024 ) -> Any:
1025 if isinstance(element, str):
1026 return resolved
1027 else:
1028 self._raise_for_expected(element, argname, resolved)
1029
1030 def _literal_coercion(self, element, **kw):
1031 """coerce the given value to :class:`._truncated_label`.
1032
1033 Existing :class:`._truncated_label` and
1034 :class:`._anonymous_label` objects are passed
1035 unchanged.
1036 """
1037
1038 if isinstance(element, elements._truncated_label):
1039 return element
1040 else:
1041 return elements._truncated_label(element)
1042
1043
1044class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1045 __slots__ = ()
1046
1047 _coerce_consts = True
1048
1049 def _text_coercion(self, element, argname=None):
1050 # see #5754 for why we can't easily deprecate this coercion.
1051 # essentially expressions like postgresql_where would have to be
1052 # text() as they come back from reflection and we don't want to
1053 # have text() elements wired into the inspection dictionaries.
1054 return elements.TextClause(element)
1055
1056
1057class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1058 __slots__ = ()
1059
1060
1061class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1062 __slots__ = ()
1063
1064
1065class LimitOffsetImpl(RoleImpl):
1066 __slots__ = ()
1067
1068 def _implicit_coercions(
1069 self,
1070 element: Any,
1071 resolved: Any,
1072 argname: Optional[str] = None,
1073 **kw: Any,
1074 ) -> Any:
1075 if resolved is None:
1076 return None
1077 else:
1078 self._raise_for_expected(element, argname, resolved)
1079
1080 def _literal_coercion( # type: ignore[override]
1081 self, element, *, name, type_, **kw
1082 ):
1083 if element is None:
1084 return None
1085 else:
1086 value = util.asint(element)
1087 return selectable._OffsetLimitParam(
1088 name, value, type_=type_, unique=True
1089 )
1090
1091
1092class LabeledColumnExprImpl(ExpressionElementImpl):
1093 __slots__ = ()
1094
1095 def _implicit_coercions(
1096 self,
1097 element: Any,
1098 resolved: Any,
1099 argname: Optional[str] = None,
1100 **kw: Any,
1101 ) -> Any:
1102 if isinstance(resolved, roles.ExpressionElementRole):
1103 return resolved.label(None)
1104 else:
1105 new = super()._implicit_coercions(
1106 element, resolved, argname=argname, **kw
1107 )
1108 if isinstance(new, roles.ExpressionElementRole):
1109 return new.label(None)
1110 else:
1111 self._raise_for_expected(element, argname, resolved)
1112
1113
1114class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1115 __slots__ = ()
1116
1117 _coerce_consts = True
1118 _coerce_numerics = True
1119 _coerce_star = True
1120
1121 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1122
1123 def _raise_for_expected(
1124 self, element, argname=None, resolved=None, *, advice=None, **kw
1125 ):
1126 if not advice and isinstance(element, list):
1127 advice = (
1128 f"Did you mean to say select("
1129 f"{', '.join(repr(e) for e in element)})?"
1130 )
1131
1132 return super()._raise_for_expected(
1133 element, argname=argname, resolved=resolved, advice=advice, **kw
1134 )
1135
1136 def _text_coercion(self, element, argname=None):
1137 element = str(element)
1138
1139 guess_is_literal = not self._guess_straight_column.match(element)
1140 raise exc.ArgumentError(
1141 "Textual column expression %(column)r %(argname)sshould be "
1142 "explicitly declared with text(%(column)r), "
1143 "or use %(literal_column)s(%(column)r) "
1144 "for more specificity"
1145 % {
1146 "column": util.ellipses_string(element),
1147 "argname": "for argument %s" % (argname,) if argname else "",
1148 "literal_column": (
1149 "literal_column" if guess_is_literal else "column"
1150 ),
1151 }
1152 )
1153
1154
1155class ReturnsRowsImpl(RoleImpl):
1156 __slots__ = ()
1157
1158
1159class StatementImpl(_CoerceLiterals, RoleImpl):
1160 __slots__ = ()
1161
1162 def _post_coercion(
1163 self, resolved, *, original_element, argname=None, **kw
1164 ):
1165 if resolved is not original_element and not isinstance(
1166 original_element, str
1167 ):
1168 # use same method as Connection uses; this will later raise
1169 # ObjectNotExecutableError
1170 try:
1171 original_element._execute_on_connection
1172 except AttributeError:
1173 util.warn_deprecated(
1174 "Object %r should not be used directly in a SQL statement "
1175 "context, such as passing to methods such as "
1176 "session.execute(). This usage will be disallowed in a "
1177 "future release. "
1178 "Please use Core select() / update() / delete() etc. "
1179 "with Session.execute() and other statement execution "
1180 "methods." % original_element,
1181 "1.4",
1182 )
1183
1184 return resolved
1185
1186 def _implicit_coercions(
1187 self,
1188 element: Any,
1189 resolved: Any,
1190 argname: Optional[str] = None,
1191 **kw: Any,
1192 ) -> Any:
1193 if resolved._is_lambda_element:
1194 return resolved
1195 else:
1196 return super()._implicit_coercions(
1197 element, resolved, argname=argname, **kw
1198 )
1199
1200
1201class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1202 __slots__ = ()
1203
1204 def _implicit_coercions(
1205 self,
1206 element: Any,
1207 resolved: Any,
1208 argname: Optional[str] = None,
1209 **kw: Any,
1210 ) -> Any:
1211 if resolved._is_text_clause:
1212 return resolved.columns()
1213 else:
1214 self._raise_for_expected(element, argname, resolved)
1215
1216
1217class HasCTEImpl(ReturnsRowsImpl):
1218 __slots__ = ()
1219
1220
1221class IsCTEImpl(RoleImpl):
1222 __slots__ = ()
1223
1224
1225class JoinTargetImpl(RoleImpl):
1226 __slots__ = ()
1227
1228 _skip_clauseelement_for_target_match = True
1229
1230 def _literal_coercion(self, element, *, argname=None, **kw):
1231 self._raise_for_expected(element, argname)
1232
1233 def _implicit_coercions(
1234 self,
1235 element: Any,
1236 resolved: Any,
1237 argname: Optional[str] = None,
1238 *,
1239 legacy: bool = False,
1240 **kw: Any,
1241 ) -> Any:
1242 if isinstance(element, roles.JoinTargetRole):
1243 # note that this codepath no longer occurs as of
1244 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1245 # were set to False.
1246 return element
1247 elif legacy and resolved._is_select_base:
1248 util.warn_deprecated(
1249 "Implicit coercion of SELECT and textual SELECT "
1250 "constructs into FROM clauses is deprecated; please call "
1251 ".subquery() on any Core select or ORM Query object in "
1252 "order to produce a subquery object.",
1253 version="1.4",
1254 )
1255 # TODO: doing _implicit_subquery here causes tests to fail,
1256 # how was this working before? probably that ORM
1257 # join logic treated it as a select and subquery would happen
1258 # in _ORMJoin->Join
1259 return resolved
1260 else:
1261 self._raise_for_expected(element, argname, resolved)
1262
1263
1264class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1265 __slots__ = ()
1266
1267 def _implicit_coercions(
1268 self,
1269 element: Any,
1270 resolved: Any,
1271 argname: Optional[str] = None,
1272 *,
1273 explicit_subquery: bool = False,
1274 allow_select: bool = True,
1275 **kw: Any,
1276 ) -> Any:
1277 if resolved._is_select_base:
1278 if explicit_subquery:
1279 return resolved.subquery()
1280 elif allow_select:
1281 util.warn_deprecated(
1282 "Implicit coercion of SELECT and textual SELECT "
1283 "constructs into FROM clauses is deprecated; please call "
1284 ".subquery() on any Core select or ORM Query object in "
1285 "order to produce a subquery object.",
1286 version="1.4",
1287 )
1288 return resolved._implicit_subquery
1289 elif resolved._is_text_clause:
1290 return resolved
1291 else:
1292 self._raise_for_expected(element, argname, resolved)
1293
1294 def _post_coercion(self, element, *, deannotate=False, **kw):
1295 if deannotate:
1296 return element._deannotate()
1297 else:
1298 return element
1299
1300
1301class StrictFromClauseImpl(FromClauseImpl):
1302 __slots__ = ()
1303
1304 def _implicit_coercions(
1305 self,
1306 element: Any,
1307 resolved: Any,
1308 argname: Optional[str] = None,
1309 *,
1310 allow_select: bool = False,
1311 **kw: Any,
1312 ) -> Any:
1313 if resolved._is_select_base and allow_select:
1314 util.warn_deprecated(
1315 "Implicit coercion of SELECT and textual SELECT constructs "
1316 "into FROM clauses is deprecated; please call .subquery() "
1317 "on any Core select or ORM Query object in order to produce a "
1318 "subquery object.",
1319 version="1.4",
1320 )
1321 return resolved._implicit_subquery
1322 else:
1323 self._raise_for_expected(element, argname, resolved)
1324
1325
1326class AnonymizedFromClauseImpl(StrictFromClauseImpl):
1327 __slots__ = ()
1328
1329 def _post_coercion(self, element, *, flat=False, name=None, **kw):
1330 assert name is None
1331
1332 return element._anonymous_fromclause(flat=flat)
1333
1334
1335class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1336 __slots__ = ()
1337
1338 def _post_coercion(self, element, **kw):
1339 if "dml_table" in element._annotations:
1340 return element._annotations["dml_table"]
1341 else:
1342 return element
1343
1344
1345class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1346 __slots__ = ()
1347
1348 def _implicit_coercions(
1349 self,
1350 element: Any,
1351 resolved: Any,
1352 argname: Optional[str] = None,
1353 **kw: Any,
1354 ) -> Any:
1355 if resolved._is_from_clause:
1356 if (
1357 isinstance(resolved, selectable.Alias)
1358 and resolved.element._is_select_base
1359 ):
1360 return resolved.element
1361 else:
1362 return resolved.select()
1363 else:
1364 self._raise_for_expected(element, argname, resolved)
1365
1366
1367class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1368 __slots__ = ()
1369
1370 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1371 if isinstance(element, roles.FromClauseRole):
1372 if element._is_subquery:
1373 advice = (
1374 "Use the plain select() object without "
1375 "calling .subquery() or .alias()."
1376 )
1377 else:
1378 advice = (
1379 "To SELECT from any FROM clause, use the .select() method."
1380 )
1381 else:
1382 advice = None
1383 return super()._raise_for_expected(
1384 element, argname=argname, resolved=resolved, advice=advice, **kw
1385 )
1386
1387
1388_impl_lookup = {}
1389
1390
1391for name in dir(roles):
1392 cls = getattr(roles, name)
1393 if name.endswith("Role"):
1394 name = name.replace("Role", "Impl")
1395 if name in globals():
1396 impl = globals()[name](cls)
1397 _impl_lookup[cls] = impl
1398
1399if not TYPE_CHECKING:
1400 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1401
1402 for py_type in (int, bool, str, float):
1403 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl