1# sql/coercions.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8import numbers
9import re
10
11from . import operators
12from . import roles
13from . import visitors
14from .base import ExecutableOption
15from .base import Options
16from .traversals import HasCacheKey
17from .visitors import Visitable
18from .. import exc
19from .. import inspection
20from .. import util
21from ..util import collections_abc
22
23
24elements = None
25lambdas = None
26schema = None
27selectable = None
28sqltypes = None
29traversals = None
30
31
32def _is_literal(element):
33 """Return whether or not the element is a "literal" in the context
34 of a SQL expression construct.
35
36 """
37
38 return (
39 not isinstance(
40 element,
41 (Visitable, schema.SchemaEventTarget),
42 )
43 and not hasattr(element, "__clause_element__")
44 )
45
46
47def _deep_is_literal(element):
48 """Return whether or not the element is a "literal" in the context
49 of a SQL expression construct.
50
51 does a deeper more esoteric check than _is_literal. is used
52 for lambda elements that have to distinguish values that would
53 be bound vs. not without any context.
54
55 """
56
57 if isinstance(element, collections_abc.Sequence) and not isinstance(
58 element, str
59 ):
60 for elem in element:
61 if not _deep_is_literal(elem):
62 return False
63 else:
64 return True
65
66 return (
67 not isinstance(
68 element,
69 (
70 Visitable,
71 schema.SchemaEventTarget,
72 HasCacheKey,
73 Options,
74 util.langhelpers._symbol,
75 ),
76 )
77 and not hasattr(element, "__clause_element__")
78 and (
79 not isinstance(element, type)
80 or not issubclass(element, HasCacheKey)
81 )
82 )
83
84
85def _document_text_coercion(paramname, meth_rst, param_rst):
86 return util.add_parameter_text(
87 paramname,
88 (
89 ".. warning:: "
90 "The %s argument to %s can be passed as a Python string argument, "
91 "which will be treated "
92 "as **trusted SQL text** and rendered as given. **DO NOT PASS "
93 "UNTRUSTED INPUT TO THIS PARAMETER**."
94 )
95 % (param_rst, meth_rst),
96 )
97
98
99def _expression_collection_was_a_list(attrname, fnname, args):
100 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
101 if isinstance(args[0], list):
102 util.warn_deprecated_20(
103 'The "%s" argument to %s(), when referring to a sequence '
104 "of items, is now passed as a series of positional "
105 "elements, rather than as a list. " % (attrname, fnname)
106 )
107 return args[0]
108 else:
109 return args
110
111
112def expect(
113 role,
114 element,
115 apply_propagate_attrs=None,
116 argname=None,
117 post_inspect=False,
118 **kw
119):
120 if (
121 role.allows_lambda
122 # note callable() will not invoke a __getattr__() method, whereas
123 # hasattr(obj, "__call__") will. by keeping the callable() check here
124 # we prevent most needless calls to hasattr() and therefore
125 # __getattr__(), which is present on ColumnElement.
126 and callable(element)
127 and hasattr(element, "__code__")
128 ):
129 return lambdas.LambdaElement(
130 element,
131 role,
132 lambdas.LambdaOptions(**kw),
133 apply_propagate_attrs=apply_propagate_attrs,
134 )
135
136 # major case is that we are given a ClauseElement already, skip more
137 # elaborate logic up front if possible
138 impl = _impl_lookup[role]
139
140 original_element = element
141
142 if not isinstance(
143 element,
144 (
145 elements.ClauseElement,
146 schema.SchemaItem,
147 schema.FetchedValue,
148 lambdas.PyWrapper,
149 ),
150 ):
151 resolved = None
152
153 if impl._resolve_literal_only:
154 resolved = impl._literal_coercion(element, **kw)
155 else:
156
157 original_element = element
158
159 is_clause_element = False
160
161 # this is a special performance optimization for ORM
162 # joins used by JoinTargetImpl that we don't go through the
163 # work of creating __clause_element__() when we only need the
164 # original QueryableAttribute, as the former will do clause
165 # adaption and all that which is just thrown away here.
166 if (
167 impl._skip_clauseelement_for_target_match
168 and isinstance(element, role)
169 and hasattr(element, "__clause_element__")
170 ):
171 is_clause_element = True
172 else:
173 while hasattr(element, "__clause_element__"):
174 is_clause_element = True
175
176 if not getattr(element, "is_clause_element", False):
177 element = element.__clause_element__()
178 else:
179 break
180
181 if not is_clause_element:
182 if impl._use_inspection:
183 insp = inspection.inspect(element, raiseerr=False)
184 if insp is not None:
185 if post_inspect:
186 insp._post_inspect
187 try:
188 resolved = insp.__clause_element__()
189 except AttributeError:
190 impl._raise_for_expected(original_element, argname)
191
192 if resolved is None:
193 resolved = impl._literal_coercion(
194 element, argname=argname, **kw
195 )
196 else:
197 resolved = element
198 elif isinstance(element, lambdas.PyWrapper):
199 resolved = element._sa__py_wrapper_literal(**kw)
200 else:
201 resolved = element
202 if (
203 apply_propagate_attrs is not None
204 and not apply_propagate_attrs._propagate_attrs
205 and resolved._propagate_attrs
206 ):
207 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
208
209 if impl._role_class in resolved.__class__.__mro__:
210 if impl._post_coercion:
211 resolved = impl._post_coercion(
212 resolved,
213 argname=argname,
214 original_element=original_element,
215 **kw
216 )
217 return resolved
218 else:
219 return impl._implicit_coercions(
220 original_element, resolved, argname=argname, **kw
221 )
222
223
224def expect_as_key(role, element, **kw):
225 kw["as_key"] = True
226 return expect(role, element, **kw)
227
228
229def expect_col_expression_collection(role, expressions):
230 for expr in expressions:
231 strname = None
232 column = None
233
234 resolved = expect(role, expr)
235 if isinstance(resolved, util.string_types):
236 strname = resolved = expr
237 else:
238 cols = []
239 visitors.traverse(resolved, {}, {"column": cols.append})
240 if cols:
241 column = cols[0]
242 add_element = column if column is not None else strname
243 yield resolved, column, strname, add_element
244
245
246class RoleImpl(object):
247 __slots__ = ("_role_class", "name", "_use_inspection")
248
249 def _literal_coercion(self, element, **kw):
250 raise NotImplementedError()
251
252 _post_coercion = None
253 _resolve_literal_only = False
254 _skip_clauseelement_for_target_match = False
255
256 def __init__(self, role_class):
257 self._role_class = role_class
258 self.name = role_class._role_name
259 self._use_inspection = issubclass(role_class, roles.UsesInspection)
260
261 def _implicit_coercions(self, element, resolved, argname=None, **kw):
262 self._raise_for_expected(element, argname, resolved)
263
264 def _raise_for_expected(
265 self,
266 element,
267 argname=None,
268 resolved=None,
269 advice=None,
270 code=None,
271 err=None,
272 ):
273 if resolved is not None and resolved is not element:
274 got = "%r object resolved from %r object" % (resolved, element)
275 else:
276 got = repr(element)
277
278 if argname:
279 msg = "%s expected for argument %r; got %s." % (
280 self.name,
281 argname,
282 got,
283 )
284 else:
285 msg = "%s expected, got %s." % (self.name, got)
286
287 if advice:
288 msg += " " + advice
289
290 util.raise_(exc.ArgumentError(msg, code=code), replace_context=err)
291
292
293class _Deannotate(object):
294 __slots__ = ()
295
296 def _post_coercion(self, resolved, **kw):
297 from .util import _deep_deannotate
298
299 return _deep_deannotate(resolved)
300
301
302class _StringOnly(object):
303 __slots__ = ()
304
305 _resolve_literal_only = True
306
307
308class _ReturnsStringKey(object):
309 __slots__ = ()
310
311 def _implicit_coercions(
312 self, original_element, resolved, argname=None, **kw
313 ):
314 if isinstance(original_element, util.string_types):
315 return original_element
316 else:
317 self._raise_for_expected(original_element, argname, resolved)
318
319 def _literal_coercion(self, element, **kw):
320 return element
321
322
323class _ColumnCoercions(object):
324 __slots__ = ()
325
326 def _warn_for_scalar_subquery_coercion(self):
327 util.warn(
328 "implicitly coercing SELECT object to scalar subquery; "
329 "please use the .scalar_subquery() method to produce a scalar "
330 "subquery.",
331 )
332
333 def _implicit_coercions(
334 self, original_element, resolved, argname=None, **kw
335 ):
336 if not getattr(resolved, "is_clause_element", False):
337 self._raise_for_expected(original_element, argname, resolved)
338 elif resolved._is_select_statement:
339 self._warn_for_scalar_subquery_coercion()
340 return resolved.scalar_subquery()
341 elif resolved._is_from_clause and isinstance(
342 resolved, selectable.Subquery
343 ):
344 self._warn_for_scalar_subquery_coercion()
345 return resolved.element.scalar_subquery()
346 elif self._role_class.allows_lambda and resolved._is_lambda_element:
347 return resolved
348 else:
349 self._raise_for_expected(original_element, argname, resolved)
350
351
352def _no_text_coercion(
353 element, argname=None, exc_cls=exc.ArgumentError, extra=None, err=None
354):
355 util.raise_(
356 exc_cls(
357 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
358 "explicitly declared as text(%(expr)r)"
359 % {
360 "expr": util.ellipses_string(element),
361 "argname": "for argument %s" % (argname,) if argname else "",
362 "extra": "%s " % extra if extra else "",
363 }
364 ),
365 replace_context=err,
366 )
367
368
369class _NoTextCoercion(object):
370 __slots__ = ()
371
372 def _literal_coercion(self, element, argname=None, **kw):
373 if isinstance(element, util.string_types) and issubclass(
374 elements.TextClause, self._role_class
375 ):
376 _no_text_coercion(element, argname)
377 else:
378 self._raise_for_expected(element, argname)
379
380
381class _CoerceLiterals(object):
382 __slots__ = ()
383 _coerce_consts = False
384 _coerce_star = False
385 _coerce_numerics = False
386
387 def _text_coercion(self, element, argname=None):
388 return _no_text_coercion(element, argname)
389
390 def _literal_coercion(self, element, argname=None, **kw):
391 if isinstance(element, util.string_types):
392 if self._coerce_star and element == "*":
393 return elements.ColumnClause("*", is_literal=True)
394 else:
395 return self._text_coercion(element, argname, **kw)
396
397 if self._coerce_consts:
398 if element is None:
399 return elements.Null()
400 elif element is False:
401 return elements.False_()
402 elif element is True:
403 return elements.True_()
404
405 if self._coerce_numerics and isinstance(element, (numbers.Number)):
406 return elements.ColumnClause(str(element), is_literal=True)
407
408 self._raise_for_expected(element, argname)
409
410
411class LiteralValueImpl(RoleImpl):
412 _resolve_literal_only = True
413
414 def _implicit_coercions(
415 self, element, resolved, argname, type_=None, **kw
416 ):
417 if not _is_literal(resolved):
418 self._raise_for_expected(
419 element, resolved=resolved, argname=argname, **kw
420 )
421
422 return elements.BindParameter(None, element, type_=type_, unique=True)
423
424 def _literal_coercion(self, element, argname=None, type_=None, **kw):
425 return element
426
427
428class _SelectIsNotFrom(object):
429 __slots__ = ()
430
431 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
432 if isinstance(element, roles.SelectStatementRole) or isinstance(
433 resolved, roles.SelectStatementRole
434 ):
435 advice = (
436 "To create a "
437 "FROM clause from a %s object, use the .subquery() method."
438 % (resolved.__class__ if resolved is not None else element,)
439 )
440 code = "89ve"
441 else:
442 advice = code = None
443
444 return super(_SelectIsNotFrom, self)._raise_for_expected(
445 element,
446 argname=argname,
447 resolved=resolved,
448 advice=advice,
449 code=code,
450 **kw
451 )
452
453
454class HasCacheKeyImpl(RoleImpl):
455 __slots__ = ()
456
457 def _implicit_coercions(
458 self, original_element, resolved, argname=None, **kw
459 ):
460 if isinstance(original_element, traversals.HasCacheKey):
461 return original_element
462 else:
463 self._raise_for_expected(original_element, argname, resolved)
464
465 def _literal_coercion(self, element, **kw):
466 return element
467
468
469class ExecutableOptionImpl(RoleImpl):
470 __slots__ = ()
471
472 def _implicit_coercions(
473 self, original_element, resolved, argname=None, **kw
474 ):
475 if isinstance(original_element, ExecutableOption):
476 return original_element
477 else:
478 self._raise_for_expected(original_element, argname, resolved)
479
480 def _literal_coercion(self, element, **kw):
481 return element
482
483
484class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
485 __slots__ = ()
486
487 def _literal_coercion(
488 self, element, name=None, type_=None, argname=None, is_crud=False, **kw
489 ):
490 if (
491 element is None
492 and not is_crud
493 and (type_ is None or not type_.should_evaluate_none)
494 ):
495 # TODO: there's no test coverage now for the
496 # "should_evaluate_none" part of this, as outside of "crud" this
497 # codepath is not normally used except in some special cases
498 return elements.Null()
499 else:
500 try:
501 return elements.BindParameter(
502 name, element, type_, unique=True, _is_crud=is_crud
503 )
504 except exc.ArgumentError as err:
505 self._raise_for_expected(element, err=err)
506
507 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
508 if isinstance(element, roles.AnonymizedFromClauseRole):
509 advice = (
510 "To create a "
511 "column expression from a FROM clause row "
512 "as a whole, use the .table_valued() method."
513 )
514 else:
515 advice = None
516
517 return super(ExpressionElementImpl, self)._raise_for_expected(
518 element, argname=argname, resolved=resolved, advice=advice, **kw
519 )
520
521
522class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
523
524 __slots__ = ()
525
526 def _literal_coercion(
527 self, element, expr, operator, bindparam_type=None, argname=None, **kw
528 ):
529 try:
530 return expr._bind_param(operator, element, type_=bindparam_type)
531 except exc.ArgumentError as err:
532 self._raise_for_expected(element, err=err)
533
534 def _post_coercion(self, resolved, expr, bindparam_type=None, **kw):
535 if resolved.type._isnull and not expr.type._isnull:
536 resolved = resolved._with_binary_element_type(
537 bindparam_type if bindparam_type is not None else expr.type
538 )
539 return resolved
540
541
542class InElementImpl(RoleImpl):
543 __slots__ = ()
544
545 def _implicit_coercions(
546 self, original_element, resolved, argname=None, **kw
547 ):
548 if resolved._is_from_clause:
549 if (
550 isinstance(resolved, selectable.Alias)
551 and resolved.element._is_select_statement
552 ):
553 self._warn_for_implicit_coercion(resolved)
554 return self._post_coercion(resolved.element, **kw)
555 else:
556 self._warn_for_implicit_coercion(resolved)
557 return self._post_coercion(resolved.select(), **kw)
558 else:
559 self._raise_for_expected(original_element, argname, resolved)
560
561 def _warn_for_implicit_coercion(self, elem):
562 util.warn(
563 "Coercing %s object into a select() for use in IN(); "
564 "please pass a select() construct explicitly"
565 % (elem.__class__.__name__)
566 )
567
568 def _literal_coercion(self, element, expr, operator, **kw):
569 if isinstance(element, collections_abc.Iterable) and not isinstance(
570 element, util.string_types
571 ):
572 non_literal_expressions = {}
573 element = list(element)
574 for o in element:
575 if not _is_literal(o):
576 if not isinstance(o, operators.ColumnOperators):
577 self._raise_for_expected(element, **kw)
578 else:
579 non_literal_expressions[o] = o
580 elif o is None:
581 non_literal_expressions[o] = elements.Null()
582
583 if non_literal_expressions:
584 return elements.ClauseList(
585 *[
586 non_literal_expressions[o]
587 if o in non_literal_expressions
588 else expr._bind_param(operator, o)
589 for o in element
590 ]
591 )
592 else:
593 return expr._bind_param(operator, element, expanding=True)
594
595 else:
596 self._raise_for_expected(element, **kw)
597
598 def _post_coercion(self, element, expr, operator, **kw):
599 if element._is_select_statement:
600 # for IN, we are doing scalar_subquery() coercion without
601 # a warning
602 return element.scalar_subquery()
603 elif isinstance(element, elements.ClauseList):
604 assert not len(element.clauses) == 0
605 return element.self_group(against=operator)
606
607 elif isinstance(element, elements.BindParameter):
608 element = element._clone(maintain_key=True)
609 element.expanding = True
610 element.expand_op = operator
611
612 return element
613 else:
614 return element
615
616
617class OnClauseImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
618 __slots__ = ()
619
620 _coerce_consts = True
621
622 def _implicit_coercions(
623 self, original_element, resolved, argname=None, legacy=False, **kw
624 ):
625 if legacy and isinstance(resolved, str):
626 return resolved
627 else:
628 return super(OnClauseImpl, self)._implicit_coercions(
629 original_element,
630 resolved,
631 argname=argname,
632 legacy=legacy,
633 **kw
634 )
635
636 def _text_coercion(self, element, argname=None, legacy=False):
637 if legacy and isinstance(element, str):
638 util.warn_deprecated_20(
639 "Using strings to indicate relationship names in "
640 "Query.join() is deprecated and will be removed in "
641 "SQLAlchemy 2.0. Please use the class-bound attribute "
642 "directly."
643 )
644 return element
645
646 return super(OnClauseImpl, self)._text_coercion(element, argname)
647
648 def _post_coercion(self, resolved, original_element=None, **kw):
649 # this is a hack right now as we want to use coercion on an
650 # ORM InstrumentedAttribute, but we want to return the object
651 # itself if it is one, not its clause element.
652 # ORM context _join and _legacy_join() would need to be improved
653 # to look for annotations in a clause element form.
654 if isinstance(original_element, roles.JoinTargetRole):
655 return original_element
656 return resolved
657
658
659class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
660 __slots__ = ()
661
662 _coerce_consts = True
663
664 def _text_coercion(self, element, argname=None):
665 return _no_text_coercion(element, argname)
666
667
668class StatementOptionImpl(_CoerceLiterals, RoleImpl):
669 __slots__ = ()
670
671 _coerce_consts = True
672
673 def _text_coercion(self, element, argname=None):
674 return elements.TextClause(element)
675
676
677class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
678 __slots__ = ()
679
680
681class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
682 __slots__ = ()
683
684
685class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
686 __slots__ = ()
687
688 def _text_coercion(self, element, argname=None):
689 return elements.ColumnClause(element)
690
691
692class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
693
694 __slots__ = ()
695
696 _coerce_consts = True
697
698 def _text_coercion(self, element, argname=None):
699 return elements._textual_label_reference(element)
700
701
702class OrderByImpl(ByOfImpl, RoleImpl):
703 __slots__ = ()
704
705 def _post_coercion(self, resolved, **kw):
706 if (
707 isinstance(resolved, self._role_class)
708 and resolved._order_by_label_element is not None
709 ):
710 return elements._label_reference(resolved)
711 else:
712 return resolved
713
714
715class GroupByImpl(ByOfImpl, RoleImpl):
716 __slots__ = ()
717
718 def _implicit_coercions(
719 self, original_element, resolved, argname=None, **kw
720 ):
721 if isinstance(resolved, roles.StrictFromClauseRole):
722 return elements.ClauseList(*resolved.c)
723 else:
724 return resolved
725
726
727class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
728 __slots__ = ()
729
730 def _post_coercion(self, element, as_key=False, **kw):
731 if as_key:
732 return element.key
733 else:
734 return element
735
736
737class ConstExprImpl(RoleImpl):
738 __slots__ = ()
739
740 def _literal_coercion(self, element, argname=None, **kw):
741 if element is None:
742 return elements.Null()
743 elif element is False:
744 return elements.False_()
745 elif element is True:
746 return elements.True_()
747 else:
748 self._raise_for_expected(element, argname)
749
750
751class TruncatedLabelImpl(_StringOnly, RoleImpl):
752 __slots__ = ()
753
754 def _implicit_coercions(
755 self, original_element, resolved, argname=None, **kw
756 ):
757 if isinstance(original_element, util.string_types):
758 return resolved
759 else:
760 self._raise_for_expected(original_element, argname, resolved)
761
762 def _literal_coercion(self, element, argname=None, **kw):
763 """coerce the given value to :class:`._truncated_label`.
764
765 Existing :class:`._truncated_label` and
766 :class:`._anonymous_label` objects are passed
767 unchanged.
768 """
769
770 if isinstance(element, elements._truncated_label):
771 return element
772 else:
773 return elements._truncated_label(element)
774
775
776class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
777
778 __slots__ = ()
779
780 _coerce_consts = True
781
782 def _text_coercion(self, element, argname=None):
783 # see #5754 for why we can't easily deprecate this coercion.
784 # essentially expressions like postgresql_where would have to be
785 # text() as they come back from reflection and we don't want to
786 # have text() elements wired into the inspection dictionaries.
787 return elements.TextClause(element)
788
789
790class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
791 __slots__ = ()
792
793
794class DDLReferredColumnImpl(DDLConstraintColumnImpl):
795 __slots__ = ()
796
797
798class LimitOffsetImpl(RoleImpl):
799 __slots__ = ()
800
801 def _implicit_coercions(self, element, resolved, argname=None, **kw):
802 if resolved is None:
803 return None
804 else:
805 self._raise_for_expected(element, argname, resolved)
806
807 def _literal_coercion(self, element, name, type_, **kw):
808 if element is None:
809 return None
810 else:
811 value = util.asint(element)
812 return selectable._OffsetLimitParam(
813 name, value, type_=type_, unique=True
814 )
815
816
817class LabeledColumnExprImpl(ExpressionElementImpl):
818 __slots__ = ()
819
820 def _implicit_coercions(
821 self, original_element, resolved, argname=None, **kw
822 ):
823 if isinstance(resolved, roles.ExpressionElementRole):
824 return resolved.label(None)
825 else:
826 new = super(LabeledColumnExprImpl, self)._implicit_coercions(
827 original_element, resolved, argname=argname, **kw
828 )
829 if isinstance(new, roles.ExpressionElementRole):
830 return new.label(None)
831 else:
832 self._raise_for_expected(original_element, argname, resolved)
833
834
835class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
836 __slots__ = ()
837
838 _coerce_consts = True
839 _coerce_numerics = True
840 _coerce_star = True
841
842 _guess_straight_column = re.compile(r"^\w\S*$", re.I)
843
844 def _text_coercion(self, element, argname=None):
845 element = str(element)
846
847 guess_is_literal = not self._guess_straight_column.match(element)
848 raise exc.ArgumentError(
849 "Textual column expression %(column)r %(argname)sshould be "
850 "explicitly declared with text(%(column)r), "
851 "or use %(literal_column)s(%(column)r) "
852 "for more specificity"
853 % {
854 "column": util.ellipses_string(element),
855 "argname": "for argument %s" % (argname,) if argname else "",
856 "literal_column": "literal_column"
857 if guess_is_literal
858 else "column",
859 }
860 )
861
862
863class ReturnsRowsImpl(RoleImpl):
864 __slots__ = ()
865
866
867class StatementImpl(_CoerceLiterals, RoleImpl):
868 __slots__ = ()
869
870 def _post_coercion(self, resolved, original_element, argname=None, **kw):
871 if resolved is not original_element and not isinstance(
872 original_element, util.string_types
873 ):
874 # use same method as Connection uses; this will later raise
875 # ObjectNotExecutableError
876 try:
877 original_element._execute_on_connection
878 except AttributeError:
879 util.warn_deprecated(
880 "Object %r should not be used directly in a SQL statement "
881 "context, such as passing to methods such as "
882 "session.execute(). This usage will be disallowed in a "
883 "future release. "
884 "Please use Core select() / update() / delete() etc. "
885 "with Session.execute() and other statement execution "
886 "methods." % original_element,
887 "1.4",
888 )
889
890 return resolved
891
892 def _implicit_coercions(
893 self, original_element, resolved, argname=None, **kw
894 ):
895 if resolved._is_lambda_element:
896 return resolved
897 else:
898 return super(StatementImpl, self)._implicit_coercions(
899 original_element, resolved, argname=argname, **kw
900 )
901
902 def _text_coercion(self, element, argname=None):
903 util.warn_deprecated_20(
904 "Using plain strings to indicate SQL statements without using "
905 "the text() construct is "
906 "deprecated and will be removed in version 2.0. Ensure plain "
907 "SQL statements are passed using the text() construct."
908 )
909 return elements.TextClause(element)
910
911
912class SelectStatementImpl(_NoTextCoercion, RoleImpl):
913 __slots__ = ()
914
915 def _implicit_coercions(
916 self, original_element, resolved, argname=None, **kw
917 ):
918 if resolved._is_text_clause:
919 return resolved.columns()
920 else:
921 self._raise_for_expected(original_element, argname, resolved)
922
923
924class HasCTEImpl(ReturnsRowsImpl):
925 __slots__ = ()
926
927
928class IsCTEImpl(RoleImpl):
929 __slots__ = ()
930
931
932class JoinTargetImpl(RoleImpl):
933 __slots__ = ()
934
935 _skip_clauseelement_for_target_match = True
936
937 def _literal_coercion(self, element, legacy=False, **kw):
938 if isinstance(element, str):
939 return element
940
941 def _implicit_coercions(
942 self, original_element, resolved, argname=None, legacy=False, **kw
943 ):
944 if isinstance(original_element, roles.JoinTargetRole):
945 # note that this codepath no longer occurs as of
946 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
947 # were set to False.
948 return original_element
949 elif legacy and isinstance(resolved, str):
950 util.warn_deprecated_20(
951 "Using strings to indicate relationship names in "
952 "Query.join() is deprecated and will be removed in "
953 "SQLAlchemy 2.0. Please use the class-bound attribute "
954 "directly."
955 )
956 return resolved
957 elif legacy and isinstance(resolved, roles.WhereHavingRole):
958 return resolved
959 elif legacy and resolved._is_select_statement:
960 util.warn_deprecated(
961 "Implicit coercion of SELECT and textual SELECT "
962 "constructs into FROM clauses is deprecated; please call "
963 ".subquery() on any Core select or ORM Query object in "
964 "order to produce a subquery object.",
965 version="1.4",
966 )
967 # TODO: doing _implicit_subquery here causes tests to fail,
968 # how was this working before? probably that ORM
969 # join logic treated it as a select and subquery would happen
970 # in _ORMJoin->Join
971 return resolved
972 else:
973 self._raise_for_expected(original_element, argname, resolved)
974
975
976class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
977 __slots__ = ()
978
979 def _implicit_coercions(
980 self,
981 original_element,
982 resolved,
983 argname=None,
984 explicit_subquery=False,
985 allow_select=True,
986 **kw
987 ):
988 if resolved._is_select_statement:
989 if explicit_subquery:
990 return resolved.subquery()
991 elif allow_select:
992 util.warn_deprecated(
993 "Implicit coercion of SELECT and textual SELECT "
994 "constructs into FROM clauses is deprecated; please call "
995 ".subquery() on any Core select or ORM Query object in "
996 "order to produce a subquery object.",
997 version="1.4",
998 )
999 return resolved._implicit_subquery
1000 elif resolved._is_text_clause:
1001 return resolved
1002 else:
1003 self._raise_for_expected(original_element, argname, resolved)
1004
1005 def _post_coercion(self, element, deannotate=False, **kw):
1006 if deannotate:
1007 return element._deannotate()
1008 else:
1009 return element
1010
1011
1012class StrictFromClauseImpl(FromClauseImpl):
1013 __slots__ = ()
1014
1015 def _implicit_coercions(
1016 self,
1017 original_element,
1018 resolved,
1019 argname=None,
1020 allow_select=False,
1021 **kw
1022 ):
1023 if resolved._is_select_statement and allow_select:
1024 util.warn_deprecated(
1025 "Implicit coercion of SELECT and textual SELECT constructs "
1026 "into FROM clauses is deprecated; please call .subquery() "
1027 "on any Core select or ORM Query object in order to produce a "
1028 "subquery object.",
1029 version="1.4",
1030 )
1031 return resolved._implicit_subquery
1032 else:
1033 self._raise_for_expected(original_element, argname, resolved)
1034
1035
1036class AnonymizedFromClauseImpl(StrictFromClauseImpl):
1037 __slots__ = ()
1038
1039 def _post_coercion(self, element, flat=False, name=None, **kw):
1040 assert name is None
1041
1042 return element._anonymous_fromclause(flat=flat)
1043
1044
1045class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
1046 __slots__ = ()
1047
1048 def _post_coercion(self, element, **kw):
1049 if "dml_table" in element._annotations:
1050 return element._annotations["dml_table"]
1051 else:
1052 return element
1053
1054
1055class DMLSelectImpl(_NoTextCoercion, RoleImpl):
1056 __slots__ = ()
1057
1058 def _implicit_coercions(
1059 self, original_element, resolved, argname=None, **kw
1060 ):
1061 if resolved._is_from_clause:
1062 if (
1063 isinstance(resolved, selectable.Alias)
1064 and resolved.element._is_select_statement
1065 ):
1066 return resolved.element
1067 else:
1068 return resolved.select()
1069 else:
1070 self._raise_for_expected(original_element, argname, resolved)
1071
1072
1073class CompoundElementImpl(_NoTextCoercion, RoleImpl):
1074 __slots__ = ()
1075
1076 def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
1077 if isinstance(element, roles.FromClauseRole):
1078 if element._is_subquery:
1079 advice = (
1080 "Use the plain select() object without "
1081 "calling .subquery() or .alias()."
1082 )
1083 else:
1084 advice = (
1085 "To SELECT from any FROM clause, use the .select() method."
1086 )
1087 else:
1088 advice = None
1089 return super(CompoundElementImpl, self)._raise_for_expected(
1090 element, argname=argname, resolved=resolved, advice=advice, **kw
1091 )
1092
1093
1094_impl_lookup = {}
1095
1096
1097for name in dir(roles):
1098 cls = getattr(roles, name)
1099 if name.endswith("Role"):
1100 name = name.replace("Role", "Impl")
1101 if name in globals():
1102 impl = globals()[name](cls)
1103 _impl_lookup[cls] = impl