1# sql/coercions.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import collections.abc as collections_abc
12import numbers
13import re
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import Iterator
21from typing import List
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import operators
33from . import roles
34from . import visitors
35from ._typing import is_from_clause
36from .base import ExecutableOption
37from .base import Options
38from .cache_key import HasCacheKey
39from .visitors import Visitable
40from .. import exc
41from .. import inspection
42from .. import util
43from ..util.typing import Literal
44
45if typing.TYPE_CHECKING:
46 # elements lambdas schema selectable are set by __init__
47 from . import elements
48 from . import lambdas
49 from . import schema
50 from . import selectable
51 from ._typing import _ColumnExpressionArgument
52 from ._typing import _ColumnsClauseArgument
53 from ._typing import _DDLColumnArgument
54 from ._typing import _DMLTableArgument
55 from ._typing import _FromClauseArgument
56 from .dml import _DMLTableElement
57 from .elements import BindParameter
58 from .elements import ClauseElement
59 from .elements import ColumnClause
60 from .elements import ColumnElement
61 from .elements import DQLDMLClauseElement
62 from .elements import NamedColumn
63 from .elements import SQLCoreOperations
64 from .schema import Column
65 from .selectable import _ColumnsClauseElement
66 from .selectable import _JoinTargetProtocol
67 from .selectable import FromClause
68 from .selectable import HasCTE
69 from .selectable import SelectBase
70 from .selectable import Subquery
71 from .visitors import _TraverseCallableType
72
73_SR = TypeVar("_SR", bound=roles.SQLRole)
74_F = TypeVar("_F", bound=Callable[..., Any])
75_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
76_T = TypeVar("_T", bound=Any)
77
78
79def _is_literal(element):
80 """Return whether or not the element is a "literal" in the context
81 of a SQL expression construct.
82
83 """
84
85 return not isinstance(
86 element,
87 (Visitable, schema.SchemaEventTarget),
88 ) and not hasattr(element, "__clause_element__")
89
90
91def _deep_is_literal(element):
92 """Return whether or not the element is a "literal" in the context
93 of a SQL expression construct.
94
95 does a deeper more esoteric check than _is_literal. is used
96 for lambda elements that have to distinguish values that would
97 be bound vs. not without any context.
98
99 """
100
101 if isinstance(element, collections_abc.Sequence) and not isinstance(
102 element, str
103 ):
104 for elem in element:
105 if not _deep_is_literal(elem):
106 return False
107 else:
108 return True
109
110 return (
111 not isinstance(
112 element,
113 (
114 Visitable,
115 schema.SchemaEventTarget,
116 HasCacheKey,
117 Options,
118 util.langhelpers.symbol,
119 ),
120 )
121 and not hasattr(element, "__clause_element__")
122 and (
123 not isinstance(element, type)
124 or not issubclass(element, HasCacheKey)
125 )
126 )
127
128
129def _document_text_coercion(
130 paramname: str, meth_rst: str, param_rst: str
131) -> Callable[[_F], _F]:
132 return util.add_parameter_text(
133 paramname,
134 (
135 ".. warning:: "
136 "The %s argument to %s can be passed as a Python string argument, "
137 "which will be treated "
138 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
139 "UNTRUSTED INPUT TO THIS PARAMETER**."
140 )
141 % (param_rst, meth_rst),
142 )
143
144
145def _expression_collection_was_a_list(
146 attrname: str,
147 fnname: str,
148 args: Union[Sequence[_T], Sequence[Sequence[_T]]],
149) -> Sequence[_T]:
150 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
151 if isinstance(args[0], list):
152 raise exc.ArgumentError(
153 f'The "{attrname}" argument to {fnname}(), when '
154 "referring to a sequence "
155 "of items, is now passed as a series of positional "
156 "elements, rather than as a list. "
157 )
158 return cast("Sequence[_T]", args[0])
159
160 return cast("Sequence[_T]", args)
161
162
163@overload
164def expect(
165 role: Type[roles.TruncatedLabelRole],
166 element: Any,
167 **kw: Any,
168) -> str: ...
169
170
171@overload
172def expect(
173 role: Type[roles.DMLColumnRole],
174 element: Any,
175 *,
176 as_key: Literal[True] = ...,
177 **kw: Any,
178) -> str: ...
179
180
181@overload
182def expect(
183 role: Type[roles.LiteralValueRole],
184 element: Any,
185 **kw: Any,
186) -> BindParameter[Any]: ...
187
188
189@overload
190def expect(
191 role: Type[roles.DDLReferredColumnRole],
192 element: Any,
193 **kw: Any,
194) -> Column[Any]: ...
195
196
197@overload
198def expect(
199 role: Type[roles.DDLConstraintColumnRole],
200 element: Any,
201 **kw: Any,
202) -> Union[Column[Any], str]: ...
203
204
205@overload
206def expect(
207 role: Type[roles.StatementOptionRole],
208 element: Any,
209 **kw: Any,
210) -> DQLDMLClauseElement: ...
211
212
213@overload
214def expect(
215 role: Type[roles.LabeledColumnExprRole[Any]],
216 element: _ColumnExpressionArgument[_T],
217 **kw: Any,
218) -> NamedColumn[_T]: ...
219
220
221@overload
222def expect(
223 role: Union[
224 Type[roles.ExpressionElementRole[Any]],
225 Type[roles.LimitOffsetRole],
226 Type[roles.WhereHavingRole],
227 ],
228 element: _ColumnExpressionArgument[_T],
229 **kw: Any,
230) -> ColumnElement[_T]: ...
231
232
233@overload
234def expect(
235 role: Union[
236 Type[roles.ExpressionElementRole[Any]],
237 Type[roles.LimitOffsetRole],
238 Type[roles.WhereHavingRole],
239 Type[roles.OnClauseRole],
240 Type[roles.ColumnArgumentRole],
241 ],
242 element: Any,
243 **kw: Any,
244) -> ColumnElement[Any]: ...
245
246
247@overload
248def expect(
249 role: Type[roles.DMLTableRole],
250 element: _DMLTableArgument,
251 **kw: Any,
252) -> _DMLTableElement: ...
253
254
255@overload
256def expect(
257 role: Type[roles.HasCTERole],
258 element: HasCTE,
259 **kw: Any,
260) -> HasCTE: ...
261
262
263@overload
264def expect(
265 role: Type[roles.SelectStatementRole],
266 element: SelectBase,
267 **kw: Any,
268) -> SelectBase: ...
269
270
271@overload
272def expect(
273 role: Type[roles.FromClauseRole],
274 element: _FromClauseArgument,
275 **kw: Any,
276) -> FromClause: ...
277
278
279@overload
280def expect(
281 role: Type[roles.FromClauseRole],
282 element: SelectBase,
283 *,
284 explicit_subquery: Literal[True] = ...,
285 **kw: Any,
286) -> Subquery: ...
287
288
289@overload
290def expect(
291 role: Type[roles.ColumnsClauseRole],
292 element: _ColumnsClauseArgument[Any],
293 **kw: Any,
294) -> _ColumnsClauseElement: ...
295
296
297@overload
298def expect(
299 role: Type[roles.JoinTargetRole],
300 element: _JoinTargetProtocol,
301 **kw: Any,
302) -> _JoinTargetProtocol: ...
303
304
305# catchall for not-yet-implemented overloads
306@overload
307def expect(
308 role: Type[_SR],
309 element: Any,
310 **kw: Any,
311) -> Any: ...
312
313
314def expect(
315 role: Type[_SR],
316 element: Any,
317 *,
318 apply_propagate_attrs: Optional[ClauseElement] = None,
319 argname: Optional[str] = None,
320 post_inspect: bool = False,
321 disable_inspection: bool = False,
322 **kw: Any,
323) -> Any:
324 if (
325 role.allows_lambda
326 # note callable() will not invoke a __getattr__() method, whereas
327 # hasattr(obj, "__call__") will. by keeping the callable() check here
328 # we prevent most needless calls to hasattr() and therefore
329 # __getattr__(), which is present on ColumnElement.
330 and callable(element)
331 and hasattr(element, "__code__")
332 ):
333 return lambdas.LambdaElement(
334 element,
335 role,
336 lambdas.LambdaOptions(**kw),
337 apply_propagate_attrs=apply_propagate_attrs,
338 )
339
340 # major case is that we are given a ClauseElement already, skip more
341 # elaborate logic up front if possible
342 impl = _impl_lookup[role]
343
344 original_element = element
345
346 if not isinstance(
347 element,
348 (
349 elements.CompilerElement,
350 schema.SchemaItem,
351 schema.FetchedValue,
352 lambdas.PyWrapper,
353 ),
354 ):
355 resolved = None
356
357 if impl._resolve_literal_only:
358 resolved = impl._literal_coercion(element, **kw)
359 else:
360 original_element = element
361
362 is_clause_element = False
363
364 # this is a special performance optimization for ORM
365 # joins used by JoinTargetImpl that we don't go through the
366 # work of creating __clause_element__() when we only need the
367 # original QueryableAttribute, as the former will do clause
368 # adaption and all that which is just thrown away here.
369 if (
370 impl._skip_clauseelement_for_target_match
371 and isinstance(element, role)
372 and hasattr(element, "__clause_element__")
373 ):
374 is_clause_element = True
375 else:
376 while hasattr(element, "__clause_element__"):
377 is_clause_element = True
378
379 if not getattr(element, "is_clause_element", False):
380 element = element.__clause_element__()
381 else:
382 break
383
384 if not is_clause_element:
385 if impl._use_inspection and not disable_inspection:
386 insp = inspection.inspect(element, raiseerr=False)
387 if insp is not None:
388 if post_inspect:
389 insp._post_inspect
390 try:
391 resolved = insp.__clause_element__()
392 except AttributeError:
393 impl._raise_for_expected(original_element, argname)
394
395 if resolved is None:
396 resolved = impl._literal_coercion(
397 element, argname=argname, **kw
398 )
399 else:
400 resolved = element
401 elif isinstance(element, lambdas.PyWrapper):
402 resolved = element._sa__py_wrapper_literal(**kw)
403 else:
404 resolved = element
405
406 if apply_propagate_attrs is not None:
407 if typing.TYPE_CHECKING:
408 assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
409
410 if not apply_propagate_attrs._propagate_attrs and getattr(
411 resolved, "_propagate_attrs", None
412 ):
413 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
414
415 if impl._role_class in resolved.__class__.__mro__:
416 if impl._post_coercion:
417 resolved = impl._post_coercion(
418 resolved,
419 argname=argname,
420 original_element=original_element,
421 **kw,
422 )
423 return resolved
424 else:
425 return impl._implicit_coercions(
426 original_element, resolved, argname=argname, **kw
427 )
428
429
430def expect_as_key(
431 role: Type[roles.DMLColumnRole], element: Any, **kw: Any
432) -> str:
433 kw.pop("as_key", None)
434 return expect(role, element, as_key=True, **kw)
435
436
437def expect_col_expression_collection(
438 role: Type[roles.DDLConstraintColumnRole],
439 expressions: Iterable[_DDLColumnArgument],
440) -> Iterator[
441 Tuple[
442 Union[str, Column[Any]],
443 Optional[ColumnClause[Any]],
444 Optional[str],
445 Optional[Union[Column[Any], str]],
446 ]
447]:
448 for expr in expressions:
449 strname = None
450 column = None
451
452 resolved: Union[Column[Any], str] = expect(role, expr)
453 if isinstance(resolved, str):
454 assert isinstance(expr, str)
455 strname = resolved = expr
456 else:
457 cols: List[Column[Any]] = []
458 col_append: _TraverseCallableType[Column[Any]] = cols.append
459 visitors.traverse(resolved, {}, {"column": col_append})
460 if cols:
461 column = cols[0]
462 add_element = column if column is not None else strname
463
464 yield resolved, column, strname, add_element
465
466
467class RoleImpl:
468 __slots__ = ("_role_class", "name", "_use_inspection")
469
470 def _literal_coercion(self, element, **kw):
471 raise NotImplementedError()
472
473 _post_coercion: Any = None
474 _resolve_literal_only = False
475 _skip_clauseelement_for_target_match = False
476
477 def __init__(self, role_class):
478 self._role_class = role_class
479 self.name = role_class._role_name
480 self._use_inspection = issubclass(role_class, roles.UsesInspection)
481
482 def _implicit_coercions(
483 self,
484 element: Any,
485 resolved: Any,
486 argname: Optional[str] = None,
487 **kw: Any,
488 ) -> Any:
489 self._raise_for_expected(element, argname, resolved)
490
491 def _raise_for_expected(
492 self,
493 element: Any,
494 argname: Optional[str] = None,
495 resolved: Optional[Any] = None,
496 *,
497 advice: Optional[str] = None,
498 code: Optional[str] = None,
499 err: Optional[Exception] = None,
500 **kw: Any,
501 ) -> NoReturn:
502 if resolved is not None and resolved is not element:
503 got = "%r object resolved from %r object" % (resolved, element)
504 else:
505 got = repr(element)
506
507 if argname:
508 msg = "%s expected for argument %r; got %s." % (
509 self.name,
510 argname,
511 got,
512 )
513 else:
514 msg = "%s expected, got %s." % (self.name, got)
515
516 if advice:
517 msg += " " + advice
518
519 raise exc.ArgumentError(msg, code=code) from err
520
521
522class _Deannotate:
523 __slots__ = ()
524
525 def _post_coercion(self, resolved, **kw):
526 from .util import _deep_deannotate
527
528 return _deep_deannotate(resolved)
529
530
531class _StringOnly:
532 __slots__ = ()
533
534 _resolve_literal_only = True
535
536
537class _ReturnsStringKey(RoleImpl):
538 __slots__ = ()
539
540 def _implicit_coercions(self, element, resolved, argname=None, **kw):
541 if isinstance(element, str):
542 return element
543 else:
544 self._raise_for_expected(element, argname, resolved)
545
546 def _literal_coercion(self, element, **kw):
547 return element
548
549
550class _ColumnCoercions(RoleImpl):
551 __slots__ = ()
552
553 def _warn_for_scalar_subquery_coercion(self):
554 util.warn(
555 "implicitly coercing SELECT object to scalar subquery; "
556 "please use the .scalar_subquery() method to produce a scalar "
557 "subquery.",
558 )
559
560 def _implicit_coercions(self, element, resolved, argname=None, **kw):
561 original_element = element
562 if not getattr(resolved, "is_clause_element", False):
563 self._raise_for_expected(original_element, argname, resolved)
564 elif resolved._is_select_base:
565 self._warn_for_scalar_subquery_coercion()
566 return resolved.scalar_subquery()
567 elif resolved._is_from_clause and isinstance(
568 resolved, selectable.Subquery
569 ):
570 self._warn_for_scalar_subquery_coercion()
571 return resolved.element.scalar_subquery()
572 elif self._role_class.allows_lambda and resolved._is_lambda_element:
573 return resolved
574 else:
575 self._raise_for_expected(original_element, argname, resolved)
576
577
578def _no_text_coercion(
579 element: Any,
580 argname: Optional[str] = None,
581 exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
582 extra: Optional[str] = None,
583 err: Optional[Exception] = None,
584) -> NoReturn:
585 raise exc_cls(
586 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
587 "explicitly declared as text(%(expr)r)"
588 % {
589 "expr": util.ellipses_string(element),
590 "argname": "for argument %s" % (argname,) if argname else "",
591 "extra": "%s " % extra if extra else "",
592 }
593 ) from err
594
595
596class _NoTextCoercion(RoleImpl):
597 __slots__ = ()
598
599 def _literal_coercion(self, element, *, argname=None, **kw):
600 if isinstance(element, str) and issubclass(
601 elements.TextClause, self._role_class
602 ):
603 _no_text_coercion(element, argname)
604 else:
605 self._raise_for_expected(element, argname)
606
607
608class _CoerceLiterals(RoleImpl):
609 __slots__ = ()
610 _coerce_consts = False
611 _coerce_star = False
612 _coerce_numerics = False
613
614 def _text_coercion(self, element, argname=None):
615 return _no_text_coercion(element, argname)
616
617 def _literal_coercion(self, element, *, argname=None, **kw):
618 if isinstance(element, str):
619 if self._coerce_star and element == "*":
620 return elements.ColumnClause("*", is_literal=True)
621 else:
622 return self._text_coercion(element, argname, **kw)
623
624 if self._coerce_consts:
625 if element is None:
626 return elements.Null()
627 elif element is False:
628 return elements.False_()
629 elif element is True:
630 return elements.True_()
631
632 if self._coerce_numerics and isinstance(element, (numbers.Number)):
633 return elements.ColumnClause(str(element), is_literal=True)
634
635 self._raise_for_expected(element, argname)
636
637
638class LiteralValueImpl(RoleImpl):
639 _resolve_literal_only = True
640
641 def _implicit_coercions(
642 self,
643 element,
644 resolved,
645 argname=None,
646 *,
647 type_=None,
648 literal_execute=False,
649 **kw,
650 ):
651 if not _is_literal(resolved):
652 self._raise_for_expected(
653 element, resolved=resolved, argname=argname, **kw
654 )
655
656 return elements.BindParameter(
657 None,
658 element,
659 type_=type_,
660 unique=True,
661 literal_execute=literal_execute,
662 )
663
664 def _literal_coercion(self, element, **kw):
665 return element
666
667
668class _SelectIsNotFrom(RoleImpl):
669 __slots__ = ()
670
671 def _raise_for_expected(
672 self,
673 element: Any,
674 argname: Optional[str] = None,
675 resolved: Optional[Any] = None,
676 *,
677 advice: Optional[str] = None,
678 code: Optional[str] = None,
679 err: Optional[Exception] = None,
680 **kw: Any,
681 ) -> NoReturn:
682 if (
683 not advice
684 and isinstance(element, roles.SelectStatementRole)
685 or isinstance(resolved, roles.SelectStatementRole)
686 ):
687 advice = (
688 "To create a "
689 "FROM clause from a %s object, use the .subquery() method."
690 % (resolved.__class__ if resolved is not None else element,)
691 )
692 code = "89ve"
693 else:
694 code = None
695
696 super()._raise_for_expected(
697 element,
698 argname=argname,
699 resolved=resolved,
700 advice=advice,
701 code=code,
702 err=err,
703 **kw,
704 )
705 # never reached
706 assert False
707
708
709class HasCacheKeyImpl(RoleImpl):
710 __slots__ = ()
711
712 def _implicit_coercions(
713 self,
714 element: Any,
715 resolved: Any,
716 argname: Optional[str] = None,
717 **kw: Any,
718 ) -> Any:
719 if isinstance(element, HasCacheKey):
720 return element
721 else:
722 self._raise_for_expected(element, argname, resolved)
723
724 def _literal_coercion(self, element, **kw):
725 return element
726
727
728class ExecutableOptionImpl(RoleImpl):
729 __slots__ = ()
730
731 def _implicit_coercions(
732 self,
733 element: Any,
734 resolved: Any,
735 argname: Optional[str] = None,
736 **kw: Any,
737 ) -> Any:
738 if isinstance(element, ExecutableOption):
739 return element
740 else:
741 self._raise_for_expected(element, argname, resolved)
742
743 def _literal_coercion(self, element, **kw):
744 return element
745
746
747class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
748 __slots__ = ()
749
750 def _literal_coercion(
751 self, element, *, name=None, type_=None, is_crud=False, **kw
752 ):
753 if (
754 element is None
755 and not is_crud
756 and (type_ is None or not type_.should_evaluate_none)
757 ):
758 # TODO: there's no test coverage now for the
759 # "should_evaluate_none" part of this, as outside of "crud" this
760 # codepath is not normally used except in some special cases
761 return elements.Null()
762 else:
763 try:
764 return elements.BindParameter(
765 name, element, type_, unique=True, _is_crud=is_crud
766 )
767 except exc.ArgumentError as err:
768 self._raise_for_expected(element, err=err)
769
770 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
771 # select uses implicit coercion with warning instead of raising
772 if isinstance(element, selectable.Values):
773 advice = (
774 "To create a column expression from a VALUES clause, "
775 "use the .scalar_values() method."
776 )
777 elif isinstance(element, roles.AnonymizedFromClauseRole):
778 advice = (
779 "To create a column expression from a FROM clause row "
780 "as a whole, use the .table_valued() method."
781 )
782 else:
783 advice = None
784
785 return super()._raise_for_expected(
786 element, argname=argname, resolved=resolved, advice=advice, **kw
787 )
788
789
790class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
791 __slots__ = ()
792
793 def _literal_coercion( # type: ignore[override]
794 self,
795 element,
796 *,
797 expr,
798 operator,
799 bindparam_type=None,
800 argname=None,
801 **kw,
802 ):
803 try:
804 return expr._bind_param(operator, element, type_=bindparam_type)
805 except exc.ArgumentError as err:
806 self._raise_for_expected(element, err=err)
807
808 def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
809 if resolved.type._isnull and not expr.type._isnull:
810 resolved = resolved._with_binary_element_type(
811 bindparam_type if bindparam_type is not None else expr.type
812 )
813 return resolved
814
815
816class InElementImpl(RoleImpl):
817 __slots__ = ()
818
819 def _implicit_coercions(
820 self,
821 element: Any,
822 resolved: Any,
823 argname: Optional[str] = None,
824 **kw: Any,
825 ) -> Any:
826 if resolved._is_from_clause:
827 if (
828 isinstance(resolved, selectable.Alias)
829 and resolved.element._is_select_base
830 ):
831 self._warn_for_implicit_coercion(resolved)
832 return self._post_coercion(resolved.element, **kw)
833 else:
834 self._warn_for_implicit_coercion(resolved)
835 return self._post_coercion(resolved.select(), **kw)
836 else:
837 self._raise_for_expected(element, argname, resolved)
838
839 def _warn_for_implicit_coercion(self, elem):
840 util.warn(
841 "Coercing %s object into a select() for use in IN(); "
842 "please pass a select() construct explicitly"
843 % (elem.__class__.__name__)
844 )
845
846 def _literal_coercion( # type: ignore[override]
847 self, element, *, expr, operator, **kw
848 ):
849 if util.is_non_string_iterable(element):
850 non_literal_expressions: Dict[
851 Optional[operators.ColumnOperators],
852 operators.ColumnOperators,
853 ] = {}
854 element = list(element)
855 for o in element:
856 if not _is_literal(o):
857 if not isinstance(o, operators.ColumnOperators):
858 self._raise_for_expected(element, **kw)
859
860 else:
861 non_literal_expressions[o] = o
862 elif o is None:
863 non_literal_expressions[o] = elements.Null()
864
865 if non_literal_expressions:
866 return elements.ClauseList(
867 *[
868 (
869 non_literal_expressions[o]
870 if o in non_literal_expressions
871 else expr._bind_param(operator, o)
872 )
873 for o in element
874 ]
875 )
876 else:
877 return expr._bind_param(operator, element, expanding=True)
878
879 else:
880 self._raise_for_expected(element, **kw)
881
882 def _post_coercion(self, element, *, expr, operator, **kw):
883 if element._is_select_base:
884 # for IN, we are doing scalar_subquery() coercion without
885 # a warning
886 return element.scalar_subquery()
887 elif isinstance(element, elements.ClauseList):
888 assert not len(element.clauses) == 0
889 return element.self_group(against=operator)
890
891 elif isinstance(element, elements.BindParameter):
892 element = element._clone(maintain_key=True)
893 element.expanding = True
894 element.expand_op = operator
895
896 return element
897 elif isinstance(element, selectable.Values):
898 return element.scalar_values()
899 else:
900 return element
901
902
903class OnClauseImpl(_ColumnCoercions, RoleImpl):
904 __slots__ = ()
905
906 _coerce_consts = True
907
908 def _literal_coercion(self, element, **kw):
909 self._raise_for_expected(element)
910
911 def _post_coercion(self, resolved, *, original_element=None, **kw):
912 # this is a hack right now as we want to use coercion on an
913 # ORM InstrumentedAttribute, but we want to return the object
914 # itself if it is one, not its clause element.
915 # ORM context _join and _legacy_join() would need to be improved
916 # to look for annotations in a clause element form.
917 if isinstance(original_element, roles.JoinTargetRole):
918 return original_element
919 return resolved
920
921
922class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
923 __slots__ = ()
924
925 _coerce_consts = True
926
927 def _text_coercion(self, element, argname=None):
928 return _no_text_coercion(element, argname)
929
930
931class StatementOptionImpl(_CoerceLiterals, RoleImpl):
932 __slots__ = ()
933
934 _coerce_consts = True
935
936 def _text_coercion(self, element, argname=None):
937 return elements.TextClause(element)
938
939
940class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
941 __slots__ = ()
942
943
944class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
945 __slots__ = ()
946
947
948class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
949 __slots__ = ()
950
951 def _text_coercion(self, element, argname=None):
952 return elements.ColumnClause(element)
953
954
955class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
956 __slots__ = ()
957
958 _coerce_consts = True
959
960 def _text_coercion(self, element, argname=None):
961 return elements._textual_label_reference(element)
962
963
964class OrderByImpl(ByOfImpl, RoleImpl):
965 __slots__ = ()
966
967 def _post_coercion(self, resolved, **kw):
968 if (
969 isinstance(resolved, self._role_class)
970 and resolved._order_by_label_element is not None
971 ):
972 return elements._label_reference(resolved)
973 else:
974 return resolved
975
976
977class GroupByImpl(ByOfImpl, RoleImpl):
978 __slots__ = ()
979
980 def _implicit_coercions(
981 self,
982 element: Any,
983 resolved: Any,
984 argname: Optional[str] = None,
985 **kw: Any,
986 ) -> Any:
987 if is_from_clause(resolved):
988 return elements.ClauseList(*resolved.c)
989 else:
990 return resolved
991
992
993class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
994 __slots__ = ()
995
996 def _post_coercion(self, element, *, as_key=False, **kw):
997 if as_key:
998 return element.key
999 else:
1000 return element
1001
1002
1003class ConstExprImpl(RoleImpl):
1004 __slots__ = ()
1005
1006 def _literal_coercion(self, element, *, argname=None, **kw):
1007 if element is None:
1008 return elements.Null()
1009 elif element is False:
1010 return elements.False_()
1011 elif element is True:
1012 return elements.True_()
1013 else:
1014 self._raise_for_expected(element, argname)
1015
1016
1017class TruncatedLabelImpl(_StringOnly, RoleImpl):
1018 __slots__ = ()
1019
1020 def _implicit_coercions(
1021 self,
1022 element: Any,
1023 resolved: Any,
1024 argname: Optional[str] = None,
1025 **kw: Any,
1026 ) -> Any:
1027 if isinstance(element, str):
1028 return resolved
1029 else:
1030 self._raise_for_expected(element, argname, resolved)
1031
1032 def _literal_coercion(self, element, **kw):
1033 """coerce the given value to :class:`._truncated_label`.
1034
1035 Existing :class:`._truncated_label` and
1036 :class:`._anonymous_label` objects are passed
1037 unchanged.
1038 """
1039
1040 if isinstance(element, elements._truncated_label):
1041 return element
1042 else:
1043 return elements._truncated_label(element)
1044
1045
1046class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
1047 __slots__ = ()
1048
1049 _coerce_consts = True
1050
1051 def _text_coercion(self, element, argname=None):
1052 # see #5754 for why we can't easily deprecate this coercion.
1053 # essentially expressions like postgresql_where would have to be
1054 # text() as they come back from reflection and we don't want to
1055 # have text() elements wired into the inspection dictionaries.
1056 return elements.TextClause(element)
1057
1058
1059class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
1060 __slots__ = ()
1061
1062
1063class DDLReferredColumnImpl(DDLConstraintColumnImpl):
1064 __slots__ = ()
1065
1066
1067class LimitOffsetImpl(RoleImpl):
1068 __slots__ = ()
1069
1070 def _implicit_coercions(
1071 self,
1072 element: Any,
1073 resolved: Any,
1074 argname: Optional[str] = None,
1075 **kw: Any,
1076 ) -> Any:
1077 if resolved is None:
1078 return None
1079 else:
1080 self._raise_for_expected(element, argname, resolved)
1081
1082 def _literal_coercion( # type: ignore[override]
1083 self, element, *, name, type_, **kw
1084 ):
1085 if element is None:
1086 return None
1087 else:
1088 value = util.asint(element)
1089 return selectable._OffsetLimitParam(
1090 name, value, type_=type_, unique=True
1091 )
1092
1093
1094class LabeledColumnExprImpl(ExpressionElementImpl):
1095 __slots__ = ()
1096
1097 def _implicit_coercions(
1098 self,
1099 element: Any,
1100 resolved: Any,
1101 argname: Optional[str] = None,
1102 **kw: Any,
1103 ) -> Any:
1104 if isinstance(resolved, roles.ExpressionElementRole):
1105 return resolved.label(None)
1106 else:
1107 new = super()._implicit_coercions(
1108 element, resolved, argname=argname, **kw
1109 )
1110 if isinstance(new, roles.ExpressionElementRole):
1111 return new.label(None)
1112 else:
1113 self._raise_for_expected(element, argname, resolved)
1114
1115
1116class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
1117 __slots__ = ()
1118
1119 _coerce_consts = True
1120 _coerce_numerics = True
1121 _coerce_star = True
1122
1123 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
1124
1125 def _raise_for_expected(
1126 self, element, argname=None, resolved=None, *, advice=None, **kw
1127 ):
1128 if not advice and isinstance(element, list):
1129 advice = (
1130 f"Did you mean to say select("
1131 f"{', '.join(repr(e) for e in element)})?"
1132 )
1133
1134 return super()._raise_for_expected(
1135 element, argname=argname, resolved=resolved, advice=advice, **kw
1136 )
1137
1138 def _text_coercion(self, element, argname=None):
1139 element = str(element)
1140
1141 guess_is_literal = not self._guess_straight_column.match(element)
1142 raise exc.ArgumentError(
1143 "Textual column expression %(column)r %(argname)sshould be "
1144 "explicitly declared with text(%(column)r), "
1145 "or use %(literal_column)s(%(column)r) "
1146 "for more specificity"
1147 % {
1148 "column": util.ellipses_string(element),
1149 "argname": "for argument %s" % (argname,) if argname else "",
1150 "literal_column": (
1151 "literal_column" if guess_is_literal else "column"
1152 ),
1153 }
1154 )
1155
1156
1157class ReturnsRowsImpl(RoleImpl):
1158 __slots__ = ()
1159
1160
1161class StatementImpl(_CoerceLiterals, RoleImpl):
1162 __slots__ = ()
1163
1164 def _post_coercion(
1165 self, resolved, *, original_element, argname=None, **kw
1166 ):
1167 if resolved is not original_element and not isinstance(
1168 original_element, str
1169 ):
1170 # use same method as Connection uses; this will later raise
1171 # ObjectNotExecutableError
1172 try:
1173 original_element._execute_on_connection
1174 except AttributeError:
1175 util.warn_deprecated(
1176 "Object %r should not be used directly in a SQL statement "
1177 "context, such as passing to methods such as "
1178 "session.execute(). This usage will be disallowed in a "
1179 "future release. "
1180 "Please use Core select() / update() / delete() etc. "
1181 "with Session.execute() and other statement execution "
1182 "methods." % original_element,
1183 "1.4",
1184 )
1185
1186 return resolved
1187
1188 def _implicit_coercions(
1189 self,
1190 element: Any,
1191 resolved: Any,
1192 argname: Optional[str] = None,
1193 **kw: Any,
1194 ) -> Any:
1195 if resolved._is_lambda_element:
1196 return resolved
1197 else:
1198 return super()._implicit_coercions(
1199 element, resolved, argname=argname, **kw
1200 )
1201
1202
1203class SelectStatementImpl(_NoTextCoercion, RoleImpl):
1204 __slots__ = ()
1205
1206 def _implicit_coercions(
1207 self,
1208 element: Any,
1209 resolved: Any,
1210 argname: Optional[str] = None,
1211 **kw: Any,
1212 ) -> Any:
1213 if resolved._is_text_clause:
1214 return resolved.columns()
1215 else:
1216 self._raise_for_expected(element, argname, resolved)
1217
1218
1219class HasCTEImpl(ReturnsRowsImpl):
1220 __slots__ = ()
1221
1222
1223class IsCTEImpl(RoleImpl):
1224 __slots__ = ()
1225
1226
1227class JoinTargetImpl(RoleImpl):
1228 __slots__ = ()
1229
1230 _skip_clauseelement_for_target_match = True
1231
1232 def _literal_coercion(self, element, *, argname=None, **kw):
1233 self._raise_for_expected(element, argname)
1234
1235 def _implicit_coercions(
1236 self,
1237 element: Any,
1238 resolved: Any,
1239 argname: Optional[str] = None,
1240 *,
1241 legacy: bool = False,
1242 **kw: Any,
1243 ) -> Any:
1244 if isinstance(element, roles.JoinTargetRole):
1245 # note that this codepath no longer occurs as of
1246 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
1247 # were set to False.
1248 return element
1249 elif legacy and resolved._is_select_base:
1250 util.warn_deprecated(
1251 "Implicit coercion of SELECT and textual SELECT "
1252 "constructs into FROM clauses is deprecated; please call "
1253 ".subquery() on any Core select or ORM Query object in "
1254 "order to produce a subquery object.",
1255 version="1.4",
1256 )
1257 # TODO: doing _implicit_subquery here causes tests to fail,
1258 # how was this working before? probably that ORM
1259 # join logic treated it as a select and subquery would happen
1260 # in _ORMJoin->Join
1261 return resolved
1262 else:
1263 self._raise_for_expected(element, argname, resolved)
1264
1265
1266class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1267 __slots__ = ()
1268
1269 def _implicit_coercions(
1270 self,
1271 element: Any,
1272 resolved: Any,
1273 argname: Optional[str] = None,
1274 *,
1275 explicit_subquery: bool = False,
1276 allow_select: bool = True,
1277 **kw: Any,
1278 ) -> Any:
1279 if resolved._is_select_base:
1280 if explicit_subquery:
1281 return resolved.subquery()
1282 elif allow_select:
1283 util.warn_deprecated(
1284 "Implicit coercion of SELECT and textual SELECT "
1285 "constructs into FROM clauses is deprecated; please call "
1286 ".subquery() on any Core select or ORM Query object in "
1287 "order to produce a subquery object.",
1288 version="1.4",
1289 )
1290 return resolved._implicit_subquery
1291 elif resolved._is_text_clause:
1292 return resolved
1293 else:
1294 self._raise_for_expected(element, argname, resolved)
1295
1296 def _post_coercion(self, element, *, deannotate=False, **kw):
1297 if deannotate:
1298 return element._deannotate()
1299 else:
1300 return element
1301
1302
1303class StrictFromClauseImpl(FromClauseImpl):
1304 __slots__ = ()
1305
1306 def _implicit_coercions(
1307 self,
1308 element: Any,
1309 resolved: Any,
1310 argname: Optional[str] = None,
1311 *,
1312 allow_select: bool = False,
1313 **kw: Any,
1314 ) -> Any:
1315 if resolved._is_select_base and allow_select:
1316 util.warn_deprecated(
1317 "Implicit coercion of SELECT and textual SELECT constructs "
1318 "into FROM clauses is deprecated; please call .subquery() "
1319 "on any Core select or ORM Query object in order to produce a "
1320 "subquery object.",
1321 version="1.4",
1322 )
1323 return resolved._implicit_subquery
1324 else:
1325 self._raise_for_expected(element, argname, resolved)
1326
1327
1328class AnonymizedFromClauseImpl(StrictFromClauseImpl):
1329 __slots__ = ()
1330
1331 def _post_coercion(self, element, *, flat=False, name=None, **kw):
1332 assert name is None
1333
1334 return element._anonymous_fromclause(flat=flat)
1335
1336
1337class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1338 __slots__ = ()
1339
1340 def _post_coercion(self, element, **kw):
1341 if "dml_table" in element._annotations:
1342 return element._annotations["dml_table"]
1343 else:
1344 return element
1345
1346
1347class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1348 __slots__ = ()
1349
1350 def _implicit_coercions(
1351 self,
1352 element: Any,
1353 resolved: Any,
1354 argname: Optional[str] = None,
1355 **kw: Any,
1356 ) -> Any:
1357 if resolved._is_from_clause:
1358 if (
1359 isinstance(resolved, selectable.Alias)
1360 and resolved.element._is_select_base
1361 ):
1362 return resolved.element
1363 else:
1364 return resolved.select()
1365 else:
1366 self._raise_for_expected(element, argname, resolved)
1367
1368
1369class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1370 __slots__ = ()
1371
1372 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1373 if isinstance(element, roles.FromClauseRole):
1374 if element._is_subquery:
1375 advice = (
1376 "Use the plain select() object without "
1377 "calling .subquery() or .alias()."
1378 )
1379 else:
1380 advice = (
1381 "To SELECT from any FROM clause, use the .select() method."
1382 )
1383 else:
1384 advice = None
1385 return super()._raise_for_expected(
1386 element, argname=argname, resolved=resolved, advice=advice, **kw
1387 )
1388
1389
1390_impl_lookup = {}
1391
1392
1393for name in dir(roles):
1394 cls = getattr(roles, name)
1395 if name.endswith("Role"):
1396 name = name.replace("Role", "Impl")
1397 if name in globals():
1398 impl = globals()[name](cls)
1399 _impl_lookup[cls] = impl
1400
1401if not TYPE_CHECKING:
1402 ee_impl = _impl_lookup[roles.ExpressionElementRole]
1403
1404 for py_type in (int, bool, str, float):
1405 _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl