1# sql/coercions.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import roles
33from . import visitors
34from ._typing import is_from_clause
35from .base import ExecutableOption
36from .base import Options
37from .cache_key import HasCacheKey
38from .visitors import Visitable
39from .. import exc
40from .. import inspection
41from .. import util
42from ..util.typing import Literal
43
44if typing.TYPE_CHECKING:
45 # elements lambdas schema selectable are set by __init__
46 from . import elements
47 from . import lambdas
48 from . import schema
49 from . import selectable
50 from ._typing import _ColumnExpressionArgument
51 from ._typing import _ColumnsClauseArgument
52 from ._typing import _DDLColumnArgument
53 from ._typing import _DMLTableArgument
54 from ._typing import _FromClauseArgument
55 from ._typing import _OnlyColumnArgument
56 from .dml import _DMLTableElement
57 from .elements import BindParameter
58 from .elements import ClauseElement
59 from .elements import ColumnClause
60 from .elements import ColumnElement
61 from .elements import NamedColumn
62 from .elements import SQLCoreOperations
63 from .elements import TextClause
64 from .schema import Column
65 from .selectable import _ColumnsClauseElement
66 from .selectable import _JoinTargetProtocol
67 from .selectable import FromClause
68 from .selectable import HasCTE
69 from .selectable import SelectBase
70 from .selectable import Subquery
71 from .visitors import _TraverseCallableType
72
73_SR = TypeVar("_SR", bound=roles.SQLRole)
74_F = TypeVar("_F", bound=Callable[..., Any])
75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
76_T = TypeVar("_T", bound=Any)
77
78
79def _is_literal(element: Any) -> bool:
80 """Return whether or not the element is a "literal" in the context
81 of a SQL expression construct.
82
83 """
84
85 return not isinstance(
86 element,
87 (Visitable, schema.SchemaEventTarget),
88 ) and not hasattr(element, "__clause_element__")
89
90
91def _deep_is_literal(element):
92 """Return whether or not the element is a "literal" in the context
93 of a SQL expression construct.
94
95 does a deeper more esoteric check than _is_literal. is used
96 for lambda elements that have to distinguish values that would
97 be bound vs. not without any context.
98
99 """
100
101 if isinstance(element, collections_abc.Sequence) and not isinstance(
102 element, str
103 ):
104 for elem in element:
105 if not _deep_is_literal(elem):
106 return False
107 else:
108 return True
109
110 return (
111 not isinstance(
112 element,
113 (
114 Visitable,
115 schema.SchemaEventTarget,
116 HasCacheKey,
117 Options,
118 util.langhelpers.symbol,
119 ),
120 )
121 and not hasattr(element, "__clause_element__")
122 and (
123 not isinstance(element, type)
124 or not issubclass(element, HasCacheKey)
125 )
126 )
127
128
129def _document_text_coercion(
130 paramname: str, meth_rst: str, param_rst: str
131) -> Callable[[_F], _F]:
132 return util.add_parameter_text(
133 paramname,
134 (
135 ".. warning:: "
136 "The %s argument to %s can be passed as a Python string argument, "
137 "which will be treated "
138 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
139 "UNTRUSTED INPUT TO THIS PARAMETER**."
140 )
141 % (param_rst, meth_rst),
142 )
143
144
145def _expression_collection_was_a_list(
146 attrname: str,
147 fnname: str,
148 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
149) -> Sequence[_T]:
150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
151 if isinstance(args[0], list):
152 raise exc.ArgumentError(
153 f'The "{attrname}" argument to {fnname}(), when '
154 "referring to a sequence "
155 "of items, is now passed as a series of positional "
156 "elements, rather than as a list. "
157 )
158 return cast("Sequence[_T]", args[0])
159
160 return cast("Sequence[_T]", args)
161
162
163@overload
164def expect(
165 role: Type[roles.TruncatedLabelRole],
166 element: Any,
167 **kw: Any,
168) -> str: ...
169
170
171@overload
172def expect(
173 role: Type[roles.DMLColumnRole],
174 element: Any,
175 *,
176 as_key: Literal[True] = ...,
177 **kw: Any,
178) -> str: ...
179
180
181@overload
182def expect(
183 role: Type[roles.LiteralValueRole],
184 element: Any,
185 **kw: Any,
186) -> BindParameter[Any]: ...
187
188
189@overload
190def expect(
191 role: Type[roles.DDLReferredColumnRole],
192 element: Any,
193 **kw: Any,
194) -> Union[Column[Any], str]: ...
195
196
197@overload
198def expect(
199 role: Type[roles.DDLConstraintColumnRole],
200 element: Any,
201 **kw: Any,
202) -> Union[Column[Any], str]: ...
203
204
205@overload
206def expect(
207 role: Type[roles.StatementOptionRole],
208 element: Any,
209 **kw: Any,
210) -> Union[ColumnElement[Any], TextClause]: ...
211
212
213@overload
214def expect(
215 role: Type[roles.LabeledColumnExprRole[Any]],
216 element: Union[_ColumnExpressionArgument[_T], _OnlyColumnArgument[_T]],
217 **kw: Any,
218) -> NamedColumn[_T]: ...
219
220
221@overload
222def expect(
223 role: Union[
224 Type[roles.ExpressionElementRole[Any]],
225 Type[roles.LimitOffsetRole],
226 Type[roles.WhereHavingRole],
227 ],
228 element: _ColumnExpressionArgument[_T],
229 **kw: Any,
230) -> ColumnElement[_T]: ...
231
232
233@overload
234def expect(
235 role: Union[
236 Type[roles.ExpressionElementRole[Any]],
237 Type[roles.LimitOffsetRole],
238 Type[roles.WhereHavingRole],
239 Type[roles.OnClauseRole],
240 Type[roles.ColumnArgumentRole],
241 ],
242 element: Any,
243 **kw: Any,
244) -> ColumnElement[Any]: ...
245
246
247@overload
248def expect(
249 role: Type[roles.DMLTableRole],
250 element: _DMLTableArgument,
251 **kw: Any,
252) -> _DMLTableElement: ...
253
254
255@overload
256def expect(
257 role: Type[roles.HasCTERole],
258 element: HasCTE,
259 **kw: Any,
260) -> HasCTE: ...
261
262
263@overload
264def expect(
265 role: Type[roles.SelectStatementRole],
266 element: SelectBase,
267 **kw: Any,
268) -> SelectBase: ...
269
270
271@overload
272def expect(
273 role: Type[roles.FromClauseRole],
274 element: _FromClauseArgument,
275 **kw: Any,
276) -> FromClause: ...
277
278
279@overload
280def expect(
281 role: Type[roles.FromClauseRole],
282 element: SelectBase,
283 *,
284 explicit_subquery: Literal[True] = ...,
285 **kw: Any,
286) -> Subquery: ...
287
288
289@overload
290def expect(
291 role: Type[roles.ColumnsClauseRole],
292 element: _ColumnsClauseArgument[Any],
293 **kw: Any,
294) -> _ColumnsClauseElement: ...
295
296
297@overload
298def expect(
299 role: Type[roles.JoinTargetRole],
300 element: _JoinTargetProtocol,
301 **kw: Any,
302) -> _JoinTargetProtocol: ...
303
304
305# catchall for not-yet-implemented overloads
306@overload
307def expect(
308 role: Type[_SR],
309 element: Any,
310 **kw: Any,
311) -> Any: ...
312
313
314def expect(
315 role: Type[_SR],
316 element: Any,
317 *,
318 apply_propagate_attrs: Optional[ClauseElement] = None,
319 argname: Optional[str] = None,
320 post_inspect: bool = False,
321 disable_inspection: bool = False,
322 **kw: Any,
323) -> Any:
324 if (
325 role.allows_lambda
326 # note callable() will not invoke a __getattr__() method, whereas
327 # hasattr(obj, "__call__") will. by keeping the callable() check here
328 # we prevent most needless calls to hasattr() and therefore
329 # __getattr__(), which is present on ColumnElement.
330 and callable(element)
331 and hasattr(element, "__code__")
332 ):
333 return lambdas.LambdaElement(
334 element,
335 role,
336 lambdas.LambdaOptions(**kw),
337 apply_propagate_attrs=apply_propagate_attrs,
338 )
339
340 # major case is that we are given a ClauseElement already, skip more
341 # elaborate logic up front if possible
342 impl = _impl_lookup[role]
343
344 original_element = element
345
346 if not isinstance(
347 element,
348 (
349 elements.CompilerElement,
350 schema.SchemaItem,
351 schema.FetchedValue,
352 lambdas.PyWrapper,
353 ),
354 ):
355 resolved = None
356
357 if impl._resolve_literal_only:
358 resolved = impl._literal_coercion(element, **kw)
359 else:
360 original_element = element
361
362 is_clause_element = False
363
364 # this is a special performance optimization for ORM
365 # joins used by JoinTargetImpl that we don't go through the
366 # work of creating __clause_element__() when we only need the
367 # original QueryableAttribute, as the former will do clause
368 # adaption and all that which is just thrown away here.
369 if (
370 impl._skip_clauseelement_for_target_match
371 and isinstance(element, role)
372 and hasattr(element, "__clause_element__")
373 ):
374 is_clause_element = True
375 else:
376 while hasattr(element, "__clause_element__"):
377 is_clause_element = True
378
379 if not getattr(element, "is_clause_element", False):
380 element = element.__clause_element__()
381 else:
382 break
383
384 if not is_clause_element:
385 if impl._use_inspection and not disable_inspection:
386 insp = inspection.inspect(element, raiseerr=False)
387 if insp is not None:
388 if post_inspect:
389 insp._post_inspect
390 try:
391 resolved = insp.__clause_element__()
392 except AttributeError:
393 impl._raise_for_expected(original_element, argname)
394
395 if resolved is None:
396 resolved = impl._literal_coercion(
397 element, argname=argname, **kw
398 )
399 else:
400 resolved = element
401 elif isinstance(element, lambdas.PyWrapper):
402 resolved = element._sa__py_wrapper_literal(**kw)
403 else:
404 resolved = element
405
406 if apply_propagate_attrs is not None:
407 if typing.TYPE_CHECKING:
408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
409
410 if not apply_propagate_attrs._propagate_attrs and getattr(
411 resolved, "_propagate_attrs", None
412 ):
413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
414
415 if impl._role_class in resolved.__class__.__mro__:
416 if impl._post_coercion:
417 resolved = impl._post_coercion(
418 resolved,
419 argname=argname,
420 original_element=original_element,
421 **kw,
422 )
423 return resolved
424 else:
425 return impl._implicit_coercions(
426 original_element, resolved, argname=argname, **kw
427 )
428
429
430def expect_as_key(
431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
432) -> str:
433 kw.pop("as_key", None)
434 return expect(role, element, as_key=True, **kw)
435
436
437def expect_col_expression_collection(
438 role: Type[roles.DDLConstraintColumnRole],
439 expressions: Iterable[_DDLColumnArgument],
440) -> Iterator[
441 Tuple[
442 Union[str, Column[Any]],
443 Optional[ColumnClause[Any]],
444 Optional[str],
445 Optional[Union[Column[Any], str]],
446 ]
447]:
448 for expr in expressions:
449 strname = None
450 column = None
451
452 resolved: Union[Column[Any], str] = expect(role, expr)
453 if isinstance(resolved, str):
454 assert isinstance(expr, str)
455 strname = resolved = expr
456 else:
457 cols: List[Column[Any]] = []
458 col_append: _TraverseCallableType[Column[Any]] = cols.append
459 visitors.traverse(resolved, {}, {"column": col_append})
460 if cols:
461 column = cols[0]
462 add_element = column if column is not None else strname
463
464 yield resolved, column, strname, add_element
465
466
467class RoleImpl:
468 __slots__ = ("_role_class", "name", "_use_inspection")
469
470 def _literal_coercion(self, element, **kw):
471 raise NotImplementedError()
472
473 _post_coercion: Any = None
474 _resolve_literal_only = False
475 _skip_clauseelement_for_target_match = False
476
477 def __init__(self, role_class):
478 self._role_class = role_class
479 self.name = role_class._role_name
480 self._use_inspection = issubclass(role_class, roles.UsesInspection)
481
482 def _implicit_coercions(
483 self,
484 element: Any,
485 resolved: Any,
486 argname: Optional[str] = None,
487 **kw: Any,
488 ) -> Any:
489 self._raise_for_expected(element, argname, resolved)
490
491 def _raise_for_expected(
492 self,
493 element: Any,
494 argname: Optional[str] = None,
495 resolved: Optional[Any] = None,
496 *,
497 advice: Optional[str] = None,
498 code: Optional[str] = None,
499 err: Optional[Exception] = None,
500 **kw: Any,
501 ) -> NoReturn:
502 if resolved is not None and resolved is not element:
503 got = "%r object resolved from %r object" % (resolved, element)
504 else:
505 got = repr(element)
506
507 if argname:
508 msg = "%s expected for argument %r; got %s." % (
509 self.name,
510 argname,
511 got,
512 )
513 else:
514 msg = "%s expected, got %s." % (self.name, got)
515
516 if advice:
517 msg += " " + advice
518
519 raise exc.ArgumentError(msg, code=code) from err
520
521
522class _Deannotate:
523 __slots__ = ()
524
525 def _post_coercion(self, resolved, **kw):
526 from .util import _deep_deannotate
527
528 return _deep_deannotate(resolved)
529
530
531class _StringOnly:
532 __slots__ = ()
533
534 _resolve_literal_only = True
535
536
537class _ReturnsStringKey(RoleImpl):
538 __slots__ = ()
539
540 def _implicit_coercions(self, element, resolved, argname=None, **kw):
541 if isinstance(element, str):
542 return element
543 else:
544 self._raise_for_expected(element, argname, resolved)
545
546 def _literal_coercion(self, element, **kw):
547 return element
548
549
550class _ColumnCoercions(RoleImpl):
551 __slots__ = ()
552
553 def _warn_for_scalar_subquery_coercion(self):
554 util.warn(
555 "implicitly coercing SELECT object to scalar subquery; "
556 "please use the .scalar_subquery() method to produce a scalar "
557 "subquery.",
558 )
559
560 def _implicit_coercions(self, element, resolved, argname=None, **kw):
561 original_element = element
562 if not getattr(resolved, "is_clause_element", False):
563 self._raise_for_expected(original_element, argname, resolved)
564 elif resolved._is_select_base:
565 self._warn_for_scalar_subquery_coercion()
566 return resolved.scalar_subquery()
567 elif resolved._is_from_clause and isinstance(
568 resolved, selectable.Subquery
569 ):
570 self._warn_for_scalar_subquery_coercion()
571 return resolved.element.scalar_subquery()
572 elif self._role_class.allows_lambda and resolved._is_lambda_element:
573 return resolved
574 else:
575 self._raise_for_expected(original_element, argname, resolved)
576
577
578def _no_text_coercion(
579 element: Any,
580 argname: Optional[str] = None,
581 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
582 extra: Optional[str] = None,
583 err: Optional[Exception] = None,
584) -> NoReturn:
585 raise exc_cls(
586 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
587 "explicitly declared as text(%(expr)r)"
588 % {
589 "expr": util.ellipses_string(element),
590 "argname": "for argument %s" % (argname,) if argname else "",
591 "extra": "%s " % extra if extra else "",
592 }
593 ) from err
594
595
596class _NoTextCoercion(RoleImpl):
597 __slots__ = ()
598
599 def _literal_coercion(self, element, *, argname=None, **kw):
600 if isinstance(element, str) and issubclass(
601 elements.TextClause, self._role_class
602 ):
603 _no_text_coercion(element, argname)
604 else:
605 self._raise_for_expected(element, argname)
606
607
608class _CoerceLiterals(RoleImpl):
609 __slots__ = ()
610 _coerce_consts = False
611 _coerce_star = False
612 _coerce_numerics = False
613
614 def _text_coercion(self, element, argname=None):
615 return _no_text_coercion(element, argname)
616
617 def _literal_coercion(self, element, *, argname=None, **kw):
618 if isinstance(element, str):
619 if self._coerce_star and element == "*":
620 return elements.ColumnClause("*", is_literal=True)
621 else:
622 return self._text_coercion(element, argname, **kw)
623
624 if self._coerce_consts:
625 if element is None:
626 return elements.Null()
627 elif element is False:
628 return elements.False_()
629 elif element is True:
630 return elements.True_()
631
632 if self._coerce_numerics and isinstance(element, (numbers.Number)):
633 return elements.ColumnClause(str(element), is_literal=True)
634
635 self._raise_for_expected(element, argname)
636
637
638class LiteralValueImpl(RoleImpl):
639 _resolve_literal_only = True
640
641 def _implicit_coercions(
642 self,
643 element,
644 resolved,
645 argname=None,
646 *,
647 type_=None,
648 literal_execute=False,
649 **kw,
650 ):
651 if not _is_literal(resolved):
652 self._raise_for_expected(
653 element, resolved=resolved, argname=argname, **kw
654 )
655
656 return elements.BindParameter(
657 None,
658 element,
659 type_=type_,
660 unique=True,
661 literal_execute=literal_execute,
662 )
663
664 def _literal_coercion(self, element, **kw):
665 return element
666
667
668class _SelectIsNotFrom(RoleImpl):
669 __slots__ = ()
670
671 def _raise_for_expected(
672 self,
673 element: Any,
674 argname: Optional[str] = None,
675 resolved: Optional[Any] = None,
676 *,
677 advice: Optional[str] = None,
678 code: Optional[str] = None,
679 err: Optional[Exception] = None,
680 **kw: Any,
681 ) -> NoReturn:
682 if (
683 not advice
684 and isinstance(element, roles.SelectStatementRole)
685 or isinstance(resolved, roles.SelectStatementRole)
686 ):
687 advice = (
688 "To create a "
689 "FROM clause from a %s object, use the .subquery() method."
690 % (resolved.__class__ if resolved is not None else element,)
691 )
692 code = "89ve"
693 else:
694 code = None
695
696 super()._raise_for_expected(
697 element,
698 argname=argname,
699 resolved=resolved,
700 advice=advice,
701 code=code,
702 err=err,
703 **kw,
704 )
705 # never reached
706 assert False
707
708
709class HasCacheKeyImpl(RoleImpl):
710 __slots__ = ()
711
712 def _implicit_coercions(
713 self,
714 element: Any,
715 resolved: Any,
716 argname: Optional[str] = None,
717 **kw: Any,
718 ) -> Any:
719 if isinstance(element, HasCacheKey):
720 return element
721 else:
722 self._raise_for_expected(element, argname, resolved)
723
724 def _literal_coercion(self, element, **kw):
725 return element
726
727
728class ExecutableOptionImpl(RoleImpl):
729 __slots__ = ()
730
731 def _implicit_coercions(
732 self,
733 element: Any,
734 resolved: Any,
735 argname: Optional[str] = None,
736 **kw: Any,
737 ) -> Any:
738 if isinstance(element, ExecutableOption):
739 return element
740 else:
741 self._raise_for_expected(element, argname, resolved)
742
743 def _literal_coercion(self, element, **kw):
744 return element
745
746
747class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
748 __slots__ = ()
749
750 def _literal_coercion(
751 self, element, *, name=None, type_=None, is_crud=False, **kw
752 ):
753 if (
754 element is None
755 and not is_crud
756 and (type_ is None or not type_.should_evaluate_none)
757 ):
758 # TODO: there's no test coverage now for the
759 # "should_evaluate_none" part of this, as outside of "crud" this
760 # codepath is not normally used except in some special cases
761 return elements.Null()
762 else:
763 try:
764 return elements.BindParameter(
765 name, element, type_, unique=True, _is_crud=is_crud
766 )
767 except exc.ArgumentError as err:
768 self._raise_for_expected(element, err=err)
769
770 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
771 # select uses implicit coercion with warning instead of raising
772 if isinstance(element, selectable.Values):
773 advice = (
774 "To create a column expression from a VALUES clause, "
775 "use the .scalar_values() method."
776 )
777 elif isinstance(element, roles.AnonymizedFromClauseRole):
778 advice = (
779 "To create a column expression from a FROM clause row "
780 "as a whole, use the .table_valued() method."
781 )
782 else:
783 advice = None
784
785 return super()._raise_for_expected(
786 element, argname=argname, resolved=resolved, advice=advice, **kw
787 )
788
789
790class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
791 __slots__ = ()
792
793 def _literal_coercion( # type: ignore[override]
794 self,
795 element,
796 *,
797 expr,
798 operator,
799 bindparam_type=None,
800 argname=None,
801 **kw,
802 ):
803 try:
804 return expr._bind_param(operator, element, type_=bindparam_type)
805 except exc.ArgumentError as err:
806 self._raise_for_expected(element, err=err)
807
808 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
809 if resolved.type._isnull and not expr.type._isnull:
810 resolved = resolved._with_binary_element_type(
811 bindparam_type if bindparam_type is not None else expr.type
812 )
813 return resolved
814
815
816class InElementImpl(RoleImpl):
817 __slots__ = ()
818
819 def _implicit_coercions(
820 self,
821 element: Any,
822 resolved: Any,
823 argname: Optional[str] = None,
824 **kw: Any,
825 ) -> Any:
826 if resolved._is_from_clause:
827 if (
828 isinstance(resolved, selectable.Alias)
829 and resolved.element._is_select_base
830 ):
831 self._warn_for_implicit_coercion(resolved)
832 return self._post_coercion(resolved.element, **kw)
833 else:
834 self._warn_for_implicit_coercion(resolved)
835 return self._post_coercion(resolved.select(), **kw)
836 else:
837 self._raise_for_expected(element, argname, resolved)
838
839 def _warn_for_implicit_coercion(self, elem):
840 util.warn(
841 "Coercing %s object into a select() for use in IN(); "
842 "please pass a select() construct explicitly"
843 % (elem.__class__.__name__)
844 )
845
846 @util.preload_module("sqlalchemy.sql.elements")
847 def _literal_coercion(self, element, *, expr, operator, **kw): # type: ignore[override] # noqa: E501
848 if util.is_non_string_iterable(element):
849 non_literal_expressions: Dict[
850 Optional[_ColumnExpressionArgument[Any]],
851 _ColumnExpressionArgument[Any],
852 ] = {}
853 element = list(element)
854 for o in element:
855 if not _is_literal(o):
856 if not isinstance(
857 o, util.preloaded.sql_elements.ColumnElement
858 ) and not hasattr(o, "__clause_element__"):
859 self._raise_for_expected(element, **kw)
860
861 else:
862 non_literal_expressions[o] = o
863
864 if non_literal_expressions:
865 return elements.ClauseList(
866 *[
867 (
868 non_literal_expressions[o]
869 if o in non_literal_expressions
870 else expr._bind_param(operator, o)
871 )
872 for o in element
873 ]
874 )
875 else:
876 return expr._bind_param(operator, element, expanding=True)
877
878 else:
879 self._raise_for_expected(element, **kw)
880
881 def _post_coercion(self, element, *, expr, operator, **kw):
882 if element._is_select_base:
883 # for IN, we are doing scalar_subquery() coercion without
884 # a warning
885 return element.scalar_subquery()
886 elif isinstance(element, elements.ClauseList):
887 assert not len(element.clauses) == 0
888 return element.self_group(against=operator)
889
890 elif isinstance(element, elements.BindParameter):
891 element = element._clone(maintain_key=True)
892 element.expanding = True
893 element.expand_op = operator
894
895 return element
896 elif isinstance(element, selectable.Values):
897 return element.scalar_values()
898 else:
899 return element
900
901
902class OnClauseImpl(_ColumnCoercions, RoleImpl):
903 __slots__ = ()
904
905 _coerce_consts = True
906
907 def _literal_coercion(self, element, **kw):
908 self._raise_for_expected(element)
909
910 def _post_coercion(self, resolved, *, original_element=None, **kw):
911 # this is a hack right now as we want to use coercion on an
912 # ORM InstrumentedAttribute, but we want to return the object
913 # itself if it is one, not its clause element.
914 # ORM context _join and _legacy_join() would need to be improved
915 # to look for annotations in a clause element form.
916 if isinstance(original_element, roles.JoinTargetRole):
917 return original_element
918 return resolved
919
920
921class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
922 __slots__ = ()
923
924 _coerce_consts = True
925
926 def _text_coercion(self, element, argname=None):
927 return _no_text_coercion(element, argname)
928
929
930class StatementOptionImpl(_CoerceLiterals, RoleImpl):
931 __slots__ = ()
932
933 _coerce_consts = True
934
935 def _text_coercion(self, element, argname=None):
936 return elements.TextClause(element)
937
938
939class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
940 __slots__ = ()
941
942
943class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
944 __slots__ = ()
945
946
947class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
948 __slots__ = ()
949
950 def _text_coercion(self, element, argname=None):
951 return elements.ColumnClause(element)
952
953
954class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
955 __slots__ = ()
956
957 _coerce_consts = True
958
959 def _text_coercion(self, element, argname=None):
960 return elements._textual_label_reference(element)
961
962
963class OrderByImpl(ByOfImpl, RoleImpl):
964 __slots__ = ()
965
966 def _post_coercion(self, resolved, **kw):
967 if (
968 isinstance(resolved, self._role_class)
969 and resolved._order_by_label_element is not None
970 ):
971 return elements._label_reference(resolved)
972 else:
973 return resolved
974
975
976class GroupByImpl(ByOfImpl, RoleImpl):
977 __slots__ = ()
978
979 def _implicit_coercions(
980 self,
981 element: Any,
982 resolved: Any,
983 argname: Optional[str] = None,
984 **kw: Any,
985 ) -> Any:
986 if is_from_clause(resolved):
987 return elements.ClauseList(*resolved.c)
988 else:
989 return resolved
990
991
992class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
993 __slots__ = ()
994
995 def _post_coercion(self, element, *, as_key=False, **kw):
996 if as_key:
997 return element.key
998 else:
999 return element
1000
1001
1002class ConstExprImpl(RoleImpl):
1003 __slots__ = ()
1004
1005 def _literal_coercion(self, element, *, argname=None, **kw):
1006 if element is None:
1007 return elements.Null()
1008 elif element is False:
1009 return elements.False_()
1010 elif element is True:
1011 return elements.True_()
1012 else:
1013 self._raise_for_expected(element, argname)
1014
1015
1016class TruncatedLabelImpl(_StringOnly, RoleImpl):
1017 __slots__ = ()
1018
1019 def _implicit_coercions(
1020 self,
1021 element: Any,
1022 resolved: Any,
1023 argname: Optional[str] = None,
1024 **kw: Any,
1025 ) -> Any:
1026 if isinstance(element, str):
1027 return resolved
1028 else:
1029 self._raise_for_expected(element, argname, resolved)
1030
1031 def _literal_coercion(self, element, **kw):
1032 """coerce the given value to :class:`._truncated_label`.
1033
1034 Existing :class:`._truncated_label` and
1035 :class:`._anonymous_label` objects are passed
1036 unchanged.
1037 """
1038
1039 if isinstance(element, elements._truncated_label):
1040 return element
1041 else:
1042 return elements._truncated_label(element)
1043
1044
1045class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1046 __slots__ = ()
1047
1048 _coerce_consts = True
1049
1050 def _text_coercion(self, element, argname=None):
1051 # see #5754 for why we can't easily deprecate this coercion.
1052 # essentially expressions like postgresql_where would have to be
1053 # text() as they come back from reflection and we don't want to
1054 # have text() elements wired into the inspection dictionaries.
1055 return elements.TextClause(element)
1056
1057
1058class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1059 __slots__ = ()
1060
1061
1062class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1063 __slots__ = ()
1064
1065
1066class LimitOffsetImpl(RoleImpl):
1067 __slots__ = ()
1068
1069 def _implicit_coercions(
1070 self,
1071 element: Any,
1072 resolved: Any,
1073 argname: Optional[str] = None,
1074 **kw: Any,
1075 ) -> Any:
1076 if resolved is None:
1077 return None
1078 else:
1079 self._raise_for_expected(element, argname, resolved)
1080
1081 def _literal_coercion( # type: ignore[override]
1082 self, element, *, name, type_, **kw
1083 ):
1084 if element is None:
1085 return None
1086 else:
1087 value = util.asint(element)
1088 return selectable._OffsetLimitParam(
1089 name, value, type_=type_, unique=True
1090 )
1091
1092
1093class LabeledColumnExprImpl(ExpressionElementImpl):
1094 __slots__ = ()
1095
1096 def _implicit_coercions(
1097 self,
1098 element: Any,
1099 resolved: Any,
1100 argname: Optional[str] = None,
1101 **kw: Any,
1102 ) -> Any:
1103 if isinstance(resolved, roles.ExpressionElementRole):
1104 return resolved.label(None)
1105 else:
1106 new = super()._implicit_coercions(
1107 element, resolved, argname=argname, **kw
1108 )
1109 if isinstance(new, roles.ExpressionElementRole):
1110 return new.label(None)
1111 else:
1112 self._raise_for_expected(element, argname, resolved)
1113
1114
1115class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1116 __slots__ = ()
1117
1118 _coerce_consts = True
1119 _coerce_numerics = True
1120 _coerce_star = True
1121
1122 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1123
1124 def _raise_for_expected(
1125 self, element, argname=None, resolved=None, *, advice=None, **kw
1126 ):
1127 if not advice and isinstance(element, list):
1128 advice = (
1129 f"Did you mean to say select("
1130 f"{', '.join(repr(e) for e in element)})?"
1131 )
1132
1133 return super()._raise_for_expected(
1134 element, argname=argname, resolved=resolved, advice=advice, **kw
1135 )
1136
1137 def _text_coercion(self, element, argname=None):
1138 element = str(element)
1139
1140 guess_is_literal = not self._guess_straight_column.match(element)
1141 raise exc.ArgumentError(
1142 "Textual column expression %(column)r %(argname)sshould be "
1143 "explicitly declared with text(%(column)r), "
1144 "or use %(literal_column)s(%(column)r) "
1145 "for more specificity"
1146 % {
1147 "column": util.ellipses_string(element),
1148 "argname": "for argument %s" % (argname,) if argname else "",
1149 "literal_column": (
1150 "literal_column" if guess_is_literal else "column"
1151 ),
1152 }
1153 )
1154
1155
1156class ReturnsRowsImpl(RoleImpl):
1157 __slots__ = ()
1158
1159
1160class StatementImpl(_CoerceLiterals, RoleImpl):
1161 __slots__ = ()
1162
1163 def _post_coercion(
1164 self, resolved, *, original_element, argname=None, **kw
1165 ):
1166 if resolved is not original_element and not isinstance(
1167 original_element, str
1168 ):
1169 # use same method as Connection uses; this will later raise
1170 # ObjectNotExecutableError
1171 try:
1172 original_element._execute_on_connection
1173 except AttributeError:
1174 util.warn_deprecated(
1175 "Object %r should not be used directly in a SQL statement "
1176 "context, such as passing to methods such as "
1177 "session.execute(). This usage will be disallowed in a "
1178 "future release. "
1179 "Please use Core select() / update() / delete() etc. "
1180 "with Session.execute() and other statement execution "
1181 "methods." % original_element,
1182 "1.4",
1183 )
1184
1185 return resolved
1186
1187 def _implicit_coercions(
1188 self,
1189 element: Any,
1190 resolved: Any,
1191 argname: Optional[str] = None,
1192 **kw: Any,
1193 ) -> Any:
1194 if resolved._is_lambda_element:
1195 return resolved
1196 else:
1197 return super()._implicit_coercions(
1198 element, resolved, argname=argname, **kw
1199 )
1200
1201
1202class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1203 __slots__ = ()
1204
1205 def _implicit_coercions(
1206 self,
1207 element: Any,
1208 resolved: Any,
1209 argname: Optional[str] = None,
1210 **kw: Any,
1211 ) -> Any:
1212 if resolved._is_text_clause:
1213 return resolved.columns()
1214 else:
1215 self._raise_for_expected(element, argname, resolved)
1216
1217
1218class HasCTEImpl(ReturnsRowsImpl):
1219 __slots__ = ()
1220
1221
1222class IsCTEImpl(RoleImpl):
1223 __slots__ = ()
1224
1225
1226class JoinTargetImpl(RoleImpl):
1227 __slots__ = ()
1228
1229 _skip_clauseelement_for_target_match = True
1230
1231 def _literal_coercion(self, element, *, argname=None, **kw):
1232 self._raise_for_expected(element, argname)
1233
1234 def _implicit_coercions(
1235 self,
1236 element: Any,
1237 resolved: Any,
1238 argname: Optional[str] = None,
1239 *,
1240 legacy: bool = False,
1241 **kw: Any,
1242 ) -> Any:
1243 if isinstance(element, roles.JoinTargetRole):
1244 # note that this codepath no longer occurs as of
1245 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1246 # were set to False.
1247 return element
1248 elif legacy and resolved._is_select_base:
1249 util.warn_deprecated(
1250 "Implicit coercion of SELECT and textual SELECT "
1251 "constructs into FROM clauses is deprecated; please call "
1252 ".subquery() on any Core select or ORM Query object in "
1253 "order to produce a subquery object.",
1254 version="1.4",
1255 )
1256 # TODO: doing _implicit_subquery here causes tests to fail,
1257 # how was this working before? probably that ORM
1258 # join logic treated it as a select and subquery would happen
1259 # in _ORMJoin->Join
1260 return resolved
1261 else:
1262 self._raise_for_expected(element, argname, resolved)
1263
1264
1265class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1266 __slots__ = ()
1267
1268 def _implicit_coercions(
1269 self,
1270 element: Any,
1271 resolved: Any,
1272 argname: Optional[str] = None,
1273 *,
1274 explicit_subquery: bool = False,
1275 allow_select: bool = True,
1276 **kw: Any,
1277 ) -> Any:
1278 if resolved._is_select_base:
1279 if explicit_subquery:
1280 return resolved.subquery()
1281 elif allow_select:
1282 util.warn_deprecated(
1283 "Implicit coercion of SELECT and textual SELECT "
1284 "constructs into FROM clauses is deprecated; please call "
1285 ".subquery() on any Core select or ORM Query object in "
1286 "order to produce a subquery object.",
1287 version="1.4",
1288 )
1289 return resolved._implicit_subquery
1290 elif resolved._is_text_clause:
1291 return resolved
1292 else:
1293 self._raise_for_expected(element, argname, resolved)
1294
1295 def _post_coercion(self, element, *, deannotate=False, **kw):
1296 if deannotate:
1297 return element._deannotate()
1298 else:
1299 return element
1300
1301
1302class StrictFromClauseImpl(FromClauseImpl):
1303 __slots__ = ()
1304
1305 def _implicit_coercions(
1306 self,
1307 element: Any,
1308 resolved: Any,
1309 argname: Optional[str] = None,
1310 *,
1311 allow_select: bool = False,
1312 **kw: Any,
1313 ) -> Any:
1314 if resolved._is_select_base and allow_select:
1315 util.warn_deprecated(
1316 "Implicit coercion of SELECT and textual SELECT constructs "
1317 "into FROM clauses is deprecated; please call .subquery() "
1318 "on any Core select or ORM Query object in order to produce a "
1319 "subquery object.",
1320 version="1.4",
1321 )
1322 return resolved._implicit_subquery
1323 else:
1324 self._raise_for_expected(element, argname, resolved)
1325
1326
1327class AnonymizedFromClauseImpl(StrictFromClauseImpl):
1328 __slots__ = ()
1329
1330 def _post_coercion(self, element, *, flat=False, name=None, **kw):
1331 assert name is None
1332
1333 return element._anonymous_fromclause(flat=flat)
1334
1335
1336class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1337 __slots__ = ()
1338
1339 def _post_coercion(self, element, **kw):
1340 if "dml_table" in element._annotations:
1341 return element._annotations["dml_table"]
1342 else:
1343 return element
1344
1345
1346class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1347 __slots__ = ()
1348
1349 def _implicit_coercions(
1350 self,
1351 element: Any,
1352 resolved: Any,
1353 argname: Optional[str] = None,
1354 **kw: Any,
1355 ) -> Any:
1356 if resolved._is_from_clause:
1357 if (
1358 isinstance(resolved, selectable.Alias)
1359 and resolved.element._is_select_base
1360 ):
1361 return resolved.element
1362 else:
1363 return resolved.select()
1364 else:
1365 self._raise_for_expected(element, argname, resolved)
1366
1367
1368class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1369 __slots__ = ()
1370
1371 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1372 if isinstance(element, roles.FromClauseRole):
1373 if element._is_subquery:
1374 advice = (
1375 "Use the plain select() object without "
1376 "calling .subquery() or .alias()."
1377 )
1378 else:
1379 advice = (
1380 "To SELECT from any FROM clause, use the .select() method."
1381 )
1382 else:
1383 advice = None
1384 return super()._raise_for_expected(
1385 element, argname=argname, resolved=resolved, advice=advice, **kw
1386 )
1387
1388
1389_impl_lookup = {}
1390
1391
1392for name in dir(roles):
1393 cls = getattr(roles, name)
1394 if name.endswith("Role"):
1395 name = name.replace("Role", "Impl")
1396 if name in globals():
1397 impl = globals()[name](cls)
1398 _impl_lookup[cls] = impl
1399
1400if not TYPE_CHECKING:
1401 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1402
1403 for py_type in (int, bool, str, float):
1404 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl