1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Mapping
48from typing import MutableMapping
49from typing import NamedTuple
50from typing import NoReturn
51from typing import Optional
52from typing import Pattern
53from typing import Sequence
54from typing import Set
55from typing import Tuple
56from typing import Type
57from typing import TYPE_CHECKING
58from typing import Union
59
60from . import base
61from . import coercions
62from . import crud
63from . import elements
64from . import functions
65from . import operators
66from . import roles
67from . import schema
68from . import selectable
69from . import sqltypes
70from . import util as sql_util
71from ._typing import is_column_element
72from ._typing import is_dml
73from .base import _de_clone
74from .base import _from_objects
75from .base import _NONE_NAME
76from .base import _SentinelDefaultCharacterization
77from .base import NO_ARG
78from .elements import quoted_name
79from .sqltypes import TupleType
80from .visitors import prefix_anon_map
81from .. import exc
82from .. import util
83from ..util import FastIntFlag
84from ..util.typing import Literal
85from ..util.typing import Protocol
86from ..util.typing import Self
87from ..util.typing import TypedDict
88
89if typing.TYPE_CHECKING:
90 from .annotation import _AnnotationDict
91 from .base import _AmbiguousTableNameMap
92 from .base import CompileState
93 from .base import Executable
94 from .cache_key import CacheKey
95 from .ddl import ExecutableDDLElement
96 from .dml import Insert
97 from .dml import Update
98 from .dml import UpdateBase
99 from .dml import UpdateDMLState
100 from .dml import ValuesBase
101 from .elements import _truncated_label
102 from .elements import BinaryExpression
103 from .elements import BindParameter
104 from .elements import ClauseElement
105 from .elements import ColumnClause
106 from .elements import ColumnElement
107 from .elements import False_
108 from .elements import Label
109 from .elements import Null
110 from .elements import True_
111 from .functions import Function
112 from .schema import CheckConstraint
113 from .schema import Column
114 from .schema import Constraint
115 from .schema import ForeignKeyConstraint
116 from .schema import IdentityOptions
117 from .schema import Index
118 from .schema import PrimaryKeyConstraint
119 from .schema import Table
120 from .schema import UniqueConstraint
121 from .selectable import _ColumnsClauseElement
122 from .selectable import AliasedReturnsRows
123 from .selectable import CompoundSelectState
124 from .selectable import CTE
125 from .selectable import FromClause
126 from .selectable import NamedFromClause
127 from .selectable import ReturnsRows
128 from .selectable import Select
129 from .selectable import SelectState
130 from .type_api import _BindProcessorType
131 from .type_api import TypeDecorator
132 from .type_api import TypeEngine
133 from .type_api import UserDefinedType
134 from .visitors import Visitable
135 from ..engine.cursor import CursorResultMetaData
136 from ..engine.interfaces import _CoreSingleExecuteParams
137 from ..engine.interfaces import _DBAPIAnyExecuteParams
138 from ..engine.interfaces import _DBAPIMultiExecuteParams
139 from ..engine.interfaces import _DBAPISingleExecuteParams
140 from ..engine.interfaces import _ExecuteOptions
141 from ..engine.interfaces import _GenericSetInputSizesType
142 from ..engine.interfaces import _MutableCoreSingleExecuteParams
143 from ..engine.interfaces import Dialect
144 from ..engine.interfaces import SchemaTranslateMapType
145
146
147_FromHintsType = Dict["FromClause", str]
148
149RESERVED_WORDS = {
150 "all",
151 "analyse",
152 "analyze",
153 "and",
154 "any",
155 "array",
156 "as",
157 "asc",
158 "asymmetric",
159 "authorization",
160 "between",
161 "binary",
162 "both",
163 "case",
164 "cast",
165 "check",
166 "collate",
167 "column",
168 "constraint",
169 "create",
170 "cross",
171 "current_date",
172 "current_role",
173 "current_time",
174 "current_timestamp",
175 "current_user",
176 "default",
177 "deferrable",
178 "desc",
179 "distinct",
180 "do",
181 "else",
182 "end",
183 "except",
184 "false",
185 "for",
186 "foreign",
187 "freeze",
188 "from",
189 "full",
190 "grant",
191 "group",
192 "having",
193 "ilike",
194 "in",
195 "initially",
196 "inner",
197 "intersect",
198 "into",
199 "is",
200 "isnull",
201 "join",
202 "leading",
203 "left",
204 "like",
205 "limit",
206 "localtime",
207 "localtimestamp",
208 "natural",
209 "new",
210 "not",
211 "notnull",
212 "null",
213 "off",
214 "offset",
215 "old",
216 "on",
217 "only",
218 "or",
219 "order",
220 "outer",
221 "overlaps",
222 "placing",
223 "primary",
224 "references",
225 "right",
226 "select",
227 "session_user",
228 "set",
229 "similar",
230 "some",
231 "symmetric",
232 "table",
233 "then",
234 "to",
235 "trailing",
236 "true",
237 "union",
238 "unique",
239 "user",
240 "using",
241 "verbose",
242 "when",
243 "where",
244}
245
246LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
247LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
248ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
249
250FK_ON_DELETE = re.compile(
251 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
252)
253FK_ON_UPDATE = re.compile(
254 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
255)
256FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
257BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
258BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
259
260_pyformat_template = "%%(%(name)s)s"
261BIND_TEMPLATES = {
262 "pyformat": _pyformat_template,
263 "qmark": "?",
264 "format": "%%s",
265 "numeric": ":[_POSITION]",
266 "numeric_dollar": "$[_POSITION]",
267 "named": ":%(name)s",
268}
269
270
271OPERATORS = {
272 # binary
273 operators.and_: " AND ",
274 operators.or_: " OR ",
275 operators.add: " + ",
276 operators.mul: " * ",
277 operators.sub: " - ",
278 operators.mod: " % ",
279 operators.neg: "-",
280 operators.lt: " < ",
281 operators.le: " <= ",
282 operators.ne: " != ",
283 operators.gt: " > ",
284 operators.ge: " >= ",
285 operators.eq: " = ",
286 operators.is_distinct_from: " IS DISTINCT FROM ",
287 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
288 operators.concat_op: " || ",
289 operators.match_op: " MATCH ",
290 operators.not_match_op: " NOT MATCH ",
291 operators.in_op: " IN ",
292 operators.not_in_op: " NOT IN ",
293 operators.comma_op: ", ",
294 operators.from_: " FROM ",
295 operators.as_: " AS ",
296 operators.is_: " IS ",
297 operators.is_not: " IS NOT ",
298 operators.collate: " COLLATE ",
299 # unary
300 operators.exists: "EXISTS ",
301 operators.distinct_op: "DISTINCT ",
302 operators.inv: "NOT ",
303 operators.any_op: "ANY ",
304 operators.all_op: "ALL ",
305 # modifiers
306 operators.desc_op: " DESC",
307 operators.asc_op: " ASC",
308 operators.nulls_first_op: " NULLS FIRST",
309 operators.nulls_last_op: " NULLS LAST",
310 # bitwise
311 operators.bitwise_xor_op: " ^ ",
312 operators.bitwise_or_op: " | ",
313 operators.bitwise_and_op: " & ",
314 operators.bitwise_not_op: "~",
315 operators.bitwise_lshift_op: " << ",
316 operators.bitwise_rshift_op: " >> ",
317}
318
319FUNCTIONS: Dict[Type[Function[Any]], str] = {
320 functions.coalesce: "coalesce",
321 functions.current_date: "CURRENT_DATE",
322 functions.current_time: "CURRENT_TIME",
323 functions.current_timestamp: "CURRENT_TIMESTAMP",
324 functions.current_user: "CURRENT_USER",
325 functions.localtime: "LOCALTIME",
326 functions.localtimestamp: "LOCALTIMESTAMP",
327 functions.random: "random",
328 functions.sysdate: "sysdate",
329 functions.session_user: "SESSION_USER",
330 functions.user: "USER",
331 functions.cube: "CUBE",
332 functions.rollup: "ROLLUP",
333 functions.grouping_sets: "GROUPING SETS",
334}
335
336
337EXTRACT_MAP = {
338 "month": "month",
339 "day": "day",
340 "year": "year",
341 "second": "second",
342 "hour": "hour",
343 "doy": "doy",
344 "minute": "minute",
345 "quarter": "quarter",
346 "dow": "dow",
347 "week": "week",
348 "epoch": "epoch",
349 "milliseconds": "milliseconds",
350 "microseconds": "microseconds",
351 "timezone_hour": "timezone_hour",
352 "timezone_minute": "timezone_minute",
353}
354
355COMPOUND_KEYWORDS = {
356 selectable._CompoundSelectKeyword.UNION: "UNION",
357 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
358 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
359 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
360 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
361 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
362}
363
364
365class ResultColumnsEntry(NamedTuple):
366 """Tracks a column expression that is expected to be represented
367 in the result rows for this statement.
368
369 This normally refers to the columns clause of a SELECT statement
370 but may also refer to a RETURNING clause, as well as for dialect-specific
371 emulations.
372
373 """
374
375 keyname: str
376 """string name that's expected in cursor.description"""
377
378 name: str
379 """column name, may be labeled"""
380
381 objects: Tuple[Any, ...]
382 """sequence of objects that should be able to locate this column
383 in a RowMapping. This is typically string names and aliases
384 as well as Column objects.
385
386 """
387
388 type: TypeEngine[Any]
389 """Datatype to be associated with this column. This is where
390 the "result processing" logic directly links the compiled statement
391 to the rows that come back from the cursor.
392
393 """
394
395
396class _ResultMapAppender(Protocol):
397 def __call__(
398 self,
399 keyname: str,
400 name: str,
401 objects: Sequence[Any],
402 type_: TypeEngine[Any],
403 ) -> None: ...
404
405
406# integer indexes into ResultColumnsEntry used by cursor.py.
407# some profiling showed integer access faster than named tuple
408RM_RENDERED_NAME: Literal[0] = 0
409RM_NAME: Literal[1] = 1
410RM_OBJECTS: Literal[2] = 2
411RM_TYPE: Literal[3] = 3
412
413
414class _BaseCompilerStackEntry(TypedDict):
415 asfrom_froms: Set[FromClause]
416 correlate_froms: Set[FromClause]
417 selectable: ReturnsRows
418
419
420class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
421 compile_state: CompileState
422 need_result_map_for_nested: bool
423 need_result_map_for_compound: bool
424 select_0: ReturnsRows
425 insert_from_select: Select[Any]
426
427
428class ExpandedState(NamedTuple):
429 """represents state to use when producing "expanded" and
430 "post compile" bound parameters for a statement.
431
432 "expanded" parameters are parameters that are generated at
433 statement execution time to suit a number of parameters passed, the most
434 prominent example being the individual elements inside of an IN expression.
435
436 "post compile" parameters are parameters where the SQL literal value
437 will be rendered into the SQL statement at execution time, rather than
438 being passed as separate parameters to the driver.
439
440 To create an :class:`.ExpandedState` instance, use the
441 :meth:`.SQLCompiler.construct_expanded_state` method on any
442 :class:`.SQLCompiler` instance.
443
444 """
445
446 statement: str
447 """String SQL statement with parameters fully expanded"""
448
449 parameters: _CoreSingleExecuteParams
450 """Parameter dictionary with parameters fully expanded.
451
452 For a statement that uses named parameters, this dictionary will map
453 exactly to the names in the statement. For a statement that uses
454 positional parameters, the :attr:`.ExpandedState.positional_parameters`
455 will yield a tuple with the positional parameter set.
456
457 """
458
459 processors: Mapping[str, _BindProcessorType[Any]]
460 """mapping of bound value processors"""
461
462 positiontup: Optional[Sequence[str]]
463 """Sequence of string names indicating the order of positional
464 parameters"""
465
466 parameter_expansion: Mapping[str, List[str]]
467 """Mapping representing the intermediary link from original parameter
468 name to list of "expanded" parameter names, for those parameters that
469 were expanded."""
470
471 @property
472 def positional_parameters(self) -> Tuple[Any, ...]:
473 """Tuple of positional parameters, for statements that were compiled
474 using a positional paramstyle.
475
476 """
477 if self.positiontup is None:
478 raise exc.InvalidRequestError(
479 "statement does not use a positional paramstyle"
480 )
481 return tuple(self.parameters[key] for key in self.positiontup)
482
483 @property
484 def additional_parameters(self) -> _CoreSingleExecuteParams:
485 """synonym for :attr:`.ExpandedState.parameters`."""
486 return self.parameters
487
488
489class _InsertManyValues(NamedTuple):
490 """represents state to use for executing an "insertmanyvalues" statement.
491
492 The primary consumers of this object are the
493 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
494 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
495
496 .. versionadded:: 2.0
497
498 """
499
500 is_default_expr: bool
501 """if True, the statement is of the form
502 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
503
504 """
505
506 single_values_expr: str
507 """The rendered "values" clause of the INSERT statement.
508
509 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
510 The insertmanyvalues logic uses this string as a search and replace
511 target.
512
513 """
514
515 insert_crud_params: List[crud._CrudParamElementStr]
516 """List of Column / bind names etc. used while rewriting the statement"""
517
518 num_positional_params_counted: int
519 """the number of bound parameters in a single-row statement.
520
521 This count may be larger or smaller than the actual number of columns
522 targeted in the INSERT, as it accommodates for SQL expressions
523 in the values list that may have zero or more parameters embedded
524 within them.
525
526 This count is part of what's used to organize rewritten parameter lists
527 when batching.
528
529 """
530
531 sort_by_parameter_order: bool = False
532 """if the deterministic_returnined_order parameter were used on the
533 insert.
534
535 All of the attributes following this will only be used if this is True.
536
537 """
538
539 includes_upsert_behaviors: bool = False
540 """if True, we have to accommodate for upsert behaviors.
541
542 This will in some cases downgrade "insertmanyvalues" that requests
543 deterministic ordering.
544
545 """
546
547 sentinel_columns: Optional[Sequence[Column[Any]]] = None
548 """List of sentinel columns that were located.
549
550 This list is only here if the INSERT asked for
551 sort_by_parameter_order=True,
552 and dialect-appropriate sentinel columns were located.
553
554 .. versionadded:: 2.0.10
555
556 """
557
558 num_sentinel_columns: int = 0
559 """how many sentinel columns are in the above list, if any.
560
561 This is the same as
562 ``len(sentinel_columns) if sentinel_columns is not None else 0``
563
564 """
565
566 sentinel_param_keys: Optional[Sequence[str]] = None
567 """parameter str keys in each param dictionary / tuple
568 that would link to the client side "sentinel" values for that row, which
569 we can use to match up parameter sets to result rows.
570
571 This is only present if sentinel_columns is present and the INSERT
572 statement actually refers to client side values for these sentinel
573 columns.
574
575 .. versionadded:: 2.0.10
576
577 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
578 only, used against the "compiled parameteters" collection before
579 the parameters were converted by bound parameter processors
580
581 """
582
583 implicit_sentinel: bool = False
584 """if True, we have exactly one sentinel column and it uses a server side
585 value, currently has to generate an incrementing integer value.
586
587 The dialect in question would have asserted that it supports receiving
588 these values back and sorting on that value as a means of guaranteeing
589 correlation with the incoming parameter list.
590
591 .. versionadded:: 2.0.10
592
593 """
594
595 has_upsert_bound_parameters: bool = False
596 """if True, the upsert SET clause contains bound parameters that will
597 receive their values from the parameters dict (i.e., parametrized
598 bindparams where value is None and callable is None).
599
600 This means we can't batch multiple rows in a single statement, since
601 each row would need different values in the SET clause but there's only
602 one SET clause per statement. See issue #13130.
603
604 .. versionadded:: 2.0.37
605
606 """
607
608 embed_values_counter: bool = False
609 """Whether to embed an incrementing integer counter in each parameter
610 set within the VALUES clause as parameters are batched over.
611
612 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
613 where a subquery is used to produce value tuples. Current support
614 includes PostgreSQL, Microsoft SQL Server.
615
616 .. versionadded:: 2.0.10
617
618 """
619
620
621class _InsertManyValuesBatch(NamedTuple):
622 """represents an individual batch SQL statement for insertmanyvalues.
623
624 This is passed through the
625 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
626 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
627 to the :class:`.Connection` within the
628 :meth:`.Connection._exec_insertmany_context` method.
629
630 .. versionadded:: 2.0.10
631
632 """
633
634 replaced_statement: str
635 replaced_parameters: _DBAPIAnyExecuteParams
636 processed_setinputsizes: Optional[_GenericSetInputSizesType]
637 batch: Sequence[_DBAPISingleExecuteParams]
638 sentinel_values: Sequence[Tuple[Any, ...]]
639 current_batch_size: int
640 batchnum: int
641 total_batches: int
642 rows_sorted: bool
643 is_downgraded: bool
644
645
646class InsertmanyvaluesSentinelOpts(FastIntFlag):
647 """bitflag enum indicating styles of PK defaults
648 which can work as implicit sentinel columns
649
650 """
651
652 NOT_SUPPORTED = 1
653 AUTOINCREMENT = 2
654 IDENTITY = 4
655 SEQUENCE = 8
656
657 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
658 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
659
660 USE_INSERT_FROM_SELECT = 16
661 RENDER_SELECT_COL_CASTS = 64
662
663
664class CompilerState(IntEnum):
665 COMPILING = 0
666 """statement is present, compilation phase in progress"""
667
668 STRING_APPLIED = 1
669 """statement is present, string form of the statement has been applied.
670
671 Additional processors by subclasses may still be pending.
672
673 """
674
675 NO_STATEMENT = 2
676 """compiler does not have a statement to compile, is used
677 for method access"""
678
679
680class Linting(IntEnum):
681 """represent preferences for the 'SQL linting' feature.
682
683 this feature currently includes support for flagging cartesian products
684 in SQL statements.
685
686 """
687
688 NO_LINTING = 0
689 "Disable all linting."
690
691 COLLECT_CARTESIAN_PRODUCTS = 1
692 """Collect data on FROMs and cartesian products and gather into
693 'self.from_linter'"""
694
695 WARN_LINTING = 2
696 "Emit warnings for linters that find problems"
697
698 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
699 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
700 and WARN_LINTING"""
701
702
703NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
704 Linting
705)
706
707
708class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
709 """represents current state for the "cartesian product" detection
710 feature."""
711
712 def lint(self, start=None):
713 froms = self.froms
714 if not froms:
715 return None, None
716
717 edges = set(self.edges)
718 the_rest = set(froms)
719
720 if start is not None:
721 start_with = start
722 the_rest.remove(start_with)
723 else:
724 start_with = the_rest.pop()
725
726 stack = collections.deque([start_with])
727
728 while stack and the_rest:
729 node = stack.popleft()
730 the_rest.discard(node)
731
732 # comparison of nodes in edges here is based on hash equality, as
733 # there are "annotated" elements that match the non-annotated ones.
734 # to remove the need for in-python hash() calls, use native
735 # containment routines (e.g. "node in edge", "edge.index(node)")
736 to_remove = {edge for edge in edges if node in edge}
737
738 # appendleft the node in each edge that is not
739 # the one that matched.
740 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
741 edges.difference_update(to_remove)
742
743 # FROMS left over? boom
744 if the_rest:
745 return the_rest, start_with
746 else:
747 return None, None
748
749 def warn(self, stmt_type="SELECT"):
750 the_rest, start_with = self.lint()
751
752 # FROMS left over? boom
753 if the_rest:
754 froms = the_rest
755 if froms:
756 template = (
757 "{stmt_type} statement has a cartesian product between "
758 "FROM element(s) {froms} and "
759 'FROM element "{start}". Apply join condition(s) '
760 "between each element to resolve."
761 )
762 froms_str = ", ".join(
763 f'"{self.froms[from_]}"' for from_ in froms
764 )
765 message = template.format(
766 stmt_type=stmt_type,
767 froms=froms_str,
768 start=self.froms[start_with],
769 )
770
771 util.warn(message)
772
773
774class Compiled:
775 """Represent a compiled SQL or DDL expression.
776
777 The ``__str__`` method of the ``Compiled`` object should produce
778 the actual text of the statement. ``Compiled`` objects are
779 specific to their underlying database dialect, and also may
780 or may not be specific to the columns referenced within a
781 particular set of bind parameters. In no case should the
782 ``Compiled`` object be dependent on the actual values of those
783 bind parameters, even though it may reference those values as
784 defaults.
785 """
786
787 statement: Optional[ClauseElement] = None
788 "The statement to compile."
789 string: str = ""
790 "The string representation of the ``statement``"
791
792 state: CompilerState
793 """description of the compiler's state"""
794
795 is_sql = False
796 is_ddl = False
797
798 _cached_metadata: Optional[CursorResultMetaData] = None
799
800 _result_columns: Optional[List[ResultColumnsEntry]] = None
801
802 schema_translate_map: Optional[SchemaTranslateMapType] = None
803
804 execution_options: _ExecuteOptions = util.EMPTY_DICT
805 """
806 Execution options propagated from the statement. In some cases,
807 sub-elements of the statement can modify these.
808 """
809
810 preparer: IdentifierPreparer
811
812 _annotations: _AnnotationDict = util.EMPTY_DICT
813
814 compile_state: Optional[CompileState] = None
815 """Optional :class:`.CompileState` object that maintains additional
816 state used by the compiler.
817
818 Major executable objects such as :class:`_expression.Insert`,
819 :class:`_expression.Update`, :class:`_expression.Delete`,
820 :class:`_expression.Select` will generate this
821 state when compiled in order to calculate additional information about the
822 object. For the top level object that is to be executed, the state can be
823 stored here where it can also have applicability towards result set
824 processing.
825
826 .. versionadded:: 1.4
827
828 """
829
830 dml_compile_state: Optional[CompileState] = None
831 """Optional :class:`.CompileState` assigned at the same point that
832 .isinsert, .isupdate, or .isdelete is assigned.
833
834 This will normally be the same object as .compile_state, with the
835 exception of cases like the :class:`.ORMFromStatementCompileState`
836 object.
837
838 .. versionadded:: 1.4.40
839
840 """
841
842 cache_key: Optional[CacheKey] = None
843 """The :class:`.CacheKey` that was generated ahead of creating this
844 :class:`.Compiled` object.
845
846 This is used for routines that need access to the original
847 :class:`.CacheKey` instance generated when the :class:`.Compiled`
848 instance was first cached, typically in order to reconcile
849 the original list of :class:`.BindParameter` objects with a
850 per-statement list that's generated on each call.
851
852 """
853
854 _gen_time: float
855 """Generation time of this :class:`.Compiled`, used for reporting
856 cache stats."""
857
858 def __init__(
859 self,
860 dialect: Dialect,
861 statement: Optional[ClauseElement],
862 schema_translate_map: Optional[SchemaTranslateMapType] = None,
863 render_schema_translate: bool = False,
864 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
865 ):
866 """Construct a new :class:`.Compiled` object.
867
868 :param dialect: :class:`.Dialect` to compile against.
869
870 :param statement: :class:`_expression.ClauseElement` to be compiled.
871
872 :param schema_translate_map: dictionary of schema names to be
873 translated when forming the resultant SQL
874
875 .. seealso::
876
877 :ref:`schema_translating`
878
879 :param compile_kwargs: additional kwargs that will be
880 passed to the initial call to :meth:`.Compiled.process`.
881
882
883 """
884 self.dialect = dialect
885 self.preparer = self.dialect.identifier_preparer
886 if schema_translate_map:
887 self.schema_translate_map = schema_translate_map
888 self.preparer = self.preparer._with_schema_translate(
889 schema_translate_map
890 )
891
892 if statement is not None:
893 self.state = CompilerState.COMPILING
894 self.statement = statement
895 self.can_execute = statement.supports_execution
896 self._annotations = statement._annotations
897 if self.can_execute:
898 if TYPE_CHECKING:
899 assert isinstance(statement, Executable)
900 self.execution_options = statement._execution_options
901 self.string = self.process(self.statement, **compile_kwargs)
902
903 if render_schema_translate:
904 assert schema_translate_map is not None
905 self.string = self.preparer._render_schema_translates(
906 self.string, schema_translate_map
907 )
908
909 self.state = CompilerState.STRING_APPLIED
910 else:
911 self.state = CompilerState.NO_STATEMENT
912
913 self._gen_time = perf_counter()
914
915 def __init_subclass__(cls) -> None:
916 cls._init_compiler_cls()
917 return super().__init_subclass__()
918
919 @classmethod
920 def _init_compiler_cls(cls):
921 pass
922
923 def _execute_on_connection(
924 self, connection, distilled_params, execution_options
925 ):
926 if self.can_execute:
927 return connection._execute_compiled(
928 self, distilled_params, execution_options
929 )
930 else:
931 raise exc.ObjectNotExecutableError(self.statement)
932
933 def visit_unsupported_compilation(self, element, err, **kw):
934 raise exc.UnsupportedCompilationError(self, type(element)) from err
935
936 @property
937 def sql_compiler(self) -> SQLCompiler:
938 """Return a Compiled that is capable of processing SQL expressions.
939
940 If this compiler is one, it would likely just return 'self'.
941
942 """
943
944 raise NotImplementedError()
945
946 def process(self, obj: Visitable, **kwargs: Any) -> str:
947 return obj._compiler_dispatch(self, **kwargs)
948
949 def __str__(self) -> str:
950 """Return the string text of the generated SQL or DDL."""
951
952 if self.state is CompilerState.STRING_APPLIED:
953 return self.string
954 else:
955 return ""
956
957 def construct_params(
958 self,
959 params: Optional[_CoreSingleExecuteParams] = None,
960 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
961 escape_names: bool = True,
962 ) -> Optional[_MutableCoreSingleExecuteParams]:
963 """Return the bind params for this compiled object.
964
965 :param params: a dict of string/object pairs whose values will
966 override bind values compiled in to the
967 statement.
968 """
969
970 raise NotImplementedError()
971
972 @property
973 def params(self):
974 """Return the bind params for this compiled object."""
975 return self.construct_params()
976
977
978class TypeCompiler(util.EnsureKWArg):
979 """Produces DDL specification for TypeEngine objects."""
980
981 ensure_kwarg = r"visit_\w+"
982
983 def __init__(self, dialect: Dialect):
984 self.dialect = dialect
985
986 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
987 if (
988 type_._variant_mapping
989 and self.dialect.name in type_._variant_mapping
990 ):
991 type_ = type_._variant_mapping[self.dialect.name]
992 return type_._compiler_dispatch(self, **kw)
993
994 def visit_unsupported_compilation(
995 self, element: Any, err: Exception, **kw: Any
996 ) -> NoReturn:
997 raise exc.UnsupportedCompilationError(self, element) from err
998
999
1000# this was a Visitable, but to allow accurate detection of
1001# column elements this is actually a column element
1002class _CompileLabel(
1003 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1004):
1005 """lightweight label object which acts as an expression.Label."""
1006
1007 __visit_name__ = "label"
1008 __slots__ = "element", "name", "_alt_names"
1009
1010 def __init__(self, col, name, alt_names=()):
1011 self.element = col
1012 self.name = name
1013 self._alt_names = (col,) + alt_names
1014
1015 @property
1016 def proxy_set(self):
1017 return self.element.proxy_set
1018
1019 @property
1020 def type(self):
1021 return self.element.type
1022
1023 def self_group(self, **kw):
1024 return self
1025
1026
1027class ilike_case_insensitive(
1028 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1029):
1030 """produce a wrapping element for a case-insensitive portion of
1031 an ILIKE construct.
1032
1033 The construct usually renders the ``lower()`` function, but on
1034 PostgreSQL will pass silently with the assumption that "ILIKE"
1035 is being used.
1036
1037 .. versionadded:: 2.0
1038
1039 """
1040
1041 __visit_name__ = "ilike_case_insensitive_operand"
1042 __slots__ = "element", "comparator"
1043
1044 def __init__(self, element):
1045 self.element = element
1046 self.comparator = element.comparator
1047
1048 @property
1049 def proxy_set(self):
1050 return self.element.proxy_set
1051
1052 @property
1053 def type(self):
1054 return self.element.type
1055
1056 def self_group(self, **kw):
1057 return self
1058
1059 def _with_binary_element_type(self, type_):
1060 return ilike_case_insensitive(
1061 self.element._with_binary_element_type(type_)
1062 )
1063
1064
1065class SQLCompiler(Compiled):
1066 """Default implementation of :class:`.Compiled`.
1067
1068 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1069
1070 """
1071
1072 extract_map = EXTRACT_MAP
1073
1074 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1075 util.immutabledict(
1076 {
1077 "%": "P",
1078 "(": "A",
1079 ")": "Z",
1080 ":": "C",
1081 ".": "_",
1082 "[": "_",
1083 "]": "_",
1084 " ": "_",
1085 }
1086 )
1087 )
1088 """A mapping (e.g. dict or similar) containing a lookup of
1089 characters keyed to replacement characters which will be applied to all
1090 'bind names' used in SQL statements as a form of 'escaping'; the given
1091 characters are replaced entirely with the 'replacement' character when
1092 rendered in the SQL statement, and a similar translation is performed
1093 on the incoming names used in parameter dictionaries passed to methods
1094 like :meth:`_engine.Connection.execute`.
1095
1096 This allows bound parameter names used in :func:`_sql.bindparam` and
1097 other constructs to have any arbitrary characters present without any
1098 concern for characters that aren't allowed at all on the target database.
1099
1100 Third party dialects can establish their own dictionary here to replace the
1101 default mapping, which will ensure that the particular characters in the
1102 mapping will never appear in a bound parameter name.
1103
1104 The dictionary is evaluated at **class creation time**, so cannot be
1105 modified at runtime; it must be present on the class when the class
1106 is first declared.
1107
1108 Note that for dialects that have additional bound parameter rules such
1109 as additional restrictions on leading characters, the
1110 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1111 See the cx_Oracle compiler for an example of this.
1112
1113 .. versionadded:: 2.0.0rc1
1114
1115 """
1116
1117 _bind_translate_re: ClassVar[Pattern[str]]
1118 _bind_translate_chars: ClassVar[Mapping[str, str]]
1119
1120 is_sql = True
1121
1122 compound_keywords = COMPOUND_KEYWORDS
1123
1124 isdelete: bool = False
1125 isinsert: bool = False
1126 isupdate: bool = False
1127 """class-level defaults which can be set at the instance
1128 level to define if this Compiled instance represents
1129 INSERT/UPDATE/DELETE
1130 """
1131
1132 postfetch: Optional[List[Column[Any]]]
1133 """list of columns that can be post-fetched after INSERT or UPDATE to
1134 receive server-updated values"""
1135
1136 insert_prefetch: Sequence[Column[Any]] = ()
1137 """list of columns for which default values should be evaluated before
1138 an INSERT takes place"""
1139
1140 update_prefetch: Sequence[Column[Any]] = ()
1141 """list of columns for which onupdate default values should be evaluated
1142 before an UPDATE takes place"""
1143
1144 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1145 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1146 statement, used to receive newly generated values of columns.
1147
1148 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1149 ``returning`` collection, which was not a generalized RETURNING
1150 collection and instead was in fact specific to the "implicit returning"
1151 feature.
1152
1153 """
1154
1155 isplaintext: bool = False
1156
1157 binds: Dict[str, BindParameter[Any]]
1158 """a dictionary of bind parameter keys to BindParameter instances."""
1159
1160 bind_names: Dict[BindParameter[Any], str]
1161 """a dictionary of BindParameter instances to "compiled" names
1162 that are actually present in the generated SQL"""
1163
1164 stack: List[_CompilerStackEntry]
1165 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1166 tracked in this stack using an entry format."""
1167
1168 returning_precedes_values: bool = False
1169 """set to True classwide to generate RETURNING
1170 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1171 """
1172
1173 render_table_with_column_in_update_from: bool = False
1174 """set to True classwide to indicate the SET clause
1175 in a multi-table UPDATE statement should qualify
1176 columns with the table name (i.e. MySQL only)
1177 """
1178
1179 ansi_bind_rules: bool = False
1180 """SQL 92 doesn't allow bind parameters to be used
1181 in the columns clause of a SELECT, nor does it allow
1182 ambiguous expressions like "? = ?". A compiler
1183 subclass can set this flag to False if the target
1184 driver/DB enforces this
1185 """
1186
1187 bindtemplate: str
1188 """template to render bound parameters based on paramstyle."""
1189
1190 compilation_bindtemplate: str
1191 """template used by compiler to render parameters before positional
1192 paramstyle application"""
1193
1194 _numeric_binds_identifier_char: str
1195 """Character that's used to as the identifier of a numerical bind param.
1196 For example if this char is set to ``$``, numerical binds will be rendered
1197 in the form ``$1, $2, $3``.
1198 """
1199
1200 _result_columns: List[ResultColumnsEntry]
1201 """relates label names in the final SQL to a tuple of local
1202 column/label name, ColumnElement object (if any) and
1203 TypeEngine. CursorResult uses this for type processing and
1204 column targeting"""
1205
1206 _textual_ordered_columns: bool = False
1207 """tell the result object that the column names as rendered are important,
1208 but they are also "ordered" vs. what is in the compiled object here.
1209
1210 As of 1.4.42 this condition is only present when the statement is a
1211 TextualSelect, e.g. text("....").columns(...), where it is required
1212 that the columns are considered positionally and not by name.
1213
1214 """
1215
1216 _ad_hoc_textual: bool = False
1217 """tell the result that we encountered text() or '*' constructs in the
1218 middle of the result columns, but we also have compiled columns, so
1219 if the number of columns in cursor.description does not match how many
1220 expressions we have, that means we can't rely on positional at all and
1221 should match on name.
1222
1223 """
1224
1225 _ordered_columns: bool = True
1226 """
1227 if False, means we can't be sure the list of entries
1228 in _result_columns is actually the rendered order. Usually
1229 True unless using an unordered TextualSelect.
1230 """
1231
1232 _loose_column_name_matching: bool = False
1233 """tell the result object that the SQL statement is textual, wants to match
1234 up to Column objects, and may be using the ._tq_label in the SELECT rather
1235 than the base name.
1236
1237 """
1238
1239 _numeric_binds: bool = False
1240 """
1241 True if paramstyle is "numeric". This paramstyle is trickier than
1242 all the others.
1243
1244 """
1245
1246 _render_postcompile: bool = False
1247 """
1248 whether to render out POSTCOMPILE params during the compile phase.
1249
1250 This attribute is used only for end-user invocation of stmt.compile();
1251 it's never used for actual statement execution, where instead the
1252 dialect internals access and render the internal postcompile structure
1253 directly.
1254
1255 """
1256
1257 _post_compile_expanded_state: Optional[ExpandedState] = None
1258 """When render_postcompile is used, the ``ExpandedState`` used to create
1259 the "expanded" SQL is assigned here, and then used by the ``.params``
1260 accessor and ``.construct_params()`` methods for their return values.
1261
1262 .. versionadded:: 2.0.0rc1
1263
1264 """
1265
1266 _pre_expanded_string: Optional[str] = None
1267 """Stores the original string SQL before 'post_compile' is applied,
1268 for cases where 'post_compile' were used.
1269
1270 """
1271
1272 _pre_expanded_positiontup: Optional[List[str]] = None
1273
1274 _insertmanyvalues: Optional[_InsertManyValues] = None
1275
1276 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1277
1278 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1279 """bindparameter objects that are rendered as literal values at statement
1280 execution time.
1281
1282 """
1283
1284 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1285 """bindparameter objects that are rendered as bound parameter placeholders
1286 at statement execution time.
1287
1288 """
1289
1290 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1291 """Late escaping of bound parameter names that has to be converted
1292 to the original name when looking in the parameter dictionary.
1293
1294 """
1295
1296 has_out_parameters = False
1297 """if True, there are bindparam() objects that have the isoutparam
1298 flag set."""
1299
1300 postfetch_lastrowid = False
1301 """if True, and this in insert, use cursor.lastrowid to populate
1302 result.inserted_primary_key. """
1303
1304 _cache_key_bind_match: Optional[
1305 Tuple[
1306 Dict[
1307 BindParameter[Any],
1308 List[BindParameter[Any]],
1309 ],
1310 Dict[
1311 str,
1312 BindParameter[Any],
1313 ],
1314 ]
1315 ] = None
1316 """a mapping that will relate the BindParameter object we compile
1317 to those that are part of the extracted collection of parameters
1318 in the cache key, if we were given a cache key.
1319
1320 """
1321
1322 positiontup: Optional[List[str]] = None
1323 """for a compiled construct that uses a positional paramstyle, will be
1324 a sequence of strings, indicating the names of bound parameters in order.
1325
1326 This is used in order to render bound parameters in their correct order,
1327 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1328 render parameters.
1329
1330 This sequence always contains the unescaped name of the parameters.
1331
1332 .. seealso::
1333
1334 :ref:`faq_sql_expression_string` - includes a usage example for
1335 debugging use cases.
1336
1337 """
1338 _values_bindparam: Optional[List[str]] = None
1339
1340 _visited_bindparam: Optional[List[str]] = None
1341
1342 inline: bool = False
1343
1344 ctes: Optional[MutableMapping[CTE, str]]
1345
1346 # Detect same CTE references - Dict[(level, name), cte]
1347 # Level is required for supporting nesting
1348 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1349
1350 # To retrieve key/level in ctes_by_level_name -
1351 # Dict[cte_reference, (level, cte_name, cte_opts)]
1352 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1353
1354 ctes_recursive: bool
1355
1356 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1357 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1358 _positional_pattern = re.compile(
1359 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1360 )
1361
1362 @classmethod
1363 def _init_compiler_cls(cls):
1364 cls._init_bind_translate()
1365
1366 @classmethod
1367 def _init_bind_translate(cls):
1368 reg = re.escape("".join(cls.bindname_escape_characters))
1369 cls._bind_translate_re = re.compile(f"[{reg}]")
1370 cls._bind_translate_chars = cls.bindname_escape_characters
1371
1372 def __init__(
1373 self,
1374 dialect: Dialect,
1375 statement: Optional[ClauseElement],
1376 cache_key: Optional[CacheKey] = None,
1377 column_keys: Optional[Sequence[str]] = None,
1378 for_executemany: bool = False,
1379 linting: Linting = NO_LINTING,
1380 _supporting_against: Optional[SQLCompiler] = None,
1381 **kwargs: Any,
1382 ):
1383 """Construct a new :class:`.SQLCompiler` object.
1384
1385 :param dialect: :class:`.Dialect` to be used
1386
1387 :param statement: :class:`_expression.ClauseElement` to be compiled
1388
1389 :param column_keys: a list of column names to be compiled into an
1390 INSERT or UPDATE statement.
1391
1392 :param for_executemany: whether INSERT / UPDATE statements should
1393 expect that they are to be invoked in an "executemany" style,
1394 which may impact how the statement will be expected to return the
1395 values of defaults and autoincrement / sequences and similar.
1396 Depending on the backend and driver in use, support for retrieving
1397 these values may be disabled which means SQL expressions may
1398 be rendered inline, RETURNING may not be rendered, etc.
1399
1400 :param kwargs: additional keyword arguments to be consumed by the
1401 superclass.
1402
1403 """
1404 self.column_keys = column_keys
1405
1406 self.cache_key = cache_key
1407
1408 if cache_key:
1409 cksm = {b.key: b for b in cache_key[1]}
1410 ckbm = {b: [b] for b in cache_key[1]}
1411 self._cache_key_bind_match = (ckbm, cksm)
1412
1413 # compile INSERT/UPDATE defaults/sequences to expect executemany
1414 # style execution, which may mean no pre-execute of defaults,
1415 # or no RETURNING
1416 self.for_executemany = for_executemany
1417
1418 self.linting = linting
1419
1420 # a dictionary of bind parameter keys to BindParameter
1421 # instances.
1422 self.binds = {}
1423
1424 # a dictionary of BindParameter instances to "compiled" names
1425 # that are actually present in the generated SQL
1426 self.bind_names = util.column_dict()
1427
1428 # stack which keeps track of nested SELECT statements
1429 self.stack = []
1430
1431 self._result_columns = []
1432
1433 # true if the paramstyle is positional
1434 self.positional = dialect.positional
1435 if self.positional:
1436 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1437 if nb:
1438 self._numeric_binds_identifier_char = (
1439 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1440 )
1441
1442 self.compilation_bindtemplate = _pyformat_template
1443 else:
1444 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1445
1446 self.ctes = None
1447
1448 self.label_length = (
1449 dialect.label_length or dialect.max_identifier_length
1450 )
1451
1452 # a map which tracks "anonymous" identifiers that are created on
1453 # the fly here
1454 self.anon_map = prefix_anon_map()
1455
1456 # a map which tracks "truncated" names based on
1457 # dialect.label_length or dialect.max_identifier_length
1458 self.truncated_names: Dict[Tuple[str, str], str] = {}
1459 self._truncated_counters: Dict[str, int] = {}
1460
1461 Compiled.__init__(self, dialect, statement, **kwargs)
1462
1463 if self.isinsert or self.isupdate or self.isdelete:
1464 if TYPE_CHECKING:
1465 assert isinstance(statement, UpdateBase)
1466
1467 if self.isinsert or self.isupdate:
1468 if TYPE_CHECKING:
1469 assert isinstance(statement, ValuesBase)
1470 if statement._inline:
1471 self.inline = True
1472 elif self.for_executemany and (
1473 not self.isinsert
1474 or (
1475 self.dialect.insert_executemany_returning
1476 and statement._return_defaults
1477 )
1478 ):
1479 self.inline = True
1480
1481 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1482
1483 if _supporting_against:
1484 self.__dict__.update(
1485 {
1486 k: v
1487 for k, v in _supporting_against.__dict__.items()
1488 if k
1489 not in {
1490 "state",
1491 "dialect",
1492 "preparer",
1493 "positional",
1494 "_numeric_binds",
1495 "compilation_bindtemplate",
1496 "bindtemplate",
1497 }
1498 }
1499 )
1500
1501 if self.state is CompilerState.STRING_APPLIED:
1502 if self.positional:
1503 if self._numeric_binds:
1504 self._process_numeric()
1505 else:
1506 self._process_positional()
1507
1508 if self._render_postcompile:
1509 parameters = self.construct_params(
1510 escape_names=False,
1511 _no_postcompile=True,
1512 )
1513
1514 self._process_parameters_for_postcompile(
1515 parameters, _populate_self=True
1516 )
1517
1518 @property
1519 def insert_single_values_expr(self) -> Optional[str]:
1520 """When an INSERT is compiled with a single set of parameters inside
1521 a VALUES expression, the string is assigned here, where it can be
1522 used for insert batching schemes to rewrite the VALUES expression.
1523
1524 .. versionadded:: 1.3.8
1525
1526 .. versionchanged:: 2.0 This collection is no longer used by
1527 SQLAlchemy's built-in dialects, in favor of the currently
1528 internal ``_insertmanyvalues`` collection that is used only by
1529 :class:`.SQLCompiler`.
1530
1531 """
1532 if self._insertmanyvalues is None:
1533 return None
1534 else:
1535 return self._insertmanyvalues.single_values_expr
1536
1537 @util.ro_memoized_property
1538 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1539 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1540
1541 This is either the so-called "implicit returning" columns which are
1542 calculated by the compiler on the fly, or those present based on what's
1543 present in ``self.statement._returning`` (expanded into individual
1544 columns using the ``._all_selected_columns`` attribute) i.e. those set
1545 explicitly using the :meth:`.UpdateBase.returning` method.
1546
1547 .. versionadded:: 2.0
1548
1549 """
1550 if self.implicit_returning:
1551 return self.implicit_returning
1552 elif self.statement is not None and is_dml(self.statement):
1553 return [
1554 c
1555 for c in self.statement._all_selected_columns
1556 if is_column_element(c)
1557 ]
1558
1559 else:
1560 return None
1561
1562 @property
1563 def returning(self):
1564 """backwards compatibility; returns the
1565 effective_returning collection.
1566
1567 """
1568 return self.effective_returning
1569
1570 @property
1571 def current_executable(self):
1572 """Return the current 'executable' that is being compiled.
1573
1574 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1575 :class:`_sql.Update`, :class:`_sql.Delete`,
1576 :class:`_sql.CompoundSelect` object that is being compiled.
1577 Specifically it's assigned to the ``self.stack`` list of elements.
1578
1579 When a statement like the above is being compiled, it normally
1580 is also assigned to the ``.statement`` attribute of the
1581 :class:`_sql.Compiler` object. However, all SQL constructs are
1582 ultimately nestable, and this attribute should never be consulted
1583 by a ``visit_`` method, as it is not guaranteed to be assigned
1584 nor guaranteed to correspond to the current statement being compiled.
1585
1586 .. versionadded:: 1.3.21
1587
1588 For compatibility with previous versions, use the following
1589 recipe::
1590
1591 statement = getattr(self, "current_executable", False)
1592 if statement is False:
1593 statement = self.stack[-1]["selectable"]
1594
1595 For versions 1.4 and above, ensure only .current_executable
1596 is used; the format of "self.stack" may change.
1597
1598
1599 """
1600 try:
1601 return self.stack[-1]["selectable"]
1602 except IndexError as ie:
1603 raise IndexError("Compiler does not have a stack entry") from ie
1604
1605 @property
1606 def prefetch(self):
1607 return list(self.insert_prefetch) + list(self.update_prefetch)
1608
1609 @util.memoized_property
1610 def _global_attributes(self) -> Dict[Any, Any]:
1611 return {}
1612
1613 @util.memoized_instancemethod
1614 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1615 """Initialize collections related to CTEs only if
1616 a CTE is located, to save on the overhead of
1617 these collections otherwise.
1618
1619 """
1620 # collect CTEs to tack on top of a SELECT
1621 # To store the query to print - Dict[cte, text_query]
1622 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1623 self.ctes = ctes
1624
1625 # Detect same CTE references - Dict[(level, name), cte]
1626 # Level is required for supporting nesting
1627 self.ctes_by_level_name = {}
1628
1629 # To retrieve key/level in ctes_by_level_name -
1630 # Dict[cte_reference, (level, cte_name, cte_opts)]
1631 self.level_name_by_cte = {}
1632
1633 self.ctes_recursive = False
1634
1635 return ctes
1636
1637 @contextlib.contextmanager
1638 def _nested_result(self):
1639 """special API to support the use case of 'nested result sets'"""
1640 result_columns, ordered_columns = (
1641 self._result_columns,
1642 self._ordered_columns,
1643 )
1644 self._result_columns, self._ordered_columns = [], False
1645
1646 try:
1647 if self.stack:
1648 entry = self.stack[-1]
1649 entry["need_result_map_for_nested"] = True
1650 else:
1651 entry = None
1652 yield self._result_columns, self._ordered_columns
1653 finally:
1654 if entry:
1655 entry.pop("need_result_map_for_nested")
1656 self._result_columns, self._ordered_columns = (
1657 result_columns,
1658 ordered_columns,
1659 )
1660
1661 def _process_positional(self):
1662 assert not self.positiontup
1663 assert self.state is CompilerState.STRING_APPLIED
1664 assert not self._numeric_binds
1665
1666 if self.dialect.paramstyle == "format":
1667 placeholder = "%s"
1668 else:
1669 assert self.dialect.paramstyle == "qmark"
1670 placeholder = "?"
1671
1672 positions = []
1673
1674 def find_position(m: re.Match[str]) -> str:
1675 normal_bind = m.group(1)
1676 if normal_bind:
1677 positions.append(normal_bind)
1678 return placeholder
1679 else:
1680 # this a post-compile bind
1681 positions.append(m.group(2))
1682 return m.group(0)
1683
1684 self.string = re.sub(
1685 self._positional_pattern, find_position, self.string
1686 )
1687
1688 if self.escaped_bind_names:
1689 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1690 assert len(self.escaped_bind_names) == len(reverse_escape)
1691 self.positiontup = [
1692 reverse_escape.get(name, name) for name in positions
1693 ]
1694 else:
1695 self.positiontup = positions
1696
1697 if self._insertmanyvalues:
1698 positions = []
1699
1700 single_values_expr = re.sub(
1701 self._positional_pattern,
1702 find_position,
1703 self._insertmanyvalues.single_values_expr,
1704 )
1705 insert_crud_params = [
1706 (
1707 v[0],
1708 v[1],
1709 re.sub(self._positional_pattern, find_position, v[2]),
1710 v[3],
1711 )
1712 for v in self._insertmanyvalues.insert_crud_params
1713 ]
1714
1715 self._insertmanyvalues = self._insertmanyvalues._replace(
1716 single_values_expr=single_values_expr,
1717 insert_crud_params=insert_crud_params,
1718 )
1719
1720 def _process_numeric(self):
1721 assert self._numeric_binds
1722 assert self.state is CompilerState.STRING_APPLIED
1723
1724 num = 1
1725 param_pos: Dict[str, str] = {}
1726 order: Iterable[str]
1727 if self._insertmanyvalues and self._values_bindparam is not None:
1728 # bindparams that are not in values are always placed first.
1729 # this avoids the need of changing them when using executemany
1730 # values () ()
1731 order = itertools.chain(
1732 (
1733 name
1734 for name in self.bind_names.values()
1735 if name not in self._values_bindparam
1736 ),
1737 self.bind_names.values(),
1738 )
1739 else:
1740 order = self.bind_names.values()
1741
1742 for bind_name in order:
1743 if bind_name in param_pos:
1744 continue
1745 bind = self.binds[bind_name]
1746 if (
1747 bind in self.post_compile_params
1748 or bind in self.literal_execute_params
1749 ):
1750 # set to None to just mark the in positiontup, it will not
1751 # be replaced below.
1752 param_pos[bind_name] = None # type: ignore
1753 else:
1754 ph = f"{self._numeric_binds_identifier_char}{num}"
1755 num += 1
1756 param_pos[bind_name] = ph
1757
1758 self.next_numeric_pos = num
1759
1760 self.positiontup = list(param_pos)
1761 if self.escaped_bind_names:
1762 len_before = len(param_pos)
1763 param_pos = {
1764 self.escaped_bind_names.get(name, name): pos
1765 for name, pos in param_pos.items()
1766 }
1767 assert len(param_pos) == len_before
1768
1769 # Can't use format here since % chars are not escaped.
1770 self.string = self._pyformat_pattern.sub(
1771 lambda m: param_pos[m.group(1)], self.string
1772 )
1773
1774 if self._insertmanyvalues:
1775 single_values_expr = (
1776 # format is ok here since single_values_expr includes only
1777 # place-holders
1778 self._insertmanyvalues.single_values_expr
1779 % param_pos
1780 )
1781 insert_crud_params = [
1782 (v[0], v[1], "%s", v[3])
1783 for v in self._insertmanyvalues.insert_crud_params
1784 ]
1785
1786 self._insertmanyvalues = self._insertmanyvalues._replace(
1787 # This has the numbers (:1, :2)
1788 single_values_expr=single_values_expr,
1789 # The single binds are instead %s so they can be formatted
1790 insert_crud_params=insert_crud_params,
1791 )
1792
1793 @util.memoized_property
1794 def _bind_processors(
1795 self,
1796 ) -> MutableMapping[
1797 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1798 ]:
1799 # mypy is not able to see the two value types as the above Union,
1800 # it just sees "object". don't know how to resolve
1801 return {
1802 key: value # type: ignore
1803 for key, value in (
1804 (
1805 self.bind_names[bindparam],
1806 (
1807 bindparam.type._cached_bind_processor(self.dialect)
1808 if not bindparam.type._is_tuple_type
1809 else tuple(
1810 elem_type._cached_bind_processor(self.dialect)
1811 for elem_type in cast(
1812 TupleType, bindparam.type
1813 ).types
1814 )
1815 ),
1816 )
1817 for bindparam in self.bind_names
1818 )
1819 if value is not None
1820 }
1821
1822 def is_subquery(self):
1823 return len(self.stack) > 1
1824
1825 @property
1826 def sql_compiler(self) -> Self:
1827 return self
1828
1829 def construct_expanded_state(
1830 self,
1831 params: Optional[_CoreSingleExecuteParams] = None,
1832 escape_names: bool = True,
1833 ) -> ExpandedState:
1834 """Return a new :class:`.ExpandedState` for a given parameter set.
1835
1836 For queries that use "expanding" or other late-rendered parameters,
1837 this method will provide for both the finalized SQL string as well
1838 as the parameters that would be used for a particular parameter set.
1839
1840 .. versionadded:: 2.0.0rc1
1841
1842 """
1843 parameters = self.construct_params(
1844 params,
1845 escape_names=escape_names,
1846 _no_postcompile=True,
1847 )
1848 return self._process_parameters_for_postcompile(
1849 parameters,
1850 )
1851
1852 def construct_params(
1853 self,
1854 params: Optional[_CoreSingleExecuteParams] = None,
1855 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1856 escape_names: bool = True,
1857 _group_number: Optional[int] = None,
1858 _check: bool = True,
1859 _no_postcompile: bool = False,
1860 ) -> _MutableCoreSingleExecuteParams:
1861 """return a dictionary of bind parameter keys and values"""
1862
1863 if self._render_postcompile and not _no_postcompile:
1864 assert self._post_compile_expanded_state is not None
1865 if not params:
1866 return dict(self._post_compile_expanded_state.parameters)
1867 else:
1868 raise exc.InvalidRequestError(
1869 "can't construct new parameters when render_postcompile "
1870 "is used; the statement is hard-linked to the original "
1871 "parameters. Use construct_expanded_state to generate a "
1872 "new statement and parameters."
1873 )
1874
1875 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1876
1877 if extracted_parameters:
1878 # related the bound parameters collected in the original cache key
1879 # to those collected in the incoming cache key. They will not have
1880 # matching names but they will line up positionally in the same
1881 # way. The parameters present in self.bind_names may be clones of
1882 # these original cache key params in the case of DML but the .key
1883 # will be guaranteed to match.
1884 if self.cache_key is None:
1885 raise exc.CompileError(
1886 "This compiled object has no original cache key; "
1887 "can't pass extracted_parameters to construct_params"
1888 )
1889 else:
1890 orig_extracted = self.cache_key[1]
1891
1892 ckbm_tuple = self._cache_key_bind_match
1893 assert ckbm_tuple is not None
1894 ckbm, _ = ckbm_tuple
1895 resolved_extracted = {
1896 bind: extracted
1897 for b, extracted in zip(orig_extracted, extracted_parameters)
1898 for bind in ckbm[b]
1899 }
1900 else:
1901 resolved_extracted = None
1902
1903 if params:
1904 pd = {}
1905 for bindparam, name in self.bind_names.items():
1906 escaped_name = (
1907 self.escaped_bind_names.get(name, name)
1908 if has_escaped_names
1909 else name
1910 )
1911
1912 if bindparam.key in params:
1913 pd[escaped_name] = params[bindparam.key]
1914 elif name in params:
1915 pd[escaped_name] = params[name]
1916
1917 elif _check and bindparam.required:
1918 if _group_number:
1919 raise exc.InvalidRequestError(
1920 "A value is required for bind parameter %r, "
1921 "in parameter group %d"
1922 % (bindparam.key, _group_number),
1923 code="cd3x",
1924 )
1925 else:
1926 raise exc.InvalidRequestError(
1927 "A value is required for bind parameter %r"
1928 % bindparam.key,
1929 code="cd3x",
1930 )
1931 else:
1932 if resolved_extracted:
1933 value_param = resolved_extracted.get(
1934 bindparam, bindparam
1935 )
1936 else:
1937 value_param = bindparam
1938
1939 if bindparam.callable:
1940 pd[escaped_name] = value_param.effective_value
1941 else:
1942 pd[escaped_name] = value_param.value
1943 return pd
1944 else:
1945 pd = {}
1946 for bindparam, name in self.bind_names.items():
1947 escaped_name = (
1948 self.escaped_bind_names.get(name, name)
1949 if has_escaped_names
1950 else name
1951 )
1952
1953 if _check and bindparam.required:
1954 if _group_number:
1955 raise exc.InvalidRequestError(
1956 "A value is required for bind parameter %r, "
1957 "in parameter group %d"
1958 % (bindparam.key, _group_number),
1959 code="cd3x",
1960 )
1961 else:
1962 raise exc.InvalidRequestError(
1963 "A value is required for bind parameter %r"
1964 % bindparam.key,
1965 code="cd3x",
1966 )
1967
1968 if resolved_extracted:
1969 value_param = resolved_extracted.get(bindparam, bindparam)
1970 else:
1971 value_param = bindparam
1972
1973 if bindparam.callable:
1974 pd[escaped_name] = value_param.effective_value
1975 else:
1976 pd[escaped_name] = value_param.value
1977
1978 return pd
1979
1980 @util.memoized_instancemethod
1981 def _get_set_input_sizes_lookup(self):
1982 dialect = self.dialect
1983
1984 include_types = dialect.include_set_input_sizes
1985 exclude_types = dialect.exclude_set_input_sizes
1986
1987 dbapi = dialect.dbapi
1988
1989 def lookup_type(typ):
1990 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
1991
1992 if (
1993 dbtype is not None
1994 and (exclude_types is None or dbtype not in exclude_types)
1995 and (include_types is None or dbtype in include_types)
1996 ):
1997 return dbtype
1998 else:
1999 return None
2000
2001 inputsizes = {}
2002
2003 literal_execute_params = self.literal_execute_params
2004
2005 for bindparam in self.bind_names:
2006 if bindparam in literal_execute_params:
2007 continue
2008
2009 if bindparam.type._is_tuple_type:
2010 inputsizes[bindparam] = [
2011 lookup_type(typ)
2012 for typ in cast(TupleType, bindparam.type).types
2013 ]
2014 else:
2015 inputsizes[bindparam] = lookup_type(bindparam.type)
2016
2017 return inputsizes
2018
2019 @property
2020 def params(self):
2021 """Return the bind param dictionary embedded into this
2022 compiled object, for those values that are present.
2023
2024 .. seealso::
2025
2026 :ref:`faq_sql_expression_string` - includes a usage example for
2027 debugging use cases.
2028
2029 """
2030 return self.construct_params(_check=False)
2031
2032 def _process_parameters_for_postcompile(
2033 self,
2034 parameters: _MutableCoreSingleExecuteParams,
2035 _populate_self: bool = False,
2036 ) -> ExpandedState:
2037 """handle special post compile parameters.
2038
2039 These include:
2040
2041 * "expanding" parameters -typically IN tuples that are rendered
2042 on a per-parameter basis for an otherwise fixed SQL statement string.
2043
2044 * literal_binds compiled with the literal_execute flag. Used for
2045 things like SQL Server "TOP N" where the driver does not accommodate
2046 N as a bound parameter.
2047
2048 """
2049
2050 expanded_parameters = {}
2051 new_positiontup: Optional[List[str]]
2052
2053 pre_expanded_string = self._pre_expanded_string
2054 if pre_expanded_string is None:
2055 pre_expanded_string = self.string
2056
2057 if self.positional:
2058 new_positiontup = []
2059
2060 pre_expanded_positiontup = self._pre_expanded_positiontup
2061 if pre_expanded_positiontup is None:
2062 pre_expanded_positiontup = self.positiontup
2063
2064 else:
2065 new_positiontup = pre_expanded_positiontup = None
2066
2067 processors = self._bind_processors
2068 single_processors = cast(
2069 "Mapping[str, _BindProcessorType[Any]]", processors
2070 )
2071 tuple_processors = cast(
2072 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2073 )
2074
2075 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2076
2077 replacement_expressions: Dict[str, Any] = {}
2078 to_update_sets: Dict[str, Any] = {}
2079
2080 # notes:
2081 # *unescaped* parameter names in:
2082 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2083 #
2084 # *escaped* parameter names in:
2085 # construct_params(), replacement_expressions
2086
2087 numeric_positiontup: Optional[List[str]] = None
2088
2089 if self.positional and pre_expanded_positiontup is not None:
2090 names: Iterable[str] = pre_expanded_positiontup
2091 if self._numeric_binds:
2092 numeric_positiontup = []
2093 else:
2094 names = self.bind_names.values()
2095
2096 ebn = self.escaped_bind_names
2097 for name in names:
2098 escaped_name = ebn.get(name, name) if ebn else name
2099 parameter = self.binds[name]
2100
2101 if parameter in self.literal_execute_params:
2102 if escaped_name not in replacement_expressions:
2103 replacement_expressions[escaped_name] = (
2104 self.render_literal_bindparam(
2105 parameter,
2106 render_literal_value=parameters.pop(escaped_name),
2107 )
2108 )
2109 continue
2110
2111 if parameter in self.post_compile_params:
2112 if escaped_name in replacement_expressions:
2113 to_update = to_update_sets[escaped_name]
2114 values = None
2115 else:
2116 # we are removing the parameter from parameters
2117 # because it is a list value, which is not expected by
2118 # TypeEngine objects that would otherwise be asked to
2119 # process it. the single name is being replaced with
2120 # individual numbered parameters for each value in the
2121 # param.
2122 #
2123 # note we are also inserting *escaped* parameter names
2124 # into the given dictionary. default dialect will
2125 # use these param names directly as they will not be
2126 # in the escaped_bind_names dictionary.
2127 values = parameters.pop(name)
2128
2129 leep_res = self._literal_execute_expanding_parameter(
2130 escaped_name, parameter, values
2131 )
2132 (to_update, replacement_expr) = leep_res
2133
2134 to_update_sets[escaped_name] = to_update
2135 replacement_expressions[escaped_name] = replacement_expr
2136
2137 if not parameter.literal_execute:
2138 parameters.update(to_update)
2139 if parameter.type._is_tuple_type:
2140 assert values is not None
2141 new_processors.update(
2142 (
2143 "%s_%s_%s" % (name, i, j),
2144 tuple_processors[name][j - 1],
2145 )
2146 for i, tuple_element in enumerate(values, 1)
2147 for j, _ in enumerate(tuple_element, 1)
2148 if name in tuple_processors
2149 and tuple_processors[name][j - 1] is not None
2150 )
2151 else:
2152 new_processors.update(
2153 (key, single_processors[name])
2154 for key, _ in to_update
2155 if name in single_processors
2156 )
2157 if numeric_positiontup is not None:
2158 numeric_positiontup.extend(
2159 name for name, _ in to_update
2160 )
2161 elif new_positiontup is not None:
2162 # to_update has escaped names, but that's ok since
2163 # these are new names, that aren't in the
2164 # escaped_bind_names dict.
2165 new_positiontup.extend(name for name, _ in to_update)
2166 expanded_parameters[name] = [
2167 expand_key for expand_key, _ in to_update
2168 ]
2169 elif new_positiontup is not None:
2170 new_positiontup.append(name)
2171
2172 def process_expanding(m):
2173 key = m.group(1)
2174 expr = replacement_expressions[key]
2175
2176 # if POSTCOMPILE included a bind_expression, render that
2177 # around each element
2178 if m.group(2):
2179 tok = m.group(2).split("~~")
2180 be_left, be_right = tok[1], tok[3]
2181 expr = ", ".join(
2182 "%s%s%s" % (be_left, exp, be_right)
2183 for exp in expr.split(", ")
2184 )
2185 return expr
2186
2187 statement = re.sub(
2188 self._post_compile_pattern, process_expanding, pre_expanded_string
2189 )
2190
2191 if numeric_positiontup is not None:
2192 assert new_positiontup is not None
2193 param_pos = {
2194 key: f"{self._numeric_binds_identifier_char}{num}"
2195 for num, key in enumerate(
2196 numeric_positiontup, self.next_numeric_pos
2197 )
2198 }
2199 # Can't use format here since % chars are not escaped.
2200 statement = self._pyformat_pattern.sub(
2201 lambda m: param_pos[m.group(1)], statement
2202 )
2203 new_positiontup.extend(numeric_positiontup)
2204
2205 expanded_state = ExpandedState(
2206 statement,
2207 parameters,
2208 new_processors,
2209 new_positiontup,
2210 expanded_parameters,
2211 )
2212
2213 if _populate_self:
2214 # this is for the "render_postcompile" flag, which is not
2215 # otherwise used internally and is for end-user debugging and
2216 # special use cases.
2217 self._pre_expanded_string = pre_expanded_string
2218 self._pre_expanded_positiontup = pre_expanded_positiontup
2219 self.string = expanded_state.statement
2220 self.positiontup = (
2221 list(expanded_state.positiontup or ())
2222 if self.positional
2223 else None
2224 )
2225 self._post_compile_expanded_state = expanded_state
2226
2227 return expanded_state
2228
2229 @util.preload_module("sqlalchemy.engine.cursor")
2230 def _create_result_map(self):
2231 """utility method used for unit tests only."""
2232 cursor = util.preloaded.engine_cursor
2233 return cursor.CursorResultMetaData._create_description_match_map(
2234 self._result_columns
2235 )
2236
2237 # assigned by crud.py for insert/update statements
2238 _get_bind_name_for_col: _BindNameForColProtocol
2239
2240 @util.memoized_property
2241 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2242 getter = self._get_bind_name_for_col
2243 return getter
2244
2245 @util.memoized_property
2246 @util.preload_module("sqlalchemy.engine.result")
2247 def _inserted_primary_key_from_lastrowid_getter(self):
2248 result = util.preloaded.engine_result
2249
2250 param_key_getter = self._within_exec_param_key_getter
2251
2252 assert self.compile_state is not None
2253 statement = self.compile_state.statement
2254
2255 if TYPE_CHECKING:
2256 assert isinstance(statement, Insert)
2257
2258 table = statement.table
2259
2260 getters = [
2261 (operator.methodcaller("get", param_key_getter(col), None), col)
2262 for col in table.primary_key
2263 ]
2264
2265 autoinc_getter = None
2266 autoinc_col = table._autoincrement_column
2267 if autoinc_col is not None:
2268 # apply type post processors to the lastrowid
2269 lastrowid_processor = autoinc_col.type._cached_result_processor(
2270 self.dialect, None
2271 )
2272 autoinc_key = param_key_getter(autoinc_col)
2273
2274 # if a bind value is present for the autoincrement column
2275 # in the parameters, we need to do the logic dictated by
2276 # #7998; honor a non-None user-passed parameter over lastrowid.
2277 # previously in the 1.4 series we weren't fetching lastrowid
2278 # at all if the key were present in the parameters
2279 if autoinc_key in self.binds:
2280
2281 def _autoinc_getter(lastrowid, parameters):
2282 param_value = parameters.get(autoinc_key, lastrowid)
2283 if param_value is not None:
2284 # they supplied non-None parameter, use that.
2285 # SQLite at least is observed to return the wrong
2286 # cursor.lastrowid for INSERT..ON CONFLICT so it
2287 # can't be used in all cases
2288 return param_value
2289 else:
2290 # use lastrowid
2291 return lastrowid
2292
2293 # work around mypy https://github.com/python/mypy/issues/14027
2294 autoinc_getter = _autoinc_getter
2295
2296 else:
2297 lastrowid_processor = None
2298
2299 row_fn = result.result_tuple([col.key for col in table.primary_key])
2300
2301 def get(lastrowid, parameters):
2302 """given cursor.lastrowid value and the parameters used for INSERT,
2303 return a "row" that represents the primary key, either by
2304 using the "lastrowid" or by extracting values from the parameters
2305 that were sent along with the INSERT.
2306
2307 """
2308 if lastrowid_processor is not None:
2309 lastrowid = lastrowid_processor(lastrowid)
2310
2311 if lastrowid is None:
2312 return row_fn(getter(parameters) for getter, col in getters)
2313 else:
2314 return row_fn(
2315 (
2316 (
2317 autoinc_getter(lastrowid, parameters)
2318 if autoinc_getter is not None
2319 else lastrowid
2320 )
2321 if col is autoinc_col
2322 else getter(parameters)
2323 )
2324 for getter, col in getters
2325 )
2326
2327 return get
2328
2329 @util.memoized_property
2330 @util.preload_module("sqlalchemy.engine.result")
2331 def _inserted_primary_key_from_returning_getter(self):
2332 result = util.preloaded.engine_result
2333
2334 assert self.compile_state is not None
2335 statement = self.compile_state.statement
2336
2337 if TYPE_CHECKING:
2338 assert isinstance(statement, Insert)
2339
2340 param_key_getter = self._within_exec_param_key_getter
2341 table = statement.table
2342
2343 returning = self.implicit_returning
2344 assert returning is not None
2345 ret = {col: idx for idx, col in enumerate(returning)}
2346
2347 getters = cast(
2348 "List[Tuple[Callable[[Any], Any], bool]]",
2349 [
2350 (
2351 (operator.itemgetter(ret[col]), True)
2352 if col in ret
2353 else (
2354 operator.methodcaller(
2355 "get", param_key_getter(col), None
2356 ),
2357 False,
2358 )
2359 )
2360 for col in table.primary_key
2361 ],
2362 )
2363
2364 row_fn = result.result_tuple([col.key for col in table.primary_key])
2365
2366 def get(row, parameters):
2367 return row_fn(
2368 getter(row) if use_row else getter(parameters)
2369 for getter, use_row in getters
2370 )
2371
2372 return get
2373
2374 def default_from(self) -> str:
2375 """Called when a SELECT statement has no froms, and no FROM clause is
2376 to be appended.
2377
2378 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2379 output.
2380
2381 """
2382 return ""
2383
2384 def visit_override_binds(self, override_binds, **kw):
2385 """SQL compile the nested element of an _OverrideBinds with
2386 bindparams swapped out.
2387
2388 The _OverrideBinds is not normally expected to be compiled; it
2389 is meant to be used when an already cached statement is to be used,
2390 the compilation was already performed, and only the bound params should
2391 be swapped in at execution time.
2392
2393 However, there are test cases that exericise this object, and
2394 additionally the ORM subquery loader is known to feed in expressions
2395 which include this construct into new queries (discovered in #11173),
2396 so it has to do the right thing at compile time as well.
2397
2398 """
2399
2400 # get SQL text first
2401 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2402
2403 # for a test compile that is not for caching, change binds after the
2404 # fact. note that we don't try to
2405 # swap the bindparam as we compile, because our element may be
2406 # elsewhere in the statement already (e.g. a subquery or perhaps a
2407 # CTE) and was already visited / compiled. See
2408 # test_relationship_criteria.py ->
2409 # test_selectinload_local_criteria_subquery
2410 for k in override_binds.translate:
2411 if k not in self.binds:
2412 continue
2413 bp = self.binds[k]
2414
2415 # so this would work, just change the value of bp in place.
2416 # but we dont want to mutate things outside.
2417 # bp.value = override_binds.translate[bp.key]
2418 # continue
2419
2420 # instead, need to replace bp with new_bp or otherwise accommodate
2421 # in all internal collections
2422 new_bp = bp._with_value(
2423 override_binds.translate[bp.key],
2424 maintain_key=True,
2425 required=False,
2426 )
2427
2428 name = self.bind_names[bp]
2429 self.binds[k] = self.binds[name] = new_bp
2430 self.bind_names[new_bp] = name
2431 self.bind_names.pop(bp, None)
2432
2433 if bp in self.post_compile_params:
2434 self.post_compile_params |= {new_bp}
2435 if bp in self.literal_execute_params:
2436 self.literal_execute_params |= {new_bp}
2437
2438 ckbm_tuple = self._cache_key_bind_match
2439 if ckbm_tuple:
2440 ckbm, cksm = ckbm_tuple
2441 for bp in bp._cloned_set:
2442 if bp.key in cksm:
2443 cb = cksm[bp.key]
2444 ckbm[cb].append(new_bp)
2445
2446 return sqltext
2447
2448 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2449 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2450
2451 def visit_select_statement_grouping(self, grouping, **kwargs):
2452 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2453
2454 def visit_label_reference(
2455 self, element, within_columns_clause=False, **kwargs
2456 ):
2457 if self.stack and self.dialect.supports_simple_order_by_label:
2458 try:
2459 compile_state = cast(
2460 "Union[SelectState, CompoundSelectState]",
2461 self.stack[-1]["compile_state"],
2462 )
2463 except KeyError as ke:
2464 raise exc.CompileError(
2465 "Can't resolve label reference for ORDER BY / "
2466 "GROUP BY / DISTINCT etc."
2467 ) from ke
2468
2469 (
2470 with_cols,
2471 only_froms,
2472 only_cols,
2473 ) = compile_state._label_resolve_dict
2474 if within_columns_clause:
2475 resolve_dict = only_froms
2476 else:
2477 resolve_dict = only_cols
2478
2479 # this can be None in the case that a _label_reference()
2480 # were subject to a replacement operation, in which case
2481 # the replacement of the Label element may have changed
2482 # to something else like a ColumnClause expression.
2483 order_by_elem = element.element._order_by_label_element
2484
2485 if (
2486 order_by_elem is not None
2487 and order_by_elem.name in resolve_dict
2488 and order_by_elem.shares_lineage(
2489 resolve_dict[order_by_elem.name]
2490 )
2491 ):
2492 kwargs["render_label_as_label"] = (
2493 element.element._order_by_label_element
2494 )
2495 return self.process(
2496 element.element,
2497 within_columns_clause=within_columns_clause,
2498 **kwargs,
2499 )
2500
2501 def visit_textual_label_reference(
2502 self, element, within_columns_clause=False, **kwargs
2503 ):
2504 if not self.stack:
2505 # compiling the element outside of the context of a SELECT
2506 return self.process(element._text_clause)
2507
2508 try:
2509 compile_state = cast(
2510 "Union[SelectState, CompoundSelectState]",
2511 self.stack[-1]["compile_state"],
2512 )
2513 except KeyError as ke:
2514 coercions._no_text_coercion(
2515 element.element,
2516 extra=(
2517 "Can't resolve label reference for ORDER BY / "
2518 "GROUP BY / DISTINCT etc."
2519 ),
2520 exc_cls=exc.CompileError,
2521 err=ke,
2522 )
2523
2524 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2525 try:
2526 if within_columns_clause:
2527 col = only_froms[element.element]
2528 else:
2529 col = with_cols[element.element]
2530 except KeyError as err:
2531 coercions._no_text_coercion(
2532 element.element,
2533 extra=(
2534 "Can't resolve label reference for ORDER BY / "
2535 "GROUP BY / DISTINCT etc."
2536 ),
2537 exc_cls=exc.CompileError,
2538 err=err,
2539 )
2540 else:
2541 kwargs["render_label_as_label"] = col
2542 return self.process(
2543 col, within_columns_clause=within_columns_clause, **kwargs
2544 )
2545
2546 def visit_label(
2547 self,
2548 label,
2549 add_to_result_map=None,
2550 within_label_clause=False,
2551 within_columns_clause=False,
2552 render_label_as_label=None,
2553 result_map_targets=(),
2554 **kw,
2555 ):
2556 # only render labels within the columns clause
2557 # or ORDER BY clause of a select. dialect-specific compilers
2558 # can modify this behavior.
2559 render_label_with_as = (
2560 within_columns_clause and not within_label_clause
2561 )
2562 render_label_only = render_label_as_label is label
2563
2564 if render_label_only or render_label_with_as:
2565 if isinstance(label.name, elements._truncated_label):
2566 labelname = self._truncated_identifier("colident", label.name)
2567 else:
2568 labelname = label.name
2569
2570 if render_label_with_as:
2571 if add_to_result_map is not None:
2572 add_to_result_map(
2573 labelname,
2574 label.name,
2575 (label, labelname) + label._alt_names + result_map_targets,
2576 label.type,
2577 )
2578 return (
2579 label.element._compiler_dispatch(
2580 self,
2581 within_columns_clause=True,
2582 within_label_clause=True,
2583 **kw,
2584 )
2585 + OPERATORS[operators.as_]
2586 + self.preparer.format_label(label, labelname)
2587 )
2588 elif render_label_only:
2589 return self.preparer.format_label(label, labelname)
2590 else:
2591 return label.element._compiler_dispatch(
2592 self, within_columns_clause=False, **kw
2593 )
2594
2595 def _fallback_column_name(self, column):
2596 raise exc.CompileError(
2597 "Cannot compile Column object until its 'name' is assigned."
2598 )
2599
2600 def visit_lambda_element(self, element, **kw):
2601 sql_element = element._resolved
2602 return self.process(sql_element, **kw)
2603
2604 def visit_column(
2605 self,
2606 column: ColumnClause[Any],
2607 add_to_result_map: Optional[_ResultMapAppender] = None,
2608 include_table: bool = True,
2609 result_map_targets: Tuple[Any, ...] = (),
2610 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2611 **kwargs: Any,
2612 ) -> str:
2613 name = orig_name = column.name
2614 if name is None:
2615 name = self._fallback_column_name(column)
2616
2617 is_literal = column.is_literal
2618 if not is_literal and isinstance(name, elements._truncated_label):
2619 name = self._truncated_identifier("colident", name)
2620
2621 if add_to_result_map is not None:
2622 targets = (column, name, column.key) + result_map_targets
2623 if column._tq_label:
2624 targets += (column._tq_label,)
2625
2626 add_to_result_map(name, orig_name, targets, column.type)
2627
2628 if is_literal:
2629 # note we are not currently accommodating for
2630 # literal_column(quoted_name('ident', True)) here
2631 name = self.escape_literal_column(name)
2632 else:
2633 name = self.preparer.quote(name)
2634 table = column.table
2635 if table is None or not include_table or not table.named_with_column:
2636 return name
2637 else:
2638 effective_schema = self.preparer.schema_for_object(table)
2639
2640 if effective_schema:
2641 schema_prefix = (
2642 self.preparer.quote_schema(effective_schema) + "."
2643 )
2644 else:
2645 schema_prefix = ""
2646
2647 if TYPE_CHECKING:
2648 assert isinstance(table, NamedFromClause)
2649 tablename = table.name
2650
2651 if (
2652 not effective_schema
2653 and ambiguous_table_name_map
2654 and tablename in ambiguous_table_name_map
2655 ):
2656 tablename = ambiguous_table_name_map[tablename]
2657
2658 if isinstance(tablename, elements._truncated_label):
2659 tablename = self._truncated_identifier("alias", tablename)
2660
2661 return schema_prefix + self.preparer.quote(tablename) + "." + name
2662
2663 def visit_collation(self, element, **kw):
2664 return self.preparer.format_collation(element.collation)
2665
2666 def visit_fromclause(self, fromclause, **kwargs):
2667 return fromclause.name
2668
2669 def visit_index(self, index, **kwargs):
2670 return index.name
2671
2672 def visit_typeclause(self, typeclause, **kw):
2673 kw["type_expression"] = typeclause
2674 kw["identifier_preparer"] = self.preparer
2675 return self.dialect.type_compiler_instance.process(
2676 typeclause.type, **kw
2677 )
2678
2679 def post_process_text(self, text):
2680 if self.preparer._double_percents:
2681 text = text.replace("%", "%%")
2682 return text
2683
2684 def escape_literal_column(self, text):
2685 if self.preparer._double_percents:
2686 text = text.replace("%", "%%")
2687 return text
2688
2689 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2690 def do_bindparam(m):
2691 name = m.group(1)
2692 if name in textclause._bindparams:
2693 return self.process(textclause._bindparams[name], **kw)
2694 else:
2695 return self.bindparam_string(name, **kw)
2696
2697 if not self.stack:
2698 self.isplaintext = True
2699
2700 if add_to_result_map:
2701 # text() object is present in the columns clause of a
2702 # select(). Add a no-name entry to the result map so that
2703 # row[text()] produces a result
2704 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2705
2706 # un-escape any \:params
2707 return BIND_PARAMS_ESC.sub(
2708 lambda m: m.group(1),
2709 BIND_PARAMS.sub(
2710 do_bindparam, self.post_process_text(textclause.text)
2711 ),
2712 )
2713
2714 def visit_textual_select(
2715 self, taf, compound_index=None, asfrom=False, **kw
2716 ):
2717 toplevel = not self.stack
2718 entry = self._default_stack_entry if toplevel else self.stack[-1]
2719
2720 new_entry: _CompilerStackEntry = {
2721 "correlate_froms": set(),
2722 "asfrom_froms": set(),
2723 "selectable": taf,
2724 }
2725 self.stack.append(new_entry)
2726
2727 if taf._independent_ctes:
2728 self._dispatch_independent_ctes(taf, kw)
2729
2730 populate_result_map = (
2731 toplevel
2732 or (
2733 compound_index == 0
2734 and entry.get("need_result_map_for_compound", False)
2735 )
2736 or entry.get("need_result_map_for_nested", False)
2737 )
2738
2739 if populate_result_map:
2740 self._ordered_columns = self._textual_ordered_columns = (
2741 taf.positional
2742 )
2743
2744 # enable looser result column matching when the SQL text links to
2745 # Column objects by name only
2746 self._loose_column_name_matching = not taf.positional and bool(
2747 taf.column_args
2748 )
2749
2750 for c in taf.column_args:
2751 self.process(
2752 c,
2753 within_columns_clause=True,
2754 add_to_result_map=self._add_to_result_map,
2755 )
2756
2757 text = self.process(taf.element, **kw)
2758 if self.ctes:
2759 nesting_level = len(self.stack) if not toplevel else None
2760 text = self._render_cte_clause(nesting_level=nesting_level) + text
2761
2762 self.stack.pop(-1)
2763
2764 return text
2765
2766 def visit_null(self, expr: Null, **kw: Any) -> str:
2767 return "NULL"
2768
2769 def visit_true(self, expr: True_, **kw: Any) -> str:
2770 if self.dialect.supports_native_boolean:
2771 return "true"
2772 else:
2773 return "1"
2774
2775 def visit_false(self, expr: False_, **kw: Any) -> str:
2776 if self.dialect.supports_native_boolean:
2777 return "false"
2778 else:
2779 return "0"
2780
2781 def _generate_delimited_list(self, elements, separator, **kw):
2782 return separator.join(
2783 s
2784 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2785 if s
2786 )
2787
2788 def _generate_delimited_and_list(self, clauses, **kw):
2789 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2790 operators.and_,
2791 elements.True_._singleton,
2792 elements.False_._singleton,
2793 clauses,
2794 )
2795 if lcc == 1:
2796 return clauses[0]._compiler_dispatch(self, **kw)
2797 else:
2798 separator = OPERATORS[operators.and_]
2799 return separator.join(
2800 s
2801 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2802 if s
2803 )
2804
2805 def visit_tuple(self, clauselist, **kw):
2806 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2807
2808 def visit_clauselist(self, clauselist, **kw):
2809 sep = clauselist.operator
2810 if sep is None:
2811 sep = " "
2812 else:
2813 sep = OPERATORS[clauselist.operator]
2814
2815 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2816
2817 def visit_expression_clauselist(self, clauselist, **kw):
2818 operator_ = clauselist.operator
2819
2820 disp = self._get_operator_dispatch(
2821 operator_, "expression_clauselist", None
2822 )
2823 if disp:
2824 return disp(clauselist, operator_, **kw)
2825
2826 try:
2827 opstring = OPERATORS[operator_]
2828 except KeyError as err:
2829 raise exc.UnsupportedCompilationError(self, operator_) from err
2830 else:
2831 kw["_in_operator_expression"] = True
2832 return self._generate_delimited_list(
2833 clauselist.clauses, opstring, **kw
2834 )
2835
2836 def visit_case(self, clause, **kwargs):
2837 x = "CASE "
2838 if clause.value is not None:
2839 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2840 for cond, result in clause.whens:
2841 x += (
2842 "WHEN "
2843 + cond._compiler_dispatch(self, **kwargs)
2844 + " THEN "
2845 + result._compiler_dispatch(self, **kwargs)
2846 + " "
2847 )
2848 if clause.else_ is not None:
2849 x += (
2850 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2851 )
2852 x += "END"
2853 return x
2854
2855 def visit_type_coerce(self, type_coerce, **kw):
2856 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2857
2858 def visit_cast(self, cast, **kwargs):
2859 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2860 match = re.match("(.*)( COLLATE .*)", type_clause)
2861 return "CAST(%s AS %s)%s" % (
2862 cast.clause._compiler_dispatch(self, **kwargs),
2863 match.group(1) if match else type_clause,
2864 match.group(2) if match else "",
2865 )
2866
2867 def _format_frame_clause(self, range_, **kw):
2868 return "%s AND %s" % (
2869 (
2870 "UNBOUNDED PRECEDING"
2871 if range_[0] is elements.RANGE_UNBOUNDED
2872 else (
2873 "CURRENT ROW"
2874 if range_[0] is elements.RANGE_CURRENT
2875 else (
2876 "%s PRECEDING"
2877 % (
2878 self.process(
2879 elements.literal(abs(range_[0])), **kw
2880 ),
2881 )
2882 if range_[0] < 0
2883 else "%s FOLLOWING"
2884 % (self.process(elements.literal(range_[0]), **kw),)
2885 )
2886 )
2887 ),
2888 (
2889 "UNBOUNDED FOLLOWING"
2890 if range_[1] is elements.RANGE_UNBOUNDED
2891 else (
2892 "CURRENT ROW"
2893 if range_[1] is elements.RANGE_CURRENT
2894 else (
2895 "%s PRECEDING"
2896 % (
2897 self.process(
2898 elements.literal(abs(range_[1])), **kw
2899 ),
2900 )
2901 if range_[1] < 0
2902 else "%s FOLLOWING"
2903 % (self.process(elements.literal(range_[1]), **kw),)
2904 )
2905 )
2906 ),
2907 )
2908
2909 def visit_over(self, over, **kwargs):
2910 text = over.element._compiler_dispatch(self, **kwargs)
2911 if over.range_ is not None:
2912 range_ = "RANGE BETWEEN %s" % self._format_frame_clause(
2913 over.range_, **kwargs
2914 )
2915 elif over.rows is not None:
2916 range_ = "ROWS BETWEEN %s" % self._format_frame_clause(
2917 over.rows, **kwargs
2918 )
2919 elif over.groups is not None:
2920 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause(
2921 over.groups, **kwargs
2922 )
2923 else:
2924 range_ = None
2925
2926 return "%s OVER (%s)" % (
2927 text,
2928 " ".join(
2929 [
2930 "%s BY %s"
2931 % (word, clause._compiler_dispatch(self, **kwargs))
2932 for word, clause in (
2933 ("PARTITION", over.partition_by),
2934 ("ORDER", over.order_by),
2935 )
2936 if clause is not None and len(clause)
2937 ]
2938 + ([range_] if range_ else [])
2939 ),
2940 )
2941
2942 def visit_withingroup(self, withingroup, **kwargs):
2943 return "%s WITHIN GROUP (ORDER BY %s)" % (
2944 withingroup.element._compiler_dispatch(self, **kwargs),
2945 withingroup.order_by._compiler_dispatch(self, **kwargs),
2946 )
2947
2948 def visit_funcfilter(self, funcfilter, **kwargs):
2949 return "%s FILTER (WHERE %s)" % (
2950 funcfilter.func._compiler_dispatch(self, **kwargs),
2951 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2952 )
2953
2954 def visit_extract(self, extract, **kwargs):
2955 field = self.extract_map.get(extract.field, extract.field)
2956 return "EXTRACT(%s FROM %s)" % (
2957 field,
2958 extract.expr._compiler_dispatch(self, **kwargs),
2959 )
2960
2961 def visit_scalar_function_column(self, element, **kw):
2962 compiled_fn = self.visit_function(element.fn, **kw)
2963 compiled_col = self.visit_column(element, **kw)
2964 return "(%s).%s" % (compiled_fn, compiled_col)
2965
2966 def visit_function(
2967 self,
2968 func: Function[Any],
2969 add_to_result_map: Optional[_ResultMapAppender] = None,
2970 **kwargs: Any,
2971 ) -> str:
2972 if add_to_result_map is not None:
2973 add_to_result_map(func.name, func.name, (func.name,), func.type)
2974
2975 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
2976
2977 text: str
2978
2979 if disp:
2980 text = disp(func, **kwargs)
2981 else:
2982 name = FUNCTIONS.get(func._deannotate().__class__, None)
2983 if name:
2984 if func._has_args:
2985 name += "%(expr)s"
2986 else:
2987 name = func.name
2988 name = (
2989 self.preparer.quote(name)
2990 if self.preparer._requires_quotes_illegal_chars(name)
2991 or isinstance(name, elements.quoted_name)
2992 else name
2993 )
2994 name = name + "%(expr)s"
2995 text = ".".join(
2996 [
2997 (
2998 self.preparer.quote(tok)
2999 if self.preparer._requires_quotes_illegal_chars(tok)
3000 or isinstance(name, elements.quoted_name)
3001 else tok
3002 )
3003 for tok in func.packagenames
3004 ]
3005 + [name]
3006 ) % {"expr": self.function_argspec(func, **kwargs)}
3007
3008 if func._with_ordinality:
3009 text += " WITH ORDINALITY"
3010 return text
3011
3012 def visit_next_value_func(self, next_value, **kw):
3013 return self.visit_sequence(next_value.sequence)
3014
3015 def visit_sequence(self, sequence, **kw):
3016 raise NotImplementedError(
3017 "Dialect '%s' does not support sequence increments."
3018 % self.dialect.name
3019 )
3020
3021 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3022 return func.clause_expr._compiler_dispatch(self, **kwargs)
3023
3024 def visit_compound_select(
3025 self, cs, asfrom=False, compound_index=None, **kwargs
3026 ):
3027 toplevel = not self.stack
3028
3029 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3030
3031 if toplevel and not self.compile_state:
3032 self.compile_state = compile_state
3033
3034 compound_stmt = compile_state.statement
3035
3036 entry = self._default_stack_entry if toplevel else self.stack[-1]
3037 need_result_map = toplevel or (
3038 not compound_index
3039 and entry.get("need_result_map_for_compound", False)
3040 )
3041
3042 # indicates there is already a CompoundSelect in play
3043 if compound_index == 0:
3044 entry["select_0"] = cs
3045
3046 self.stack.append(
3047 {
3048 "correlate_froms": entry["correlate_froms"],
3049 "asfrom_froms": entry["asfrom_froms"],
3050 "selectable": cs,
3051 "compile_state": compile_state,
3052 "need_result_map_for_compound": need_result_map,
3053 }
3054 )
3055
3056 if compound_stmt._independent_ctes:
3057 self._dispatch_independent_ctes(compound_stmt, kwargs)
3058
3059 keyword = self.compound_keywords[cs.keyword]
3060
3061 text = (" " + keyword + " ").join(
3062 (
3063 c._compiler_dispatch(
3064 self, asfrom=asfrom, compound_index=i, **kwargs
3065 )
3066 for i, c in enumerate(cs.selects)
3067 )
3068 )
3069
3070 kwargs["include_table"] = False
3071 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3072 text += self.order_by_clause(cs, **kwargs)
3073 if cs._has_row_limiting_clause:
3074 text += self._row_limit_clause(cs, **kwargs)
3075
3076 if self.ctes:
3077 nesting_level = len(self.stack) if not toplevel else None
3078 text = (
3079 self._render_cte_clause(
3080 nesting_level=nesting_level,
3081 include_following_stack=True,
3082 )
3083 + text
3084 )
3085
3086 self.stack.pop(-1)
3087 return text
3088
3089 def _row_limit_clause(self, cs, **kwargs):
3090 if cs._fetch_clause is not None:
3091 return self.fetch_clause(cs, **kwargs)
3092 else:
3093 return self.limit_clause(cs, **kwargs)
3094
3095 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3096 attrname = "visit_%s_%s%s" % (
3097 operator_.__name__,
3098 qualifier1,
3099 "_" + qualifier2 if qualifier2 else "",
3100 )
3101 return getattr(self, attrname, None)
3102
3103 def visit_unary(
3104 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3105 ):
3106 if add_to_result_map is not None:
3107 result_map_targets += (unary,)
3108 kw["add_to_result_map"] = add_to_result_map
3109 kw["result_map_targets"] = result_map_targets
3110
3111 if unary.operator:
3112 if unary.modifier:
3113 raise exc.CompileError(
3114 "Unary expression does not support operator "
3115 "and modifier simultaneously"
3116 )
3117 disp = self._get_operator_dispatch(
3118 unary.operator, "unary", "operator"
3119 )
3120 if disp:
3121 return disp(unary, unary.operator, **kw)
3122 else:
3123 return self._generate_generic_unary_operator(
3124 unary, OPERATORS[unary.operator], **kw
3125 )
3126 elif unary.modifier:
3127 disp = self._get_operator_dispatch(
3128 unary.modifier, "unary", "modifier"
3129 )
3130 if disp:
3131 return disp(unary, unary.modifier, **kw)
3132 else:
3133 return self._generate_generic_unary_modifier(
3134 unary, OPERATORS[unary.modifier], **kw
3135 )
3136 else:
3137 raise exc.CompileError(
3138 "Unary expression has no operator or modifier"
3139 )
3140
3141 def visit_truediv_binary(self, binary, operator, **kw):
3142 if self.dialect.div_is_floordiv:
3143 return (
3144 self.process(binary.left, **kw)
3145 + " / "
3146 # TODO: would need a fast cast again here,
3147 # unless we want to use an implicit cast like "+ 0.0"
3148 + self.process(
3149 elements.Cast(
3150 binary.right,
3151 (
3152 binary.right.type
3153 if binary.right.type._type_affinity
3154 is sqltypes.Numeric
3155 else sqltypes.Numeric()
3156 ),
3157 ),
3158 **kw,
3159 )
3160 )
3161 else:
3162 return (
3163 self.process(binary.left, **kw)
3164 + " / "
3165 + self.process(binary.right, **kw)
3166 )
3167
3168 def visit_floordiv_binary(self, binary, operator, **kw):
3169 if (
3170 self.dialect.div_is_floordiv
3171 and binary.right.type._type_affinity is sqltypes.Integer
3172 ):
3173 return (
3174 self.process(binary.left, **kw)
3175 + " / "
3176 + self.process(binary.right, **kw)
3177 )
3178 else:
3179 return "FLOOR(%s)" % (
3180 self.process(binary.left, **kw)
3181 + " / "
3182 + self.process(binary.right, **kw)
3183 )
3184
3185 def visit_is_true_unary_operator(self, element, operator, **kw):
3186 if (
3187 element._is_implicitly_boolean
3188 or self.dialect.supports_native_boolean
3189 ):
3190 return self.process(element.element, **kw)
3191 else:
3192 return "%s = 1" % self.process(element.element, **kw)
3193
3194 def visit_is_false_unary_operator(self, element, operator, **kw):
3195 if (
3196 element._is_implicitly_boolean
3197 or self.dialect.supports_native_boolean
3198 ):
3199 return "NOT %s" % self.process(element.element, **kw)
3200 else:
3201 return "%s = 0" % self.process(element.element, **kw)
3202
3203 def visit_not_match_op_binary(self, binary, operator, **kw):
3204 return "NOT %s" % self.visit_binary(
3205 binary, override_operator=operators.match_op
3206 )
3207
3208 def visit_not_in_op_binary(self, binary, operator, **kw):
3209 # The brackets are required in the NOT IN operation because the empty
3210 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3211 # The presence of the OR makes the brackets required.
3212 return "(%s)" % self._generate_generic_binary(
3213 binary, OPERATORS[operator], **kw
3214 )
3215
3216 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3217 if expand_op is operators.not_in_op:
3218 if len(type_) > 1:
3219 return "(%s)) OR (1 = 1" % (
3220 ", ".join("NULL" for element in type_)
3221 )
3222 else:
3223 return "NULL) OR (1 = 1"
3224 elif expand_op is operators.in_op:
3225 if len(type_) > 1:
3226 return "(%s)) AND (1 != 1" % (
3227 ", ".join("NULL" for element in type_)
3228 )
3229 else:
3230 return "NULL) AND (1 != 1"
3231 else:
3232 return self.visit_empty_set_expr(type_)
3233
3234 def visit_empty_set_expr(self, element_types, **kw):
3235 raise NotImplementedError(
3236 "Dialect '%s' does not support empty set expression."
3237 % self.dialect.name
3238 )
3239
3240 def _literal_execute_expanding_parameter_literal_binds(
3241 self, parameter, values, bind_expression_template=None
3242 ):
3243 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3244
3245 if not values:
3246 # empty IN expression. note we don't need to use
3247 # bind_expression_template here because there are no
3248 # expressions to render.
3249
3250 if typ_dialect_impl._is_tuple_type:
3251 replacement_expression = (
3252 "VALUES " if self.dialect.tuple_in_values else ""
3253 ) + self.visit_empty_set_op_expr(
3254 parameter.type.types, parameter.expand_op
3255 )
3256
3257 else:
3258 replacement_expression = self.visit_empty_set_op_expr(
3259 [parameter.type], parameter.expand_op
3260 )
3261
3262 elif typ_dialect_impl._is_tuple_type or (
3263 typ_dialect_impl._isnull
3264 and isinstance(values[0], collections_abc.Sequence)
3265 and not isinstance(values[0], (str, bytes))
3266 ):
3267 if typ_dialect_impl._has_bind_expression:
3268 raise NotImplementedError(
3269 "bind_expression() on TupleType not supported with "
3270 "literal_binds"
3271 )
3272
3273 replacement_expression = (
3274 "VALUES " if self.dialect.tuple_in_values else ""
3275 ) + ", ".join(
3276 "(%s)"
3277 % (
3278 ", ".join(
3279 self.render_literal_value(value, param_type)
3280 for value, param_type in zip(
3281 tuple_element, parameter.type.types
3282 )
3283 )
3284 )
3285 for i, tuple_element in enumerate(values)
3286 )
3287 else:
3288 if bind_expression_template:
3289 post_compile_pattern = self._post_compile_pattern
3290 m = post_compile_pattern.search(bind_expression_template)
3291 assert m and m.group(
3292 2
3293 ), "unexpected format for expanding parameter"
3294
3295 tok = m.group(2).split("~~")
3296 be_left, be_right = tok[1], tok[3]
3297 replacement_expression = ", ".join(
3298 "%s%s%s"
3299 % (
3300 be_left,
3301 self.render_literal_value(value, parameter.type),
3302 be_right,
3303 )
3304 for value in values
3305 )
3306 else:
3307 replacement_expression = ", ".join(
3308 self.render_literal_value(value, parameter.type)
3309 for value in values
3310 )
3311
3312 return (), replacement_expression
3313
3314 def _literal_execute_expanding_parameter(self, name, parameter, values):
3315 if parameter.literal_execute:
3316 return self._literal_execute_expanding_parameter_literal_binds(
3317 parameter, values
3318 )
3319
3320 dialect = self.dialect
3321 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3322
3323 if self._numeric_binds:
3324 bind_template = self.compilation_bindtemplate
3325 else:
3326 bind_template = self.bindtemplate
3327
3328 if (
3329 self.dialect._bind_typing_render_casts
3330 and typ_dialect_impl.render_bind_cast
3331 ):
3332
3333 def _render_bindtemplate(name):
3334 return self.render_bind_cast(
3335 parameter.type,
3336 typ_dialect_impl,
3337 bind_template % {"name": name},
3338 )
3339
3340 else:
3341
3342 def _render_bindtemplate(name):
3343 return bind_template % {"name": name}
3344
3345 if not values:
3346 to_update = []
3347 if typ_dialect_impl._is_tuple_type:
3348 replacement_expression = self.visit_empty_set_op_expr(
3349 parameter.type.types, parameter.expand_op
3350 )
3351 else:
3352 replacement_expression = self.visit_empty_set_op_expr(
3353 [parameter.type], parameter.expand_op
3354 )
3355
3356 elif typ_dialect_impl._is_tuple_type or (
3357 typ_dialect_impl._isnull
3358 and isinstance(values[0], collections_abc.Sequence)
3359 and not isinstance(values[0], (str, bytes))
3360 ):
3361 assert not typ_dialect_impl._is_array
3362 to_update = [
3363 ("%s_%s_%s" % (name, i, j), value)
3364 for i, tuple_element in enumerate(values, 1)
3365 for j, value in enumerate(tuple_element, 1)
3366 ]
3367
3368 replacement_expression = (
3369 "VALUES " if dialect.tuple_in_values else ""
3370 ) + ", ".join(
3371 "(%s)"
3372 % (
3373 ", ".join(
3374 _render_bindtemplate(
3375 to_update[i * len(tuple_element) + j][0]
3376 )
3377 for j, value in enumerate(tuple_element)
3378 )
3379 )
3380 for i, tuple_element in enumerate(values)
3381 )
3382 else:
3383 to_update = [
3384 ("%s_%s" % (name, i), value)
3385 for i, value in enumerate(values, 1)
3386 ]
3387 replacement_expression = ", ".join(
3388 _render_bindtemplate(key) for key, value in to_update
3389 )
3390
3391 return to_update, replacement_expression
3392
3393 def visit_binary(
3394 self,
3395 binary,
3396 override_operator=None,
3397 eager_grouping=False,
3398 from_linter=None,
3399 lateral_from_linter=None,
3400 **kw,
3401 ):
3402 if from_linter and operators.is_comparison(binary.operator):
3403 if lateral_from_linter is not None:
3404 enclosing_lateral = kw["enclosing_lateral"]
3405 lateral_from_linter.edges.update(
3406 itertools.product(
3407 _de_clone(
3408 binary.left._from_objects + [enclosing_lateral]
3409 ),
3410 _de_clone(
3411 binary.right._from_objects + [enclosing_lateral]
3412 ),
3413 )
3414 )
3415 else:
3416 from_linter.edges.update(
3417 itertools.product(
3418 _de_clone(binary.left._from_objects),
3419 _de_clone(binary.right._from_objects),
3420 )
3421 )
3422
3423 # don't allow "? = ?" to render
3424 if (
3425 self.ansi_bind_rules
3426 and isinstance(binary.left, elements.BindParameter)
3427 and isinstance(binary.right, elements.BindParameter)
3428 ):
3429 kw["literal_execute"] = True
3430
3431 operator_ = override_operator or binary.operator
3432 disp = self._get_operator_dispatch(operator_, "binary", None)
3433 if disp:
3434 return disp(binary, operator_, **kw)
3435 else:
3436 try:
3437 opstring = OPERATORS[operator_]
3438 except KeyError as err:
3439 raise exc.UnsupportedCompilationError(self, operator_) from err
3440 else:
3441 return self._generate_generic_binary(
3442 binary,
3443 opstring,
3444 from_linter=from_linter,
3445 lateral_from_linter=lateral_from_linter,
3446 **kw,
3447 )
3448
3449 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3450 return self.process(element.sql_function, **kw)
3451
3452 def visit_mod_binary(self, binary, operator, **kw):
3453 if self.preparer._double_percents:
3454 return (
3455 self.process(binary.left, **kw)
3456 + " %% "
3457 + self.process(binary.right, **kw)
3458 )
3459 else:
3460 return (
3461 self.process(binary.left, **kw)
3462 + " % "
3463 + self.process(binary.right, **kw)
3464 )
3465
3466 def visit_custom_op_binary(self, element, operator, **kw):
3467 kw["eager_grouping"] = operator.eager_grouping
3468 return self._generate_generic_binary(
3469 element,
3470 " " + self.escape_literal_column(operator.opstring) + " ",
3471 **kw,
3472 )
3473
3474 def visit_custom_op_unary_operator(self, element, operator, **kw):
3475 return self._generate_generic_unary_operator(
3476 element, self.escape_literal_column(operator.opstring) + " ", **kw
3477 )
3478
3479 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3480 return self._generate_generic_unary_modifier(
3481 element, " " + self.escape_literal_column(operator.opstring), **kw
3482 )
3483
3484 def _generate_generic_binary(
3485 self,
3486 binary: BinaryExpression[Any],
3487 opstring: str,
3488 eager_grouping: bool = False,
3489 **kw: Any,
3490 ) -> str:
3491 _in_operator_expression = kw.get("_in_operator_expression", False)
3492
3493 kw["_in_operator_expression"] = True
3494 kw["_binary_op"] = binary.operator
3495 text = (
3496 binary.left._compiler_dispatch(
3497 self, eager_grouping=eager_grouping, **kw
3498 )
3499 + opstring
3500 + binary.right._compiler_dispatch(
3501 self, eager_grouping=eager_grouping, **kw
3502 )
3503 )
3504
3505 if _in_operator_expression and eager_grouping:
3506 text = "(%s)" % text
3507 return text
3508
3509 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3510 return opstring + unary.element._compiler_dispatch(self, **kw)
3511
3512 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3513 return unary.element._compiler_dispatch(self, **kw) + opstring
3514
3515 @util.memoized_property
3516 def _like_percent_literal(self):
3517 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3518
3519 def visit_ilike_case_insensitive_operand(self, element, **kw):
3520 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3521
3522 def visit_contains_op_binary(self, binary, operator, **kw):
3523 binary = binary._clone()
3524 percent = self._like_percent_literal
3525 binary.right = percent.concat(binary.right).concat(percent)
3526 return self.visit_like_op_binary(binary, operator, **kw)
3527
3528 def visit_not_contains_op_binary(self, binary, operator, **kw):
3529 binary = binary._clone()
3530 percent = self._like_percent_literal
3531 binary.right = percent.concat(binary.right).concat(percent)
3532 return self.visit_not_like_op_binary(binary, operator, **kw)
3533
3534 def visit_icontains_op_binary(self, binary, operator, **kw):
3535 binary = binary._clone()
3536 percent = self._like_percent_literal
3537 binary.left = ilike_case_insensitive(binary.left)
3538 binary.right = percent.concat(
3539 ilike_case_insensitive(binary.right)
3540 ).concat(percent)
3541 return self.visit_ilike_op_binary(binary, operator, **kw)
3542
3543 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3544 binary = binary._clone()
3545 percent = self._like_percent_literal
3546 binary.left = ilike_case_insensitive(binary.left)
3547 binary.right = percent.concat(
3548 ilike_case_insensitive(binary.right)
3549 ).concat(percent)
3550 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3551
3552 def visit_startswith_op_binary(self, binary, operator, **kw):
3553 binary = binary._clone()
3554 percent = self._like_percent_literal
3555 binary.right = percent._rconcat(binary.right)
3556 return self.visit_like_op_binary(binary, operator, **kw)
3557
3558 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3559 binary = binary._clone()
3560 percent = self._like_percent_literal
3561 binary.right = percent._rconcat(binary.right)
3562 return self.visit_not_like_op_binary(binary, operator, **kw)
3563
3564 def visit_istartswith_op_binary(self, binary, operator, **kw):
3565 binary = binary._clone()
3566 percent = self._like_percent_literal
3567 binary.left = ilike_case_insensitive(binary.left)
3568 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3569 return self.visit_ilike_op_binary(binary, operator, **kw)
3570
3571 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3572 binary = binary._clone()
3573 percent = self._like_percent_literal
3574 binary.left = ilike_case_insensitive(binary.left)
3575 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3576 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3577
3578 def visit_endswith_op_binary(self, binary, operator, **kw):
3579 binary = binary._clone()
3580 percent = self._like_percent_literal
3581 binary.right = percent.concat(binary.right)
3582 return self.visit_like_op_binary(binary, operator, **kw)
3583
3584 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3585 binary = binary._clone()
3586 percent = self._like_percent_literal
3587 binary.right = percent.concat(binary.right)
3588 return self.visit_not_like_op_binary(binary, operator, **kw)
3589
3590 def visit_iendswith_op_binary(self, binary, operator, **kw):
3591 binary = binary._clone()
3592 percent = self._like_percent_literal
3593 binary.left = ilike_case_insensitive(binary.left)
3594 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3595 return self.visit_ilike_op_binary(binary, operator, **kw)
3596
3597 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3598 binary = binary._clone()
3599 percent = self._like_percent_literal
3600 binary.left = ilike_case_insensitive(binary.left)
3601 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3602 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3603
3604 def visit_like_op_binary(self, binary, operator, **kw):
3605 escape = binary.modifiers.get("escape", None)
3606
3607 return "%s LIKE %s" % (
3608 binary.left._compiler_dispatch(self, **kw),
3609 binary.right._compiler_dispatch(self, **kw),
3610 ) + (
3611 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3612 if escape is not None
3613 else ""
3614 )
3615
3616 def visit_not_like_op_binary(self, binary, operator, **kw):
3617 escape = binary.modifiers.get("escape", None)
3618 return "%s NOT LIKE %s" % (
3619 binary.left._compiler_dispatch(self, **kw),
3620 binary.right._compiler_dispatch(self, **kw),
3621 ) + (
3622 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3623 if escape is not None
3624 else ""
3625 )
3626
3627 def visit_ilike_op_binary(self, binary, operator, **kw):
3628 if operator is operators.ilike_op:
3629 binary = binary._clone()
3630 binary.left = ilike_case_insensitive(binary.left)
3631 binary.right = ilike_case_insensitive(binary.right)
3632 # else we assume ilower() has been applied
3633
3634 return self.visit_like_op_binary(binary, operator, **kw)
3635
3636 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3637 if operator is operators.not_ilike_op:
3638 binary = binary._clone()
3639 binary.left = ilike_case_insensitive(binary.left)
3640 binary.right = ilike_case_insensitive(binary.right)
3641 # else we assume ilower() has been applied
3642
3643 return self.visit_not_like_op_binary(binary, operator, **kw)
3644
3645 def visit_between_op_binary(self, binary, operator, **kw):
3646 symmetric = binary.modifiers.get("symmetric", False)
3647 return self._generate_generic_binary(
3648 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3649 )
3650
3651 def visit_not_between_op_binary(self, binary, operator, **kw):
3652 symmetric = binary.modifiers.get("symmetric", False)
3653 return self._generate_generic_binary(
3654 binary,
3655 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3656 **kw,
3657 )
3658
3659 def visit_regexp_match_op_binary(
3660 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3661 ) -> str:
3662 raise exc.CompileError(
3663 "%s dialect does not support regular expressions"
3664 % self.dialect.name
3665 )
3666
3667 def visit_not_regexp_match_op_binary(
3668 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3669 ) -> str:
3670 raise exc.CompileError(
3671 "%s dialect does not support regular expressions"
3672 % self.dialect.name
3673 )
3674
3675 def visit_regexp_replace_op_binary(
3676 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3677 ) -> str:
3678 raise exc.CompileError(
3679 "%s dialect does not support regular expression replacements"
3680 % self.dialect.name
3681 )
3682
3683 def visit_bindparam(
3684 self,
3685 bindparam,
3686 within_columns_clause=False,
3687 literal_binds=False,
3688 skip_bind_expression=False,
3689 literal_execute=False,
3690 render_postcompile=False,
3691 is_upsert_set=False,
3692 **kwargs,
3693 ):
3694 # Detect parametrized bindparams in upsert SET clause for issue #13130
3695 if (
3696 is_upsert_set
3697 and bindparam.value is None
3698 and bindparam.callable is None
3699 and self._insertmanyvalues is not None
3700 ):
3701 self._insertmanyvalues = self._insertmanyvalues._replace(
3702 has_upsert_bound_parameters=True
3703 )
3704
3705 if not skip_bind_expression:
3706 impl = bindparam.type.dialect_impl(self.dialect)
3707 if impl._has_bind_expression:
3708 bind_expression = impl.bind_expression(bindparam)
3709 wrapped = self.process(
3710 bind_expression,
3711 skip_bind_expression=True,
3712 within_columns_clause=within_columns_clause,
3713 literal_binds=literal_binds and not bindparam.expanding,
3714 literal_execute=literal_execute,
3715 render_postcompile=render_postcompile,
3716 **kwargs,
3717 )
3718 if bindparam.expanding:
3719 # for postcompile w/ expanding, move the "wrapped" part
3720 # of this into the inside
3721
3722 m = re.match(
3723 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3724 )
3725 assert m, "unexpected format for expanding parameter"
3726 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3727 m.group(2),
3728 m.group(1),
3729 m.group(3),
3730 )
3731
3732 if literal_binds:
3733 ret = self.render_literal_bindparam(
3734 bindparam,
3735 within_columns_clause=True,
3736 bind_expression_template=wrapped,
3737 **kwargs,
3738 )
3739 return "(%s)" % ret
3740
3741 return wrapped
3742
3743 if not literal_binds:
3744 literal_execute = (
3745 literal_execute
3746 or bindparam.literal_execute
3747 or (within_columns_clause and self.ansi_bind_rules)
3748 )
3749 post_compile = literal_execute or bindparam.expanding
3750 else:
3751 post_compile = False
3752
3753 if literal_binds:
3754 ret = self.render_literal_bindparam(
3755 bindparam, within_columns_clause=True, **kwargs
3756 )
3757 if bindparam.expanding:
3758 ret = "(%s)" % ret
3759 return ret
3760
3761 name = self._truncate_bindparam(bindparam)
3762
3763 if name in self.binds:
3764 existing = self.binds[name]
3765 if existing is not bindparam:
3766 if (
3767 (existing.unique or bindparam.unique)
3768 and not existing.proxy_set.intersection(
3769 bindparam.proxy_set
3770 )
3771 and not existing._cloned_set.intersection(
3772 bindparam._cloned_set
3773 )
3774 ):
3775 raise exc.CompileError(
3776 "Bind parameter '%s' conflicts with "
3777 "unique bind parameter of the same name" % name
3778 )
3779 elif existing.expanding != bindparam.expanding:
3780 raise exc.CompileError(
3781 "Can't reuse bound parameter name '%s' in both "
3782 "'expanding' (e.g. within an IN expression) and "
3783 "non-expanding contexts. If this parameter is to "
3784 "receive a list/array value, set 'expanding=True' on "
3785 "it for expressions that aren't IN, otherwise use "
3786 "a different parameter name." % (name,)
3787 )
3788 elif existing._is_crud or bindparam._is_crud:
3789 if existing._is_crud and bindparam._is_crud:
3790 # TODO: this condition is not well understood.
3791 # see tests in test/sql/test_update.py
3792 raise exc.CompileError(
3793 "Encountered unsupported case when compiling an "
3794 "INSERT or UPDATE statement. If this is a "
3795 "multi-table "
3796 "UPDATE statement, please provide string-named "
3797 "arguments to the "
3798 "values() method with distinct names; support for "
3799 "multi-table UPDATE statements that "
3800 "target multiple tables for UPDATE is very "
3801 "limited",
3802 )
3803 else:
3804 raise exc.CompileError(
3805 f"bindparam() name '{bindparam.key}' is reserved "
3806 "for automatic usage in the VALUES or SET "
3807 "clause of this "
3808 "insert/update statement. Please use a "
3809 "name other than column name when using "
3810 "bindparam() "
3811 "with insert() or update() (for example, "
3812 f"'b_{bindparam.key}')."
3813 )
3814
3815 self.binds[bindparam.key] = self.binds[name] = bindparam
3816
3817 # if we are given a cache key that we're going to match against,
3818 # relate the bindparam here to one that is most likely present
3819 # in the "extracted params" portion of the cache key. this is used
3820 # to set up a positional mapping that is used to determine the
3821 # correct parameters for a subsequent use of this compiled with
3822 # a different set of parameter values. here, we accommodate for
3823 # parameters that may have been cloned both before and after the cache
3824 # key was been generated.
3825 ckbm_tuple = self._cache_key_bind_match
3826
3827 if ckbm_tuple:
3828 ckbm, cksm = ckbm_tuple
3829 for bp in bindparam._cloned_set:
3830 if bp.key in cksm:
3831 cb = cksm[bp.key]
3832 ckbm[cb].append(bindparam)
3833
3834 if bindparam.isoutparam:
3835 self.has_out_parameters = True
3836
3837 if post_compile:
3838 if render_postcompile:
3839 self._render_postcompile = True
3840
3841 if literal_execute:
3842 self.literal_execute_params |= {bindparam}
3843 else:
3844 self.post_compile_params |= {bindparam}
3845
3846 ret = self.bindparam_string(
3847 name,
3848 post_compile=post_compile,
3849 expanding=bindparam.expanding,
3850 bindparam_type=bindparam.type,
3851 **kwargs,
3852 )
3853
3854 if bindparam.expanding:
3855 ret = "(%s)" % ret
3856
3857 return ret
3858
3859 def render_bind_cast(self, type_, dbapi_type, sqltext):
3860 raise NotImplementedError()
3861
3862 def render_literal_bindparam(
3863 self,
3864 bindparam,
3865 render_literal_value=NO_ARG,
3866 bind_expression_template=None,
3867 **kw,
3868 ):
3869 if render_literal_value is not NO_ARG:
3870 value = render_literal_value
3871 else:
3872 if bindparam.value is None and bindparam.callable is None:
3873 op = kw.get("_binary_op", None)
3874 if op and op not in (operators.is_, operators.is_not):
3875 util.warn_limited(
3876 "Bound parameter '%s' rendering literal NULL in a SQL "
3877 "expression; comparisons to NULL should not use "
3878 "operators outside of 'is' or 'is not'",
3879 (bindparam.key,),
3880 )
3881 return self.process(sqltypes.NULLTYPE, **kw)
3882 value = bindparam.effective_value
3883
3884 if bindparam.expanding:
3885 leep = self._literal_execute_expanding_parameter_literal_binds
3886 to_update, replacement_expr = leep(
3887 bindparam,
3888 value,
3889 bind_expression_template=bind_expression_template,
3890 )
3891 return replacement_expr
3892 else:
3893 return self.render_literal_value(value, bindparam.type)
3894
3895 def render_literal_value(
3896 self, value: Any, type_: sqltypes.TypeEngine[Any]
3897 ) -> str:
3898 """Render the value of a bind parameter as a quoted literal.
3899
3900 This is used for statement sections that do not accept bind parameters
3901 on the target driver/database.
3902
3903 This should be implemented by subclasses using the quoting services
3904 of the DBAPI.
3905
3906 """
3907
3908 if value is None and not type_.should_evaluate_none:
3909 # issue #10535 - handle NULL in the compiler without placing
3910 # this onto each type, except for "evaluate None" types
3911 # (e.g. JSON)
3912 return self.process(elements.Null._instance())
3913
3914 processor = type_._cached_literal_processor(self.dialect)
3915 if processor:
3916 try:
3917 return processor(value)
3918 except Exception as e:
3919 raise exc.CompileError(
3920 f"Could not render literal value "
3921 f'"{sql_util._repr_single_value(value)}" '
3922 f"with datatype "
3923 f"{type_}; see parent stack trace for "
3924 "more detail."
3925 ) from e
3926
3927 else:
3928 raise exc.CompileError(
3929 f"No literal value renderer is available for literal value "
3930 f'"{sql_util._repr_single_value(value)}" '
3931 f"with datatype {type_}"
3932 )
3933
3934 def _truncate_bindparam(self, bindparam):
3935 if bindparam in self.bind_names:
3936 return self.bind_names[bindparam]
3937
3938 bind_name = bindparam.key
3939 if isinstance(bind_name, elements._truncated_label):
3940 bind_name = self._truncated_identifier("bindparam", bind_name)
3941
3942 # add to bind_names for translation
3943 self.bind_names[bindparam] = bind_name
3944
3945 return bind_name
3946
3947 def _truncated_identifier(
3948 self, ident_class: str, name: _truncated_label
3949 ) -> str:
3950 if (ident_class, name) in self.truncated_names:
3951 return self.truncated_names[(ident_class, name)]
3952
3953 anonname = name.apply_map(self.anon_map)
3954
3955 if len(anonname) > self.label_length - 6:
3956 counter = self._truncated_counters.get(ident_class, 1)
3957 truncname = (
3958 anonname[0 : max(self.label_length - 6, 0)]
3959 + "_"
3960 + hex(counter)[2:]
3961 )
3962 self._truncated_counters[ident_class] = counter + 1
3963 else:
3964 truncname = anonname
3965 self.truncated_names[(ident_class, name)] = truncname
3966 return truncname
3967
3968 def _anonymize(self, name: str) -> str:
3969 return name % self.anon_map
3970
3971 def bindparam_string(
3972 self,
3973 name: str,
3974 post_compile: bool = False,
3975 expanding: bool = False,
3976 escaped_from: Optional[str] = None,
3977 bindparam_type: Optional[TypeEngine[Any]] = None,
3978 accumulate_bind_names: Optional[Set[str]] = None,
3979 visited_bindparam: Optional[List[str]] = None,
3980 **kw: Any,
3981 ) -> str:
3982 # TODO: accumulate_bind_names is passed by crud.py to gather
3983 # names on a per-value basis, visited_bindparam is passed by
3984 # visit_insert() to collect all parameters in the statement.
3985 # see if this gathering can be simplified somehow
3986 if accumulate_bind_names is not None:
3987 accumulate_bind_names.add(name)
3988 if visited_bindparam is not None:
3989 visited_bindparam.append(name)
3990
3991 if not escaped_from:
3992 if self._bind_translate_re.search(name):
3993 # not quite the translate use case as we want to
3994 # also get a quick boolean if we even found
3995 # unusual characters in the name
3996 new_name = self._bind_translate_re.sub(
3997 lambda m: self._bind_translate_chars[m.group(0)],
3998 name,
3999 )
4000 escaped_from = name
4001 name = new_name
4002
4003 if escaped_from:
4004 self.escaped_bind_names = self.escaped_bind_names.union(
4005 {escaped_from: name}
4006 )
4007 if post_compile:
4008 ret = "__[POSTCOMPILE_%s]" % name
4009 if expanding:
4010 # for expanding, bound parameters or literal values will be
4011 # rendered per item
4012 return ret
4013
4014 # otherwise, for non-expanding "literal execute", apply
4015 # bind casts as determined by the datatype
4016 if bindparam_type is not None:
4017 type_impl = bindparam_type._unwrapped_dialect_impl(
4018 self.dialect
4019 )
4020 if type_impl.render_literal_cast:
4021 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4022 return ret
4023 elif self.state is CompilerState.COMPILING:
4024 ret = self.compilation_bindtemplate % {"name": name}
4025 else:
4026 ret = self.bindtemplate % {"name": name}
4027
4028 if (
4029 bindparam_type is not None
4030 and self.dialect._bind_typing_render_casts
4031 ):
4032 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4033 if type_impl.render_bind_cast:
4034 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4035
4036 return ret
4037
4038 def _dispatch_independent_ctes(self, stmt, kw):
4039 local_kw = kw.copy()
4040 local_kw.pop("cte_opts", None)
4041 for cte, opt in zip(
4042 stmt._independent_ctes, stmt._independent_ctes_opts
4043 ):
4044 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4045
4046 def visit_cte(
4047 self,
4048 cte: CTE,
4049 asfrom: bool = False,
4050 ashint: bool = False,
4051 fromhints: Optional[_FromHintsType] = None,
4052 visiting_cte: Optional[CTE] = None,
4053 from_linter: Optional[FromLinter] = None,
4054 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4055 **kwargs: Any,
4056 ) -> Optional[str]:
4057 self_ctes = self._init_cte_state()
4058 assert self_ctes is self.ctes
4059
4060 kwargs["visiting_cte"] = cte
4061
4062 cte_name = cte.name
4063
4064 if isinstance(cte_name, elements._truncated_label):
4065 cte_name = self._truncated_identifier("alias", cte_name)
4066
4067 is_new_cte = True
4068 embedded_in_current_named_cte = False
4069
4070 _reference_cte = cte._get_reference_cte()
4071
4072 nesting = cte.nesting or cte_opts.nesting
4073
4074 # check for CTE already encountered
4075 if _reference_cte in self.level_name_by_cte:
4076 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4077 _reference_cte
4078 ]
4079 assert _ == cte_name
4080
4081 cte_level_name = (cte_level, cte_name)
4082 existing_cte = self.ctes_by_level_name[cte_level_name]
4083
4084 # check if we are receiving it here with a specific
4085 # "nest_here" location; if so, move it to this location
4086
4087 if cte_opts.nesting:
4088 if existing_cte_opts.nesting:
4089 raise exc.CompileError(
4090 "CTE is stated as 'nest_here' in "
4091 "more than one location"
4092 )
4093
4094 old_level_name = (cte_level, cte_name)
4095 cte_level = len(self.stack) if nesting else 1
4096 cte_level_name = new_level_name = (cte_level, cte_name)
4097
4098 del self.ctes_by_level_name[old_level_name]
4099 self.ctes_by_level_name[new_level_name] = existing_cte
4100 self.level_name_by_cte[_reference_cte] = new_level_name + (
4101 cte_opts,
4102 )
4103
4104 else:
4105 cte_level = len(self.stack) if nesting else 1
4106 cte_level_name = (cte_level, cte_name)
4107
4108 if cte_level_name in self.ctes_by_level_name:
4109 existing_cte = self.ctes_by_level_name[cte_level_name]
4110 else:
4111 existing_cte = None
4112
4113 if existing_cte is not None:
4114 embedded_in_current_named_cte = visiting_cte is existing_cte
4115
4116 # we've generated a same-named CTE that we are enclosed in,
4117 # or this is the same CTE. just return the name.
4118 if cte is existing_cte._restates or cte is existing_cte:
4119 is_new_cte = False
4120 elif existing_cte is cte._restates:
4121 # we've generated a same-named CTE that is
4122 # enclosed in us - we take precedence, so
4123 # discard the text for the "inner".
4124 del self_ctes[existing_cte]
4125
4126 existing_cte_reference_cte = existing_cte._get_reference_cte()
4127
4128 assert existing_cte_reference_cte is _reference_cte
4129 assert existing_cte_reference_cte is existing_cte
4130
4131 del self.level_name_by_cte[existing_cte_reference_cte]
4132 else:
4133 if (
4134 # if the two CTEs have the same hash, which we expect
4135 # here means that one/both is an annotated of the other
4136 (hash(cte) == hash(existing_cte))
4137 # or...
4138 or (
4139 (
4140 # if they are clones, i.e. they came from the ORM
4141 # or some other visit method
4142 cte._is_clone_of is not None
4143 or existing_cte._is_clone_of is not None
4144 )
4145 # and are deep-copy identical
4146 and cte.compare(existing_cte)
4147 )
4148 ):
4149 # then consider these two CTEs the same
4150 is_new_cte = False
4151 else:
4152 # otherwise these are two CTEs that either will render
4153 # differently, or were indicated separately by the user,
4154 # with the same name
4155 raise exc.CompileError(
4156 "Multiple, unrelated CTEs found with "
4157 "the same name: %r" % cte_name
4158 )
4159
4160 if not asfrom and not is_new_cte:
4161 return None
4162
4163 if cte._cte_alias is not None:
4164 pre_alias_cte = cte._cte_alias
4165 cte_pre_alias_name = cte._cte_alias.name
4166 if isinstance(cte_pre_alias_name, elements._truncated_label):
4167 cte_pre_alias_name = self._truncated_identifier(
4168 "alias", cte_pre_alias_name
4169 )
4170 else:
4171 pre_alias_cte = cte
4172 cte_pre_alias_name = None
4173
4174 if is_new_cte:
4175 self.ctes_by_level_name[cte_level_name] = cte
4176 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4177 cte_opts,
4178 )
4179
4180 if pre_alias_cte not in self.ctes:
4181 self.visit_cte(pre_alias_cte, **kwargs)
4182
4183 if not cte_pre_alias_name and cte not in self_ctes:
4184 if cte.recursive:
4185 self.ctes_recursive = True
4186 text = self.preparer.format_alias(cte, cte_name)
4187 if cte.recursive or cte.element.name_cte_columns:
4188 col_source = cte.element
4189
4190 # TODO: can we get at the .columns_plus_names collection
4191 # that is already (or will be?) generated for the SELECT
4192 # rather than calling twice?
4193 recur_cols = [
4194 # TODO: proxy_name is not technically safe,
4195 # see test_cte->
4196 # test_with_recursive_no_name_currently_buggy. not
4197 # clear what should be done with such a case
4198 fallback_label_name or proxy_name
4199 for (
4200 _,
4201 proxy_name,
4202 fallback_label_name,
4203 c,
4204 repeated,
4205 ) in (col_source._generate_columns_plus_names(True))
4206 if not repeated
4207 ]
4208
4209 text += "(%s)" % (
4210 ", ".join(
4211 self.preparer.format_label_name(
4212 ident, anon_map=self.anon_map
4213 )
4214 for ident in recur_cols
4215 )
4216 )
4217
4218 assert kwargs.get("subquery", False) is False
4219
4220 if not self.stack:
4221 # toplevel, this is a stringify of the
4222 # cte directly. just compile the inner
4223 # the way alias() does.
4224 return cte.element._compiler_dispatch(
4225 self, asfrom=asfrom, **kwargs
4226 )
4227 else:
4228 prefixes = self._generate_prefixes(
4229 cte, cte._prefixes, **kwargs
4230 )
4231 inner = cte.element._compiler_dispatch(
4232 self, asfrom=True, **kwargs
4233 )
4234
4235 text += " AS %s\n(%s)" % (prefixes, inner)
4236
4237 if cte._suffixes:
4238 text += " " + self._generate_prefixes(
4239 cte, cte._suffixes, **kwargs
4240 )
4241
4242 self_ctes[cte] = text
4243
4244 if asfrom:
4245 if from_linter:
4246 from_linter.froms[cte._de_clone()] = cte_name
4247
4248 if not is_new_cte and embedded_in_current_named_cte:
4249 return self.preparer.format_alias(cte, cte_name)
4250
4251 if cte_pre_alias_name:
4252 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4253 if self.preparer._requires_quotes(cte_name):
4254 cte_name = self.preparer.quote(cte_name)
4255 text += self.get_render_as_alias_suffix(cte_name)
4256 return text # type: ignore[no-any-return]
4257 else:
4258 return self.preparer.format_alias(cte, cte_name)
4259
4260 return None
4261
4262 def visit_table_valued_alias(self, element, **kw):
4263 if element.joins_implicitly:
4264 kw["from_linter"] = None
4265 if element._is_lateral:
4266 return self.visit_lateral(element, **kw)
4267 else:
4268 return self.visit_alias(element, **kw)
4269
4270 def visit_table_valued_column(self, element, **kw):
4271 return self.visit_column(element, **kw)
4272
4273 def visit_alias(
4274 self,
4275 alias,
4276 asfrom=False,
4277 ashint=False,
4278 iscrud=False,
4279 fromhints=None,
4280 subquery=False,
4281 lateral=False,
4282 enclosing_alias=None,
4283 from_linter=None,
4284 **kwargs,
4285 ):
4286 if lateral:
4287 if "enclosing_lateral" not in kwargs:
4288 # if lateral is set and enclosing_lateral is not
4289 # present, we assume we are being called directly
4290 # from visit_lateral() and we need to set enclosing_lateral.
4291 assert alias._is_lateral
4292 kwargs["enclosing_lateral"] = alias
4293
4294 # for lateral objects, we track a second from_linter that is...
4295 # lateral! to the level above us.
4296 if (
4297 from_linter
4298 and "lateral_from_linter" not in kwargs
4299 and "enclosing_lateral" in kwargs
4300 ):
4301 kwargs["lateral_from_linter"] = from_linter
4302
4303 if enclosing_alias is not None and enclosing_alias.element is alias:
4304 inner = alias.element._compiler_dispatch(
4305 self,
4306 asfrom=asfrom,
4307 ashint=ashint,
4308 iscrud=iscrud,
4309 fromhints=fromhints,
4310 lateral=lateral,
4311 enclosing_alias=alias,
4312 **kwargs,
4313 )
4314 if subquery and (asfrom or lateral):
4315 inner = "(%s)" % (inner,)
4316 return inner
4317 else:
4318 kwargs["enclosing_alias"] = alias
4319
4320 if asfrom or ashint:
4321 if isinstance(alias.name, elements._truncated_label):
4322 alias_name = self._truncated_identifier("alias", alias.name)
4323 else:
4324 alias_name = alias.name
4325
4326 if ashint:
4327 return self.preparer.format_alias(alias, alias_name)
4328 elif asfrom:
4329 if from_linter:
4330 from_linter.froms[alias._de_clone()] = alias_name
4331
4332 inner = alias.element._compiler_dispatch(
4333 self, asfrom=True, lateral=lateral, **kwargs
4334 )
4335 if subquery:
4336 inner = "(%s)" % (inner,)
4337
4338 ret = inner + self.get_render_as_alias_suffix(
4339 self.preparer.format_alias(alias, alias_name)
4340 )
4341
4342 if alias._supports_derived_columns and alias._render_derived:
4343 ret += "(%s)" % (
4344 ", ".join(
4345 "%s%s"
4346 % (
4347 self.preparer.quote(col.name),
4348 (
4349 " %s"
4350 % self.dialect.type_compiler_instance.process(
4351 col.type, **kwargs
4352 )
4353 if alias._render_derived_w_types
4354 else ""
4355 ),
4356 )
4357 for col in alias.c
4358 )
4359 )
4360
4361 if fromhints and alias in fromhints:
4362 ret = self.format_from_hint_text(
4363 ret, alias, fromhints[alias], iscrud
4364 )
4365
4366 return ret
4367 else:
4368 # note we cancel the "subquery" flag here as well
4369 return alias.element._compiler_dispatch(
4370 self, lateral=lateral, **kwargs
4371 )
4372
4373 def visit_subquery(self, subquery, **kw):
4374 kw["subquery"] = True
4375 return self.visit_alias(subquery, **kw)
4376
4377 def visit_lateral(self, lateral_, **kw):
4378 kw["lateral"] = True
4379 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4380
4381 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4382 text = "%s TABLESAMPLE %s" % (
4383 self.visit_alias(tablesample, asfrom=True, **kw),
4384 tablesample._get_method()._compiler_dispatch(self, **kw),
4385 )
4386
4387 if tablesample.seed is not None:
4388 text += " REPEATABLE (%s)" % (
4389 tablesample.seed._compiler_dispatch(self, **kw)
4390 )
4391
4392 return text
4393
4394 def _render_values(self, element, **kw):
4395 kw.setdefault("literal_binds", element.literal_binds)
4396 tuples = ", ".join(
4397 self.process(
4398 elements.Tuple(
4399 types=element._column_types, *elem
4400 ).self_group(),
4401 **kw,
4402 )
4403 for chunk in element._data
4404 for elem in chunk
4405 )
4406 return f"VALUES {tuples}"
4407
4408 def visit_values(
4409 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4410 ):
4411
4412 if element._independent_ctes:
4413 self._dispatch_independent_ctes(element, kw)
4414
4415 v = self._render_values(element, **kw)
4416
4417 if element._unnamed:
4418 name = None
4419 elif isinstance(element.name, elements._truncated_label):
4420 name = self._truncated_identifier("values", element.name)
4421 else:
4422 name = element.name
4423
4424 if element._is_lateral:
4425 lateral = "LATERAL "
4426 else:
4427 lateral = ""
4428
4429 if asfrom:
4430 if from_linter:
4431 from_linter.froms[element._de_clone()] = (
4432 name if name is not None else "(unnamed VALUES element)"
4433 )
4434
4435 if visiting_cte is not None and visiting_cte.element is element:
4436 if element._is_lateral:
4437 raise exc.CompileError(
4438 "Can't use a LATERAL VALUES expression inside of a CTE"
4439 )
4440 elif name:
4441 kw["include_table"] = False
4442 v = "%s(%s)%s (%s)" % (
4443 lateral,
4444 v,
4445 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4446 (
4447 ", ".join(
4448 c._compiler_dispatch(self, **kw)
4449 for c in element.columns
4450 )
4451 ),
4452 )
4453 else:
4454 v = "%s(%s)" % (lateral, v)
4455 return v
4456
4457 def visit_scalar_values(self, element, **kw):
4458 return f"({self._render_values(element, **kw)})"
4459
4460 def get_render_as_alias_suffix(self, alias_name_text):
4461 return " AS " + alias_name_text
4462
4463 def _add_to_result_map(
4464 self,
4465 keyname: str,
4466 name: str,
4467 objects: Tuple[Any, ...],
4468 type_: TypeEngine[Any],
4469 ) -> None:
4470
4471 # note objects must be non-empty for cursor.py to handle the
4472 # collection properly
4473 assert objects
4474
4475 if keyname is None or keyname == "*":
4476 self._ordered_columns = False
4477 self._ad_hoc_textual = True
4478 if type_._is_tuple_type:
4479 raise exc.CompileError(
4480 "Most backends don't support SELECTing "
4481 "from a tuple() object. If this is an ORM query, "
4482 "consider using the Bundle object."
4483 )
4484 self._result_columns.append(
4485 ResultColumnsEntry(keyname, name, objects, type_)
4486 )
4487
4488 def _label_returning_column(
4489 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4490 ):
4491 """Render a column with necessary labels inside of a RETURNING clause.
4492
4493 This method is provided for individual dialects in place of calling
4494 the _label_select_column method directly, so that the two use cases
4495 of RETURNING vs. SELECT can be disambiguated going forward.
4496
4497 .. versionadded:: 1.4.21
4498
4499 """
4500 return self._label_select_column(
4501 None,
4502 column,
4503 populate_result_map,
4504 False,
4505 {} if column_clause_args is None else column_clause_args,
4506 **kw,
4507 )
4508
4509 def _label_select_column(
4510 self,
4511 select,
4512 column,
4513 populate_result_map,
4514 asfrom,
4515 column_clause_args,
4516 name=None,
4517 proxy_name=None,
4518 fallback_label_name=None,
4519 within_columns_clause=True,
4520 column_is_repeated=False,
4521 need_column_expressions=False,
4522 include_table=True,
4523 ):
4524 """produce labeled columns present in a select()."""
4525 impl = column.type.dialect_impl(self.dialect)
4526
4527 if impl._has_column_expression and (
4528 need_column_expressions or populate_result_map
4529 ):
4530 col_expr = impl.column_expression(column)
4531 else:
4532 col_expr = column
4533
4534 if populate_result_map:
4535 # pass an "add_to_result_map" callable into the compilation
4536 # of embedded columns. this collects information about the
4537 # column as it will be fetched in the result and is coordinated
4538 # with cursor.description when the query is executed.
4539 add_to_result_map = self._add_to_result_map
4540
4541 # if the SELECT statement told us this column is a repeat,
4542 # wrap the callable with one that prevents the addition of the
4543 # targets
4544 if column_is_repeated:
4545 _add_to_result_map = add_to_result_map
4546
4547 def add_to_result_map(keyname, name, objects, type_):
4548 _add_to_result_map(keyname, name, (keyname,), type_)
4549
4550 # if we redefined col_expr for type expressions, wrap the
4551 # callable with one that adds the original column to the targets
4552 elif col_expr is not column:
4553 _add_to_result_map = add_to_result_map
4554
4555 def add_to_result_map(keyname, name, objects, type_):
4556 _add_to_result_map(
4557 keyname, name, (column,) + objects, type_
4558 )
4559
4560 else:
4561 add_to_result_map = None
4562
4563 # this method is used by some of the dialects for RETURNING,
4564 # which has different inputs. _label_returning_column was added
4565 # as the better target for this now however for 1.4 we will keep
4566 # _label_select_column directly compatible with this use case.
4567 # these assertions right now set up the current expected inputs
4568 assert within_columns_clause, (
4569 "_label_select_column is only relevant within "
4570 "the columns clause of a SELECT or RETURNING"
4571 )
4572 if isinstance(column, elements.Label):
4573 if col_expr is not column:
4574 result_expr = _CompileLabel(
4575 col_expr, column.name, alt_names=(column.element,)
4576 )
4577 else:
4578 result_expr = col_expr
4579
4580 elif name:
4581 # here, _columns_plus_names has determined there's an explicit
4582 # label name we need to use. this is the default for
4583 # tablenames_plus_columnnames as well as when columns are being
4584 # deduplicated on name
4585
4586 assert (
4587 proxy_name is not None
4588 ), "proxy_name is required if 'name' is passed"
4589
4590 result_expr = _CompileLabel(
4591 col_expr,
4592 name,
4593 alt_names=(
4594 proxy_name,
4595 # this is a hack to allow legacy result column lookups
4596 # to work as they did before; this goes away in 2.0.
4597 # TODO: this only seems to be tested indirectly
4598 # via test/orm/test_deprecations.py. should be a
4599 # resultset test for this
4600 column._tq_label,
4601 ),
4602 )
4603 else:
4604 # determine here whether this column should be rendered in
4605 # a labelled context or not, as we were given no required label
4606 # name from the caller. Here we apply heuristics based on the kind
4607 # of SQL expression involved.
4608
4609 if col_expr is not column:
4610 # type-specific expression wrapping the given column,
4611 # so we render a label
4612 render_with_label = True
4613 elif isinstance(column, elements.ColumnClause):
4614 # table-bound column, we render its name as a label if we are
4615 # inside of a subquery only
4616 render_with_label = (
4617 asfrom
4618 and not column.is_literal
4619 and column.table is not None
4620 )
4621 elif isinstance(column, elements.TextClause):
4622 render_with_label = False
4623 elif isinstance(column, elements.UnaryExpression):
4624 # unary expression. notes added as of #12681
4625 #
4626 # By convention, the visit_unary() method
4627 # itself does not add an entry to the result map, and relies
4628 # upon either the inner expression creating a result map
4629 # entry, or if not, by creating a label here that produces
4630 # the result map entry. Where that happens is based on whether
4631 # or not the element immediately inside the unary is a
4632 # NamedColumn subclass or not.
4633 #
4634 # Now, this also impacts how the SELECT is written; if
4635 # we decide to generate a label here, we get the usual
4636 # "~(x+y) AS anon_1" thing in the columns clause. If we
4637 # don't, we don't get an AS at all, we get like
4638 # "~table.column".
4639 #
4640 # But here is the important thing as of modernish (like 1.4)
4641 # versions of SQLAlchemy - **whether or not the AS <label>
4642 # is present in the statement is not actually important**.
4643 # We target result columns **positionally** for a fully
4644 # compiled ``Select()`` object; before 1.4 we needed those
4645 # labels to match in cursor.description etc etc but now it
4646 # really doesn't matter.
4647 # So really, we could set render_with_label True in all cases.
4648 # Or we could just have visit_unary() populate the result map
4649 # in all cases.
4650 #
4651 # What we're doing here is strictly trying to not rock the
4652 # boat too much with when we do/don't render "AS label";
4653 # labels being present helps in the edge cases that we
4654 # "fall back" to named cursor.description matching, labels
4655 # not being present for columns keeps us from having awkward
4656 # phrases like "SELECT DISTINCT table.x AS x".
4657 render_with_label = (
4658 (
4659 # exception case to detect if we render "not boolean"
4660 # as "not <col>" for native boolean or "<col> = 1"
4661 # for non-native boolean. this is controlled by
4662 # visit_is_<true|false>_unary_operator
4663 column.operator
4664 in (operators.is_false, operators.is_true)
4665 and not self.dialect.supports_native_boolean
4666 )
4667 or column._wraps_unnamed_column()
4668 or asfrom
4669 )
4670 elif (
4671 # general class of expressions that don't have a SQL-column
4672 # addressable name. includes scalar selects, bind parameters,
4673 # SQL functions, others
4674 not isinstance(column, elements.NamedColumn)
4675 # deeper check that indicates there's no natural "name" to
4676 # this element, which accommodates for custom SQL constructs
4677 # that might have a ".name" attribute (but aren't SQL
4678 # functions) but are not implementing this more recently added
4679 # base class. in theory the "NamedColumn" check should be
4680 # enough, however here we seek to maintain legacy behaviors
4681 # as well.
4682 and column._non_anon_label is None
4683 ):
4684 render_with_label = True
4685 else:
4686 render_with_label = False
4687
4688 if render_with_label:
4689 if not fallback_label_name:
4690 # used by the RETURNING case right now. we generate it
4691 # here as 3rd party dialects may be referring to
4692 # _label_select_column method directly instead of the
4693 # just-added _label_returning_column method
4694 assert not column_is_repeated
4695 fallback_label_name = column._anon_name_label
4696
4697 fallback_label_name = (
4698 elements._truncated_label(fallback_label_name)
4699 if not isinstance(
4700 fallback_label_name, elements._truncated_label
4701 )
4702 else fallback_label_name
4703 )
4704
4705 result_expr = _CompileLabel(
4706 col_expr, fallback_label_name, alt_names=(proxy_name,)
4707 )
4708 else:
4709 result_expr = col_expr
4710
4711 column_clause_args.update(
4712 within_columns_clause=within_columns_clause,
4713 add_to_result_map=add_to_result_map,
4714 include_table=include_table,
4715 )
4716 return result_expr._compiler_dispatch(self, **column_clause_args)
4717
4718 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4719 hinttext = self.get_from_hint_text(table, hint)
4720 if hinttext:
4721 sqltext += " " + hinttext
4722 return sqltext
4723
4724 def get_select_hint_text(self, byfroms):
4725 return None
4726
4727 def get_from_hint_text(
4728 self, table: FromClause, text: Optional[str]
4729 ) -> Optional[str]:
4730 return None
4731
4732 def get_crud_hint_text(self, table, text):
4733 return None
4734
4735 def get_statement_hint_text(self, hint_texts):
4736 return " ".join(hint_texts)
4737
4738 _default_stack_entry: _CompilerStackEntry
4739
4740 if not typing.TYPE_CHECKING:
4741 _default_stack_entry = util.immutabledict(
4742 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4743 )
4744
4745 def _display_froms_for_select(
4746 self, select_stmt, asfrom, lateral=False, **kw
4747 ):
4748 # utility method to help external dialects
4749 # get the correct from list for a select.
4750 # specifically the oracle dialect needs this feature
4751 # right now.
4752 toplevel = not self.stack
4753 entry = self._default_stack_entry if toplevel else self.stack[-1]
4754
4755 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4756
4757 correlate_froms = entry["correlate_froms"]
4758 asfrom_froms = entry["asfrom_froms"]
4759
4760 if asfrom and not lateral:
4761 froms = compile_state._get_display_froms(
4762 explicit_correlate_froms=correlate_froms.difference(
4763 asfrom_froms
4764 ),
4765 implicit_correlate_froms=(),
4766 )
4767 else:
4768 froms = compile_state._get_display_froms(
4769 explicit_correlate_froms=correlate_froms,
4770 implicit_correlate_froms=asfrom_froms,
4771 )
4772 return froms
4773
4774 translate_select_structure: Any = None
4775 """if not ``None``, should be a callable which accepts ``(select_stmt,
4776 **kw)`` and returns a select object. this is used for structural changes
4777 mostly to accommodate for LIMIT/OFFSET schemes
4778
4779 """
4780
4781 def visit_select(
4782 self,
4783 select_stmt,
4784 asfrom=False,
4785 insert_into=False,
4786 fromhints=None,
4787 compound_index=None,
4788 select_wraps_for=None,
4789 lateral=False,
4790 from_linter=None,
4791 **kwargs,
4792 ):
4793 assert select_wraps_for is None, (
4794 "SQLAlchemy 1.4 requires use of "
4795 "the translate_select_structure hook for structural "
4796 "translations of SELECT objects"
4797 )
4798
4799 # initial setup of SELECT. the compile_state_factory may now
4800 # be creating a totally different SELECT from the one that was
4801 # passed in. for ORM use this will convert from an ORM-state
4802 # SELECT to a regular "Core" SELECT. other composed operations
4803 # such as computation of joins will be performed.
4804
4805 kwargs["within_columns_clause"] = False
4806
4807 compile_state = select_stmt._compile_state_factory(
4808 select_stmt, self, **kwargs
4809 )
4810 kwargs["ambiguous_table_name_map"] = (
4811 compile_state._ambiguous_table_name_map
4812 )
4813
4814 select_stmt = compile_state.statement
4815
4816 toplevel = not self.stack
4817
4818 if toplevel and not self.compile_state:
4819 self.compile_state = compile_state
4820
4821 is_embedded_select = compound_index is not None or insert_into
4822
4823 # translate step for Oracle, SQL Server which often need to
4824 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4825 # other conditions
4826 if self.translate_select_structure:
4827 new_select_stmt = self.translate_select_structure(
4828 select_stmt, asfrom=asfrom, **kwargs
4829 )
4830
4831 # if SELECT was restructured, maintain a link to the originals
4832 # and assemble a new compile state
4833 if new_select_stmt is not select_stmt:
4834 compile_state_wraps_for = compile_state
4835 select_wraps_for = select_stmt
4836 select_stmt = new_select_stmt
4837
4838 compile_state = select_stmt._compile_state_factory(
4839 select_stmt, self, **kwargs
4840 )
4841 select_stmt = compile_state.statement
4842
4843 entry = self._default_stack_entry if toplevel else self.stack[-1]
4844
4845 populate_result_map = need_column_expressions = (
4846 toplevel
4847 or entry.get("need_result_map_for_compound", False)
4848 or entry.get("need_result_map_for_nested", False)
4849 )
4850
4851 # indicates there is a CompoundSelect in play and we are not the
4852 # first select
4853 if compound_index:
4854 populate_result_map = False
4855
4856 # this was first proposed as part of #3372; however, it is not
4857 # reached in current tests and could possibly be an assertion
4858 # instead.
4859 if not populate_result_map and "add_to_result_map" in kwargs:
4860 del kwargs["add_to_result_map"]
4861
4862 froms = self._setup_select_stack(
4863 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4864 )
4865
4866 column_clause_args = kwargs.copy()
4867 column_clause_args.update(
4868 {"within_label_clause": False, "within_columns_clause": False}
4869 )
4870
4871 text = "SELECT " # we're off to a good start !
4872
4873 if select_stmt._hints:
4874 hint_text, byfrom = self._setup_select_hints(select_stmt)
4875 if hint_text:
4876 text += hint_text + " "
4877 else:
4878 byfrom = None
4879
4880 if select_stmt._independent_ctes:
4881 self._dispatch_independent_ctes(select_stmt, kwargs)
4882
4883 if select_stmt._prefixes:
4884 text += self._generate_prefixes(
4885 select_stmt, select_stmt._prefixes, **kwargs
4886 )
4887
4888 text += self.get_select_precolumns(select_stmt, **kwargs)
4889 # the actual list of columns to print in the SELECT column list.
4890 inner_columns = [
4891 c
4892 for c in [
4893 self._label_select_column(
4894 select_stmt,
4895 column,
4896 populate_result_map,
4897 asfrom,
4898 column_clause_args,
4899 name=name,
4900 proxy_name=proxy_name,
4901 fallback_label_name=fallback_label_name,
4902 column_is_repeated=repeated,
4903 need_column_expressions=need_column_expressions,
4904 )
4905 for (
4906 name,
4907 proxy_name,
4908 fallback_label_name,
4909 column,
4910 repeated,
4911 ) in compile_state.columns_plus_names
4912 ]
4913 if c is not None
4914 ]
4915
4916 if populate_result_map and select_wraps_for is not None:
4917 # if this select was generated from translate_select,
4918 # rewrite the targeted columns in the result map
4919
4920 translate = dict(
4921 zip(
4922 [
4923 name
4924 for (
4925 key,
4926 proxy_name,
4927 fallback_label_name,
4928 name,
4929 repeated,
4930 ) in compile_state.columns_plus_names
4931 ],
4932 [
4933 name
4934 for (
4935 key,
4936 proxy_name,
4937 fallback_label_name,
4938 name,
4939 repeated,
4940 ) in compile_state_wraps_for.columns_plus_names
4941 ],
4942 )
4943 )
4944
4945 self._result_columns = [
4946 ResultColumnsEntry(
4947 key, name, tuple(translate.get(o, o) for o in obj), type_
4948 )
4949 for key, name, obj, type_ in self._result_columns
4950 ]
4951
4952 text = self._compose_select_body(
4953 text,
4954 select_stmt,
4955 compile_state,
4956 inner_columns,
4957 froms,
4958 byfrom,
4959 toplevel,
4960 kwargs,
4961 )
4962
4963 if select_stmt._statement_hints:
4964 per_dialect = [
4965 ht
4966 for (dialect_name, ht) in select_stmt._statement_hints
4967 if dialect_name in ("*", self.dialect.name)
4968 ]
4969 if per_dialect:
4970 text += " " + self.get_statement_hint_text(per_dialect)
4971
4972 # In compound query, CTEs are shared at the compound level
4973 if self.ctes and (not is_embedded_select or toplevel):
4974 nesting_level = len(self.stack) if not toplevel else None
4975 text = self._render_cte_clause(nesting_level=nesting_level) + text
4976
4977 if select_stmt._suffixes:
4978 text += " " + self._generate_prefixes(
4979 select_stmt, select_stmt._suffixes, **kwargs
4980 )
4981
4982 self.stack.pop(-1)
4983
4984 return text
4985
4986 def _setup_select_hints(
4987 self, select: Select[Any]
4988 ) -> Tuple[str, _FromHintsType]:
4989 byfrom = {
4990 from_: hinttext
4991 % {"name": from_._compiler_dispatch(self, ashint=True)}
4992 for (from_, dialect), hinttext in select._hints.items()
4993 if dialect in ("*", self.dialect.name)
4994 }
4995 hint_text = self.get_select_hint_text(byfrom)
4996 return hint_text, byfrom
4997
4998 def _setup_select_stack(
4999 self, select, compile_state, entry, asfrom, lateral, compound_index
5000 ):
5001 correlate_froms = entry["correlate_froms"]
5002 asfrom_froms = entry["asfrom_froms"]
5003
5004 if compound_index == 0:
5005 entry["select_0"] = select
5006 elif compound_index:
5007 select_0 = entry["select_0"]
5008 numcols = len(select_0._all_selected_columns)
5009
5010 if len(compile_state.columns_plus_names) != numcols:
5011 raise exc.CompileError(
5012 "All selectables passed to "
5013 "CompoundSelect must have identical numbers of "
5014 "columns; select #%d has %d columns, select "
5015 "#%d has %d"
5016 % (
5017 1,
5018 numcols,
5019 compound_index + 1,
5020 len(select._all_selected_columns),
5021 )
5022 )
5023
5024 if asfrom and not lateral:
5025 froms = compile_state._get_display_froms(
5026 explicit_correlate_froms=correlate_froms.difference(
5027 asfrom_froms
5028 ),
5029 implicit_correlate_froms=(),
5030 )
5031 else:
5032 froms = compile_state._get_display_froms(
5033 explicit_correlate_froms=correlate_froms,
5034 implicit_correlate_froms=asfrom_froms,
5035 )
5036
5037 new_correlate_froms = set(_from_objects(*froms))
5038 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5039
5040 new_entry: _CompilerStackEntry = {
5041 "asfrom_froms": new_correlate_froms,
5042 "correlate_froms": all_correlate_froms,
5043 "selectable": select,
5044 "compile_state": compile_state,
5045 }
5046 self.stack.append(new_entry)
5047
5048 return froms
5049
5050 def _compose_select_body(
5051 self,
5052 text,
5053 select,
5054 compile_state,
5055 inner_columns,
5056 froms,
5057 byfrom,
5058 toplevel,
5059 kwargs,
5060 ):
5061 text += ", ".join(inner_columns)
5062
5063 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5064 from_linter = FromLinter({}, set())
5065 warn_linting = self.linting & WARN_LINTING
5066 if toplevel:
5067 self.from_linter = from_linter
5068 else:
5069 from_linter = None
5070 warn_linting = False
5071
5072 # adjust the whitespace for no inner columns, part of #9440,
5073 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5074 # "SELECT FROM ...".
5075 # while it would be better to have built the SELECT starting string
5076 # without trailing whitespace first, then add whitespace only if inner
5077 # cols were present, this breaks compatibility with various custom
5078 # compilation schemes that are currently being tested.
5079 if not inner_columns:
5080 text = text.rstrip()
5081
5082 if froms:
5083 text += " \nFROM "
5084
5085 if select._hints:
5086 text += ", ".join(
5087 [
5088 f._compiler_dispatch(
5089 self,
5090 asfrom=True,
5091 fromhints=byfrom,
5092 from_linter=from_linter,
5093 **kwargs,
5094 )
5095 for f in froms
5096 ]
5097 )
5098 else:
5099 text += ", ".join(
5100 [
5101 f._compiler_dispatch(
5102 self,
5103 asfrom=True,
5104 from_linter=from_linter,
5105 **kwargs,
5106 )
5107 for f in froms
5108 ]
5109 )
5110 else:
5111 text += self.default_from()
5112
5113 if select._where_criteria:
5114 t = self._generate_delimited_and_list(
5115 select._where_criteria, from_linter=from_linter, **kwargs
5116 )
5117 if t:
5118 text += " \nWHERE " + t
5119
5120 if warn_linting:
5121 assert from_linter is not None
5122 from_linter.warn()
5123
5124 if select._group_by_clauses:
5125 text += self.group_by_clause(select, **kwargs)
5126
5127 if select._having_criteria:
5128 t = self._generate_delimited_and_list(
5129 select._having_criteria, **kwargs
5130 )
5131 if t:
5132 text += " \nHAVING " + t
5133
5134 if select._order_by_clauses:
5135 text += self.order_by_clause(select, **kwargs)
5136
5137 if select._has_row_limiting_clause:
5138 text += self._row_limit_clause(select, **kwargs)
5139
5140 if select._for_update_arg is not None:
5141 text += self.for_update_clause(select, **kwargs)
5142
5143 return text
5144
5145 def _generate_prefixes(self, stmt, prefixes, **kw):
5146 clause = " ".join(
5147 prefix._compiler_dispatch(self, **kw)
5148 for prefix, dialect_name in prefixes
5149 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5150 )
5151 if clause:
5152 clause += " "
5153 return clause
5154
5155 def _render_cte_clause(
5156 self,
5157 nesting_level=None,
5158 include_following_stack=False,
5159 ):
5160 """
5161 include_following_stack
5162 Also render the nesting CTEs on the next stack. Useful for
5163 SQL structures like UNION or INSERT that can wrap SELECT
5164 statements containing nesting CTEs.
5165 """
5166 if not self.ctes:
5167 return ""
5168
5169 ctes: MutableMapping[CTE, str]
5170
5171 if nesting_level and nesting_level > 1:
5172 ctes = util.OrderedDict()
5173 for cte in list(self.ctes.keys()):
5174 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5175 cte._get_reference_cte()
5176 ]
5177 nesting = cte.nesting or cte_opts.nesting
5178 is_rendered_level = cte_level == nesting_level or (
5179 include_following_stack and cte_level == nesting_level + 1
5180 )
5181 if not (nesting and is_rendered_level):
5182 continue
5183
5184 ctes[cte] = self.ctes[cte]
5185
5186 else:
5187 ctes = self.ctes
5188
5189 if not ctes:
5190 return ""
5191 ctes_recursive = any([cte.recursive for cte in ctes])
5192
5193 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5194 cte_text += ", \n".join([txt for txt in ctes.values()])
5195 cte_text += "\n "
5196
5197 if nesting_level and nesting_level > 1:
5198 for cte in list(ctes.keys()):
5199 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5200 cte._get_reference_cte()
5201 ]
5202 del self.ctes[cte]
5203 del self.ctes_by_level_name[(cte_level, cte_name)]
5204 del self.level_name_by_cte[cte._get_reference_cte()]
5205
5206 return cte_text
5207
5208 def get_cte_preamble(self, recursive):
5209 if recursive:
5210 return "WITH RECURSIVE"
5211 else:
5212 return "WITH"
5213
5214 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5215 """Called when building a ``SELECT`` statement, position is just
5216 before column list.
5217
5218 """
5219 if select._distinct_on:
5220 util.warn_deprecated(
5221 "DISTINCT ON is currently supported only by the PostgreSQL "
5222 "dialect. Use of DISTINCT ON for other backends is currently "
5223 "silently ignored, however this usage is deprecated, and will "
5224 "raise CompileError in a future release for all backends "
5225 "that do not support this syntax.",
5226 version="1.4",
5227 )
5228 return "DISTINCT " if select._distinct else ""
5229
5230 def group_by_clause(self, select, **kw):
5231 """allow dialects to customize how GROUP BY is rendered."""
5232
5233 group_by = self._generate_delimited_list(
5234 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5235 )
5236 if group_by:
5237 return " GROUP BY " + group_by
5238 else:
5239 return ""
5240
5241 def order_by_clause(self, select, **kw):
5242 """allow dialects to customize how ORDER BY is rendered."""
5243
5244 order_by = self._generate_delimited_list(
5245 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5246 )
5247
5248 if order_by:
5249 return " ORDER BY " + order_by
5250 else:
5251 return ""
5252
5253 def for_update_clause(self, select, **kw):
5254 return " FOR UPDATE"
5255
5256 def returning_clause(
5257 self,
5258 stmt: UpdateBase,
5259 returning_cols: Sequence[_ColumnsClauseElement],
5260 *,
5261 populate_result_map: bool,
5262 **kw: Any,
5263 ) -> str:
5264 columns = [
5265 self._label_returning_column(
5266 stmt,
5267 column,
5268 populate_result_map,
5269 fallback_label_name=fallback_label_name,
5270 column_is_repeated=repeated,
5271 name=name,
5272 proxy_name=proxy_name,
5273 **kw,
5274 )
5275 for (
5276 name,
5277 proxy_name,
5278 fallback_label_name,
5279 column,
5280 repeated,
5281 ) in stmt._generate_columns_plus_names(
5282 True, cols=base._select_iterables(returning_cols)
5283 )
5284 ]
5285
5286 return "RETURNING " + ", ".join(columns)
5287
5288 def limit_clause(self, select, **kw):
5289 text = ""
5290 if select._limit_clause is not None:
5291 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5292 if select._offset_clause is not None:
5293 if select._limit_clause is None:
5294 text += "\n LIMIT -1"
5295 text += " OFFSET " + self.process(select._offset_clause, **kw)
5296 return text
5297
5298 def fetch_clause(
5299 self,
5300 select,
5301 fetch_clause=None,
5302 require_offset=False,
5303 use_literal_execute_for_simple_int=False,
5304 **kw,
5305 ):
5306 if fetch_clause is None:
5307 fetch_clause = select._fetch_clause
5308 fetch_clause_options = select._fetch_clause_options
5309 else:
5310 fetch_clause_options = {"percent": False, "with_ties": False}
5311
5312 text = ""
5313
5314 if select._offset_clause is not None:
5315 offset_clause = select._offset_clause
5316 if (
5317 use_literal_execute_for_simple_int
5318 and select._simple_int_clause(offset_clause)
5319 ):
5320 offset_clause = offset_clause.render_literal_execute()
5321 offset_str = self.process(offset_clause, **kw)
5322 text += "\n OFFSET %s ROWS" % offset_str
5323 elif require_offset:
5324 text += "\n OFFSET 0 ROWS"
5325
5326 if fetch_clause is not None:
5327 if (
5328 use_literal_execute_for_simple_int
5329 and select._simple_int_clause(fetch_clause)
5330 ):
5331 fetch_clause = fetch_clause.render_literal_execute()
5332 text += "\n FETCH FIRST %s%s ROWS %s" % (
5333 self.process(fetch_clause, **kw),
5334 " PERCENT" if fetch_clause_options["percent"] else "",
5335 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5336 )
5337 return text
5338
5339 def visit_table(
5340 self,
5341 table,
5342 asfrom=False,
5343 iscrud=False,
5344 ashint=False,
5345 fromhints=None,
5346 use_schema=True,
5347 from_linter=None,
5348 ambiguous_table_name_map=None,
5349 enclosing_alias=None,
5350 **kwargs,
5351 ):
5352 if from_linter:
5353 from_linter.froms[table] = table.fullname
5354
5355 if asfrom or ashint:
5356 effective_schema = self.preparer.schema_for_object(table)
5357
5358 if use_schema and effective_schema:
5359 ret = (
5360 self.preparer.quote_schema(effective_schema)
5361 + "."
5362 + self.preparer.quote(table.name)
5363 )
5364 else:
5365 ret = self.preparer.quote(table.name)
5366
5367 if (
5368 (
5369 enclosing_alias is None
5370 or enclosing_alias.element is not table
5371 )
5372 and not effective_schema
5373 and ambiguous_table_name_map
5374 and table.name in ambiguous_table_name_map
5375 ):
5376 anon_name = self._truncated_identifier(
5377 "alias", ambiguous_table_name_map[table.name]
5378 )
5379
5380 ret = ret + self.get_render_as_alias_suffix(
5381 self.preparer.format_alias(None, anon_name)
5382 )
5383
5384 if fromhints and table in fromhints:
5385 ret = self.format_from_hint_text(
5386 ret, table, fromhints[table], iscrud
5387 )
5388 return ret
5389 else:
5390 return ""
5391
5392 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5393 if from_linter:
5394 from_linter.edges.update(
5395 itertools.product(
5396 _de_clone(join.left._from_objects),
5397 _de_clone(join.right._from_objects),
5398 )
5399 )
5400
5401 if join.full:
5402 join_type = " FULL OUTER JOIN "
5403 elif join.isouter:
5404 join_type = " LEFT OUTER JOIN "
5405 else:
5406 join_type = " JOIN "
5407 return (
5408 join.left._compiler_dispatch(
5409 self, asfrom=True, from_linter=from_linter, **kwargs
5410 )
5411 + join_type
5412 + join.right._compiler_dispatch(
5413 self, asfrom=True, from_linter=from_linter, **kwargs
5414 )
5415 + " ON "
5416 # TODO: likely need asfrom=True here?
5417 + join.onclause._compiler_dispatch(
5418 self, from_linter=from_linter, **kwargs
5419 )
5420 )
5421
5422 def _setup_crud_hints(self, stmt, table_text):
5423 dialect_hints = {
5424 table: hint_text
5425 for (table, dialect), hint_text in stmt._hints.items()
5426 if dialect in ("*", self.dialect.name)
5427 }
5428 if stmt.table in dialect_hints:
5429 table_text = self.format_from_hint_text(
5430 table_text, stmt.table, dialect_hints[stmt.table], True
5431 )
5432 return dialect_hints, table_text
5433
5434 # within the realm of "insertmanyvalues sentinel columns",
5435 # these lookups match different kinds of Column() configurations
5436 # to specific backend capabilities. they are broken into two
5437 # lookups, one for autoincrement columns and the other for non
5438 # autoincrement columns
5439 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5440 {
5441 _SentinelDefaultCharacterization.CLIENTSIDE: (
5442 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5443 ),
5444 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5445 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5446 ),
5447 _SentinelDefaultCharacterization.NONE: (
5448 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5449 ),
5450 _SentinelDefaultCharacterization.IDENTITY: (
5451 InsertmanyvaluesSentinelOpts.IDENTITY
5452 ),
5453 _SentinelDefaultCharacterization.SEQUENCE: (
5454 InsertmanyvaluesSentinelOpts.SEQUENCE
5455 ),
5456 }
5457 )
5458 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5459 {
5460 _SentinelDefaultCharacterization.NONE: (
5461 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5462 ),
5463 }
5464 )
5465
5466 def _get_sentinel_column_for_table(
5467 self, table: Table
5468 ) -> Optional[Sequence[Column[Any]]]:
5469 """given a :class:`.Table`, return a usable sentinel column or
5470 columns for this dialect if any.
5471
5472 Return None if no sentinel columns could be identified, or raise an
5473 error if a column was marked as a sentinel explicitly but isn't
5474 compatible with this dialect.
5475
5476 """
5477
5478 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5479 sentinel_characteristics = table._sentinel_column_characteristics
5480
5481 sent_cols = sentinel_characteristics.columns
5482
5483 if sent_cols is None:
5484 return None
5485
5486 if sentinel_characteristics.is_autoinc:
5487 bitmask = self._sentinel_col_autoinc_lookup.get(
5488 sentinel_characteristics.default_characterization, 0
5489 )
5490 else:
5491 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5492 sentinel_characteristics.default_characterization, 0
5493 )
5494
5495 if sentinel_opts & bitmask:
5496 return sent_cols
5497
5498 if sentinel_characteristics.is_explicit:
5499 # a column was explicitly marked as insert_sentinel=True,
5500 # however it is not compatible with this dialect. they should
5501 # not indicate this column as a sentinel if they need to include
5502 # this dialect.
5503
5504 # TODO: do we want non-primary key explicit sentinel cols
5505 # that can gracefully degrade for some backends?
5506 # insert_sentinel="degrade" perhaps. not for the initial release.
5507 # I am hoping people are generally not dealing with this sentinel
5508 # business at all.
5509
5510 # if is_explicit is True, there will be only one sentinel column.
5511
5512 raise exc.InvalidRequestError(
5513 f"Column {sent_cols[0]} can't be explicitly "
5514 "marked as a sentinel column when using the "
5515 f"{self.dialect.name} dialect, as the "
5516 "particular type of default generation on this column is "
5517 "not currently compatible with this dialect's specific "
5518 f"INSERT..RETURNING syntax which can receive the "
5519 "server-generated value in "
5520 "a deterministic way. To remove this error, remove "
5521 "insert_sentinel=True from primary key autoincrement "
5522 "columns; these columns are automatically used as "
5523 "sentinels for supported dialects in any case."
5524 )
5525
5526 return None
5527
5528 def _deliver_insertmanyvalues_batches(
5529 self,
5530 statement: str,
5531 parameters: _DBAPIMultiExecuteParams,
5532 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5533 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5534 batch_size: int,
5535 sort_by_parameter_order: bool,
5536 schema_translate_map: Optional[SchemaTranslateMapType],
5537 ) -> Iterator[_InsertManyValuesBatch]:
5538 imv = self._insertmanyvalues
5539 assert imv is not None
5540
5541 if not imv.sentinel_param_keys:
5542 _sentinel_from_params = None
5543 else:
5544 _sentinel_from_params = operator.itemgetter(
5545 *imv.sentinel_param_keys
5546 )
5547
5548 lenparams = len(parameters)
5549 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5550 # backend doesn't support
5551 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5552 # at the moment this is basically SQL Server due to
5553 # not being able to use DEFAULT for identity column
5554 # just yield out that many single statements! still
5555 # faster than a whole connection.execute() call ;)
5556 #
5557 # note we still are taking advantage of the fact that we know
5558 # we are using RETURNING. The generalized approach of fetching
5559 # cursor.lastrowid etc. still goes through the more heavyweight
5560 # "ExecutionContext per statement" system as it isn't usable
5561 # as a generic "RETURNING" approach
5562 use_row_at_a_time = True
5563 downgraded = False
5564 elif not self.dialect.supports_multivalues_insert or (
5565 sort_by_parameter_order
5566 and self._result_columns
5567 and (
5568 imv.sentinel_columns is None
5569 or (
5570 imv.includes_upsert_behaviors
5571 and not imv.embed_values_counter
5572 )
5573 )
5574 ):
5575 # deterministic order was requested and the compiler could
5576 # not organize sentinel columns for this dialect/statement.
5577 # use row at a time. Note: if embed_values_counter is True,
5578 # the counter itself provides the ordering capability we need,
5579 # so we can use batch mode even with upsert behaviors.
5580 use_row_at_a_time = True
5581 downgraded = True
5582 elif (
5583 imv.has_upsert_bound_parameters
5584 and not imv.embed_values_counter
5585 and self._result_columns
5586 ):
5587 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
5588 # and parametrized bindparams in the SET clause, we must use
5589 # row-at-a-time. Batching multiple rows in a single statement
5590 # doesn't work when the SET clause contains bound parameters that
5591 # will receive different values per row, as there's only one SET
5592 # clause per statement. See issue #13130.
5593 use_row_at_a_time = True
5594 downgraded = True
5595 else:
5596 use_row_at_a_time = False
5597 downgraded = False
5598
5599 if use_row_at_a_time:
5600 for batchnum, (param, compiled_param) in enumerate(
5601 cast(
5602 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5603 zip(parameters, compiled_parameters),
5604 ),
5605 1,
5606 ):
5607 yield _InsertManyValuesBatch(
5608 statement,
5609 param,
5610 generic_setinputsizes,
5611 [param],
5612 (
5613 [_sentinel_from_params(compiled_param)]
5614 if _sentinel_from_params
5615 else []
5616 ),
5617 1,
5618 batchnum,
5619 lenparams,
5620 sort_by_parameter_order,
5621 downgraded,
5622 )
5623 return
5624
5625 if schema_translate_map:
5626 rst = functools.partial(
5627 self.preparer._render_schema_translates,
5628 schema_translate_map=schema_translate_map,
5629 )
5630 else:
5631 rst = None
5632
5633 imv_single_values_expr = imv.single_values_expr
5634 if rst:
5635 imv_single_values_expr = rst(imv_single_values_expr)
5636
5637 executemany_values = f"({imv_single_values_expr})"
5638 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5639
5640 # Use optional insertmanyvalues_max_parameters
5641 # to further shrink the batch size so that there are no more than
5642 # insertmanyvalues_max_parameters params.
5643 # Currently used by SQL Server, which limits statements to 2100 bound
5644 # parameters (actually 2099).
5645 max_params = self.dialect.insertmanyvalues_max_parameters
5646 if max_params:
5647 total_num_of_params = len(self.bind_names)
5648 num_params_per_batch = len(imv.insert_crud_params)
5649 num_params_outside_of_batch = (
5650 total_num_of_params - num_params_per_batch
5651 )
5652 batch_size = min(
5653 batch_size,
5654 (
5655 (max_params - num_params_outside_of_batch)
5656 // num_params_per_batch
5657 ),
5658 )
5659
5660 batches = cast("List[Sequence[Any]]", list(parameters))
5661 compiled_batches = cast(
5662 "List[Sequence[Any]]", list(compiled_parameters)
5663 )
5664
5665 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5666 batchnum = 1
5667 total_batches = lenparams // batch_size + (
5668 1 if lenparams % batch_size else 0
5669 )
5670
5671 insert_crud_params = imv.insert_crud_params
5672 assert insert_crud_params is not None
5673
5674 if rst:
5675 insert_crud_params = [
5676 (col, key, rst(expr), st)
5677 for col, key, expr, st in insert_crud_params
5678 ]
5679
5680 escaped_bind_names: Mapping[str, str]
5681 expand_pos_lower_index = expand_pos_upper_index = 0
5682
5683 if not self.positional:
5684 if self.escaped_bind_names:
5685 escaped_bind_names = self.escaped_bind_names
5686 else:
5687 escaped_bind_names = {}
5688
5689 all_keys = set(parameters[0])
5690
5691 def apply_placeholders(keys, formatted):
5692 for key in keys:
5693 key = escaped_bind_names.get(key, key)
5694 formatted = formatted.replace(
5695 self.bindtemplate % {"name": key},
5696 self.bindtemplate
5697 % {"name": f"{key}__EXECMANY_INDEX__"},
5698 )
5699 return formatted
5700
5701 if imv.embed_values_counter:
5702 imv_values_counter = ", _IMV_VALUES_COUNTER"
5703 else:
5704 imv_values_counter = ""
5705 formatted_values_clause = f"""({', '.join(
5706 apply_placeholders(bind_keys, formatted)
5707 for _, _, formatted, bind_keys in insert_crud_params
5708 )}{imv_values_counter})"""
5709
5710 keys_to_replace = all_keys.intersection(
5711 escaped_bind_names.get(key, key)
5712 for _, _, _, bind_keys in insert_crud_params
5713 for key in bind_keys
5714 )
5715 base_parameters = {
5716 key: parameters[0][key]
5717 for key in all_keys.difference(keys_to_replace)
5718 }
5719
5720 executemany_values_w_comma = ""
5721 else:
5722 formatted_values_clause = ""
5723 keys_to_replace = set()
5724 base_parameters = {}
5725
5726 if imv.embed_values_counter:
5727 executemany_values_w_comma = (
5728 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5729 )
5730 else:
5731 executemany_values_w_comma = f"({imv_single_values_expr}), "
5732
5733 all_names_we_will_expand: Set[str] = set()
5734 for elem in imv.insert_crud_params:
5735 all_names_we_will_expand.update(elem[3])
5736
5737 # get the start and end position in a particular list
5738 # of parameters where we will be doing the "expanding".
5739 # statements can have params on either side or both sides,
5740 # given RETURNING and CTEs
5741 if all_names_we_will_expand:
5742 positiontup = self.positiontup
5743 assert positiontup is not None
5744
5745 all_expand_positions = {
5746 idx
5747 for idx, name in enumerate(positiontup)
5748 if name in all_names_we_will_expand
5749 }
5750 expand_pos_lower_index = min(all_expand_positions)
5751 expand_pos_upper_index = max(all_expand_positions) + 1
5752 assert (
5753 len(all_expand_positions)
5754 == expand_pos_upper_index - expand_pos_lower_index
5755 )
5756
5757 if self._numeric_binds:
5758 escaped = re.escape(self._numeric_binds_identifier_char)
5759 executemany_values_w_comma = re.sub(
5760 rf"{escaped}\d+", "%s", executemany_values_w_comma
5761 )
5762
5763 while batches:
5764 batch = batches[0:batch_size]
5765 compiled_batch = compiled_batches[0:batch_size]
5766
5767 batches[0:batch_size] = []
5768 compiled_batches[0:batch_size] = []
5769
5770 if batches:
5771 current_batch_size = batch_size
5772 else:
5773 current_batch_size = len(batch)
5774
5775 if generic_setinputsizes:
5776 # if setinputsizes is present, expand this collection to
5777 # suit the batch length as well
5778 # currently this will be mssql+pyodbc for internal dialects
5779 processed_setinputsizes = [
5780 (new_key, len_, typ)
5781 for new_key, len_, typ in (
5782 (f"{key}_{index}", len_, typ)
5783 for index in range(current_batch_size)
5784 for key, len_, typ in generic_setinputsizes
5785 )
5786 ]
5787
5788 replaced_parameters: Any
5789 if self.positional:
5790 num_ins_params = imv.num_positional_params_counted
5791
5792 batch_iterator: Iterable[Sequence[Any]]
5793 extra_params_left: Sequence[Any]
5794 extra_params_right: Sequence[Any]
5795
5796 if num_ins_params == len(batch[0]):
5797 extra_params_left = extra_params_right = ()
5798 batch_iterator = batch
5799 else:
5800 extra_params_left = batch[0][:expand_pos_lower_index]
5801 extra_params_right = batch[0][expand_pos_upper_index:]
5802 batch_iterator = (
5803 b[expand_pos_lower_index:expand_pos_upper_index]
5804 for b in batch
5805 )
5806
5807 if imv.embed_values_counter:
5808 expanded_values_string = (
5809 "".join(
5810 executemany_values_w_comma.replace(
5811 "_IMV_VALUES_COUNTER", str(i)
5812 )
5813 for i, _ in enumerate(batch)
5814 )
5815 )[:-2]
5816 else:
5817 expanded_values_string = (
5818 (executemany_values_w_comma * current_batch_size)
5819 )[:-2]
5820
5821 if self._numeric_binds and num_ins_params > 0:
5822 # numeric will always number the parameters inside of
5823 # VALUES (and thus order self.positiontup) to be higher
5824 # than non-VALUES parameters, no matter where in the
5825 # statement those non-VALUES parameters appear (this is
5826 # ensured in _process_numeric by numbering first all
5827 # params that are not in _values_bindparam)
5828 # therefore all extra params are always
5829 # on the left side and numbered lower than the VALUES
5830 # parameters
5831 assert not extra_params_right
5832
5833 start = expand_pos_lower_index + 1
5834 end = num_ins_params * (current_batch_size) + start
5835
5836 # need to format here, since statement may contain
5837 # unescaped %, while values_string contains just (%s, %s)
5838 positions = tuple(
5839 f"{self._numeric_binds_identifier_char}{i}"
5840 for i in range(start, end)
5841 )
5842 expanded_values_string = expanded_values_string % positions
5843
5844 replaced_statement = statement.replace(
5845 "__EXECMANY_TOKEN__", expanded_values_string
5846 )
5847
5848 replaced_parameters = tuple(
5849 itertools.chain.from_iterable(batch_iterator)
5850 )
5851
5852 replaced_parameters = (
5853 extra_params_left
5854 + replaced_parameters
5855 + extra_params_right
5856 )
5857
5858 else:
5859 replaced_values_clauses = []
5860 replaced_parameters = base_parameters.copy()
5861
5862 for i, param in enumerate(batch):
5863 fmv = formatted_values_clause.replace(
5864 "EXECMANY_INDEX__", str(i)
5865 )
5866 if imv.embed_values_counter:
5867 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5868
5869 replaced_values_clauses.append(fmv)
5870 replaced_parameters.update(
5871 {f"{key}__{i}": param[key] for key in keys_to_replace}
5872 )
5873
5874 replaced_statement = statement.replace(
5875 "__EXECMANY_TOKEN__",
5876 ", ".join(replaced_values_clauses),
5877 )
5878
5879 yield _InsertManyValuesBatch(
5880 replaced_statement,
5881 replaced_parameters,
5882 processed_setinputsizes,
5883 batch,
5884 (
5885 [_sentinel_from_params(cb) for cb in compiled_batch]
5886 if _sentinel_from_params
5887 else []
5888 ),
5889 current_batch_size,
5890 batchnum,
5891 total_batches,
5892 sort_by_parameter_order,
5893 False,
5894 )
5895 batchnum += 1
5896
5897 def visit_insert(
5898 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5899 ):
5900 compile_state = insert_stmt._compile_state_factory(
5901 insert_stmt, self, **kw
5902 )
5903 insert_stmt = compile_state.statement
5904
5905 if visiting_cte is not None:
5906 kw["visiting_cte"] = visiting_cte
5907 toplevel = False
5908 else:
5909 toplevel = not self.stack
5910
5911 if toplevel:
5912 self.isinsert = True
5913 if not self.dml_compile_state:
5914 self.dml_compile_state = compile_state
5915 if not self.compile_state:
5916 self.compile_state = compile_state
5917
5918 self.stack.append(
5919 {
5920 "correlate_froms": set(),
5921 "asfrom_froms": set(),
5922 "selectable": insert_stmt,
5923 }
5924 )
5925
5926 counted_bindparam = 0
5927
5928 # reset any incoming "visited_bindparam" collection
5929 visited_bindparam = None
5930
5931 # for positional, insertmanyvalues needs to know how many
5932 # bound parameters are in the VALUES sequence; there's no simple
5933 # rule because default expressions etc. can have zero or more
5934 # params inside them. After multiple attempts to figure this out,
5935 # this very simplistic "count after" works and is
5936 # likely the least amount of callcounts, though looks clumsy
5937 if self.positional and visiting_cte is None:
5938 # if we are inside a CTE, don't count parameters
5939 # here since they won't be for insertmanyvalues. keep
5940 # visited_bindparam at None so no counting happens.
5941 # see #9173
5942 visited_bindparam = []
5943
5944 crud_params_struct = crud._get_crud_params(
5945 self,
5946 insert_stmt,
5947 compile_state,
5948 toplevel,
5949 visited_bindparam=visited_bindparam,
5950 **kw,
5951 )
5952
5953 if self.positional and visited_bindparam is not None:
5954 counted_bindparam = len(visited_bindparam)
5955 if self._numeric_binds:
5956 if self._values_bindparam is not None:
5957 self._values_bindparam += visited_bindparam
5958 else:
5959 self._values_bindparam = visited_bindparam
5960
5961 crud_params_single = crud_params_struct.single_params
5962
5963 if (
5964 not crud_params_single
5965 and not self.dialect.supports_default_values
5966 and not self.dialect.supports_default_metavalue
5967 and not self.dialect.supports_empty_insert
5968 ):
5969 raise exc.CompileError(
5970 "The '%s' dialect with current database "
5971 "version settings does not support empty "
5972 "inserts." % self.dialect.name
5973 )
5974
5975 if compile_state._has_multi_parameters:
5976 if not self.dialect.supports_multivalues_insert:
5977 raise exc.CompileError(
5978 "The '%s' dialect with current database "
5979 "version settings does not support "
5980 "in-place multirow inserts." % self.dialect.name
5981 )
5982 elif (
5983 self.implicit_returning or insert_stmt._returning
5984 ) and insert_stmt._sort_by_parameter_order:
5985 raise exc.CompileError(
5986 "RETURNING cannot be deterministically sorted when "
5987 "using an INSERT which includes multi-row values()."
5988 )
5989 crud_params_single = crud_params_struct.single_params
5990 else:
5991 crud_params_single = crud_params_struct.single_params
5992
5993 preparer = self.preparer
5994 supports_default_values = self.dialect.supports_default_values
5995
5996 text = "INSERT "
5997
5998 if insert_stmt._prefixes:
5999 text += self._generate_prefixes(
6000 insert_stmt, insert_stmt._prefixes, **kw
6001 )
6002
6003 text += "INTO "
6004 table_text = preparer.format_table(insert_stmt.table)
6005
6006 if insert_stmt._hints:
6007 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6008
6009 if insert_stmt._independent_ctes:
6010 self._dispatch_independent_ctes(insert_stmt, kw)
6011
6012 text += table_text
6013
6014 if crud_params_single or not supports_default_values:
6015 text += " (%s)" % ", ".join(
6016 [expr for _, expr, _, _ in crud_params_single]
6017 )
6018
6019 # look for insertmanyvalues attributes that would have been configured
6020 # by crud.py as it scanned through the columns to be part of the
6021 # INSERT
6022 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6023 named_sentinel_params: Optional[Sequence[str]] = None
6024 add_sentinel_cols = None
6025 implicit_sentinel = False
6026
6027 returning_cols = self.implicit_returning or insert_stmt._returning
6028 if returning_cols:
6029 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6030 if add_sentinel_cols is not None:
6031 assert use_insertmanyvalues
6032
6033 # search for the sentinel column explicitly present
6034 # in the INSERT columns list, and additionally check that
6035 # this column has a bound parameter name set up that's in the
6036 # parameter list. If both of these cases are present, it means
6037 # we will have a client side value for the sentinel in each
6038 # parameter set.
6039
6040 _params_by_col = {
6041 col: param_names
6042 for col, _, _, param_names in crud_params_single
6043 }
6044 named_sentinel_params = []
6045 for _add_sentinel_col in add_sentinel_cols:
6046 if _add_sentinel_col not in _params_by_col:
6047 named_sentinel_params = None
6048 break
6049 param_name = self._within_exec_param_key_getter(
6050 _add_sentinel_col
6051 )
6052 if param_name not in _params_by_col[_add_sentinel_col]:
6053 named_sentinel_params = None
6054 break
6055 named_sentinel_params.append(param_name)
6056
6057 if named_sentinel_params is None:
6058 # if we are not going to have a client side value for
6059 # the sentinel in the parameter set, that means it's
6060 # an autoincrement, an IDENTITY, or a server-side SQL
6061 # expression like nextval('seqname'). So this is
6062 # an "implicit" sentinel; we will look for it in
6063 # RETURNING
6064 # only, and then sort on it. For this case on PG,
6065 # SQL Server we have to use a special INSERT form
6066 # that guarantees the server side function lines up with
6067 # the entries in the VALUES.
6068 if (
6069 self.dialect.insertmanyvalues_implicit_sentinel
6070 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6071 ):
6072 implicit_sentinel = True
6073 else:
6074 # here, we are not using a sentinel at all
6075 # and we are likely the SQLite dialect.
6076 # The first add_sentinel_col that we have should not
6077 # be marked as "insert_sentinel=True". if it was,
6078 # an error should have been raised in
6079 # _get_sentinel_column_for_table.
6080 assert not add_sentinel_cols[0]._insert_sentinel, (
6081 "sentinel selection rules should have prevented "
6082 "us from getting here for this dialect"
6083 )
6084
6085 # always put the sentinel columns last. even if they are
6086 # in the returning list already, they will be there twice
6087 # then.
6088 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6089
6090 returning_clause = self.returning_clause(
6091 insert_stmt,
6092 returning_cols,
6093 populate_result_map=toplevel,
6094 )
6095
6096 if self.returning_precedes_values:
6097 text += " " + returning_clause
6098
6099 else:
6100 returning_clause = None
6101
6102 if insert_stmt.select is not None:
6103 # placed here by crud.py
6104 select_text = self.process(
6105 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6106 )
6107
6108 if self.ctes and self.dialect.cte_follows_insert:
6109 nesting_level = len(self.stack) if not toplevel else None
6110 text += " %s%s" % (
6111 self._render_cte_clause(
6112 nesting_level=nesting_level,
6113 include_following_stack=True,
6114 ),
6115 select_text,
6116 )
6117 else:
6118 text += " %s" % select_text
6119 elif not crud_params_single and supports_default_values:
6120 text += " DEFAULT VALUES"
6121 if use_insertmanyvalues:
6122 self._insertmanyvalues = _InsertManyValues(
6123 True,
6124 self.dialect.default_metavalue_token,
6125 crud_params_single,
6126 counted_bindparam,
6127 sort_by_parameter_order=(
6128 insert_stmt._sort_by_parameter_order
6129 ),
6130 includes_upsert_behaviors=(
6131 insert_stmt._post_values_clause is not None
6132 ),
6133 sentinel_columns=add_sentinel_cols,
6134 num_sentinel_columns=(
6135 len(add_sentinel_cols) if add_sentinel_cols else 0
6136 ),
6137 implicit_sentinel=implicit_sentinel,
6138 )
6139 elif compile_state._has_multi_parameters:
6140 text += " VALUES %s" % (
6141 ", ".join(
6142 "(%s)"
6143 % (", ".join(value for _, _, value, _ in crud_param_set))
6144 for crud_param_set in crud_params_struct.all_multi_params
6145 ),
6146 )
6147 elif use_insertmanyvalues:
6148 if (
6149 implicit_sentinel
6150 and (
6151 self.dialect.insertmanyvalues_implicit_sentinel
6152 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6153 )
6154 # this is checking if we have
6155 # INSERT INTO table (id) VALUES (DEFAULT).
6156 and not (crud_params_struct.is_default_metavalue_only)
6157 ):
6158 # if we have a sentinel column that is server generated,
6159 # then for selected backends render the VALUES list as a
6160 # subquery. This is the orderable form supported by
6161 # PostgreSQL and in fewer cases SQL Server
6162 embed_sentinel_value = True
6163
6164 render_bind_casts = (
6165 self.dialect.insertmanyvalues_implicit_sentinel
6166 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6167 )
6168
6169 add_sentinel_set = add_sentinel_cols or ()
6170
6171 insert_single_values_expr = ", ".join(
6172 [
6173 value
6174 for col, _, value, _ in crud_params_single
6175 if col not in add_sentinel_set
6176 ]
6177 )
6178
6179 colnames = ", ".join(
6180 f"p{i}"
6181 for i, cp in enumerate(crud_params_single)
6182 if cp[0] not in add_sentinel_set
6183 )
6184
6185 if render_bind_casts:
6186 # render casts for the SELECT list. For PG, we are
6187 # already rendering bind casts in the parameter list,
6188 # selectively for the more "tricky" types like ARRAY.
6189 # however, even for the "easy" types, if the parameter
6190 # is NULL for every entry, PG gives up and says
6191 # "it must be TEXT", which fails for other easy types
6192 # like ints. So we cast on this side too.
6193 colnames_w_cast = ", ".join(
6194 (
6195 self.render_bind_cast(
6196 col.type,
6197 col.type._unwrapped_dialect_impl(self.dialect),
6198 f"p{i}",
6199 )
6200 if col not in add_sentinel_set
6201 else expr
6202 )
6203 for i, (col, _, expr, _) in enumerate(
6204 crud_params_single
6205 )
6206 )
6207 else:
6208 colnames_w_cast = ", ".join(
6209 (f"p{i}" if col not in add_sentinel_set else expr)
6210 for i, (col, _, expr, _) in enumerate(
6211 crud_params_single
6212 )
6213 )
6214
6215 insert_crud_params = [
6216 elem
6217 for elem in crud_params_single
6218 if elem[0] not in add_sentinel_set
6219 ]
6220
6221 text += (
6222 f" SELECT {colnames_w_cast} FROM "
6223 f"(VALUES ({insert_single_values_expr})) "
6224 f"AS imp_sen({colnames}, sen_counter) "
6225 "ORDER BY sen_counter"
6226 )
6227
6228 else:
6229 # otherwise, if no sentinel or backend doesn't support
6230 # orderable subquery form, use a plain VALUES list
6231 embed_sentinel_value = False
6232 insert_crud_params = crud_params_single
6233 insert_single_values_expr = ", ".join(
6234 [value for _, _, value, _ in crud_params_single]
6235 )
6236
6237 text += f" VALUES ({insert_single_values_expr})"
6238
6239 self._insertmanyvalues = _InsertManyValues(
6240 is_default_expr=False,
6241 single_values_expr=insert_single_values_expr,
6242 insert_crud_params=insert_crud_params,
6243 num_positional_params_counted=counted_bindparam,
6244 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6245 includes_upsert_behaviors=(
6246 insert_stmt._post_values_clause is not None
6247 ),
6248 sentinel_columns=add_sentinel_cols,
6249 num_sentinel_columns=(
6250 len(add_sentinel_cols) if add_sentinel_cols else 0
6251 ),
6252 sentinel_param_keys=named_sentinel_params,
6253 implicit_sentinel=implicit_sentinel,
6254 embed_values_counter=embed_sentinel_value,
6255 )
6256
6257 else:
6258 insert_single_values_expr = ", ".join(
6259 [value for _, _, value, _ in crud_params_single]
6260 )
6261
6262 text += f" VALUES ({insert_single_values_expr})"
6263
6264 if insert_stmt._post_values_clause is not None:
6265 post_values_clause = self.process(
6266 insert_stmt._post_values_clause, **kw
6267 )
6268 if post_values_clause:
6269 text += " " + post_values_clause
6270
6271 if returning_clause and not self.returning_precedes_values:
6272 text += " " + returning_clause
6273
6274 if self.ctes and not self.dialect.cte_follows_insert:
6275 nesting_level = len(self.stack) if not toplevel else None
6276 text = (
6277 self._render_cte_clause(
6278 nesting_level=nesting_level,
6279 include_following_stack=True,
6280 )
6281 + text
6282 )
6283
6284 self.stack.pop(-1)
6285
6286 return text
6287
6288 def update_limit_clause(self, update_stmt):
6289 """Provide a hook for MySQL to add LIMIT to the UPDATE"""
6290 return None
6291
6292 def delete_limit_clause(self, delete_stmt):
6293 """Provide a hook for MySQL to add LIMIT to the DELETE"""
6294 return None
6295
6296 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6297 """Provide a hook to override the initial table clause
6298 in an UPDATE statement.
6299
6300 MySQL overrides this.
6301
6302 """
6303 kw["asfrom"] = True
6304 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6305
6306 def update_from_clause(
6307 self, update_stmt, from_table, extra_froms, from_hints, **kw
6308 ):
6309 """Provide a hook to override the generation of an
6310 UPDATE..FROM clause.
6311
6312 MySQL and MSSQL override this.
6313
6314 """
6315 raise NotImplementedError(
6316 "This backend does not support multiple-table "
6317 "criteria within UPDATE"
6318 )
6319
6320 def visit_update(
6321 self,
6322 update_stmt: Update,
6323 visiting_cte: Optional[CTE] = None,
6324 **kw: Any,
6325 ) -> str:
6326 compile_state = update_stmt._compile_state_factory(
6327 update_stmt, self, **kw
6328 )
6329 if TYPE_CHECKING:
6330 assert isinstance(compile_state, UpdateDMLState)
6331 update_stmt = compile_state.statement # type: ignore[assignment]
6332
6333 if visiting_cte is not None:
6334 kw["visiting_cte"] = visiting_cte
6335 toplevel = False
6336 else:
6337 toplevel = not self.stack
6338
6339 if toplevel:
6340 self.isupdate = True
6341 if not self.dml_compile_state:
6342 self.dml_compile_state = compile_state
6343 if not self.compile_state:
6344 self.compile_state = compile_state
6345
6346 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6347 from_linter = FromLinter({}, set())
6348 warn_linting = self.linting & WARN_LINTING
6349 if toplevel:
6350 self.from_linter = from_linter
6351 else:
6352 from_linter = None
6353 warn_linting = False
6354
6355 extra_froms = compile_state._extra_froms
6356 is_multitable = bool(extra_froms)
6357
6358 if is_multitable:
6359 # main table might be a JOIN
6360 main_froms = set(_from_objects(update_stmt.table))
6361 render_extra_froms = [
6362 f for f in extra_froms if f not in main_froms
6363 ]
6364 correlate_froms = main_froms.union(extra_froms)
6365 else:
6366 render_extra_froms = []
6367 correlate_froms = {update_stmt.table}
6368
6369 self.stack.append(
6370 {
6371 "correlate_froms": correlate_froms,
6372 "asfrom_froms": correlate_froms,
6373 "selectable": update_stmt,
6374 }
6375 )
6376
6377 text = "UPDATE "
6378
6379 if update_stmt._prefixes:
6380 text += self._generate_prefixes(
6381 update_stmt, update_stmt._prefixes, **kw
6382 )
6383
6384 table_text = self.update_tables_clause(
6385 update_stmt,
6386 update_stmt.table,
6387 render_extra_froms,
6388 from_linter=from_linter,
6389 **kw,
6390 )
6391 crud_params_struct = crud._get_crud_params(
6392 self, update_stmt, compile_state, toplevel, **kw
6393 )
6394 crud_params = crud_params_struct.single_params
6395
6396 if update_stmt._hints:
6397 dialect_hints, table_text = self._setup_crud_hints(
6398 update_stmt, table_text
6399 )
6400 else:
6401 dialect_hints = None
6402
6403 if update_stmt._independent_ctes:
6404 self._dispatch_independent_ctes(update_stmt, kw)
6405
6406 text += table_text
6407
6408 text += " SET "
6409 text += ", ".join(
6410 expr + "=" + value
6411 for _, expr, value, _ in cast(
6412 "List[Tuple[Any, str, str, Any]]", crud_params
6413 )
6414 )
6415
6416 if self.implicit_returning or update_stmt._returning:
6417 if self.returning_precedes_values:
6418 text += " " + self.returning_clause(
6419 update_stmt,
6420 self.implicit_returning or update_stmt._returning,
6421 populate_result_map=toplevel,
6422 )
6423
6424 if extra_froms:
6425 extra_from_text = self.update_from_clause(
6426 update_stmt,
6427 update_stmt.table,
6428 render_extra_froms,
6429 dialect_hints,
6430 from_linter=from_linter,
6431 **kw,
6432 )
6433 if extra_from_text:
6434 text += " " + extra_from_text
6435
6436 if update_stmt._where_criteria:
6437 t = self._generate_delimited_and_list(
6438 update_stmt._where_criteria, from_linter=from_linter, **kw
6439 )
6440 if t:
6441 text += " WHERE " + t
6442
6443 limit_clause = self.update_limit_clause(update_stmt)
6444 if limit_clause:
6445 text += " " + limit_clause
6446
6447 if (
6448 self.implicit_returning or update_stmt._returning
6449 ) and not self.returning_precedes_values:
6450 text += " " + self.returning_clause(
6451 update_stmt,
6452 self.implicit_returning or update_stmt._returning,
6453 populate_result_map=toplevel,
6454 )
6455
6456 if self.ctes:
6457 nesting_level = len(self.stack) if not toplevel else None
6458 text = self._render_cte_clause(nesting_level=nesting_level) + text
6459
6460 if warn_linting:
6461 assert from_linter is not None
6462 from_linter.warn(stmt_type="UPDATE")
6463
6464 self.stack.pop(-1)
6465
6466 return text # type: ignore[no-any-return]
6467
6468 def delete_extra_from_clause(
6469 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6470 ):
6471 """Provide a hook to override the generation of an
6472 DELETE..FROM clause.
6473
6474 This can be used to implement DELETE..USING for example.
6475
6476 MySQL and MSSQL override this.
6477
6478 """
6479 raise NotImplementedError(
6480 "This backend does not support multiple-table "
6481 "criteria within DELETE"
6482 )
6483
6484 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6485 return from_table._compiler_dispatch(
6486 self, asfrom=True, iscrud=True, **kw
6487 )
6488
6489 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6490 compile_state = delete_stmt._compile_state_factory(
6491 delete_stmt, self, **kw
6492 )
6493 delete_stmt = compile_state.statement
6494
6495 if visiting_cte is not None:
6496 kw["visiting_cte"] = visiting_cte
6497 toplevel = False
6498 else:
6499 toplevel = not self.stack
6500
6501 if toplevel:
6502 self.isdelete = True
6503 if not self.dml_compile_state:
6504 self.dml_compile_state = compile_state
6505 if not self.compile_state:
6506 self.compile_state = compile_state
6507
6508 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6509 from_linter = FromLinter({}, set())
6510 warn_linting = self.linting & WARN_LINTING
6511 if toplevel:
6512 self.from_linter = from_linter
6513 else:
6514 from_linter = None
6515 warn_linting = False
6516
6517 extra_froms = compile_state._extra_froms
6518
6519 correlate_froms = {delete_stmt.table}.union(extra_froms)
6520 self.stack.append(
6521 {
6522 "correlate_froms": correlate_froms,
6523 "asfrom_froms": correlate_froms,
6524 "selectable": delete_stmt,
6525 }
6526 )
6527
6528 text = "DELETE "
6529
6530 if delete_stmt._prefixes:
6531 text += self._generate_prefixes(
6532 delete_stmt, delete_stmt._prefixes, **kw
6533 )
6534
6535 text += "FROM "
6536
6537 try:
6538 table_text = self.delete_table_clause(
6539 delete_stmt,
6540 delete_stmt.table,
6541 extra_froms,
6542 from_linter=from_linter,
6543 )
6544 except TypeError:
6545 # anticipate 3rd party dialects that don't include **kw
6546 # TODO: remove in 2.1
6547 table_text = self.delete_table_clause(
6548 delete_stmt, delete_stmt.table, extra_froms
6549 )
6550 if from_linter:
6551 _ = self.process(delete_stmt.table, from_linter=from_linter)
6552
6553 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6554
6555 if delete_stmt._hints:
6556 dialect_hints, table_text = self._setup_crud_hints(
6557 delete_stmt, table_text
6558 )
6559 else:
6560 dialect_hints = None
6561
6562 if delete_stmt._independent_ctes:
6563 self._dispatch_independent_ctes(delete_stmt, kw)
6564
6565 text += table_text
6566
6567 if (
6568 self.implicit_returning or delete_stmt._returning
6569 ) and self.returning_precedes_values:
6570 text += " " + self.returning_clause(
6571 delete_stmt,
6572 self.implicit_returning or delete_stmt._returning,
6573 populate_result_map=toplevel,
6574 )
6575
6576 if extra_froms:
6577 extra_from_text = self.delete_extra_from_clause(
6578 delete_stmt,
6579 delete_stmt.table,
6580 extra_froms,
6581 dialect_hints,
6582 from_linter=from_linter,
6583 **kw,
6584 )
6585 if extra_from_text:
6586 text += " " + extra_from_text
6587
6588 if delete_stmt._where_criteria:
6589 t = self._generate_delimited_and_list(
6590 delete_stmt._where_criteria, from_linter=from_linter, **kw
6591 )
6592 if t:
6593 text += " WHERE " + t
6594
6595 limit_clause = self.delete_limit_clause(delete_stmt)
6596 if limit_clause:
6597 text += " " + limit_clause
6598
6599 if (
6600 self.implicit_returning or delete_stmt._returning
6601 ) and not self.returning_precedes_values:
6602 text += " " + self.returning_clause(
6603 delete_stmt,
6604 self.implicit_returning or delete_stmt._returning,
6605 populate_result_map=toplevel,
6606 )
6607
6608 if self.ctes:
6609 nesting_level = len(self.stack) if not toplevel else None
6610 text = self._render_cte_clause(nesting_level=nesting_level) + text
6611
6612 if warn_linting:
6613 assert from_linter is not None
6614 from_linter.warn(stmt_type="DELETE")
6615
6616 self.stack.pop(-1)
6617
6618 return text
6619
6620 def visit_savepoint(self, savepoint_stmt, **kw):
6621 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6622
6623 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6624 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6625 savepoint_stmt
6626 )
6627
6628 def visit_release_savepoint(self, savepoint_stmt, **kw):
6629 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6630 savepoint_stmt
6631 )
6632
6633
6634class StrSQLCompiler(SQLCompiler):
6635 """A :class:`.SQLCompiler` subclass which allows a small selection
6636 of non-standard SQL features to render into a string value.
6637
6638 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6639 element is directly stringified without calling upon the
6640 :meth:`_expression.ClauseElement.compile` method.
6641 It can render a limited set
6642 of non-standard SQL constructs to assist in basic stringification,
6643 however for more substantial custom or dialect-specific SQL constructs,
6644 it will be necessary to make use of
6645 :meth:`_expression.ClauseElement.compile`
6646 directly.
6647
6648 .. seealso::
6649
6650 :ref:`faq_sql_expression_string`
6651
6652 """
6653
6654 def _fallback_column_name(self, column):
6655 return "<name unknown>"
6656
6657 @util.preload_module("sqlalchemy.engine.url")
6658 def visit_unsupported_compilation(self, element, err, **kw):
6659 if element.stringify_dialect != "default":
6660 url = util.preloaded.engine_url
6661 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6662
6663 compiler = dialect.statement_compiler(
6664 dialect, None, _supporting_against=self
6665 )
6666 if not isinstance(compiler, StrSQLCompiler):
6667 return compiler.process(element, **kw)
6668
6669 return super().visit_unsupported_compilation(element, err)
6670
6671 def visit_getitem_binary(self, binary, operator, **kw):
6672 return "%s[%s]" % (
6673 self.process(binary.left, **kw),
6674 self.process(binary.right, **kw),
6675 )
6676
6677 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6678 return self.visit_getitem_binary(binary, operator, **kw)
6679
6680 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6681 return self.visit_getitem_binary(binary, operator, **kw)
6682
6683 def visit_sequence(self, sequence, **kw):
6684 return (
6685 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6686 )
6687
6688 def returning_clause(
6689 self,
6690 stmt: UpdateBase,
6691 returning_cols: Sequence[_ColumnsClauseElement],
6692 *,
6693 populate_result_map: bool,
6694 **kw: Any,
6695 ) -> str:
6696 columns = [
6697 self._label_select_column(None, c, True, False, {})
6698 for c in base._select_iterables(returning_cols)
6699 ]
6700 return "RETURNING " + ", ".join(columns)
6701
6702 def update_from_clause(
6703 self, update_stmt, from_table, extra_froms, from_hints, **kw
6704 ):
6705 kw["asfrom"] = True
6706 return "FROM " + ", ".join(
6707 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6708 for t in extra_froms
6709 )
6710
6711 def delete_extra_from_clause(
6712 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6713 ):
6714 kw["asfrom"] = True
6715 return ", " + ", ".join(
6716 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6717 for t in extra_froms
6718 )
6719
6720 def visit_empty_set_expr(self, element_types, **kw):
6721 return "SELECT 1 WHERE 1!=1"
6722
6723 def get_from_hint_text(self, table, text):
6724 return "[%s]" % text
6725
6726 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6727 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6728
6729 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6730 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6731
6732 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6733 return "<regexp replace>(%s, %s)" % (
6734 binary.left._compiler_dispatch(self, **kw),
6735 binary.right._compiler_dispatch(self, **kw),
6736 )
6737
6738 def visit_try_cast(self, cast, **kwargs):
6739 return "TRY_CAST(%s AS %s)" % (
6740 cast.clause._compiler_dispatch(self, **kwargs),
6741 cast.typeclause._compiler_dispatch(self, **kwargs),
6742 )
6743
6744
6745class DDLCompiler(Compiled):
6746 is_ddl = True
6747
6748 if TYPE_CHECKING:
6749
6750 def __init__(
6751 self,
6752 dialect: Dialect,
6753 statement: ExecutableDDLElement,
6754 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6755 render_schema_translate: bool = ...,
6756 compile_kwargs: Mapping[str, Any] = ...,
6757 ): ...
6758
6759 @util.ro_memoized_property
6760 def sql_compiler(self) -> SQLCompiler:
6761 return self.dialect.statement_compiler(
6762 self.dialect, None, schema_translate_map=self.schema_translate_map
6763 )
6764
6765 @util.memoized_property
6766 def type_compiler(self):
6767 return self.dialect.type_compiler_instance
6768
6769 def construct_params(
6770 self,
6771 params: Optional[_CoreSingleExecuteParams] = None,
6772 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6773 escape_names: bool = True,
6774 ) -> Optional[_MutableCoreSingleExecuteParams]:
6775 return None
6776
6777 def visit_ddl(self, ddl, **kwargs):
6778 # table events can substitute table and schema name
6779 context = ddl.context
6780 if isinstance(ddl.target, schema.Table):
6781 context = context.copy()
6782
6783 preparer = self.preparer
6784 path = preparer.format_table_seq(ddl.target)
6785 if len(path) == 1:
6786 table, sch = path[0], ""
6787 else:
6788 table, sch = path[-1], path[0]
6789
6790 context.setdefault("table", table)
6791 context.setdefault("schema", sch)
6792 context.setdefault("fullname", preparer.format_table(ddl.target))
6793
6794 return self.sql_compiler.post_process_text(ddl.statement % context)
6795
6796 def visit_create_schema(self, create, **kw):
6797 text = "CREATE SCHEMA "
6798 if create.if_not_exists:
6799 text += "IF NOT EXISTS "
6800 return text + self.preparer.format_schema(create.element)
6801
6802 def visit_drop_schema(self, drop, **kw):
6803 text = "DROP SCHEMA "
6804 if drop.if_exists:
6805 text += "IF EXISTS "
6806 text += self.preparer.format_schema(drop.element)
6807 if drop.cascade:
6808 text += " CASCADE"
6809 return text
6810
6811 def visit_create_table(self, create, **kw):
6812 table = create.element
6813 preparer = self.preparer
6814
6815 text = "\nCREATE "
6816 if table._prefixes:
6817 text += " ".join(table._prefixes) + " "
6818
6819 text += "TABLE "
6820 if create.if_not_exists:
6821 text += "IF NOT EXISTS "
6822
6823 text += preparer.format_table(table) + " "
6824
6825 create_table_suffix = self.create_table_suffix(table)
6826 if create_table_suffix:
6827 text += create_table_suffix + " "
6828
6829 text += "("
6830
6831 separator = "\n"
6832
6833 # if only one primary key, specify it along with the column
6834 first_pk = False
6835 for create_column in create.columns:
6836 column = create_column.element
6837 try:
6838 processed = self.process(
6839 create_column, first_pk=column.primary_key and not first_pk
6840 )
6841 if processed is not None:
6842 text += separator
6843 separator = ", \n"
6844 text += "\t" + processed
6845 if column.primary_key:
6846 first_pk = True
6847 except exc.CompileError as ce:
6848 raise exc.CompileError(
6849 "(in table '%s', column '%s'): %s"
6850 % (table.description, column.name, ce.args[0])
6851 ) from ce
6852
6853 const = self.create_table_constraints(
6854 table,
6855 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6856 )
6857 if const:
6858 text += separator + "\t" + const
6859
6860 text += "\n)%s\n\n" % self.post_create_table(table)
6861 return text
6862
6863 def visit_create_column(self, create, first_pk=False, **kw):
6864 column = create.element
6865
6866 if column.system:
6867 return None
6868
6869 text = self.get_column_specification(column, first_pk=first_pk)
6870 const = " ".join(
6871 self.process(constraint) for constraint in column.constraints
6872 )
6873 if const:
6874 text += " " + const
6875
6876 return text
6877
6878 def create_table_constraints(
6879 self, table, _include_foreign_key_constraints=None, **kw
6880 ):
6881 # On some DB order is significant: visit PK first, then the
6882 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6883 constraints = []
6884 if table.primary_key:
6885 constraints.append(table.primary_key)
6886
6887 all_fkcs = table.foreign_key_constraints
6888 if _include_foreign_key_constraints is not None:
6889 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6890 else:
6891 omit_fkcs = set()
6892
6893 constraints.extend(
6894 [
6895 c
6896 for c in table._sorted_constraints
6897 if c is not table.primary_key and c not in omit_fkcs
6898 ]
6899 )
6900
6901 return ", \n\t".join(
6902 p
6903 for p in (
6904 self.process(constraint)
6905 for constraint in constraints
6906 if (constraint._should_create_for_compiler(self))
6907 and (
6908 not self.dialect.supports_alter
6909 or not getattr(constraint, "use_alter", False)
6910 )
6911 )
6912 if p is not None
6913 )
6914
6915 def visit_drop_table(self, drop, **kw):
6916 text = "\nDROP TABLE "
6917 if drop.if_exists:
6918 text += "IF EXISTS "
6919 return text + self.preparer.format_table(drop.element)
6920
6921 def visit_drop_view(self, drop, **kw):
6922 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
6923
6924 def _verify_index_table(self, index: Index) -> None:
6925 if index.table is None:
6926 raise exc.CompileError(
6927 "Index '%s' is not associated with any table." % index.name
6928 )
6929
6930 def visit_create_index(
6931 self, create, include_schema=False, include_table_schema=True, **kw
6932 ):
6933 index = create.element
6934 self._verify_index_table(index)
6935 preparer = self.preparer
6936 text = "CREATE "
6937 if index.unique:
6938 text += "UNIQUE "
6939 if index.name is None:
6940 raise exc.CompileError(
6941 "CREATE INDEX requires that the index have a name"
6942 )
6943
6944 text += "INDEX "
6945 if create.if_not_exists:
6946 text += "IF NOT EXISTS "
6947
6948 text += "%s ON %s (%s)" % (
6949 self._prepared_index_name(index, include_schema=include_schema),
6950 preparer.format_table(
6951 index.table, use_schema=include_table_schema
6952 ),
6953 ", ".join(
6954 self.sql_compiler.process(
6955 expr, include_table=False, literal_binds=True
6956 )
6957 for expr in index.expressions
6958 ),
6959 )
6960 return text
6961
6962 def visit_drop_index(self, drop, **kw):
6963 index = drop.element
6964
6965 if index.name is None:
6966 raise exc.CompileError(
6967 "DROP INDEX requires that the index have a name"
6968 )
6969 text = "\nDROP INDEX "
6970 if drop.if_exists:
6971 text += "IF EXISTS "
6972
6973 return text + self._prepared_index_name(index, include_schema=True)
6974
6975 def _prepared_index_name(
6976 self, index: Index, include_schema: bool = False
6977 ) -> str:
6978 if index.table is not None:
6979 effective_schema = self.preparer.schema_for_object(index.table)
6980 else:
6981 effective_schema = None
6982 if include_schema and effective_schema:
6983 schema_name = self.preparer.quote_schema(effective_schema)
6984 else:
6985 schema_name = None
6986
6987 index_name: str = self.preparer.format_index(index)
6988
6989 if schema_name:
6990 index_name = schema_name + "." + index_name
6991 return index_name
6992
6993 def visit_add_constraint(self, create, **kw):
6994 return "ALTER TABLE %s ADD %s" % (
6995 self.preparer.format_table(create.element.table),
6996 self.process(create.element),
6997 )
6998
6999 def visit_set_table_comment(self, create, **kw):
7000 return "COMMENT ON TABLE %s IS %s" % (
7001 self.preparer.format_table(create.element),
7002 self.sql_compiler.render_literal_value(
7003 create.element.comment, sqltypes.String()
7004 ),
7005 )
7006
7007 def visit_drop_table_comment(self, drop, **kw):
7008 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7009 drop.element
7010 )
7011
7012 def visit_set_column_comment(self, create, **kw):
7013 return "COMMENT ON COLUMN %s IS %s" % (
7014 self.preparer.format_column(
7015 create.element, use_table=True, use_schema=True
7016 ),
7017 self.sql_compiler.render_literal_value(
7018 create.element.comment, sqltypes.String()
7019 ),
7020 )
7021
7022 def visit_drop_column_comment(self, drop, **kw):
7023 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7024 drop.element, use_table=True
7025 )
7026
7027 def visit_set_constraint_comment(self, create, **kw):
7028 raise exc.UnsupportedCompilationError(self, type(create))
7029
7030 def visit_drop_constraint_comment(self, drop, **kw):
7031 raise exc.UnsupportedCompilationError(self, type(drop))
7032
7033 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7034 text = []
7035 if identity_options.increment is not None:
7036 text.append("INCREMENT BY %d" % identity_options.increment)
7037 if identity_options.start is not None:
7038 text.append("START WITH %d" % identity_options.start)
7039 if identity_options.minvalue is not None:
7040 text.append("MINVALUE %d" % identity_options.minvalue)
7041 if identity_options.maxvalue is not None:
7042 text.append("MAXVALUE %d" % identity_options.maxvalue)
7043 if identity_options.nominvalue is not None:
7044 text.append("NO MINVALUE")
7045 if identity_options.nomaxvalue is not None:
7046 text.append("NO MAXVALUE")
7047 if identity_options.cache is not None:
7048 text.append("CACHE %d" % identity_options.cache)
7049 if identity_options.cycle is not None:
7050 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7051 return " ".join(text)
7052
7053 def visit_create_sequence(self, create, prefix=None, **kw):
7054 text = "CREATE SEQUENCE "
7055 if create.if_not_exists:
7056 text += "IF NOT EXISTS "
7057 text += self.preparer.format_sequence(create.element)
7058
7059 if prefix:
7060 text += prefix
7061 options = self.get_identity_options(create.element)
7062 if options:
7063 text += " " + options
7064 return text
7065
7066 def visit_drop_sequence(self, drop, **kw):
7067 text = "DROP SEQUENCE "
7068 if drop.if_exists:
7069 text += "IF EXISTS "
7070 return text + self.preparer.format_sequence(drop.element)
7071
7072 def visit_drop_constraint(self, drop, **kw):
7073 constraint = drop.element
7074 if constraint.name is not None:
7075 formatted_name = self.preparer.format_constraint(constraint)
7076 else:
7077 formatted_name = None
7078
7079 if formatted_name is None:
7080 raise exc.CompileError(
7081 "Can't emit DROP CONSTRAINT for constraint %r; "
7082 "it has no name" % drop.element
7083 )
7084 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7085 self.preparer.format_table(drop.element.table),
7086 "IF EXISTS " if drop.if_exists else "",
7087 formatted_name,
7088 " CASCADE" if drop.cascade else "",
7089 )
7090
7091 def get_column_specification(self, column, **kwargs):
7092 colspec = (
7093 self.preparer.format_column(column)
7094 + " "
7095 + self.dialect.type_compiler_instance.process(
7096 column.type, type_expression=column
7097 )
7098 )
7099 default = self.get_column_default_string(column)
7100 if default is not None:
7101 colspec += " DEFAULT " + default
7102
7103 if column.computed is not None:
7104 colspec += " " + self.process(column.computed)
7105
7106 if (
7107 column.identity is not None
7108 and self.dialect.supports_identity_columns
7109 ):
7110 colspec += " " + self.process(column.identity)
7111
7112 if not column.nullable and (
7113 not column.identity or not self.dialect.supports_identity_columns
7114 ):
7115 colspec += " NOT NULL"
7116 return colspec
7117
7118 def create_table_suffix(self, table):
7119 return ""
7120
7121 def post_create_table(self, table):
7122 return ""
7123
7124 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7125 if isinstance(column.server_default, schema.DefaultClause):
7126 return self.render_default_string(column.server_default.arg)
7127 else:
7128 return None
7129
7130 def render_default_string(self, default: Union[Visitable, str]) -> str:
7131 if isinstance(default, str):
7132 return self.sql_compiler.render_literal_value(
7133 default, sqltypes.STRINGTYPE
7134 )
7135 else:
7136 return self.sql_compiler.process(default, literal_binds=True)
7137
7138 def visit_table_or_column_check_constraint(self, constraint, **kw):
7139 if constraint.is_column_level:
7140 return self.visit_column_check_constraint(constraint)
7141 else:
7142 return self.visit_check_constraint(constraint)
7143
7144 def visit_check_constraint(self, constraint, **kw):
7145 text = self.define_constraint_preamble(constraint, **kw)
7146 text += self.define_check_body(constraint, **kw)
7147 text += self.define_constraint_deferrability(constraint)
7148 return text
7149
7150 def visit_column_check_constraint(self, constraint, **kw):
7151 text = self.define_constraint_preamble(constraint, **kw)
7152 text += self.define_check_body(constraint, **kw)
7153 text += self.define_constraint_deferrability(constraint)
7154 return text
7155
7156 def visit_primary_key_constraint(
7157 self, constraint: PrimaryKeyConstraint, **kw: Any
7158 ) -> str:
7159 if len(constraint) == 0:
7160 return ""
7161 text = self.define_constraint_preamble(constraint, **kw)
7162 text += self.define_primary_key_body(constraint, **kw)
7163 text += self.define_constraint_deferrability(constraint)
7164 return text
7165
7166 def visit_foreign_key_constraint(
7167 self, constraint: ForeignKeyConstraint, **kw: Any
7168 ) -> str:
7169 text = self.define_constraint_preamble(constraint, **kw)
7170 text += self.define_foreign_key_body(constraint, **kw)
7171 text += self.define_constraint_match(constraint)
7172 text += self.define_constraint_cascades(constraint)
7173 text += self.define_constraint_deferrability(constraint)
7174 return text
7175
7176 def define_constraint_remote_table(self, constraint, table, preparer):
7177 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7178
7179 return preparer.format_table(table)
7180
7181 def visit_unique_constraint(
7182 self, constraint: UniqueConstraint, **kw: Any
7183 ) -> str:
7184 if len(constraint) == 0:
7185 return ""
7186 text = self.define_constraint_preamble(constraint, **kw)
7187 text += self.define_unique_body(constraint, **kw)
7188 text += self.define_constraint_deferrability(constraint)
7189 return text
7190
7191 def define_constraint_preamble(
7192 self, constraint: Constraint, **kw: Any
7193 ) -> str:
7194 text = ""
7195 if constraint.name is not None:
7196 formatted_name = self.preparer.format_constraint(constraint)
7197 if formatted_name is not None:
7198 text += "CONSTRAINT %s " % formatted_name
7199 return text
7200
7201 def define_primary_key_body(
7202 self, constraint: PrimaryKeyConstraint, **kw: Any
7203 ) -> str:
7204 text = ""
7205 text += "PRIMARY KEY "
7206 text += "(%s)" % ", ".join(
7207 self.preparer.quote(c.name)
7208 for c in (
7209 constraint.columns_autoinc_first
7210 if constraint._implicit_generated
7211 else constraint.columns
7212 )
7213 )
7214 return text
7215
7216 def define_foreign_key_body(
7217 self, constraint: ForeignKeyConstraint, **kw: Any
7218 ) -> str:
7219 preparer = self.preparer
7220 remote_table = list(constraint.elements)[0].column.table
7221 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7222 ", ".join(
7223 preparer.quote(f.parent.name) for f in constraint.elements
7224 ),
7225 self.define_constraint_remote_table(
7226 constraint, remote_table, preparer
7227 ),
7228 ", ".join(
7229 preparer.quote(f.column.name) for f in constraint.elements
7230 ),
7231 )
7232 return text
7233
7234 def define_unique_body(
7235 self, constraint: UniqueConstraint, **kw: Any
7236 ) -> str:
7237 text = "UNIQUE %s(%s)" % (
7238 self.define_unique_constraint_distinct(constraint, **kw),
7239 ", ".join(self.preparer.quote(c.name) for c in constraint),
7240 )
7241 return text
7242
7243 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7244 text = "CHECK (%s)" % self.sql_compiler.process(
7245 constraint.sqltext, include_table=False, literal_binds=True
7246 )
7247 return text
7248
7249 def define_unique_constraint_distinct(
7250 self, constraint: UniqueConstraint, **kw: Any
7251 ) -> str:
7252 return ""
7253
7254 def define_constraint_cascades(
7255 self, constraint: ForeignKeyConstraint
7256 ) -> str:
7257 text = ""
7258 if constraint.ondelete is not None:
7259 text += self.define_constraint_ondelete_cascade(constraint)
7260
7261 if constraint.onupdate is not None:
7262 text += self.define_constraint_onupdate_cascade(constraint)
7263 return text
7264
7265 def define_constraint_ondelete_cascade(
7266 self, constraint: ForeignKeyConstraint
7267 ) -> str:
7268 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7269 constraint.ondelete, FK_ON_DELETE
7270 )
7271
7272 def define_constraint_onupdate_cascade(
7273 self, constraint: ForeignKeyConstraint
7274 ) -> str:
7275 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7276 constraint.onupdate, FK_ON_UPDATE
7277 )
7278
7279 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7280 text = ""
7281 if constraint.deferrable is not None:
7282 if constraint.deferrable:
7283 text += " DEFERRABLE"
7284 else:
7285 text += " NOT DEFERRABLE"
7286 if constraint.initially is not None:
7287 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7288 constraint.initially, FK_INITIALLY
7289 )
7290 return text
7291
7292 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7293 text = ""
7294 if constraint.match is not None:
7295 text += " MATCH %s" % constraint.match
7296 return text
7297
7298 def visit_computed_column(self, generated, **kw):
7299 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7300 generated.sqltext, include_table=False, literal_binds=True
7301 )
7302 if generated.persisted is True:
7303 text += " STORED"
7304 elif generated.persisted is False:
7305 text += " VIRTUAL"
7306 return text
7307
7308 def visit_identity_column(self, identity, **kw):
7309 text = "GENERATED %s AS IDENTITY" % (
7310 "ALWAYS" if identity.always else "BY DEFAULT",
7311 )
7312 options = self.get_identity_options(identity)
7313 if options:
7314 text += " (%s)" % options
7315 return text
7316
7317
7318class GenericTypeCompiler(TypeCompiler):
7319 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7320 return "FLOAT"
7321
7322 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7323 return "DOUBLE"
7324
7325 def visit_DOUBLE_PRECISION(
7326 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7327 ) -> str:
7328 return "DOUBLE PRECISION"
7329
7330 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7331 return "REAL"
7332
7333 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7334 if type_.precision is None:
7335 return "NUMERIC"
7336 elif type_.scale is None:
7337 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7338 else:
7339 return "NUMERIC(%(precision)s, %(scale)s)" % {
7340 "precision": type_.precision,
7341 "scale": type_.scale,
7342 }
7343
7344 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7345 if type_.precision is None:
7346 return "DECIMAL"
7347 elif type_.scale is None:
7348 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7349 else:
7350 return "DECIMAL(%(precision)s, %(scale)s)" % {
7351 "precision": type_.precision,
7352 "scale": type_.scale,
7353 }
7354
7355 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7356 return "INTEGER"
7357
7358 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7359 return "SMALLINT"
7360
7361 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7362 return "BIGINT"
7363
7364 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7365 return "TIMESTAMP"
7366
7367 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7368 return "DATETIME"
7369
7370 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7371 return "DATE"
7372
7373 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7374 return "TIME"
7375
7376 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7377 return "CLOB"
7378
7379 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7380 return "NCLOB"
7381
7382 def _render_string_type(
7383 self, name: str, length: Optional[int], collation: Optional[str]
7384 ) -> str:
7385 text = name
7386 if length:
7387 text += f"({length})"
7388 if collation:
7389 text += f' COLLATE "{collation}"'
7390 return text
7391
7392 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7393 return self._render_string_type("CHAR", type_.length, type_.collation)
7394
7395 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7396 return self._render_string_type("NCHAR", type_.length, type_.collation)
7397
7398 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7399 return self._render_string_type(
7400 "VARCHAR", type_.length, type_.collation
7401 )
7402
7403 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7404 return self._render_string_type(
7405 "NVARCHAR", type_.length, type_.collation
7406 )
7407
7408 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7409 return self._render_string_type("TEXT", type_.length, type_.collation)
7410
7411 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7412 return "UUID"
7413
7414 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7415 return "BLOB"
7416
7417 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7418 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7419
7420 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7421 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7422
7423 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7424 return "BOOLEAN"
7425
7426 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7427 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7428 return self._render_string_type("CHAR", length=32, collation=None)
7429 else:
7430 return self.visit_UUID(type_, **kw)
7431
7432 def visit_large_binary(
7433 self, type_: sqltypes.LargeBinary, **kw: Any
7434 ) -> str:
7435 return self.visit_BLOB(type_, **kw)
7436
7437 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7438 return self.visit_BOOLEAN(type_, **kw)
7439
7440 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7441 return self.visit_TIME(type_, **kw)
7442
7443 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7444 return self.visit_DATETIME(type_, **kw)
7445
7446 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7447 return self.visit_DATE(type_, **kw)
7448
7449 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7450 return self.visit_BIGINT(type_, **kw)
7451
7452 def visit_small_integer(
7453 self, type_: sqltypes.SmallInteger, **kw: Any
7454 ) -> str:
7455 return self.visit_SMALLINT(type_, **kw)
7456
7457 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7458 return self.visit_INTEGER(type_, **kw)
7459
7460 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7461 return self.visit_REAL(type_, **kw)
7462
7463 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7464 return self.visit_FLOAT(type_, **kw)
7465
7466 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7467 return self.visit_DOUBLE(type_, **kw)
7468
7469 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7470 return self.visit_NUMERIC(type_, **kw)
7471
7472 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7473 return self.visit_VARCHAR(type_, **kw)
7474
7475 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7476 return self.visit_VARCHAR(type_, **kw)
7477
7478 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7479 return self.visit_TEXT(type_, **kw)
7480
7481 def visit_unicode_text(
7482 self, type_: sqltypes.UnicodeText, **kw: Any
7483 ) -> str:
7484 return self.visit_TEXT(type_, **kw)
7485
7486 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7487 return self.visit_VARCHAR(type_, **kw)
7488
7489 def visit_null(self, type_, **kw):
7490 raise exc.CompileError(
7491 "Can't generate DDL for %r; "
7492 "did you forget to specify a "
7493 "type on this Column?" % type_
7494 )
7495
7496 def visit_type_decorator(
7497 self, type_: TypeDecorator[Any], **kw: Any
7498 ) -> str:
7499 return self.process(type_.type_engine(self.dialect), **kw)
7500
7501 def visit_user_defined(
7502 self, type_: UserDefinedType[Any], **kw: Any
7503 ) -> str:
7504 return type_.get_col_spec(**kw)
7505
7506
7507class StrSQLTypeCompiler(GenericTypeCompiler):
7508 def process(self, type_, **kw):
7509 try:
7510 _compiler_dispatch = type_._compiler_dispatch
7511 except AttributeError:
7512 return self._visit_unknown(type_, **kw)
7513 else:
7514 return _compiler_dispatch(self, **kw)
7515
7516 def __getattr__(self, key):
7517 if key.startswith("visit_"):
7518 return self._visit_unknown
7519 else:
7520 raise AttributeError(key)
7521
7522 def _visit_unknown(self, type_, **kw):
7523 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7524 return type_.__class__.__name__
7525 else:
7526 return repr(type_)
7527
7528 def visit_null(self, type_, **kw):
7529 return "NULL"
7530
7531 def visit_user_defined(self, type_, **kw):
7532 try:
7533 get_col_spec = type_.get_col_spec
7534 except AttributeError:
7535 return repr(type_)
7536 else:
7537 return get_col_spec(**kw)
7538
7539
7540class _SchemaForObjectCallable(Protocol):
7541 def __call__(self, __obj: Any) -> str: ...
7542
7543
7544class _BindNameForColProtocol(Protocol):
7545 def __call__(self, col: ColumnClause[Any]) -> str: ...
7546
7547
7548class IdentifierPreparer:
7549 """Handle quoting and case-folding of identifiers based on options."""
7550
7551 reserved_words = RESERVED_WORDS
7552
7553 legal_characters = LEGAL_CHARACTERS
7554
7555 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7556
7557 initial_quote: str
7558
7559 final_quote: str
7560
7561 _strings: MutableMapping[str, str]
7562
7563 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7564 """Return the .schema attribute for an object.
7565
7566 For the default IdentifierPreparer, the schema for an object is always
7567 the value of the ".schema" attribute. if the preparer is replaced
7568 with one that has a non-empty schema_translate_map, the value of the
7569 ".schema" attribute is rendered a symbol that will be converted to a
7570 real schema name from the mapping post-compile.
7571
7572 """
7573
7574 _includes_none_schema_translate: bool = False
7575
7576 def __init__(
7577 self,
7578 dialect: Dialect,
7579 initial_quote: str = '"',
7580 final_quote: Optional[str] = None,
7581 escape_quote: str = '"',
7582 quote_case_sensitive_collations: bool = True,
7583 omit_schema: bool = False,
7584 ):
7585 """Construct a new ``IdentifierPreparer`` object.
7586
7587 initial_quote
7588 Character that begins a delimited identifier.
7589
7590 final_quote
7591 Character that ends a delimited identifier. Defaults to
7592 `initial_quote`.
7593
7594 omit_schema
7595 Prevent prepending schema name. Useful for databases that do
7596 not support schemae.
7597 """
7598
7599 self.dialect = dialect
7600 self.initial_quote = initial_quote
7601 self.final_quote = final_quote or self.initial_quote
7602 self.escape_quote = escape_quote
7603 self.escape_to_quote = self.escape_quote * 2
7604 self.omit_schema = omit_schema
7605 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7606 self._strings = {}
7607 self._double_percents = self.dialect.paramstyle in (
7608 "format",
7609 "pyformat",
7610 )
7611
7612 def _with_schema_translate(self, schema_translate_map):
7613 prep = self.__class__.__new__(self.__class__)
7614 prep.__dict__.update(self.__dict__)
7615
7616 includes_none = None in schema_translate_map
7617
7618 def symbol_getter(obj):
7619 name = obj.schema
7620 if obj._use_schema_map and (name is not None or includes_none):
7621 if name is not None and ("[" in name or "]" in name):
7622 raise exc.CompileError(
7623 "Square bracket characters ([]) not supported "
7624 "in schema translate name '%s'" % name
7625 )
7626 return quoted_name(
7627 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7628 )
7629 else:
7630 return obj.schema
7631
7632 prep.schema_for_object = symbol_getter
7633 prep._includes_none_schema_translate = includes_none
7634 return prep
7635
7636 def _render_schema_translates(
7637 self, statement: str, schema_translate_map: SchemaTranslateMapType
7638 ) -> str:
7639 d = schema_translate_map
7640 if None in d:
7641 if not self._includes_none_schema_translate:
7642 raise exc.InvalidRequestError(
7643 "schema translate map which previously did not have "
7644 "`None` present as a key now has `None` present; compiled "
7645 "statement may lack adequate placeholders. Please use "
7646 "consistent keys in successive "
7647 "schema_translate_map dictionaries."
7648 )
7649
7650 d["_none"] = d[None] # type: ignore[index]
7651
7652 def replace(m):
7653 name = m.group(2)
7654 if name in d:
7655 effective_schema = d[name]
7656 else:
7657 if name in (None, "_none"):
7658 raise exc.InvalidRequestError(
7659 "schema translate map which previously had `None` "
7660 "present as a key now no longer has it present; don't "
7661 "know how to apply schema for compiled statement. "
7662 "Please use consistent keys in successive "
7663 "schema_translate_map dictionaries."
7664 )
7665 effective_schema = name
7666
7667 if not effective_schema:
7668 effective_schema = self.dialect.default_schema_name
7669 if not effective_schema:
7670 # TODO: no coverage here
7671 raise exc.CompileError(
7672 "Dialect has no default schema name; can't "
7673 "use None as dynamic schema target."
7674 )
7675 return self.quote_schema(effective_schema)
7676
7677 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7678
7679 def _escape_identifier(self, value: str) -> str:
7680 """Escape an identifier.
7681
7682 Subclasses should override this to provide database-dependent
7683 escaping behavior.
7684 """
7685
7686 value = value.replace(self.escape_quote, self.escape_to_quote)
7687 if self._double_percents:
7688 value = value.replace("%", "%%")
7689 return value
7690
7691 def _unescape_identifier(self, value: str) -> str:
7692 """Canonicalize an escaped identifier.
7693
7694 Subclasses should override this to provide database-dependent
7695 unescaping behavior that reverses _escape_identifier.
7696 """
7697
7698 return value.replace(self.escape_to_quote, self.escape_quote)
7699
7700 def validate_sql_phrase(self, element, reg):
7701 """keyword sequence filter.
7702
7703 a filter for elements that are intended to represent keyword sequences,
7704 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7705 should be present.
7706
7707 .. versionadded:: 1.3
7708
7709 """
7710
7711 if element is not None and not reg.match(element):
7712 raise exc.CompileError(
7713 "Unexpected SQL phrase: %r (matching against %r)"
7714 % (element, reg.pattern)
7715 )
7716 return element
7717
7718 def quote_identifier(self, value: str) -> str:
7719 """Quote an identifier.
7720
7721 Subclasses should override this to provide database-dependent
7722 quoting behavior.
7723 """
7724
7725 return (
7726 self.initial_quote
7727 + self._escape_identifier(value)
7728 + self.final_quote
7729 )
7730
7731 def _requires_quotes(self, value: str) -> bool:
7732 """Return True if the given identifier requires quoting."""
7733 lc_value = value.lower()
7734 return (
7735 lc_value in self.reserved_words
7736 or value[0] in self.illegal_initial_characters
7737 or not self.legal_characters.match(str(value))
7738 or (lc_value != value)
7739 )
7740
7741 def _requires_quotes_illegal_chars(self, value):
7742 """Return True if the given identifier requires quoting, but
7743 not taking case convention into account."""
7744 return not self.legal_characters.match(str(value))
7745
7746 def quote_schema(self, schema: str, force: Any = None) -> str:
7747 """Conditionally quote a schema name.
7748
7749
7750 The name is quoted if it is a reserved word, contains quote-necessary
7751 characters, or is an instance of :class:`.quoted_name` which includes
7752 ``quote`` set to ``True``.
7753
7754 Subclasses can override this to provide database-dependent
7755 quoting behavior for schema names.
7756
7757 :param schema: string schema name
7758 :param force: unused
7759
7760 .. deprecated:: 0.9
7761
7762 The :paramref:`.IdentifierPreparer.quote_schema.force`
7763 parameter is deprecated and will be removed in a future
7764 release. This flag has no effect on the behavior of the
7765 :meth:`.IdentifierPreparer.quote` method; please refer to
7766 :class:`.quoted_name`.
7767
7768 """
7769 if force is not None:
7770 # not using the util.deprecated_params() decorator in this
7771 # case because of the additional function call overhead on this
7772 # very performance-critical spot.
7773 util.warn_deprecated(
7774 "The IdentifierPreparer.quote_schema.force parameter is "
7775 "deprecated and will be removed in a future release. This "
7776 "flag has no effect on the behavior of the "
7777 "IdentifierPreparer.quote method; please refer to "
7778 "quoted_name().",
7779 # deprecated 0.9. warning from 1.3
7780 version="0.9",
7781 )
7782
7783 return self.quote(schema)
7784
7785 def quote(self, ident: str, force: Any = None) -> str:
7786 """Conditionally quote an identifier.
7787
7788 The identifier is quoted if it is a reserved word, contains
7789 quote-necessary characters, or is an instance of
7790 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7791
7792 Subclasses can override this to provide database-dependent
7793 quoting behavior for identifier names.
7794
7795 :param ident: string identifier
7796 :param force: unused
7797
7798 .. deprecated:: 0.9
7799
7800 The :paramref:`.IdentifierPreparer.quote.force`
7801 parameter is deprecated and will be removed in a future
7802 release. This flag has no effect on the behavior of the
7803 :meth:`.IdentifierPreparer.quote` method; please refer to
7804 :class:`.quoted_name`.
7805
7806 """
7807 if force is not None:
7808 # not using the util.deprecated_params() decorator in this
7809 # case because of the additional function call overhead on this
7810 # very performance-critical spot.
7811 util.warn_deprecated(
7812 "The IdentifierPreparer.quote.force parameter is "
7813 "deprecated and will be removed in a future release. This "
7814 "flag has no effect on the behavior of the "
7815 "IdentifierPreparer.quote method; please refer to "
7816 "quoted_name().",
7817 # deprecated 0.9. warning from 1.3
7818 version="0.9",
7819 )
7820
7821 force = getattr(ident, "quote", None)
7822
7823 if force is None:
7824 if ident in self._strings:
7825 return self._strings[ident]
7826 else:
7827 if self._requires_quotes(ident):
7828 self._strings[ident] = self.quote_identifier(ident)
7829 else:
7830 self._strings[ident] = ident
7831 return self._strings[ident]
7832 elif force:
7833 return self.quote_identifier(ident)
7834 else:
7835 return ident
7836
7837 def format_collation(self, collation_name):
7838 if self.quote_case_sensitive_collations:
7839 return self.quote(collation_name)
7840 else:
7841 return collation_name
7842
7843 def format_sequence(
7844 self, sequence: schema.Sequence, use_schema: bool = True
7845 ) -> str:
7846 name = self.quote(sequence.name)
7847
7848 effective_schema = self.schema_for_object(sequence)
7849
7850 if (
7851 not self.omit_schema
7852 and use_schema
7853 and effective_schema is not None
7854 ):
7855 name = self.quote_schema(effective_schema) + "." + name
7856 return name
7857
7858 def format_label(
7859 self, label: Label[Any], name: Optional[str] = None
7860 ) -> str:
7861 return self.quote(name or label.name)
7862
7863 def format_alias(
7864 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7865 ) -> str:
7866 if name is None:
7867 assert alias is not None
7868 return self.quote(alias.name)
7869 else:
7870 return self.quote(name)
7871
7872 def format_savepoint(self, savepoint, name=None):
7873 # Running the savepoint name through quoting is unnecessary
7874 # for all known dialects. This is here to support potential
7875 # third party use cases
7876 ident = name or savepoint.ident
7877 if self._requires_quotes(ident):
7878 ident = self.quote_identifier(ident)
7879 return ident
7880
7881 @util.preload_module("sqlalchemy.sql.naming")
7882 def format_constraint(
7883 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7884 ) -> Optional[str]:
7885 naming = util.preloaded.sql_naming
7886
7887 if constraint.name is _NONE_NAME:
7888 name = naming._constraint_name_for_table(
7889 constraint, constraint.table
7890 )
7891
7892 if name is None:
7893 return None
7894 else:
7895 name = constraint.name
7896
7897 assert name is not None
7898 if constraint.__visit_name__ == "index":
7899 return self.truncate_and_render_index_name(
7900 name, _alembic_quote=_alembic_quote
7901 )
7902 else:
7903 return self.truncate_and_render_constraint_name(
7904 name, _alembic_quote=_alembic_quote
7905 )
7906
7907 def truncate_and_render_index_name(
7908 self, name: str, _alembic_quote: bool = True
7909 ) -> str:
7910 # calculate these at format time so that ad-hoc changes
7911 # to dialect.max_identifier_length etc. can be reflected
7912 # as IdentifierPreparer is long lived
7913 max_ = (
7914 self.dialect.max_index_name_length
7915 or self.dialect.max_identifier_length
7916 )
7917 return self._truncate_and_render_maxlen_name(
7918 name, max_, _alembic_quote
7919 )
7920
7921 def truncate_and_render_constraint_name(
7922 self, name: str, _alembic_quote: bool = True
7923 ) -> str:
7924 # calculate these at format time so that ad-hoc changes
7925 # to dialect.max_identifier_length etc. can be reflected
7926 # as IdentifierPreparer is long lived
7927 max_ = (
7928 self.dialect.max_constraint_name_length
7929 or self.dialect.max_identifier_length
7930 )
7931 return self._truncate_and_render_maxlen_name(
7932 name, max_, _alembic_quote
7933 )
7934
7935 def _truncate_and_render_maxlen_name(
7936 self, name: str, max_: int, _alembic_quote: bool
7937 ) -> str:
7938 if isinstance(name, elements._truncated_label):
7939 if len(name) > max_:
7940 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7941 else:
7942 self.dialect.validate_identifier(name)
7943
7944 if not _alembic_quote:
7945 return name
7946 else:
7947 return self.quote(name)
7948
7949 def format_index(self, index: Index) -> str:
7950 name = self.format_constraint(index)
7951 assert name is not None
7952 return name
7953
7954 def format_table(
7955 self,
7956 table: FromClause,
7957 use_schema: bool = True,
7958 name: Optional[str] = None,
7959 ) -> str:
7960 """Prepare a quoted table and schema name."""
7961 if name is None:
7962 if TYPE_CHECKING:
7963 assert isinstance(table, NamedFromClause)
7964 name = table.name
7965
7966 result = self.quote(name)
7967
7968 effective_schema = self.schema_for_object(table)
7969
7970 if not self.omit_schema and use_schema and effective_schema:
7971 result = self.quote_schema(effective_schema) + "." + result
7972 return result
7973
7974 def format_schema(self, name):
7975 """Prepare a quoted schema name."""
7976
7977 return self.quote(name)
7978
7979 def format_label_name(
7980 self,
7981 name,
7982 anon_map=None,
7983 ):
7984 """Prepare a quoted column name."""
7985
7986 if anon_map is not None and isinstance(
7987 name, elements._truncated_label
7988 ):
7989 name = name.apply_map(anon_map)
7990
7991 return self.quote(name)
7992
7993 def format_column(
7994 self,
7995 column: ColumnElement[Any],
7996 use_table: bool = False,
7997 name: Optional[str] = None,
7998 table_name: Optional[str] = None,
7999 use_schema: bool = False,
8000 anon_map: Optional[Mapping[str, Any]] = None,
8001 ) -> str:
8002 """Prepare a quoted column name."""
8003
8004 if name is None:
8005 name = column.name
8006 assert name is not None
8007
8008 if anon_map is not None and isinstance(
8009 name, elements._truncated_label
8010 ):
8011 name = name.apply_map(anon_map)
8012
8013 if not getattr(column, "is_literal", False):
8014 if use_table:
8015 return (
8016 self.format_table(
8017 column.table, use_schema=use_schema, name=table_name
8018 )
8019 + "."
8020 + self.quote(name)
8021 )
8022 else:
8023 return self.quote(name)
8024 else:
8025 # literal textual elements get stuck into ColumnClause a lot,
8026 # which shouldn't get quoted
8027
8028 if use_table:
8029 return (
8030 self.format_table(
8031 column.table, use_schema=use_schema, name=table_name
8032 )
8033 + "."
8034 + name
8035 )
8036 else:
8037 return name
8038
8039 def format_table_seq(self, table, use_schema=True):
8040 """Format table name and schema as a tuple."""
8041
8042 # Dialects with more levels in their fully qualified references
8043 # ('database', 'owner', etc.) could override this and return
8044 # a longer sequence.
8045
8046 effective_schema = self.schema_for_object(table)
8047
8048 if not self.omit_schema and use_schema and effective_schema:
8049 return (
8050 self.quote_schema(effective_schema),
8051 self.format_table(table, use_schema=False),
8052 )
8053 else:
8054 return (self.format_table(table, use_schema=False),)
8055
8056 @util.memoized_property
8057 def _r_identifiers(self):
8058 initial, final, escaped_final = (
8059 re.escape(s)
8060 for s in (
8061 self.initial_quote,
8062 self.final_quote,
8063 self._escape_identifier(self.final_quote),
8064 )
8065 )
8066 r = re.compile(
8067 r"(?:"
8068 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8069 r"|([^\.]+))(?=\.|$))+"
8070 % {"initial": initial, "final": final, "escaped": escaped_final}
8071 )
8072 return r
8073
8074 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8075 """Unpack 'schema.table.column'-like strings into components."""
8076
8077 r = self._r_identifiers
8078 return [
8079 self._unescape_identifier(i)
8080 for i in [a or b for a, b in r.findall(identifiers)]
8081 ]