1# sql/coercions.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import operators
33from . import roles
34from . import visitors
35from ._typing import is_from_clause
36from .base import ExecutableOption
37from .base import Options
38from .cache_key import HasCacheKey
39from .visitors import Visitable
40from .. import exc
41from .. import inspection
42from .. import util
43from ..util.typing import Literal
44
45if typing.TYPE_CHECKING:
46 # elements lambdas schema selectable are set by __init__
47 from . import elements
48 from . import lambdas
49 from . import schema
50 from . import selectable
51 from ._typing import _ColumnExpressionArgument
52 from ._typing import _ColumnsClauseArgument
53 from ._typing import _DDLColumnArgument
54 from ._typing import _DMLTableArgument
55 from ._typing import _FromClauseArgument
56 from .dml import _DMLTableElement
57 from .elements import BindParameter
58 from .elements import ClauseElement
59 from .elements import ColumnClause
60 from .elements import ColumnElement
61 from .elements import DQLDMLClauseElement
62 from .elements import NamedColumn
63 from .elements import SQLCoreOperations
64 from .schema import Column
65 from .selectable import _ColumnsClauseElement
66 from .selectable import _JoinTargetProtocol
67 from .selectable import FromClause
68 from .selectable import HasCTE
69 from .selectable import SelectBase
70 from .selectable import Subquery
71 from .visitors import _TraverseCallableType
72
73_SR = TypeVar("_SR", bound=roles.SQLRole)
74_F = TypeVar("_F", bound=Callable[..., Any])
75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
76_T = TypeVar("_T", bound=Any)
77
78
79def _is_literal(element):
80 """Return whether or not the element is a "literal" in the context
81 of a SQL expression construct.
82
83 """
84
85 return not isinstance(
86 element,
87 (Visitable, schema.SchemaEventTarget),
88 ) and not hasattr(element, "__clause_element__")
89
90
91def _deep_is_literal(element):
92 """Return whether or not the element is a "literal" in the context
93 of a SQL expression construct.
94
95 does a deeper more esoteric check than _is_literal. is used
96 for lambda elements that have to distinguish values that would
97 be bound vs. not without any context.
98
99 """
100
101 if isinstance(element, collections_abc.Sequence) and not isinstance(
102 element, str
103 ):
104 for elem in element:
105 if not _deep_is_literal(elem):
106 return False
107 else:
108 return True
109
110 return (
111 not isinstance(
112 element,
113 (
114 Visitable,
115 schema.SchemaEventTarget,
116 HasCacheKey,
117 Options,
118 util.langhelpers.symbol,
119 ),
120 )
121 and not hasattr(element, "__clause_element__")
122 and (
123 not isinstance(element, type)
124 or not issubclass(element, HasCacheKey)
125 )
126 )
127
128
129def _document_text_coercion(
130 paramname: str, meth_rst: str, param_rst: str
131) -> Callable[[_F], _F]:
132 return util.add_parameter_text(
133 paramname,
134 (
135 ".. warning:: "
136 "The %s argument to %s can be passed as a Python string argument, "
137 "which will be treated "
138 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
139 "UNTRUSTED INPUT TO THIS PARAMETER**."
140 )
141 % (param_rst, meth_rst),
142 )
143
144
145def _expression_collection_was_a_list(
146 attrname: str,
147 fnname: str,
148 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
149) -> Sequence[_T]:
150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
151 if isinstance(args[0], list):
152 raise exc.ArgumentError(
153 f'The "{attrname}" argument to {fnname}(), when '
154 "referring to a sequence "
155 "of items, is now passed as a series of positional "
156 "elements, rather than as a list. "
157 )
158 return cast("Sequence[_T]", args[0])
159
160 return cast("Sequence[_T]", args)
161
162
163@overload
164def expect(
165 role: Type[roles.TruncatedLabelRole],
166 element: Any,
167 **kw: Any,
168) -> str: ...
169
170
171@overload
172def expect(
173 role: Type[roles.DMLColumnRole],
174 element: Any,
175 *,
176 as_key: Literal[True] = ...,
177 **kw: Any,
178) -> str: ...
179
180
181@overload
182def expect(
183 role: Type[roles.LiteralValueRole],
184 element: Any,
185 **kw: Any,
186) -> BindParameter[Any]: ...
187
188
189@overload
190def expect(
191 role: Type[roles.DDLReferredColumnRole],
192 element: Any,
193 **kw: Any,
194) -> Column[Any]: ...
195
196
197@overload
198def expect(
199 role: Type[roles.DDLConstraintColumnRole],
200 element: Any,
201 **kw: Any,
202) -> Union[Column[Any], str]: ...
203
204
205@overload
206def expect(
207 role: Type[roles.StatementOptionRole],
208 element: Any,
209 **kw: Any,
210) -> DQLDMLClauseElement: ...
211
212
213@overload
214def expect(
215 role: Type[roles.LabeledColumnExprRole[Any]],
216 element: _ColumnExpressionArgument[_T],
217 **kw: Any,
218) -> NamedColumn[_T]: ...
219
220
221@overload
222def expect(
223 role: Union[
224 Type[roles.ExpressionElementRole[Any]],
225 Type[roles.LimitOffsetRole],
226 Type[roles.WhereHavingRole],
227 ],
228 element: _ColumnExpressionArgument[_T],
229 **kw: Any,
230) -> ColumnElement[_T]: ...
231
232
233@overload
234def expect(
235 role: Union[
236 Type[roles.ExpressionElementRole[Any]],
237 Type[roles.LimitOffsetRole],
238 Type[roles.WhereHavingRole],
239 Type[roles.OnClauseRole],
240 Type[roles.ColumnArgumentRole],
241 ],
242 element: Any,
243 **kw: Any,
244) -> ColumnElement[Any]: ...
245
246
247@overload
248def expect(
249 role: Type[roles.DMLTableRole],
250 element: _DMLTableArgument,
251 **kw: Any,
252) -> _DMLTableElement: ...
253
254
255@overload
256def expect(
257 role: Type[roles.HasCTERole],
258 element: HasCTE,
259 **kw: Any,
260) -> HasCTE: ...
261
262
263@overload
264def expect(
265 role: Type[roles.SelectStatementRole],
266 element: SelectBase,
267 **kw: Any,
268) -> SelectBase: ...
269
270
271@overload
272def expect(
273 role: Type[roles.FromClauseRole],
274 element: _FromClauseArgument,
275 **kw: Any,
276) -> FromClause: ...
277
278
279@overload
280def expect(
281 role: Type[roles.FromClauseRole],
282 element: SelectBase,
283 *,
284 explicit_subquery: Literal[True] = ...,
285 **kw: Any,
286) -> Subquery: ...
287
288
289@overload
290def expect(
291 role: Type[roles.ColumnsClauseRole],
292 element: _ColumnsClauseArgument[Any],
293 **kw: Any,
294) -> _ColumnsClauseElement: ...
295
296
297@overload
298def expect(
299 role: Type[roles.JoinTargetRole],
300 element: _JoinTargetProtocol,
301 **kw: Any,
302) -> _JoinTargetProtocol: ...
303
304
305# catchall for not-yet-implemented overloads
306@overload
307def expect(
308 role: Type[_SR],
309 element: Any,
310 **kw: Any,
311) -> Any: ...
312
313
314def expect(
315 role: Type[_SR],
316 element: Any,
317 *,
318 apply_propagate_attrs: Optional[ClauseElement] = None,
319 argname: Optional[str] = None,
320 post_inspect: bool = False,
321 disable_inspection: bool = False,
322 **kw: Any,
323) -> Any:
324 if (
325 role.allows_lambda
326 # note callable() will not invoke a __getattr__() method, whereas
327 # hasattr(obj, "__call__") will. by keeping the callable() check here
328 # we prevent most needless calls to hasattr() and therefore
329 # __getattr__(), which is present on ColumnElement.
330 and callable(element)
331 and hasattr(element, "__code__")
332 ):
333 return lambdas.LambdaElement(
334 element,
335 role,
336 lambdas.LambdaOptions(**kw),
337 apply_propagate_attrs=apply_propagate_attrs,
338 )
339
340 # major case is that we are given a ClauseElement already, skip more
341 # elaborate logic up front if possible
342 impl = _impl_lookup[role]
343
344 original_element = element
345
346 if not isinstance(
347 element,
348 (
349 elements.CompilerElement,
350 schema.SchemaItem,
351 schema.FetchedValue,
352 lambdas.PyWrapper,
353 ),
354 ):
355 resolved = None
356
357 if impl._resolve_literal_only:
358 resolved = impl._literal_coercion(element, **kw)
359 else:
360 original_element = element
361
362 is_clause_element = False
363
364 # this is a special performance optimization for ORM
365 # joins used by JoinTargetImpl that we don't go through the
366 # work of creating __clause_element__() when we only need the
367 # original QueryableAttribute, as the former will do clause
368 # adaption and all that which is just thrown away here.
369 if (
370 impl._skip_clauseelement_for_target_match
371 and isinstance(element, role)
372 and hasattr(element, "__clause_element__")
373 ):
374 is_clause_element = True
375 else:
376 while hasattr(element, "__clause_element__"):
377 is_clause_element = True
378
379 if not getattr(element, "is_clause_element", False):
380 element = element.__clause_element__()
381 else:
382 break
383
384 if not is_clause_element:
385 if impl._use_inspection and not disable_inspection:
386 insp = inspection.inspect(element, raiseerr=False)
387 if insp is not None:
388 if post_inspect:
389 insp._post_inspect
390 try:
391 resolved = insp.__clause_element__()
392 except AttributeError:
393 impl._raise_for_expected(original_element, argname)
394
395 if resolved is None:
396 resolved = impl._literal_coercion(
397 element, argname=argname, **kw
398 )
399 else:
400 resolved = element
401 elif isinstance(element, lambdas.PyWrapper):
402 resolved = element._sa__py_wrapper_literal(**kw)
403 else:
404 resolved = element
405
406 if apply_propagate_attrs is not None:
407 if typing.TYPE_CHECKING:
408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
409
410 if not apply_propagate_attrs._propagate_attrs and getattr(
411 resolved, "_propagate_attrs", None
412 ):
413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
414
415 if impl._role_class in resolved.__class__.__mro__:
416 if impl._post_coercion:
417 resolved = impl._post_coercion(
418 resolved,
419 argname=argname,
420 original_element=original_element,
421 **kw,
422 )
423 return resolved
424 else:
425 return impl._implicit_coercions(
426 original_element, resolved, argname=argname, **kw
427 )
428
429
430def expect_as_key(
431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
432) -> str:
433 kw.pop("as_key", None)
434 return expect(role, element, as_key=True, **kw)
435
436
437def expect_col_expression_collection(
438 role: Type[roles.DDLConstraintColumnRole],
439 expressions: Iterable[_DDLColumnArgument],
440) -> Iterator[
441 Tuple[
442 Union[str, Column[Any]],
443 Optional[ColumnClause[Any]],
444 Optional[str],
445 Optional[Union[Column[Any], str]],
446 ]
447]:
448 for expr in expressions:
449 strname = None
450 column = None
451
452 resolved: Union[Column[Any], str] = expect(role, expr)
453 if isinstance(resolved, str):
454 assert isinstance(expr, str)
455 strname = resolved = expr
456 else:
457 cols: List[Column[Any]] = []
458 col_append: _TraverseCallableType[Column[Any]] = cols.append
459 visitors.traverse(resolved, {}, {"column": col_append})
460 if cols:
461 column = cols[0]
462 add_element = column if column is not None else strname
463
464 yield resolved, column, strname, add_element
465
466
467class RoleImpl:
468 __slots__ = ("_role_class", "name", "_use_inspection")
469
470 def _literal_coercion(self, element, **kw):
471 raise NotImplementedError()
472
473 _post_coercion: Any = None
474 _resolve_literal_only = False
475 _skip_clauseelement_for_target_match = False
476
477 def __init__(self, role_class):
478 self._role_class = role_class
479 self.name = role_class._role_name
480 self._use_inspection = issubclass(role_class, roles.UsesInspection)
481
482 def _implicit_coercions(
483 self,
484 element: Any,
485 resolved: Any,
486 argname: Optional[str] = None,
487 **kw: Any,
488 ) -> Any:
489 self._raise_for_expected(element, argname, resolved)
490
491 def _raise_for_expected(
492 self,
493 element: Any,
494 argname: Optional[str] = None,
495 resolved: Optional[Any] = None,
496 advice: Optional[str] = None,
497 code: Optional[str] = None,
498 err: Optional[Exception] = None,
499 **kw: Any,
500 ) -> NoReturn:
501 if resolved is not None and resolved is not element:
502 got = "%r object resolved from %r object" % (resolved, element)
503 else:
504 got = repr(element)
505
506 if argname:
507 msg = "%s expected for argument %r; got %s." % (
508 self.name,
509 argname,
510 got,
511 )
512 else:
513 msg = "%s expected, got %s." % (self.name, got)
514
515 if advice:
516 msg += " " + advice
517
518 raise exc.ArgumentError(msg, code=code) from err
519
520
521class _Deannotate:
522 __slots__ = ()
523
524 def _post_coercion(self, resolved, **kw):
525 from .util import _deep_deannotate
526
527 return _deep_deannotate(resolved)
528
529
530class _StringOnly:
531 __slots__ = ()
532
533 _resolve_literal_only = True
534
535
536class _ReturnsStringKey(RoleImpl):
537 __slots__ = ()
538
539 def _implicit_coercions(self, element, resolved, argname=None, **kw):
540 if isinstance(element, str):
541 return element
542 else:
543 self._raise_for_expected(element, argname, resolved)
544
545 def _literal_coercion(self, element, **kw):
546 return element
547
548
549class _ColumnCoercions(RoleImpl):
550 __slots__ = ()
551
552 def _warn_for_scalar_subquery_coercion(self):
553 util.warn(
554 "implicitly coercing SELECT object to scalar subquery; "
555 "please use the .scalar_subquery() method to produce a scalar "
556 "subquery.",
557 )
558
559 def _implicit_coercions(self, element, resolved, argname=None, **kw):
560 original_element = element
561 if not getattr(resolved, "is_clause_element", False):
562 self._raise_for_expected(original_element, argname, resolved)
563 elif resolved._is_select_base:
564 self._warn_for_scalar_subquery_coercion()
565 return resolved.scalar_subquery()
566 elif resolved._is_from_clause and isinstance(
567 resolved, selectable.Subquery
568 ):
569 self._warn_for_scalar_subquery_coercion()
570 return resolved.element.scalar_subquery()
571 elif self._role_class.allows_lambda and resolved._is_lambda_element:
572 return resolved
573 else:
574 self._raise_for_expected(original_element, argname, resolved)
575
576
577def _no_text_coercion(
578 element: Any,
579 argname: Optional[str] = None,
580 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
581 extra: Optional[str] = None,
582 err: Optional[Exception] = None,
583) -> NoReturn:
584 raise exc_cls(
585 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
586 "explicitly declared as text(%(expr)r)"
587 % {
588 "expr": util.ellipses_string(element),
589 "argname": "for argument %s" % (argname,) if argname else "",
590 "extra": "%s " % extra if extra else "",
591 }
592 ) from err
593
594
595class _NoTextCoercion(RoleImpl):
596 __slots__ = ()
597
598 def _literal_coercion(self, element, argname=None, **kw):
599 if isinstance(element, str) and issubclass(
600 elements.TextClause, self._role_class
601 ):
602 _no_text_coercion(element, argname)
603 else:
604 self._raise_for_expected(element, argname)
605
606
607class _CoerceLiterals(RoleImpl):
608 __slots__ = ()
609 _coerce_consts = False
610 _coerce_star = False
611 _coerce_numerics = False
612
613 def _text_coercion(self, element, argname=None):
614 return _no_text_coercion(element, argname)
615
616 def _literal_coercion(self, element, argname=None, **kw):
617 if isinstance(element, str):
618 if self._coerce_star and element == "*":
619 return elements.ColumnClause("*", is_literal=True)
620 else:
621 return self._text_coercion(element, argname, **kw)
622
623 if self._coerce_consts:
624 if element is None:
625 return elements.Null()
626 elif element is False:
627 return elements.False_()
628 elif element is True:
629 return elements.True_()
630
631 if self._coerce_numerics and isinstance(element, (numbers.Number)):
632 return elements.ColumnClause(str(element), is_literal=True)
633
634 self._raise_for_expected(element, argname)
635
636
637class LiteralValueImpl(RoleImpl):
638 _resolve_literal_only = True
639
640 def _implicit_coercions(
641 self,
642 element,
643 resolved,
644 argname,
645 type_=None,
646 literal_execute=False,
647 **kw,
648 ):
649 if not _is_literal(resolved):
650 self._raise_for_expected(
651 element, resolved=resolved, argname=argname, **kw
652 )
653
654 return elements.BindParameter(
655 None,
656 element,
657 type_=type_,
658 unique=True,
659 literal_execute=literal_execute,
660 )
661
662 def _literal_coercion(self, element, argname=None, type_=None, **kw):
663 return element
664
665
666class _SelectIsNotFrom(RoleImpl):
667 __slots__ = ()
668
669 def _raise_for_expected(
670 self,
671 element: Any,
672 argname: Optional[str] = None,
673 resolved: Optional[Any] = None,
674 advice: Optional[str] = None,
675 code: Optional[str] = None,
676 err: Optional[Exception] = None,
677 **kw: Any,
678 ) -> NoReturn:
679 if (
680 not advice
681 and isinstance(element, roles.SelectStatementRole)
682 or isinstance(resolved, roles.SelectStatementRole)
683 ):
684 advice = (
685 "To create a "
686 "FROM clause from a %s object, use the .subquery() method."
687 % (resolved.__class__ if resolved is not None else element,)
688 )
689 code = "89ve"
690 else:
691 code = None
692
693 super()._raise_for_expected(
694 element,
695 argname=argname,
696 resolved=resolved,
697 advice=advice,
698 code=code,
699 err=err,
700 **kw,
701 )
702 # never reached
703 assert False
704
705
706class HasCacheKeyImpl(RoleImpl):
707 __slots__ = ()
708
709 def _implicit_coercions(
710 self,
711 element: Any,
712 resolved: Any,
713 argname: Optional[str] = None,
714 **kw: Any,
715 ) -> Any:
716 if isinstance(element, HasCacheKey):
717 return element
718 else:
719 self._raise_for_expected(element, argname, resolved)
720
721 def _literal_coercion(self, element, **kw):
722 return element
723
724
725class ExecutableOptionImpl(RoleImpl):
726 __slots__ = ()
727
728 def _implicit_coercions(
729 self,
730 element: Any,
731 resolved: Any,
732 argname: Optional[str] = None,
733 **kw: Any,
734 ) -> Any:
735 if isinstance(element, ExecutableOption):
736 return element
737 else:
738 self._raise_for_expected(element, argname, resolved)
739
740 def _literal_coercion(self, element, **kw):
741 return element
742
743
744class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
745 __slots__ = ()
746
747 def _literal_coercion(
748 self, element, name=None, type_=None, argname=None, is_crud=False, **kw
749 ):
750 if (
751 element is None
752 and not is_crud
753 and (type_ is None or not type_.should_evaluate_none)
754 ):
755 # TODO: there's no test coverage now for the
756 # "should_evaluate_none" part of this, as outside of "crud" this
757 # codepath is not normally used except in some special cases
758 return elements.Null()
759 else:
760 try:
761 return elements.BindParameter(
762 name, element, type_, unique=True, _is_crud=is_crud
763 )
764 except exc.ArgumentError as err:
765 self._raise_for_expected(element, err=err)
766
767 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
768 # select uses implicit coercion with warning instead of raising
769 if isinstance(element, selectable.Values):
770 advice = (
771 "To create a column expression from a VALUES clause, "
772 "use the .scalar_values() method."
773 )
774 elif isinstance(element, roles.AnonymizedFromClauseRole):
775 advice = (
776 "To create a column expression from a FROM clause row "
777 "as a whole, use the .table_valued() method."
778 )
779 else:
780 advice = None
781
782 return super()._raise_for_expected(
783 element, argname=argname, resolved=resolved, advice=advice, **kw
784 )
785
786
787class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
788 __slots__ = ()
789
790 def _literal_coercion(
791 self, element, expr, operator, bindparam_type=None, argname=None, **kw
792 ):
793 try:
794 return expr._bind_param(operator, element, type_=bindparam_type)
795 except exc.ArgumentError as err:
796 self._raise_for_expected(element, err=err)
797
798 def _post_coercion(self, resolved, expr, bindparam_type=None, **kw):
799 if resolved.type._isnull and not expr.type._isnull:
800 resolved = resolved._with_binary_element_type(
801 bindparam_type if bindparam_type is not None else expr.type
802 )
803 return resolved
804
805
806class InElementImpl(RoleImpl):
807 __slots__ = ()
808
809 def _implicit_coercions(
810 self,
811 element: Any,
812 resolved: Any,
813 argname: Optional[str] = None,
814 **kw: Any,
815 ) -> Any:
816 if resolved._is_from_clause:
817 if (
818 isinstance(resolved, selectable.Alias)
819 and resolved.element._is_select_base
820 ):
821 self._warn_for_implicit_coercion(resolved)
822 return self._post_coercion(resolved.element, **kw)
823 else:
824 self._warn_for_implicit_coercion(resolved)
825 return self._post_coercion(resolved.select(), **kw)
826 else:
827 self._raise_for_expected(element, argname, resolved)
828
829 def _warn_for_implicit_coercion(self, elem):
830 util.warn(
831 "Coercing %s object into a select() for use in IN(); "
832 "please pass a select() construct explicitly"
833 % (elem.__class__.__name__)
834 )
835
836 def _literal_coercion(self, element, expr, operator, **kw):
837 if util.is_non_string_iterable(element):
838 non_literal_expressions: Dict[
839 Optional[operators.ColumnOperators],
840 operators.ColumnOperators,
841 ] = {}
842 element = list(element)
843 for o in element:
844 if not _is_literal(o):
845 if not isinstance(o, operators.ColumnOperators):
846 self._raise_for_expected(element, **kw)
847
848 else:
849 non_literal_expressions[o] = o
850 elif o is None:
851 non_literal_expressions[o] = elements.Null()
852
853 if non_literal_expressions:
854 return elements.ClauseList(
855 *[
856 (
857 non_literal_expressions[o]
858 if o in non_literal_expressions
859 else expr._bind_param(operator, o)
860 )
861 for o in element
862 ]
863 )
864 else:
865 return expr._bind_param(operator, element, expanding=True)
866
867 else:
868 self._raise_for_expected(element, **kw)
869
870 def _post_coercion(self, element, expr, operator, **kw):
871 if element._is_select_base:
872 # for IN, we are doing scalar_subquery() coercion without
873 # a warning
874 return element.scalar_subquery()
875 elif isinstance(element, elements.ClauseList):
876 assert not len(element.clauses) == 0
877 return element.self_group(against=operator)
878
879 elif isinstance(element, elements.BindParameter):
880 element = element._clone(maintain_key=True)
881 element.expanding = True
882 element.expand_op = operator
883
884 return element
885 elif isinstance(element, selectable.Values):
886 return element.scalar_values()
887 else:
888 return element
889
890
891class OnClauseImpl(_ColumnCoercions, RoleImpl):
892 __slots__ = ()
893
894 _coerce_consts = True
895
896 def _literal_coercion(
897 self, element, name=None, type_=None, argname=None, is_crud=False, **kw
898 ):
899 self._raise_for_expected(element)
900
901 def _post_coercion(self, resolved, original_element=None, **kw):
902 # this is a hack right now as we want to use coercion on an
903 # ORM InstrumentedAttribute, but we want to return the object
904 # itself if it is one, not its clause element.
905 # ORM context _join and _legacy_join() would need to be improved
906 # to look for annotations in a clause element form.
907 if isinstance(original_element, roles.JoinTargetRole):
908 return original_element
909 return resolved
910
911
912class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
913 __slots__ = ()
914
915 _coerce_consts = True
916
917 def _text_coercion(self, element, argname=None):
918 return _no_text_coercion(element, argname)
919
920
921class StatementOptionImpl(_CoerceLiterals, RoleImpl):
922 __slots__ = ()
923
924 _coerce_consts = True
925
926 def _text_coercion(self, element, argname=None):
927 return elements.TextClause(element)
928
929
930class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
931 __slots__ = ()
932
933
934class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
935 __slots__ = ()
936
937
938class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
939 __slots__ = ()
940
941 def _text_coercion(self, element, argname=None):
942 return elements.ColumnClause(element)
943
944
945class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
946 __slots__ = ()
947
948 _coerce_consts = True
949
950 def _text_coercion(self, element, argname=None):
951 return elements._textual_label_reference(element)
952
953
954class OrderByImpl(ByOfImpl, RoleImpl):
955 __slots__ = ()
956
957 def _post_coercion(self, resolved, **kw):
958 if (
959 isinstance(resolved, self._role_class)
960 and resolved._order_by_label_element is not None
961 ):
962 return elements._label_reference(resolved)
963 else:
964 return resolved
965
966
967class GroupByImpl(ByOfImpl, RoleImpl):
968 __slots__ = ()
969
970 def _implicit_coercions(
971 self,
972 element: Any,
973 resolved: Any,
974 argname: Optional[str] = None,
975 **kw: Any,
976 ) -> Any:
977 if is_from_clause(resolved):
978 return elements.ClauseList(*resolved.c)
979 else:
980 return resolved
981
982
983class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
984 __slots__ = ()
985
986 def _post_coercion(self, element, as_key=False, **kw):
987 if as_key:
988 return element.key
989 else:
990 return element
991
992
993class ConstExprImpl(RoleImpl):
994 __slots__ = ()
995
996 def _literal_coercion(self, element, argname=None, **kw):
997 if element is None:
998 return elements.Null()
999 elif element is False:
1000 return elements.False_()
1001 elif element is True:
1002 return elements.True_()
1003 else:
1004 self._raise_for_expected(element, argname)
1005
1006
1007class TruncatedLabelImpl(_StringOnly, RoleImpl):
1008 __slots__ = ()
1009
1010 def _implicit_coercions(
1011 self,
1012 element: Any,
1013 resolved: Any,
1014 argname: Optional[str] = None,
1015 **kw: Any,
1016 ) -> Any:
1017 if isinstance(element, str):
1018 return resolved
1019 else:
1020 self._raise_for_expected(element, argname, resolved)
1021
1022 def _literal_coercion(self, element, argname=None, **kw):
1023 """coerce the given value to :class:`._truncated_label`.
1024
1025 Existing :class:`._truncated_label` and
1026 :class:`._anonymous_label` objects are passed
1027 unchanged.
1028 """
1029
1030 if isinstance(element, elements._truncated_label):
1031 return element
1032 else:
1033 return elements._truncated_label(element)
1034
1035
1036class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1037 __slots__ = ()
1038
1039 _coerce_consts = True
1040
1041 def _text_coercion(self, element, argname=None):
1042 # see #5754 for why we can't easily deprecate this coercion.
1043 # essentially expressions like postgresql_where would have to be
1044 # text() as they come back from reflection and we don't want to
1045 # have text() elements wired into the inspection dictionaries.
1046 return elements.TextClause(element)
1047
1048
1049class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1050 __slots__ = ()
1051
1052
1053class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1054 __slots__ = ()
1055
1056
1057class LimitOffsetImpl(RoleImpl):
1058 __slots__ = ()
1059
1060 def _implicit_coercions(
1061 self,
1062 element: Any,
1063 resolved: Any,
1064 argname: Optional[str] = None,
1065 **kw: Any,
1066 ) -> Any:
1067 if resolved is None:
1068 return None
1069 else:
1070 self._raise_for_expected(element, argname, resolved)
1071
1072 def _literal_coercion(self, element, name, type_, **kw):
1073 if element is None:
1074 return None
1075 else:
1076 value = util.asint(element)
1077 return selectable._OffsetLimitParam(
1078 name, value, type_=type_, unique=True
1079 )
1080
1081
1082class LabeledColumnExprImpl(ExpressionElementImpl):
1083 __slots__ = ()
1084
1085 def _implicit_coercions(
1086 self,
1087 element: Any,
1088 resolved: Any,
1089 argname: Optional[str] = None,
1090 **kw: Any,
1091 ) -> Any:
1092 if isinstance(resolved, roles.ExpressionElementRole):
1093 return resolved.label(None)
1094 else:
1095 new = super()._implicit_coercions(
1096 element, resolved, argname=argname, **kw
1097 )
1098 if isinstance(new, roles.ExpressionElementRole):
1099 return new.label(None)
1100 else:
1101 self._raise_for_expected(element, argname, resolved)
1102
1103
1104class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1105 __slots__ = ()
1106
1107 _coerce_consts = True
1108 _coerce_numerics = True
1109 _coerce_star = True
1110
1111 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1112
1113 def _raise_for_expected(
1114 self, element, argname=None, resolved=None, advice=None, **kw
1115 ):
1116 if not advice and isinstance(element, list):
1117 advice = (
1118 f"Did you mean to say select("
1119 f"{', '.join(repr(e) for e in element)})?"
1120 )
1121
1122 return super()._raise_for_expected(
1123 element, argname=argname, resolved=resolved, advice=advice, **kw
1124 )
1125
1126 def _text_coercion(self, element, argname=None):
1127 element = str(element)
1128
1129 guess_is_literal = not self._guess_straight_column.match(element)
1130 raise exc.ArgumentError(
1131 "Textual column expression %(column)r %(argname)sshould be "
1132 "explicitly declared with text(%(column)r), "
1133 "or use %(literal_column)s(%(column)r) "
1134 "for more specificity"
1135 % {
1136 "column": util.ellipses_string(element),
1137 "argname": "for argument %s" % (argname,) if argname else "",
1138 "literal_column": (
1139 "literal_column" if guess_is_literal else "column"
1140 ),
1141 }
1142 )
1143
1144
1145class ReturnsRowsImpl(RoleImpl):
1146 __slots__ = ()
1147
1148
1149class StatementImpl(_CoerceLiterals, RoleImpl):
1150 __slots__ = ()
1151
1152 def _post_coercion(self, resolved, original_element, argname=None, **kw):
1153 if resolved is not original_element and not isinstance(
1154 original_element, str
1155 ):
1156 # use same method as Connection uses; this will later raise
1157 # ObjectNotExecutableError
1158 try:
1159 original_element._execute_on_connection
1160 except AttributeError:
1161 util.warn_deprecated(
1162 "Object %r should not be used directly in a SQL statement "
1163 "context, such as passing to methods such as "
1164 "session.execute(). This usage will be disallowed in a "
1165 "future release. "
1166 "Please use Core select() / update() / delete() etc. "
1167 "with Session.execute() and other statement execution "
1168 "methods." % original_element,
1169 "1.4",
1170 )
1171
1172 return resolved
1173
1174 def _implicit_coercions(
1175 self,
1176 element: Any,
1177 resolved: Any,
1178 argname: Optional[str] = None,
1179 **kw: Any,
1180 ) -> Any:
1181 if resolved._is_lambda_element:
1182 return resolved
1183 else:
1184 return super()._implicit_coercions(
1185 element, resolved, argname=argname, **kw
1186 )
1187
1188
1189class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1190 __slots__ = ()
1191
1192 def _implicit_coercions(
1193 self,
1194 element: Any,
1195 resolved: Any,
1196 argname: Optional[str] = None,
1197 **kw: Any,
1198 ) -> Any:
1199 if resolved._is_text_clause:
1200 return resolved.columns()
1201 else:
1202 self._raise_for_expected(element, argname, resolved)
1203
1204
1205class HasCTEImpl(ReturnsRowsImpl):
1206 __slots__ = ()
1207
1208
1209class IsCTEImpl(RoleImpl):
1210 __slots__ = ()
1211
1212
1213class JoinTargetImpl(RoleImpl):
1214 __slots__ = ()
1215
1216 _skip_clauseelement_for_target_match = True
1217
1218 def _literal_coercion(self, element, argname=None, **kw):
1219 self._raise_for_expected(element, argname)
1220
1221 def _implicit_coercions(
1222 self,
1223 element: Any,
1224 resolved: Any,
1225 argname: Optional[str] = None,
1226 legacy: bool = False,
1227 **kw: Any,
1228 ) -> Any:
1229 if isinstance(element, roles.JoinTargetRole):
1230 # note that this codepath no longer occurs as of
1231 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1232 # were set to False.
1233 return element
1234 elif legacy and resolved._is_select_base:
1235 util.warn_deprecated(
1236 "Implicit coercion of SELECT and textual SELECT "
1237 "constructs into FROM clauses is deprecated; please call "
1238 ".subquery() on any Core select or ORM Query object in "
1239 "order to produce a subquery object.",
1240 version="1.4",
1241 )
1242 # TODO: doing _implicit_subquery here causes tests to fail,
1243 # how was this working before? probably that ORM
1244 # join logic treated it as a select and subquery would happen
1245 # in _ORMJoin->Join
1246 return resolved
1247 else:
1248 self._raise_for_expected(element, argname, resolved)
1249
1250
1251class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1252 __slots__ = ()
1253
1254 def _implicit_coercions(
1255 self,
1256 element: Any,
1257 resolved: Any,
1258 argname: Optional[str] = None,
1259 explicit_subquery: bool = False,
1260 allow_select: bool = True,
1261 **kw: Any,
1262 ) -> Any:
1263 if resolved._is_select_base:
1264 if explicit_subquery:
1265 return resolved.subquery()
1266 elif allow_select:
1267 util.warn_deprecated(
1268 "Implicit coercion of SELECT and textual SELECT "
1269 "constructs into FROM clauses is deprecated; please call "
1270 ".subquery() on any Core select or ORM Query object in "
1271 "order to produce a subquery object.",
1272 version="1.4",
1273 )
1274 return resolved._implicit_subquery
1275 elif resolved._is_text_clause:
1276 return resolved
1277 else:
1278 self._raise_for_expected(element, argname, resolved)
1279
1280 def _post_coercion(self, element, deannotate=False, **kw):
1281 if deannotate:
1282 return element._deannotate()
1283 else:
1284 return element
1285
1286
1287class StrictFromClauseImpl(FromClauseImpl):
1288 __slots__ = ()
1289
1290 def _implicit_coercions(
1291 self,
1292 element: Any,
1293 resolved: Any,
1294 argname: Optional[str] = None,
1295 explicit_subquery: bool = False,
1296 allow_select: bool = False,
1297 **kw: Any,
1298 ) -> Any:
1299 if resolved._is_select_base and allow_select:
1300 util.warn_deprecated(
1301 "Implicit coercion of SELECT and textual SELECT constructs "
1302 "into FROM clauses is deprecated; please call .subquery() "
1303 "on any Core select or ORM Query object in order to produce a "
1304 "subquery object.",
1305 version="1.4",
1306 )
1307 return resolved._implicit_subquery
1308 else:
1309 self._raise_for_expected(element, argname, resolved)
1310
1311
1312class AnonymizedFromClauseImpl(StrictFromClauseImpl):
1313 __slots__ = ()
1314
1315 def _post_coercion(self, element, flat=False, name=None, **kw):
1316 assert name is None
1317
1318 return element._anonymous_fromclause(flat=flat)
1319
1320
1321class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1322 __slots__ = ()
1323
1324 def _post_coercion(self, element, **kw):
1325 if "dml_table" in element._annotations:
1326 return element._annotations["dml_table"]
1327 else:
1328 return element
1329
1330
1331class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1332 __slots__ = ()
1333
1334 def _implicit_coercions(
1335 self,
1336 element: Any,
1337 resolved: Any,
1338 argname: Optional[str] = None,
1339 **kw: Any,
1340 ) -> Any:
1341 if resolved._is_from_clause:
1342 if (
1343 isinstance(resolved, selectable.Alias)
1344 and resolved.element._is_select_base
1345 ):
1346 return resolved.element
1347 else:
1348 return resolved.select()
1349 else:
1350 self._raise_for_expected(element, argname, resolved)
1351
1352
1353class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1354 __slots__ = ()
1355
1356 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1357 if isinstance(element, roles.FromClauseRole):
1358 if element._is_subquery:
1359 advice = (
1360 "Use the plain select() object without "
1361 "calling .subquery() or .alias()."
1362 )
1363 else:
1364 advice = (
1365 "To SELECT from any FROM clause, use the .select() method."
1366 )
1367 else:
1368 advice = None
1369 return super()._raise_for_expected(
1370 element, argname=argname, resolved=resolved, advice=advice, **kw
1371 )
1372
1373
1374_impl_lookup = {}
1375
1376
1377for name in dir(roles):
1378 cls = getattr(roles, name)
1379 if name.endswith("Role"):
1380 name = name.replace("Role", "Impl")
1381 if name in globals():
1382 impl = globals()[name](cls)
1383 _impl_lookup[cls] = impl
1384
1385if not TYPE_CHECKING:
1386 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1387
1388 for py_type in (int, bool, str, float):
1389 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl