1# sql/compiler.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Literal
48from typing import Mapping
49from typing import MutableMapping
50from typing import NamedTuple
51from typing import NoReturn
52from typing import Optional
53from typing import Pattern
54from typing import Protocol
55from typing import Sequence
56from typing import Set
57from typing import Tuple
58from typing import Type
59from typing import TYPE_CHECKING
60from typing import TypedDict
61from typing import Union
62
63from . import base
64from . import coercions
65from . import crud
66from . import elements
67from . import functions
68from . import operators
69from . import roles
70from . import schema
71from . import selectable
72from . import sqltypes
73from . import util as sql_util
74from ._typing import is_column_element
75from ._typing import is_dml
76from .base import _de_clone
77from .base import _from_objects
78from .base import _NONE_NAME
79from .base import _SentinelDefaultCharacterization
80from .base import NO_ARG
81from .elements import quoted_name
82from .sqltypes import TupleType
83from .visitors import prefix_anon_map
84from .. import exc
85from .. import util
86from ..util import FastIntFlag
87from ..util.typing import Self
88from ..util.typing import TupleAny
89from ..util.typing import Unpack
90
91if typing.TYPE_CHECKING:
92 from .annotation import _AnnotationDict
93 from .base import _AmbiguousTableNameMap
94 from .base import CompileState
95 from .base import Executable
96 from .cache_key import CacheKey
97 from .ddl import CreateTableAs
98 from .ddl import ExecutableDDLElement
99 from .dml import Delete
100 from .dml import Insert
101 from .dml import Update
102 from .dml import UpdateBase
103 from .dml import UpdateDMLState
104 from .dml import ValuesBase
105 from .elements import _truncated_label
106 from .elements import BinaryExpression
107 from .elements import BindParameter
108 from .elements import ClauseElement
109 from .elements import ColumnClause
110 from .elements import ColumnElement
111 from .elements import False_
112 from .elements import Label
113 from .elements import Null
114 from .elements import True_
115 from .functions import Function
116 from .schema import Column
117 from .schema import Constraint
118 from .schema import ForeignKeyConstraint
119 from .schema import Index
120 from .schema import PrimaryKeyConstraint
121 from .schema import Table
122 from .schema import UniqueConstraint
123 from .selectable import _ColumnsClauseElement
124 from .selectable import AliasedReturnsRows
125 from .selectable import CompoundSelectState
126 from .selectable import CTE
127 from .selectable import FromClause
128 from .selectable import NamedFromClause
129 from .selectable import ReturnsRows
130 from .selectable import Select
131 from .selectable import SelectState
132 from .type_api import _BindProcessorType
133 from .type_api import TypeDecorator
134 from .type_api import TypeEngine
135 from .type_api import UserDefinedType
136 from .visitors import Visitable
137 from ..engine.cursor import CursorResultMetaData
138 from ..engine.interfaces import _CoreSingleExecuteParams
139 from ..engine.interfaces import _DBAPIAnyExecuteParams
140 from ..engine.interfaces import _DBAPIMultiExecuteParams
141 from ..engine.interfaces import _DBAPISingleExecuteParams
142 from ..engine.interfaces import _ExecuteOptions
143 from ..engine.interfaces import _GenericSetInputSizesType
144 from ..engine.interfaces import _MutableCoreSingleExecuteParams
145 from ..engine.interfaces import Dialect
146 from ..engine.interfaces import SchemaTranslateMapType
147
148
149_FromHintsType = Dict["FromClause", str]
150
151RESERVED_WORDS = {
152 "all",
153 "analyse",
154 "analyze",
155 "and",
156 "any",
157 "array",
158 "as",
159 "asc",
160 "asymmetric",
161 "authorization",
162 "between",
163 "binary",
164 "both",
165 "case",
166 "cast",
167 "check",
168 "collate",
169 "column",
170 "constraint",
171 "create",
172 "cross",
173 "current_date",
174 "current_role",
175 "current_time",
176 "current_timestamp",
177 "current_user",
178 "default",
179 "deferrable",
180 "desc",
181 "distinct",
182 "do",
183 "else",
184 "end",
185 "except",
186 "false",
187 "for",
188 "foreign",
189 "freeze",
190 "from",
191 "full",
192 "grant",
193 "group",
194 "having",
195 "ilike",
196 "in",
197 "initially",
198 "inner",
199 "intersect",
200 "into",
201 "is",
202 "isnull",
203 "join",
204 "leading",
205 "left",
206 "like",
207 "limit",
208 "localtime",
209 "localtimestamp",
210 "natural",
211 "new",
212 "not",
213 "notnull",
214 "null",
215 "off",
216 "offset",
217 "old",
218 "on",
219 "only",
220 "or",
221 "order",
222 "outer",
223 "overlaps",
224 "placing",
225 "primary",
226 "references",
227 "right",
228 "select",
229 "session_user",
230 "set",
231 "similar",
232 "some",
233 "symmetric",
234 "table",
235 "then",
236 "to",
237 "trailing",
238 "true",
239 "union",
240 "unique",
241 "user",
242 "using",
243 "verbose",
244 "when",
245 "where",
246}
247
248LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
249LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
250ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
251
252FK_ON_DELETE = re.compile(
253 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
254)
255FK_ON_UPDATE = re.compile(
256 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
257)
258FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
259BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
260BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
261
262_pyformat_template = "%%(%(name)s)s"
263BIND_TEMPLATES = {
264 "pyformat": _pyformat_template,
265 "qmark": "?",
266 "format": "%%s",
267 "numeric": ":[_POSITION]",
268 "numeric_dollar": "$[_POSITION]",
269 "named": ":%(name)s",
270}
271
272
273OPERATORS = {
274 # binary
275 operators.and_: " AND ",
276 operators.or_: " OR ",
277 operators.add: " + ",
278 operators.mul: " * ",
279 operators.sub: " - ",
280 operators.mod: " % ",
281 operators.neg: "-",
282 operators.lt: " < ",
283 operators.le: " <= ",
284 operators.ne: " != ",
285 operators.gt: " > ",
286 operators.ge: " >= ",
287 operators.eq: " = ",
288 operators.is_distinct_from: " IS DISTINCT FROM ",
289 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
290 operators.concat_op: " || ",
291 operators.match_op: " MATCH ",
292 operators.not_match_op: " NOT MATCH ",
293 operators.in_op: " IN ",
294 operators.not_in_op: " NOT IN ",
295 operators.comma_op: ", ",
296 operators.from_: " FROM ",
297 operators.as_: " AS ",
298 operators.is_: " IS ",
299 operators.is_not: " IS NOT ",
300 operators.collate: " COLLATE ",
301 # unary
302 operators.exists: "EXISTS ",
303 operators.distinct_op: "DISTINCT ",
304 operators.inv: "NOT ",
305 operators.any_op: "ANY ",
306 operators.all_op: "ALL ",
307 # modifiers
308 operators.desc_op: " DESC",
309 operators.asc_op: " ASC",
310 operators.nulls_first_op: " NULLS FIRST",
311 operators.nulls_last_op: " NULLS LAST",
312 # bitwise
313 operators.bitwise_xor_op: " ^ ",
314 operators.bitwise_or_op: " | ",
315 operators.bitwise_and_op: " & ",
316 operators.bitwise_not_op: "~",
317 operators.bitwise_lshift_op: " << ",
318 operators.bitwise_rshift_op: " >> ",
319}
320
321FUNCTIONS: Dict[Type[Function[Any]], str] = {
322 functions.coalesce: "coalesce",
323 functions.current_date: "CURRENT_DATE",
324 functions.current_time: "CURRENT_TIME",
325 functions.current_timestamp: "CURRENT_TIMESTAMP",
326 functions.current_user: "CURRENT_USER",
327 functions.localtime: "LOCALTIME",
328 functions.localtimestamp: "LOCALTIMESTAMP",
329 functions.random: "random",
330 functions.sysdate: "sysdate",
331 functions.session_user: "SESSION_USER",
332 functions.user: "USER",
333 functions.cube: "CUBE",
334 functions.rollup: "ROLLUP",
335 functions.grouping_sets: "GROUPING SETS",
336}
337
338
339EXTRACT_MAP = {
340 "month": "month",
341 "day": "day",
342 "year": "year",
343 "second": "second",
344 "hour": "hour",
345 "doy": "doy",
346 "minute": "minute",
347 "quarter": "quarter",
348 "dow": "dow",
349 "week": "week",
350 "epoch": "epoch",
351 "milliseconds": "milliseconds",
352 "microseconds": "microseconds",
353 "timezone_hour": "timezone_hour",
354 "timezone_minute": "timezone_minute",
355}
356
357COMPOUND_KEYWORDS = {
358 selectable._CompoundSelectKeyword.UNION: "UNION",
359 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
360 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
361 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
362 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
363 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
364}
365
366
367class ResultColumnsEntry(NamedTuple):
368 """Tracks a column expression that is expected to be represented
369 in the result rows for this statement.
370
371 This normally refers to the columns clause of a SELECT statement
372 but may also refer to a RETURNING clause, as well as for dialect-specific
373 emulations.
374
375 """
376
377 keyname: str
378 """string name that's expected in cursor.description"""
379
380 name: str
381 """column name, may be labeled"""
382
383 objects: Tuple[Any, ...]
384 """sequence of objects that should be able to locate this column
385 in a RowMapping. This is typically string names and aliases
386 as well as Column objects.
387
388 """
389
390 type: TypeEngine[Any]
391 """Datatype to be associated with this column. This is where
392 the "result processing" logic directly links the compiled statement
393 to the rows that come back from the cursor.
394
395 """
396
397
398class _ResultMapAppender(Protocol):
399 def __call__(
400 self,
401 keyname: str,
402 name: str,
403 objects: Sequence[Any],
404 type_: TypeEngine[Any],
405 ) -> None: ...
406
407
408# integer indexes into ResultColumnsEntry used by cursor.py.
409# some profiling showed integer access faster than named tuple
410RM_RENDERED_NAME: Literal[0] = 0
411RM_NAME: Literal[1] = 1
412RM_OBJECTS: Literal[2] = 2
413RM_TYPE: Literal[3] = 3
414
415
416class _BaseCompilerStackEntry(TypedDict):
417 asfrom_froms: Set[FromClause]
418 correlate_froms: Set[FromClause]
419 selectable: ReturnsRows
420
421
422class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
423 compile_state: CompileState
424 need_result_map_for_nested: bool
425 need_result_map_for_compound: bool
426 select_0: ReturnsRows
427 insert_from_select: Select[Unpack[TupleAny]]
428
429
430class ExpandedState(NamedTuple):
431 """represents state to use when producing "expanded" and
432 "post compile" bound parameters for a statement.
433
434 "expanded" parameters are parameters that are generated at
435 statement execution time to suit a number of parameters passed, the most
436 prominent example being the individual elements inside of an IN expression.
437
438 "post compile" parameters are parameters where the SQL literal value
439 will be rendered into the SQL statement at execution time, rather than
440 being passed as separate parameters to the driver.
441
442 To create an :class:`.ExpandedState` instance, use the
443 :meth:`.SQLCompiler.construct_expanded_state` method on any
444 :class:`.SQLCompiler` instance.
445
446 """
447
448 statement: str
449 """String SQL statement with parameters fully expanded"""
450
451 parameters: _CoreSingleExecuteParams
452 """Parameter dictionary with parameters fully expanded.
453
454 For a statement that uses named parameters, this dictionary will map
455 exactly to the names in the statement. For a statement that uses
456 positional parameters, the :attr:`.ExpandedState.positional_parameters`
457 will yield a tuple with the positional parameter set.
458
459 """
460
461 processors: Mapping[str, _BindProcessorType[Any]]
462 """mapping of bound value processors"""
463
464 positiontup: Optional[Sequence[str]]
465 """Sequence of string names indicating the order of positional
466 parameters"""
467
468 parameter_expansion: Mapping[str, List[str]]
469 """Mapping representing the intermediary link from original parameter
470 name to list of "expanded" parameter names, for those parameters that
471 were expanded."""
472
473 @property
474 def positional_parameters(self) -> Tuple[Any, ...]:
475 """Tuple of positional parameters, for statements that were compiled
476 using a positional paramstyle.
477
478 """
479 if self.positiontup is None:
480 raise exc.InvalidRequestError(
481 "statement does not use a positional paramstyle"
482 )
483 return tuple(self.parameters[key] for key in self.positiontup)
484
485 @property
486 def additional_parameters(self) -> _CoreSingleExecuteParams:
487 """synonym for :attr:`.ExpandedState.parameters`."""
488 return self.parameters
489
490
491class _InsertManyValues(NamedTuple):
492 """represents state to use for executing an "insertmanyvalues" statement.
493
494 The primary consumers of this object are the
495 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
496 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
497
498 .. versionadded:: 2.0
499
500 """
501
502 is_default_expr: bool
503 """if True, the statement is of the form
504 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
505
506 """
507
508 single_values_expr: str
509 """The rendered "values" clause of the INSERT statement.
510
511 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
512 The insertmanyvalues logic uses this string as a search and replace
513 target.
514
515 """
516
517 insert_crud_params: List[crud._CrudParamElementStr]
518 """List of Column / bind names etc. used while rewriting the statement"""
519
520 num_positional_params_counted: int
521 """the number of bound parameters in a single-row statement.
522
523 This count may be larger or smaller than the actual number of columns
524 targeted in the INSERT, as it accommodates for SQL expressions
525 in the values list that may have zero or more parameters embedded
526 within them.
527
528 This count is part of what's used to organize rewritten parameter lists
529 when batching.
530
531 """
532
533 sort_by_parameter_order: bool = False
534 """if the deterministic_returnined_order parameter were used on the
535 insert.
536
537 All of the attributes following this will only be used if this is True.
538
539 """
540
541 includes_upsert_behaviors: bool = False
542 """if True, we have to accommodate for upsert behaviors.
543
544 This will in some cases downgrade "insertmanyvalues" that requests
545 deterministic ordering.
546
547 """
548
549 sentinel_columns: Optional[Sequence[Column[Any]]] = None
550 """List of sentinel columns that were located.
551
552 This list is only here if the INSERT asked for
553 sort_by_parameter_order=True,
554 and dialect-appropriate sentinel columns were located.
555
556 .. versionadded:: 2.0.10
557
558 """
559
560 num_sentinel_columns: int = 0
561 """how many sentinel columns are in the above list, if any.
562
563 This is the same as
564 ``len(sentinel_columns) if sentinel_columns is not None else 0``
565
566 """
567
568 sentinel_param_keys: Optional[Sequence[str]] = None
569 """parameter str keys in each param dictionary / tuple
570 that would link to the client side "sentinel" values for that row, which
571 we can use to match up parameter sets to result rows.
572
573 This is only present if sentinel_columns is present and the INSERT
574 statement actually refers to client side values for these sentinel
575 columns.
576
577 .. versionadded:: 2.0.10
578
579 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
580 only, used against the "compiled parameteters" collection before
581 the parameters were converted by bound parameter processors
582
583 """
584
585 implicit_sentinel: bool = False
586 """if True, we have exactly one sentinel column and it uses a server side
587 value, currently has to generate an incrementing integer value.
588
589 The dialect in question would have asserted that it supports receiving
590 these values back and sorting on that value as a means of guaranteeing
591 correlation with the incoming parameter list.
592
593 .. versionadded:: 2.0.10
594
595 """
596
597 embed_values_counter: bool = False
598 """Whether to embed an incrementing integer counter in each parameter
599 set within the VALUES clause as parameters are batched over.
600
601 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
602 where a subquery is used to produce value tuples. Current support
603 includes PostgreSQL, Microsoft SQL Server.
604
605 .. versionadded:: 2.0.10
606
607 """
608
609
610class _InsertManyValuesBatch(NamedTuple):
611 """represents an individual batch SQL statement for insertmanyvalues.
612
613 This is passed through the
614 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
615 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
616 to the :class:`.Connection` within the
617 :meth:`.Connection._exec_insertmany_context` method.
618
619 .. versionadded:: 2.0.10
620
621 """
622
623 replaced_statement: str
624 replaced_parameters: _DBAPIAnyExecuteParams
625 processed_setinputsizes: Optional[_GenericSetInputSizesType]
626 batch: Sequence[_DBAPISingleExecuteParams]
627 sentinel_values: Sequence[Tuple[Any, ...]]
628 current_batch_size: int
629 batchnum: int
630 total_batches: int
631 rows_sorted: bool
632 is_downgraded: bool
633
634
635class InsertmanyvaluesSentinelOpts(FastIntFlag):
636 """bitflag enum indicating styles of PK defaults
637 which can work as implicit sentinel columns
638
639 """
640
641 NOT_SUPPORTED = 1
642 AUTOINCREMENT = 2
643 IDENTITY = 4
644 SEQUENCE = 8
645
646 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
647 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
648
649 USE_INSERT_FROM_SELECT = 16
650 RENDER_SELECT_COL_CASTS = 64
651
652
653class AggregateOrderByStyle(IntEnum):
654 """Describes backend database's capabilities with ORDER BY for aggregate
655 functions
656
657 .. versionadded:: 2.1
658
659 """
660
661 NONE = 0
662 """database has no ORDER BY for aggregate functions"""
663
664 INLINE = 1
665 """ORDER BY is rendered inside the function's argument list, typically as
666 the last element"""
667
668 WITHIN_GROUP = 2
669 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
670 functions (not just the ordered set ones)"""
671
672
673class CompilerState(IntEnum):
674 COMPILING = 0
675 """statement is present, compilation phase in progress"""
676
677 STRING_APPLIED = 1
678 """statement is present, string form of the statement has been applied.
679
680 Additional processors by subclasses may still be pending.
681
682 """
683
684 NO_STATEMENT = 2
685 """compiler does not have a statement to compile, is used
686 for method access"""
687
688
689class Linting(IntEnum):
690 """represent preferences for the 'SQL linting' feature.
691
692 this feature currently includes support for flagging cartesian products
693 in SQL statements.
694
695 """
696
697 NO_LINTING = 0
698 "Disable all linting."
699
700 COLLECT_CARTESIAN_PRODUCTS = 1
701 """Collect data on FROMs and cartesian products and gather into
702 'self.from_linter'"""
703
704 WARN_LINTING = 2
705 "Emit warnings for linters that find problems"
706
707 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
708 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
709 and WARN_LINTING"""
710
711
712NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
713 Linting
714)
715
716
717class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
718 """represents current state for the "cartesian product" detection
719 feature."""
720
721 def lint(self, start=None):
722 froms = self.froms
723 if not froms:
724 return None, None
725
726 edges = set(self.edges)
727 the_rest = set(froms)
728
729 if start is not None:
730 start_with = start
731 the_rest.remove(start_with)
732 else:
733 start_with = the_rest.pop()
734
735 stack = collections.deque([start_with])
736
737 while stack and the_rest:
738 node = stack.popleft()
739 the_rest.discard(node)
740
741 # comparison of nodes in edges here is based on hash equality, as
742 # there are "annotated" elements that match the non-annotated ones.
743 # to remove the need for in-python hash() calls, use native
744 # containment routines (e.g. "node in edge", "edge.index(node)")
745 to_remove = {edge for edge in edges if node in edge}
746
747 # appendleft the node in each edge that is not
748 # the one that matched.
749 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
750 edges.difference_update(to_remove)
751
752 # FROMS left over? boom
753 if the_rest:
754 return the_rest, start_with
755 else:
756 return None, None
757
758 def warn(self, stmt_type="SELECT"):
759 the_rest, start_with = self.lint()
760
761 # FROMS left over? boom
762 if the_rest:
763 froms = the_rest
764 if froms:
765 template = (
766 "{stmt_type} statement has a cartesian product between "
767 "FROM element(s) {froms} and "
768 'FROM element "{start}". Apply join condition(s) '
769 "between each element to resolve."
770 )
771 froms_str = ", ".join(
772 f'"{self.froms[from_]}"' for from_ in froms
773 )
774 message = template.format(
775 stmt_type=stmt_type,
776 froms=froms_str,
777 start=self.froms[start_with],
778 )
779
780 util.warn(message)
781
782
783class Compiled:
784 """Represent a compiled SQL or DDL expression.
785
786 The ``__str__`` method of the ``Compiled`` object should produce
787 the actual text of the statement. ``Compiled`` objects are
788 specific to their underlying database dialect, and also may
789 or may not be specific to the columns referenced within a
790 particular set of bind parameters. In no case should the
791 ``Compiled`` object be dependent on the actual values of those
792 bind parameters, even though it may reference those values as
793 defaults.
794 """
795
796 statement: Optional[ClauseElement] = None
797 "The statement to compile."
798 string: str = ""
799 "The string representation of the ``statement``"
800
801 state: CompilerState
802 """description of the compiler's state"""
803
804 is_sql = False
805 is_ddl = False
806
807 _cached_metadata: Optional[CursorResultMetaData] = None
808
809 _result_columns: Optional[List[ResultColumnsEntry]] = None
810
811 schema_translate_map: Optional[SchemaTranslateMapType] = None
812
813 execution_options: _ExecuteOptions = util.EMPTY_DICT
814 """
815 Execution options propagated from the statement. In some cases,
816 sub-elements of the statement can modify these.
817 """
818
819 preparer: IdentifierPreparer
820
821 _annotations: _AnnotationDict = util.EMPTY_DICT
822
823 compile_state: Optional[CompileState] = None
824 """Optional :class:`.CompileState` object that maintains additional
825 state used by the compiler.
826
827 Major executable objects such as :class:`_expression.Insert`,
828 :class:`_expression.Update`, :class:`_expression.Delete`,
829 :class:`_expression.Select` will generate this
830 state when compiled in order to calculate additional information about the
831 object. For the top level object that is to be executed, the state can be
832 stored here where it can also have applicability towards result set
833 processing.
834
835 .. versionadded:: 1.4
836
837 """
838
839 dml_compile_state: Optional[CompileState] = None
840 """Optional :class:`.CompileState` assigned at the same point that
841 .isinsert, .isupdate, or .isdelete is assigned.
842
843 This will normally be the same object as .compile_state, with the
844 exception of cases like the :class:`.ORMFromStatementCompileState`
845 object.
846
847 .. versionadded:: 1.4.40
848
849 """
850
851 cache_key: Optional[CacheKey] = None
852 """The :class:`.CacheKey` that was generated ahead of creating this
853 :class:`.Compiled` object.
854
855 This is used for routines that need access to the original
856 :class:`.CacheKey` instance generated when the :class:`.Compiled`
857 instance was first cached, typically in order to reconcile
858 the original list of :class:`.BindParameter` objects with a
859 per-statement list that's generated on each call.
860
861 """
862
863 _gen_time: float
864 """Generation time of this :class:`.Compiled`, used for reporting
865 cache stats."""
866
867 def __init__(
868 self,
869 dialect: Dialect,
870 statement: Optional[ClauseElement],
871 schema_translate_map: Optional[SchemaTranslateMapType] = None,
872 render_schema_translate: bool = False,
873 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
874 ):
875 """Construct a new :class:`.Compiled` object.
876
877 :param dialect: :class:`.Dialect` to compile against.
878
879 :param statement: :class:`_expression.ClauseElement` to be compiled.
880
881 :param schema_translate_map: dictionary of schema names to be
882 translated when forming the resultant SQL
883
884 .. seealso::
885
886 :ref:`schema_translating`
887
888 :param compile_kwargs: additional kwargs that will be
889 passed to the initial call to :meth:`.Compiled.process`.
890
891
892 """
893 self.dialect = dialect
894 self.preparer = self.dialect.identifier_preparer
895 if schema_translate_map:
896 self.schema_translate_map = schema_translate_map
897 self.preparer = self.preparer._with_schema_translate(
898 schema_translate_map
899 )
900
901 if statement is not None:
902 self.state = CompilerState.COMPILING
903 self.statement = statement
904 self.can_execute = statement.supports_execution
905 self._annotations = statement._annotations
906 if self.can_execute:
907 if TYPE_CHECKING:
908 assert isinstance(statement, Executable)
909 self.execution_options = statement._execution_options
910 self.string = self.process(self.statement, **compile_kwargs)
911
912 if render_schema_translate:
913 assert schema_translate_map is not None
914 self.string = self.preparer._render_schema_translates(
915 self.string, schema_translate_map
916 )
917
918 self.state = CompilerState.STRING_APPLIED
919 else:
920 self.state = CompilerState.NO_STATEMENT
921
922 self._gen_time = perf_counter()
923
924 def __init_subclass__(cls) -> None:
925 cls._init_compiler_cls()
926 return super().__init_subclass__()
927
928 @classmethod
929 def _init_compiler_cls(cls):
930 pass
931
932 def visit_unsupported_compilation(self, element, err, **kw):
933 raise exc.UnsupportedCompilationError(self, type(element)) from err
934
935 @property
936 def sql_compiler(self) -> SQLCompiler:
937 """Return a Compiled that is capable of processing SQL expressions.
938
939 If this compiler is one, it would likely just return 'self'.
940
941 """
942
943 raise NotImplementedError()
944
945 def process(self, obj: Visitable, **kwargs: Any) -> str:
946 return obj._compiler_dispatch(self, **kwargs)
947
948 def __str__(self) -> str:
949 """Return the string text of the generated SQL or DDL."""
950
951 if self.state is CompilerState.STRING_APPLIED:
952 return self.string
953 else:
954 return ""
955
956 def construct_params(
957 self,
958 params: Optional[_CoreSingleExecuteParams] = None,
959 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
960 escape_names: bool = True,
961 ) -> Optional[_MutableCoreSingleExecuteParams]:
962 """Return the bind params for this compiled object.
963
964 :param params: a dict of string/object pairs whose values will
965 override bind values compiled in to the
966 statement.
967 """
968
969 raise NotImplementedError()
970
971 @property
972 def params(self):
973 """Return the bind params for this compiled object."""
974 return self.construct_params()
975
976
977class TypeCompiler(util.EnsureKWArg):
978 """Produces DDL specification for TypeEngine objects."""
979
980 ensure_kwarg = r"visit_\w+"
981
982 def __init__(self, dialect: Dialect):
983 self.dialect = dialect
984
985 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
986 if (
987 type_._variant_mapping
988 and self.dialect.name in type_._variant_mapping
989 ):
990 type_ = type_._variant_mapping[self.dialect.name]
991 return type_._compiler_dispatch(self, **kw)
992
993 def visit_unsupported_compilation(
994 self, element: Any, err: Exception, **kw: Any
995 ) -> NoReturn:
996 raise exc.UnsupportedCompilationError(self, element) from err
997
998
999# this was a Visitable, but to allow accurate detection of
1000# column elements this is actually a column element
1001class _CompileLabel(
1002 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1003):
1004 """lightweight label object which acts as an expression.Label."""
1005
1006 __visit_name__ = "label"
1007 __slots__ = "element", "name", "_alt_names"
1008
1009 def __init__(self, col, name, alt_names=()):
1010 self.element = col
1011 self.name = name
1012 self._alt_names = (col,) + alt_names
1013
1014 @property
1015 def proxy_set(self):
1016 return self.element.proxy_set
1017
1018 @property
1019 def type(self):
1020 return self.element.type
1021
1022 def self_group(self, **kw):
1023 return self
1024
1025
1026class aggregate_orderby_inline(
1027 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1028):
1029 """produce ORDER BY inside of function argument lists"""
1030
1031 __visit_name__ = "aggregate_orderby_inline"
1032 __slots__ = "element", "aggregate_order_by"
1033
1034 def __init__(self, element, orderby):
1035 self.element = element
1036 self.aggregate_order_by = orderby
1037
1038 def __iter__(self):
1039 return iter(self.element)
1040
1041 @property
1042 def proxy_set(self):
1043 return self.element.proxy_set
1044
1045 @property
1046 def type(self):
1047 return self.element.type
1048
1049 def self_group(self, **kw):
1050 return self
1051
1052 def _with_binary_element_type(self, type_):
1053 return aggregate_orderby_inline(
1054 self.element._with_binary_element_type(type_),
1055 self.aggregate_order_by,
1056 )
1057
1058
1059class ilike_case_insensitive(
1060 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1061):
1062 """produce a wrapping element for a case-insensitive portion of
1063 an ILIKE construct.
1064
1065 The construct usually renders the ``lower()`` function, but on
1066 PostgreSQL will pass silently with the assumption that "ILIKE"
1067 is being used.
1068
1069 .. versionadded:: 2.0
1070
1071 """
1072
1073 __visit_name__ = "ilike_case_insensitive_operand"
1074 __slots__ = "element", "comparator"
1075
1076 def __init__(self, element):
1077 self.element = element
1078 self.comparator = element.comparator
1079
1080 @property
1081 def proxy_set(self):
1082 return self.element.proxy_set
1083
1084 @property
1085 def type(self):
1086 return self.element.type
1087
1088 def self_group(self, **kw):
1089 return self
1090
1091 def _with_binary_element_type(self, type_):
1092 return ilike_case_insensitive(
1093 self.element._with_binary_element_type(type_)
1094 )
1095
1096
1097class SQLCompiler(Compiled):
1098 """Default implementation of :class:`.Compiled`.
1099
1100 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1101
1102 """
1103
1104 extract_map = EXTRACT_MAP
1105
1106 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1107 util.immutabledict(
1108 {
1109 "%": "P",
1110 "(": "A",
1111 ")": "Z",
1112 ":": "C",
1113 ".": "_",
1114 "[": "_",
1115 "]": "_",
1116 " ": "_",
1117 }
1118 )
1119 )
1120 """A mapping (e.g. dict or similar) containing a lookup of
1121 characters keyed to replacement characters which will be applied to all
1122 'bind names' used in SQL statements as a form of 'escaping'; the given
1123 characters are replaced entirely with the 'replacement' character when
1124 rendered in the SQL statement, and a similar translation is performed
1125 on the incoming names used in parameter dictionaries passed to methods
1126 like :meth:`_engine.Connection.execute`.
1127
1128 This allows bound parameter names used in :func:`_sql.bindparam` and
1129 other constructs to have any arbitrary characters present without any
1130 concern for characters that aren't allowed at all on the target database.
1131
1132 Third party dialects can establish their own dictionary here to replace the
1133 default mapping, which will ensure that the particular characters in the
1134 mapping will never appear in a bound parameter name.
1135
1136 The dictionary is evaluated at **class creation time**, so cannot be
1137 modified at runtime; it must be present on the class when the class
1138 is first declared.
1139
1140 Note that for dialects that have additional bound parameter rules such
1141 as additional restrictions on leading characters, the
1142 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1143 See the cx_Oracle compiler for an example of this.
1144
1145 .. versionadded:: 2.0.0rc1
1146
1147 """
1148
1149 _bind_translate_re: ClassVar[Pattern[str]]
1150 _bind_translate_chars: ClassVar[Mapping[str, str]]
1151
1152 is_sql = True
1153
1154 compound_keywords = COMPOUND_KEYWORDS
1155
1156 isdelete: bool = False
1157 isinsert: bool = False
1158 isupdate: bool = False
1159 """class-level defaults which can be set at the instance
1160 level to define if this Compiled instance represents
1161 INSERT/UPDATE/DELETE
1162 """
1163
1164 postfetch: Optional[List[Column[Any]]]
1165 """list of columns that can be post-fetched after INSERT or UPDATE to
1166 receive server-updated values"""
1167
1168 insert_prefetch: Sequence[Column[Any]] = ()
1169 """list of columns for which default values should be evaluated before
1170 an INSERT takes place"""
1171
1172 update_prefetch: Sequence[Column[Any]] = ()
1173 """list of columns for which onupdate default values should be evaluated
1174 before an UPDATE takes place"""
1175
1176 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1177 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1178 statement, used to receive newly generated values of columns.
1179
1180 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1181 ``returning`` collection, which was not a generalized RETURNING
1182 collection and instead was in fact specific to the "implicit returning"
1183 feature.
1184
1185 """
1186
1187 isplaintext: bool = False
1188
1189 binds: Dict[str, BindParameter[Any]]
1190 """a dictionary of bind parameter keys to BindParameter instances."""
1191
1192 bind_names: Dict[BindParameter[Any], str]
1193 """a dictionary of BindParameter instances to "compiled" names
1194 that are actually present in the generated SQL"""
1195
1196 stack: List[_CompilerStackEntry]
1197 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1198 tracked in this stack using an entry format."""
1199
1200 returning_precedes_values: bool = False
1201 """set to True classwide to generate RETURNING
1202 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1203 """
1204
1205 render_table_with_column_in_update_from: bool = False
1206 """set to True classwide to indicate the SET clause
1207 in a multi-table UPDATE statement should qualify
1208 columns with the table name (i.e. MySQL only)
1209 """
1210
1211 ansi_bind_rules: bool = False
1212 """SQL 92 doesn't allow bind parameters to be used
1213 in the columns clause of a SELECT, nor does it allow
1214 ambiguous expressions like "? = ?". A compiler
1215 subclass can set this flag to False if the target
1216 driver/DB enforces this
1217 """
1218
1219 bindtemplate: str
1220 """template to render bound parameters based on paramstyle."""
1221
1222 compilation_bindtemplate: str
1223 """template used by compiler to render parameters before positional
1224 paramstyle application"""
1225
1226 _numeric_binds_identifier_char: str
1227 """Character that's used to as the identifier of a numerical bind param.
1228 For example if this char is set to ``$``, numerical binds will be rendered
1229 in the form ``$1, $2, $3``.
1230 """
1231
1232 _result_columns: List[ResultColumnsEntry]
1233 """relates label names in the final SQL to a tuple of local
1234 column/label name, ColumnElement object (if any) and
1235 TypeEngine. CursorResult uses this for type processing and
1236 column targeting"""
1237
1238 _textual_ordered_columns: bool = False
1239 """tell the result object that the column names as rendered are important,
1240 but they are also "ordered" vs. what is in the compiled object here.
1241
1242 As of 1.4.42 this condition is only present when the statement is a
1243 TextualSelect, e.g. text("....").columns(...), where it is required
1244 that the columns are considered positionally and not by name.
1245
1246 """
1247
1248 _ad_hoc_textual: bool = False
1249 """tell the result that we encountered text() or '*' constructs in the
1250 middle of the result columns, but we also have compiled columns, so
1251 if the number of columns in cursor.description does not match how many
1252 expressions we have, that means we can't rely on positional at all and
1253 should match on name.
1254
1255 """
1256
1257 _ordered_columns: bool = True
1258 """
1259 if False, means we can't be sure the list of entries
1260 in _result_columns is actually the rendered order. Usually
1261 True unless using an unordered TextualSelect.
1262 """
1263
1264 _loose_column_name_matching: bool = False
1265 """tell the result object that the SQL statement is textual, wants to match
1266 up to Column objects, and may be using the ._tq_label in the SELECT rather
1267 than the base name.
1268
1269 """
1270
1271 _numeric_binds: bool = False
1272 """
1273 True if paramstyle is "numeric". This paramstyle is trickier than
1274 all the others.
1275
1276 """
1277
1278 _render_postcompile: bool = False
1279 """
1280 whether to render out POSTCOMPILE params during the compile phase.
1281
1282 This attribute is used only for end-user invocation of stmt.compile();
1283 it's never used for actual statement execution, where instead the
1284 dialect internals access and render the internal postcompile structure
1285 directly.
1286
1287 """
1288
1289 _post_compile_expanded_state: Optional[ExpandedState] = None
1290 """When render_postcompile is used, the ``ExpandedState`` used to create
1291 the "expanded" SQL is assigned here, and then used by the ``.params``
1292 accessor and ``.construct_params()`` methods for their return values.
1293
1294 .. versionadded:: 2.0.0rc1
1295
1296 """
1297
1298 _pre_expanded_string: Optional[str] = None
1299 """Stores the original string SQL before 'post_compile' is applied,
1300 for cases where 'post_compile' were used.
1301
1302 """
1303
1304 _pre_expanded_positiontup: Optional[List[str]] = None
1305
1306 _insertmanyvalues: Optional[_InsertManyValues] = None
1307
1308 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1309
1310 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1311 """bindparameter objects that are rendered as literal values at statement
1312 execution time.
1313
1314 """
1315
1316 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1317 """bindparameter objects that are rendered as bound parameter placeholders
1318 at statement execution time.
1319
1320 """
1321
1322 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1323 """Late escaping of bound parameter names that has to be converted
1324 to the original name when looking in the parameter dictionary.
1325
1326 """
1327
1328 has_out_parameters = False
1329 """if True, there are bindparam() objects that have the isoutparam
1330 flag set."""
1331
1332 postfetch_lastrowid = False
1333 """if True, and this in insert, use cursor.lastrowid to populate
1334 result.inserted_primary_key. """
1335
1336 _cache_key_bind_match: Optional[
1337 Tuple[
1338 Dict[
1339 BindParameter[Any],
1340 List[BindParameter[Any]],
1341 ],
1342 Dict[
1343 str,
1344 BindParameter[Any],
1345 ],
1346 ]
1347 ] = None
1348 """a mapping that will relate the BindParameter object we compile
1349 to those that are part of the extracted collection of parameters
1350 in the cache key, if we were given a cache key.
1351
1352 """
1353
1354 positiontup: Optional[List[str]] = None
1355 """for a compiled construct that uses a positional paramstyle, will be
1356 a sequence of strings, indicating the names of bound parameters in order.
1357
1358 This is used in order to render bound parameters in their correct order,
1359 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1360 render parameters.
1361
1362 This sequence always contains the unescaped name of the parameters.
1363
1364 .. seealso::
1365
1366 :ref:`faq_sql_expression_string` - includes a usage example for
1367 debugging use cases.
1368
1369 """
1370 _values_bindparam: Optional[List[str]] = None
1371
1372 _visited_bindparam: Optional[List[str]] = None
1373
1374 inline: bool = False
1375
1376 ctes: Optional[MutableMapping[CTE, str]]
1377
1378 # Detect same CTE references - Dict[(level, name), cte]
1379 # Level is required for supporting nesting
1380 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1381
1382 # To retrieve key/level in ctes_by_level_name -
1383 # Dict[cte_reference, (level, cte_name, cte_opts)]
1384 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1385
1386 ctes_recursive: bool
1387
1388 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1389 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1390 _positional_pattern = re.compile(
1391 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1392 )
1393
1394 @classmethod
1395 def _init_compiler_cls(cls):
1396 cls._init_bind_translate()
1397
1398 @classmethod
1399 def _init_bind_translate(cls):
1400 reg = re.escape("".join(cls.bindname_escape_characters))
1401 cls._bind_translate_re = re.compile(f"[{reg}]")
1402 cls._bind_translate_chars = cls.bindname_escape_characters
1403
1404 def __init__(
1405 self,
1406 dialect: Dialect,
1407 statement: Optional[ClauseElement],
1408 cache_key: Optional[CacheKey] = None,
1409 column_keys: Optional[Sequence[str]] = None,
1410 for_executemany: bool = False,
1411 linting: Linting = NO_LINTING,
1412 _supporting_against: Optional[SQLCompiler] = None,
1413 **kwargs: Any,
1414 ):
1415 """Construct a new :class:`.SQLCompiler` object.
1416
1417 :param dialect: :class:`.Dialect` to be used
1418
1419 :param statement: :class:`_expression.ClauseElement` to be compiled
1420
1421 :param column_keys: a list of column names to be compiled into an
1422 INSERT or UPDATE statement.
1423
1424 :param for_executemany: whether INSERT / UPDATE statements should
1425 expect that they are to be invoked in an "executemany" style,
1426 which may impact how the statement will be expected to return the
1427 values of defaults and autoincrement / sequences and similar.
1428 Depending on the backend and driver in use, support for retrieving
1429 these values may be disabled which means SQL expressions may
1430 be rendered inline, RETURNING may not be rendered, etc.
1431
1432 :param kwargs: additional keyword arguments to be consumed by the
1433 superclass.
1434
1435 """
1436 self.column_keys = column_keys
1437
1438 self.cache_key = cache_key
1439
1440 if cache_key:
1441 cksm = {b.key: b for b in cache_key[1]}
1442 ckbm = {b: [b] for b in cache_key[1]}
1443 self._cache_key_bind_match = (ckbm, cksm)
1444
1445 # compile INSERT/UPDATE defaults/sequences to expect executemany
1446 # style execution, which may mean no pre-execute of defaults,
1447 # or no RETURNING
1448 self.for_executemany = for_executemany
1449
1450 self.linting = linting
1451
1452 # a dictionary of bind parameter keys to BindParameter
1453 # instances.
1454 self.binds = {}
1455
1456 # a dictionary of BindParameter instances to "compiled" names
1457 # that are actually present in the generated SQL
1458 self.bind_names = util.column_dict()
1459
1460 # stack which keeps track of nested SELECT statements
1461 self.stack = []
1462
1463 self._result_columns = []
1464
1465 # true if the paramstyle is positional
1466 self.positional = dialect.positional
1467 if self.positional:
1468 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1469 if nb:
1470 self._numeric_binds_identifier_char = (
1471 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1472 )
1473
1474 self.compilation_bindtemplate = _pyformat_template
1475 else:
1476 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1477
1478 self.ctes = None
1479
1480 self.label_length = (
1481 dialect.label_length or dialect.max_identifier_length
1482 )
1483
1484 # a map which tracks "anonymous" identifiers that are created on
1485 # the fly here
1486 self.anon_map = prefix_anon_map()
1487
1488 # a map which tracks "truncated" names based on
1489 # dialect.label_length or dialect.max_identifier_length
1490 self.truncated_names: Dict[Tuple[str, str], str] = {}
1491 self._truncated_counters: Dict[str, int] = {}
1492
1493 Compiled.__init__(self, dialect, statement, **kwargs)
1494
1495 if self.isinsert or self.isupdate or self.isdelete:
1496 if TYPE_CHECKING:
1497 assert isinstance(statement, UpdateBase)
1498
1499 if self.isinsert or self.isupdate:
1500 if TYPE_CHECKING:
1501 assert isinstance(statement, ValuesBase)
1502 if statement._inline:
1503 self.inline = True
1504 elif self.for_executemany and (
1505 not self.isinsert
1506 or (
1507 self.dialect.insert_executemany_returning
1508 and statement._return_defaults
1509 )
1510 ):
1511 self.inline = True
1512
1513 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1514
1515 if _supporting_against:
1516 self.__dict__.update(
1517 {
1518 k: v
1519 for k, v in _supporting_against.__dict__.items()
1520 if k
1521 not in {
1522 "state",
1523 "dialect",
1524 "preparer",
1525 "positional",
1526 "_numeric_binds",
1527 "compilation_bindtemplate",
1528 "bindtemplate",
1529 }
1530 }
1531 )
1532
1533 if self.state is CompilerState.STRING_APPLIED:
1534 if self.positional:
1535 if self._numeric_binds:
1536 self._process_numeric()
1537 else:
1538 self._process_positional()
1539
1540 if self._render_postcompile:
1541 parameters = self.construct_params(
1542 escape_names=False,
1543 _no_postcompile=True,
1544 )
1545
1546 self._process_parameters_for_postcompile(
1547 parameters, _populate_self=True
1548 )
1549
1550 @property
1551 def insert_single_values_expr(self) -> Optional[str]:
1552 """When an INSERT is compiled with a single set of parameters inside
1553 a VALUES expression, the string is assigned here, where it can be
1554 used for insert batching schemes to rewrite the VALUES expression.
1555
1556 .. versionchanged:: 2.0 This collection is no longer used by
1557 SQLAlchemy's built-in dialects, in favor of the currently
1558 internal ``_insertmanyvalues`` collection that is used only by
1559 :class:`.SQLCompiler`.
1560
1561 """
1562 if self._insertmanyvalues is None:
1563 return None
1564 else:
1565 return self._insertmanyvalues.single_values_expr
1566
1567 @util.ro_memoized_property
1568 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1569 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1570
1571 This is either the so-called "implicit returning" columns which are
1572 calculated by the compiler on the fly, or those present based on what's
1573 present in ``self.statement._returning`` (expanded into individual
1574 columns using the ``._all_selected_columns`` attribute) i.e. those set
1575 explicitly using the :meth:`.UpdateBase.returning` method.
1576
1577 .. versionadded:: 2.0
1578
1579 """
1580 if self.implicit_returning:
1581 return self.implicit_returning
1582 elif self.statement is not None and is_dml(self.statement):
1583 return [
1584 c
1585 for c in self.statement._all_selected_columns
1586 if is_column_element(c)
1587 ]
1588
1589 else:
1590 return None
1591
1592 @property
1593 def returning(self):
1594 """backwards compatibility; returns the
1595 effective_returning collection.
1596
1597 """
1598 return self.effective_returning
1599
1600 @property
1601 def current_executable(self):
1602 """Return the current 'executable' that is being compiled.
1603
1604 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1605 :class:`_sql.Update`, :class:`_sql.Delete`,
1606 :class:`_sql.CompoundSelect` object that is being compiled.
1607 Specifically it's assigned to the ``self.stack`` list of elements.
1608
1609 When a statement like the above is being compiled, it normally
1610 is also assigned to the ``.statement`` attribute of the
1611 :class:`_sql.Compiler` object. However, all SQL constructs are
1612 ultimately nestable, and this attribute should never be consulted
1613 by a ``visit_`` method, as it is not guaranteed to be assigned
1614 nor guaranteed to correspond to the current statement being compiled.
1615
1616 """
1617 try:
1618 return self.stack[-1]["selectable"]
1619 except IndexError as ie:
1620 raise IndexError("Compiler does not have a stack entry") from ie
1621
1622 @property
1623 def prefetch(self):
1624 return list(self.insert_prefetch) + list(self.update_prefetch)
1625
1626 @util.memoized_property
1627 def _global_attributes(self) -> Dict[Any, Any]:
1628 return {}
1629
1630 @util.memoized_instancemethod
1631 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1632 """Initialize collections related to CTEs only if
1633 a CTE is located, to save on the overhead of
1634 these collections otherwise.
1635
1636 """
1637 # collect CTEs to tack on top of a SELECT
1638 # To store the query to print - Dict[cte, text_query]
1639 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1640 self.ctes = ctes
1641
1642 # Detect same CTE references - Dict[(level, name), cte]
1643 # Level is required for supporting nesting
1644 self.ctes_by_level_name = {}
1645
1646 # To retrieve key/level in ctes_by_level_name -
1647 # Dict[cte_reference, (level, cte_name, cte_opts)]
1648 self.level_name_by_cte = {}
1649
1650 self.ctes_recursive = False
1651
1652 return ctes
1653
1654 @contextlib.contextmanager
1655 def _nested_result(self):
1656 """special API to support the use case of 'nested result sets'"""
1657 result_columns, ordered_columns = (
1658 self._result_columns,
1659 self._ordered_columns,
1660 )
1661 self._result_columns, self._ordered_columns = [], False
1662
1663 try:
1664 if self.stack:
1665 entry = self.stack[-1]
1666 entry["need_result_map_for_nested"] = True
1667 else:
1668 entry = None
1669 yield self._result_columns, self._ordered_columns
1670 finally:
1671 if entry:
1672 entry.pop("need_result_map_for_nested")
1673 self._result_columns, self._ordered_columns = (
1674 result_columns,
1675 ordered_columns,
1676 )
1677
1678 def _process_positional(self):
1679 assert not self.positiontup
1680 assert self.state is CompilerState.STRING_APPLIED
1681 assert not self._numeric_binds
1682
1683 if self.dialect.paramstyle == "format":
1684 placeholder = "%s"
1685 else:
1686 assert self.dialect.paramstyle == "qmark"
1687 placeholder = "?"
1688
1689 positions = []
1690
1691 def find_position(m: re.Match[str]) -> str:
1692 normal_bind = m.group(1)
1693 if normal_bind:
1694 positions.append(normal_bind)
1695 return placeholder
1696 else:
1697 # this a post-compile bind
1698 positions.append(m.group(2))
1699 return m.group(0)
1700
1701 self.string = re.sub(
1702 self._positional_pattern, find_position, self.string
1703 )
1704
1705 if self.escaped_bind_names:
1706 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1707 assert len(self.escaped_bind_names) == len(reverse_escape)
1708 self.positiontup = [
1709 reverse_escape.get(name, name) for name in positions
1710 ]
1711 else:
1712 self.positiontup = positions
1713
1714 if self._insertmanyvalues:
1715 positions = []
1716
1717 single_values_expr = re.sub(
1718 self._positional_pattern,
1719 find_position,
1720 self._insertmanyvalues.single_values_expr,
1721 )
1722 insert_crud_params = [
1723 (
1724 v[0],
1725 v[1],
1726 re.sub(self._positional_pattern, find_position, v[2]),
1727 v[3],
1728 )
1729 for v in self._insertmanyvalues.insert_crud_params
1730 ]
1731
1732 self._insertmanyvalues = self._insertmanyvalues._replace(
1733 single_values_expr=single_values_expr,
1734 insert_crud_params=insert_crud_params,
1735 )
1736
1737 def _process_numeric(self):
1738 assert self._numeric_binds
1739 assert self.state is CompilerState.STRING_APPLIED
1740
1741 num = 1
1742 param_pos: Dict[str, str] = {}
1743 order: Iterable[str]
1744 if self._insertmanyvalues and self._values_bindparam is not None:
1745 # bindparams that are not in values are always placed first.
1746 # this avoids the need of changing them when using executemany
1747 # values () ()
1748 order = itertools.chain(
1749 (
1750 name
1751 for name in self.bind_names.values()
1752 if name not in self._values_bindparam
1753 ),
1754 self.bind_names.values(),
1755 )
1756 else:
1757 order = self.bind_names.values()
1758
1759 for bind_name in order:
1760 if bind_name in param_pos:
1761 continue
1762 bind = self.binds[bind_name]
1763 if (
1764 bind in self.post_compile_params
1765 or bind in self.literal_execute_params
1766 ):
1767 # set to None to just mark the in positiontup, it will not
1768 # be replaced below.
1769 param_pos[bind_name] = None # type: ignore
1770 else:
1771 ph = f"{self._numeric_binds_identifier_char}{num}"
1772 num += 1
1773 param_pos[bind_name] = ph
1774
1775 self.next_numeric_pos = num
1776
1777 self.positiontup = list(param_pos)
1778 if self.escaped_bind_names:
1779 len_before = len(param_pos)
1780 param_pos = {
1781 self.escaped_bind_names.get(name, name): pos
1782 for name, pos in param_pos.items()
1783 }
1784 assert len(param_pos) == len_before
1785
1786 # Can't use format here since % chars are not escaped.
1787 self.string = self._pyformat_pattern.sub(
1788 lambda m: param_pos[m.group(1)], self.string
1789 )
1790
1791 if self._insertmanyvalues:
1792 single_values_expr = (
1793 # format is ok here since single_values_expr includes only
1794 # place-holders
1795 self._insertmanyvalues.single_values_expr
1796 % param_pos
1797 )
1798 insert_crud_params = [
1799 (v[0], v[1], "%s", v[3])
1800 for v in self._insertmanyvalues.insert_crud_params
1801 ]
1802
1803 self._insertmanyvalues = self._insertmanyvalues._replace(
1804 # This has the numbers (:1, :2)
1805 single_values_expr=single_values_expr,
1806 # The single binds are instead %s so they can be formatted
1807 insert_crud_params=insert_crud_params,
1808 )
1809
1810 @util.memoized_property
1811 def _bind_processors(
1812 self,
1813 ) -> MutableMapping[
1814 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1815 ]:
1816 # mypy is not able to see the two value types as the above Union,
1817 # it just sees "object". don't know how to resolve
1818 return {
1819 key: value # type: ignore
1820 for key, value in (
1821 (
1822 self.bind_names[bindparam],
1823 (
1824 bindparam.type._cached_bind_processor(self.dialect)
1825 if not bindparam.type._is_tuple_type
1826 else tuple(
1827 elem_type._cached_bind_processor(self.dialect)
1828 for elem_type in cast(
1829 TupleType, bindparam.type
1830 ).types
1831 )
1832 ),
1833 )
1834 for bindparam in self.bind_names
1835 )
1836 if value is not None
1837 }
1838
1839 def is_subquery(self):
1840 return len(self.stack) > 1
1841
1842 @property
1843 def sql_compiler(self) -> Self:
1844 return self
1845
1846 def construct_expanded_state(
1847 self,
1848 params: Optional[_CoreSingleExecuteParams] = None,
1849 escape_names: bool = True,
1850 ) -> ExpandedState:
1851 """Return a new :class:`.ExpandedState` for a given parameter set.
1852
1853 For queries that use "expanding" or other late-rendered parameters,
1854 this method will provide for both the finalized SQL string as well
1855 as the parameters that would be used for a particular parameter set.
1856
1857 .. versionadded:: 2.0.0rc1
1858
1859 """
1860 parameters = self.construct_params(
1861 params,
1862 escape_names=escape_names,
1863 _no_postcompile=True,
1864 )
1865 return self._process_parameters_for_postcompile(
1866 parameters,
1867 )
1868
1869 def construct_params(
1870 self,
1871 params: Optional[_CoreSingleExecuteParams] = None,
1872 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1873 escape_names: bool = True,
1874 _group_number: Optional[int] = None,
1875 _check: bool = True,
1876 _no_postcompile: bool = False,
1877 ) -> _MutableCoreSingleExecuteParams:
1878 """return a dictionary of bind parameter keys and values"""
1879
1880 if self._render_postcompile and not _no_postcompile:
1881 assert self._post_compile_expanded_state is not None
1882 if not params:
1883 return dict(self._post_compile_expanded_state.parameters)
1884 else:
1885 raise exc.InvalidRequestError(
1886 "can't construct new parameters when render_postcompile "
1887 "is used; the statement is hard-linked to the original "
1888 "parameters. Use construct_expanded_state to generate a "
1889 "new statement and parameters."
1890 )
1891
1892 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1893
1894 if extracted_parameters:
1895 # related the bound parameters collected in the original cache key
1896 # to those collected in the incoming cache key. They will not have
1897 # matching names but they will line up positionally in the same
1898 # way. The parameters present in self.bind_names may be clones of
1899 # these original cache key params in the case of DML but the .key
1900 # will be guaranteed to match.
1901 if self.cache_key is None:
1902 raise exc.CompileError(
1903 "This compiled object has no original cache key; "
1904 "can't pass extracted_parameters to construct_params"
1905 )
1906 else:
1907 orig_extracted = self.cache_key[1]
1908
1909 ckbm_tuple = self._cache_key_bind_match
1910 assert ckbm_tuple is not None
1911 ckbm, _ = ckbm_tuple
1912 resolved_extracted = {
1913 bind: extracted
1914 for b, extracted in zip(orig_extracted, extracted_parameters)
1915 for bind in ckbm[b]
1916 }
1917 else:
1918 resolved_extracted = None
1919
1920 if params:
1921 pd = {}
1922 for bindparam, name in self.bind_names.items():
1923 escaped_name = (
1924 self.escaped_bind_names.get(name, name)
1925 if has_escaped_names
1926 else name
1927 )
1928
1929 if bindparam.key in params:
1930 pd[escaped_name] = params[bindparam.key]
1931 elif name in params:
1932 pd[escaped_name] = params[name]
1933
1934 elif _check and bindparam.required:
1935 if _group_number:
1936 raise exc.InvalidRequestError(
1937 "A value is required for bind parameter %r, "
1938 "in parameter group %d"
1939 % (bindparam.key, _group_number),
1940 code="cd3x",
1941 )
1942 else:
1943 raise exc.InvalidRequestError(
1944 "A value is required for bind parameter %r"
1945 % bindparam.key,
1946 code="cd3x",
1947 )
1948 else:
1949 if resolved_extracted:
1950 value_param = resolved_extracted.get(
1951 bindparam, bindparam
1952 )
1953 else:
1954 value_param = bindparam
1955
1956 if bindparam.callable:
1957 pd[escaped_name] = value_param.effective_value
1958 else:
1959 pd[escaped_name] = value_param.value
1960 return pd
1961 else:
1962 pd = {}
1963 for bindparam, name in self.bind_names.items():
1964 escaped_name = (
1965 self.escaped_bind_names.get(name, name)
1966 if has_escaped_names
1967 else name
1968 )
1969
1970 if _check and bindparam.required:
1971 if _group_number:
1972 raise exc.InvalidRequestError(
1973 "A value is required for bind parameter %r, "
1974 "in parameter group %d"
1975 % (bindparam.key, _group_number),
1976 code="cd3x",
1977 )
1978 else:
1979 raise exc.InvalidRequestError(
1980 "A value is required for bind parameter %r"
1981 % bindparam.key,
1982 code="cd3x",
1983 )
1984
1985 if resolved_extracted:
1986 value_param = resolved_extracted.get(bindparam, bindparam)
1987 else:
1988 value_param = bindparam
1989
1990 if bindparam.callable:
1991 pd[escaped_name] = value_param.effective_value
1992 else:
1993 pd[escaped_name] = value_param.value
1994
1995 return pd
1996
1997 @util.memoized_instancemethod
1998 def _get_set_input_sizes_lookup(self):
1999 dialect = self.dialect
2000
2001 include_types = dialect.include_set_input_sizes
2002 exclude_types = dialect.exclude_set_input_sizes
2003
2004 dbapi = dialect.dbapi
2005
2006 def lookup_type(typ):
2007 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2008
2009 if (
2010 dbtype is not None
2011 and (exclude_types is None or dbtype not in exclude_types)
2012 and (include_types is None or dbtype in include_types)
2013 ):
2014 return dbtype
2015 else:
2016 return None
2017
2018 inputsizes = {}
2019
2020 literal_execute_params = self.literal_execute_params
2021
2022 for bindparam in self.bind_names:
2023 if bindparam in literal_execute_params:
2024 continue
2025
2026 if bindparam.type._is_tuple_type:
2027 inputsizes[bindparam] = [
2028 lookup_type(typ)
2029 for typ in cast(TupleType, bindparam.type).types
2030 ]
2031 else:
2032 inputsizes[bindparam] = lookup_type(bindparam.type)
2033
2034 return inputsizes
2035
2036 @property
2037 def params(self):
2038 """Return the bind param dictionary embedded into this
2039 compiled object, for those values that are present.
2040
2041 .. seealso::
2042
2043 :ref:`faq_sql_expression_string` - includes a usage example for
2044 debugging use cases.
2045
2046 """
2047 return self.construct_params(_check=False)
2048
2049 def _process_parameters_for_postcompile(
2050 self,
2051 parameters: _MutableCoreSingleExecuteParams,
2052 _populate_self: bool = False,
2053 ) -> ExpandedState:
2054 """handle special post compile parameters.
2055
2056 These include:
2057
2058 * "expanding" parameters -typically IN tuples that are rendered
2059 on a per-parameter basis for an otherwise fixed SQL statement string.
2060
2061 * literal_binds compiled with the literal_execute flag. Used for
2062 things like SQL Server "TOP N" where the driver does not accommodate
2063 N as a bound parameter.
2064
2065 """
2066
2067 expanded_parameters = {}
2068 new_positiontup: Optional[List[str]]
2069
2070 pre_expanded_string = self._pre_expanded_string
2071 if pre_expanded_string is None:
2072 pre_expanded_string = self.string
2073
2074 if self.positional:
2075 new_positiontup = []
2076
2077 pre_expanded_positiontup = self._pre_expanded_positiontup
2078 if pre_expanded_positiontup is None:
2079 pre_expanded_positiontup = self.positiontup
2080
2081 else:
2082 new_positiontup = pre_expanded_positiontup = None
2083
2084 processors = self._bind_processors
2085 single_processors = cast(
2086 "Mapping[str, _BindProcessorType[Any]]", processors
2087 )
2088 tuple_processors = cast(
2089 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2090 )
2091
2092 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2093
2094 replacement_expressions: Dict[str, Any] = {}
2095 to_update_sets: Dict[str, Any] = {}
2096
2097 # notes:
2098 # *unescaped* parameter names in:
2099 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2100 #
2101 # *escaped* parameter names in:
2102 # construct_params(), replacement_expressions
2103
2104 numeric_positiontup: Optional[List[str]] = None
2105
2106 if self.positional and pre_expanded_positiontup is not None:
2107 names: Iterable[str] = pre_expanded_positiontup
2108 if self._numeric_binds:
2109 numeric_positiontup = []
2110 else:
2111 names = self.bind_names.values()
2112
2113 ebn = self.escaped_bind_names
2114 for name in names:
2115 escaped_name = ebn.get(name, name) if ebn else name
2116 parameter = self.binds[name]
2117
2118 if parameter in self.literal_execute_params:
2119 if escaped_name not in replacement_expressions:
2120 replacement_expressions[escaped_name] = (
2121 self.render_literal_bindparam(
2122 parameter,
2123 render_literal_value=parameters.pop(escaped_name),
2124 )
2125 )
2126 continue
2127
2128 if parameter in self.post_compile_params:
2129 if escaped_name in replacement_expressions:
2130 to_update = to_update_sets[escaped_name]
2131 values = None
2132 else:
2133 # we are removing the parameter from parameters
2134 # because it is a list value, which is not expected by
2135 # TypeEngine objects that would otherwise be asked to
2136 # process it. the single name is being replaced with
2137 # individual numbered parameters for each value in the
2138 # param.
2139 #
2140 # note we are also inserting *escaped* parameter names
2141 # into the given dictionary. default dialect will
2142 # use these param names directly as they will not be
2143 # in the escaped_bind_names dictionary.
2144 values = parameters.pop(name)
2145
2146 leep_res = self._literal_execute_expanding_parameter(
2147 escaped_name, parameter, values
2148 )
2149 (to_update, replacement_expr) = leep_res
2150
2151 to_update_sets[escaped_name] = to_update
2152 replacement_expressions[escaped_name] = replacement_expr
2153
2154 if not parameter.literal_execute:
2155 parameters.update(to_update)
2156 if parameter.type._is_tuple_type:
2157 assert values is not None
2158 new_processors.update(
2159 (
2160 "%s_%s_%s" % (name, i, j),
2161 tuple_processors[name][j - 1],
2162 )
2163 for i, tuple_element in enumerate(values, 1)
2164 for j, _ in enumerate(tuple_element, 1)
2165 if name in tuple_processors
2166 and tuple_processors[name][j - 1] is not None
2167 )
2168 else:
2169 new_processors.update(
2170 (key, single_processors[name])
2171 for key, _ in to_update
2172 if name in single_processors
2173 )
2174 if numeric_positiontup is not None:
2175 numeric_positiontup.extend(
2176 name for name, _ in to_update
2177 )
2178 elif new_positiontup is not None:
2179 # to_update has escaped names, but that's ok since
2180 # these are new names, that aren't in the
2181 # escaped_bind_names dict.
2182 new_positiontup.extend(name for name, _ in to_update)
2183 expanded_parameters[name] = [
2184 expand_key for expand_key, _ in to_update
2185 ]
2186 elif new_positiontup is not None:
2187 new_positiontup.append(name)
2188
2189 def process_expanding(m):
2190 key = m.group(1)
2191 expr = replacement_expressions[key]
2192
2193 # if POSTCOMPILE included a bind_expression, render that
2194 # around each element
2195 if m.group(2):
2196 tok = m.group(2).split("~~")
2197 be_left, be_right = tok[1], tok[3]
2198 expr = ", ".join(
2199 "%s%s%s" % (be_left, exp, be_right)
2200 for exp in expr.split(", ")
2201 )
2202 return expr
2203
2204 statement = re.sub(
2205 self._post_compile_pattern, process_expanding, pre_expanded_string
2206 )
2207
2208 if numeric_positiontup is not None:
2209 assert new_positiontup is not None
2210 param_pos = {
2211 key: f"{self._numeric_binds_identifier_char}{num}"
2212 for num, key in enumerate(
2213 numeric_positiontup, self.next_numeric_pos
2214 )
2215 }
2216 # Can't use format here since % chars are not escaped.
2217 statement = self._pyformat_pattern.sub(
2218 lambda m: param_pos[m.group(1)], statement
2219 )
2220 new_positiontup.extend(numeric_positiontup)
2221
2222 expanded_state = ExpandedState(
2223 statement,
2224 parameters,
2225 new_processors,
2226 new_positiontup,
2227 expanded_parameters,
2228 )
2229
2230 if _populate_self:
2231 # this is for the "render_postcompile" flag, which is not
2232 # otherwise used internally and is for end-user debugging and
2233 # special use cases.
2234 self._pre_expanded_string = pre_expanded_string
2235 self._pre_expanded_positiontup = pre_expanded_positiontup
2236 self.string = expanded_state.statement
2237 self.positiontup = (
2238 list(expanded_state.positiontup or ())
2239 if self.positional
2240 else None
2241 )
2242 self._post_compile_expanded_state = expanded_state
2243
2244 return expanded_state
2245
2246 @util.preload_module("sqlalchemy.engine.cursor")
2247 def _create_result_map(self):
2248 """utility method used for unit tests only."""
2249 cursor = util.preloaded.engine_cursor
2250 return cursor.CursorResultMetaData._create_description_match_map(
2251 self._result_columns
2252 )
2253
2254 # assigned by crud.py for insert/update statements
2255 _get_bind_name_for_col: _BindNameForColProtocol
2256
2257 @util.memoized_property
2258 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2259 getter = self._get_bind_name_for_col
2260 return getter
2261
2262 @util.memoized_property
2263 @util.preload_module("sqlalchemy.engine.result")
2264 def _inserted_primary_key_from_lastrowid_getter(self):
2265 result = util.preloaded.engine_result
2266
2267 param_key_getter = self._within_exec_param_key_getter
2268
2269 assert self.compile_state is not None
2270 statement = self.compile_state.statement
2271
2272 if TYPE_CHECKING:
2273 assert isinstance(statement, Insert)
2274
2275 table = statement.table
2276
2277 getters = [
2278 (operator.methodcaller("get", param_key_getter(col), None), col)
2279 for col in table.primary_key
2280 ]
2281
2282 autoinc_getter = None
2283 autoinc_col = table._autoincrement_column
2284 if autoinc_col is not None:
2285 # apply type post processors to the lastrowid
2286 lastrowid_processor = autoinc_col.type._cached_result_processor(
2287 self.dialect, None
2288 )
2289 autoinc_key = param_key_getter(autoinc_col)
2290
2291 # if a bind value is present for the autoincrement column
2292 # in the parameters, we need to do the logic dictated by
2293 # #7998; honor a non-None user-passed parameter over lastrowid.
2294 # previously in the 1.4 series we weren't fetching lastrowid
2295 # at all if the key were present in the parameters
2296 if autoinc_key in self.binds:
2297
2298 def _autoinc_getter(lastrowid, parameters):
2299 param_value = parameters.get(autoinc_key, lastrowid)
2300 if param_value is not None:
2301 # they supplied non-None parameter, use that.
2302 # SQLite at least is observed to return the wrong
2303 # cursor.lastrowid for INSERT..ON CONFLICT so it
2304 # can't be used in all cases
2305 return param_value
2306 else:
2307 # use lastrowid
2308 return lastrowid
2309
2310 # work around mypy https://github.com/python/mypy/issues/14027
2311 autoinc_getter = _autoinc_getter
2312
2313 else:
2314 lastrowid_processor = None
2315
2316 row_fn = result.result_tuple([col.key for col in table.primary_key])
2317
2318 def get(lastrowid, parameters):
2319 """given cursor.lastrowid value and the parameters used for INSERT,
2320 return a "row" that represents the primary key, either by
2321 using the "lastrowid" or by extracting values from the parameters
2322 that were sent along with the INSERT.
2323
2324 """
2325 if lastrowid_processor is not None:
2326 lastrowid = lastrowid_processor(lastrowid)
2327
2328 if lastrowid is None:
2329 return row_fn(getter(parameters) for getter, col in getters)
2330 else:
2331 return row_fn(
2332 (
2333 (
2334 autoinc_getter(lastrowid, parameters)
2335 if autoinc_getter is not None
2336 else lastrowid
2337 )
2338 if col is autoinc_col
2339 else getter(parameters)
2340 )
2341 for getter, col in getters
2342 )
2343
2344 return get
2345
2346 @util.memoized_property
2347 @util.preload_module("sqlalchemy.engine.result")
2348 def _inserted_primary_key_from_returning_getter(self):
2349 result = util.preloaded.engine_result
2350
2351 assert self.compile_state is not None
2352 statement = self.compile_state.statement
2353
2354 if TYPE_CHECKING:
2355 assert isinstance(statement, Insert)
2356
2357 param_key_getter = self._within_exec_param_key_getter
2358 table = statement.table
2359
2360 returning = self.implicit_returning
2361 assert returning is not None
2362 ret = {col: idx for idx, col in enumerate(returning)}
2363
2364 getters = cast(
2365 "List[Tuple[Callable[[Any], Any], bool]]",
2366 [
2367 (
2368 (operator.itemgetter(ret[col]), True)
2369 if col in ret
2370 else (
2371 operator.methodcaller(
2372 "get", param_key_getter(col), None
2373 ),
2374 False,
2375 )
2376 )
2377 for col in table.primary_key
2378 ],
2379 )
2380
2381 row_fn = result.result_tuple([col.key for col in table.primary_key])
2382
2383 def get(row, parameters):
2384 return row_fn(
2385 getter(row) if use_row else getter(parameters)
2386 for getter, use_row in getters
2387 )
2388
2389 return get
2390
2391 def default_from(self) -> str:
2392 """Called when a SELECT statement has no froms, and no FROM clause is
2393 to be appended.
2394
2395 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2396 output.
2397
2398 """
2399 return ""
2400
2401 def visit_override_binds(self, override_binds, **kw):
2402 """SQL compile the nested element of an _OverrideBinds with
2403 bindparams swapped out.
2404
2405 The _OverrideBinds is not normally expected to be compiled; it
2406 is meant to be used when an already cached statement is to be used,
2407 the compilation was already performed, and only the bound params should
2408 be swapped in at execution time.
2409
2410 However, there are test cases that exericise this object, and
2411 additionally the ORM subquery loader is known to feed in expressions
2412 which include this construct into new queries (discovered in #11173),
2413 so it has to do the right thing at compile time as well.
2414
2415 """
2416
2417 # get SQL text first
2418 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2419
2420 # for a test compile that is not for caching, change binds after the
2421 # fact. note that we don't try to
2422 # swap the bindparam as we compile, because our element may be
2423 # elsewhere in the statement already (e.g. a subquery or perhaps a
2424 # CTE) and was already visited / compiled. See
2425 # test_relationship_criteria.py ->
2426 # test_selectinload_local_criteria_subquery
2427 for k in override_binds.translate:
2428 if k not in self.binds:
2429 continue
2430 bp = self.binds[k]
2431
2432 # so this would work, just change the value of bp in place.
2433 # but we dont want to mutate things outside.
2434 # bp.value = override_binds.translate[bp.key]
2435 # continue
2436
2437 # instead, need to replace bp with new_bp or otherwise accommodate
2438 # in all internal collections
2439 new_bp = bp._with_value(
2440 override_binds.translate[bp.key],
2441 maintain_key=True,
2442 required=False,
2443 )
2444
2445 name = self.bind_names[bp]
2446 self.binds[k] = self.binds[name] = new_bp
2447 self.bind_names[new_bp] = name
2448 self.bind_names.pop(bp, None)
2449
2450 if bp in self.post_compile_params:
2451 self.post_compile_params |= {new_bp}
2452 if bp in self.literal_execute_params:
2453 self.literal_execute_params |= {new_bp}
2454
2455 ckbm_tuple = self._cache_key_bind_match
2456 if ckbm_tuple:
2457 ckbm, cksm = ckbm_tuple
2458 for bp in bp._cloned_set:
2459 if bp.key in cksm:
2460 cb = cksm[bp.key]
2461 ckbm[cb].append(new_bp)
2462
2463 return sqltext
2464
2465 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2466 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2467
2468 def visit_select_statement_grouping(self, grouping, **kwargs):
2469 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2470
2471 def visit_label_reference(
2472 self, element, within_columns_clause=False, **kwargs
2473 ):
2474 if self.stack and self.dialect.supports_simple_order_by_label:
2475 try:
2476 compile_state = cast(
2477 "Union[SelectState, CompoundSelectState]",
2478 self.stack[-1]["compile_state"],
2479 )
2480 except KeyError as ke:
2481 raise exc.CompileError(
2482 "Can't resolve label reference for ORDER BY / "
2483 "GROUP BY / DISTINCT etc."
2484 ) from ke
2485
2486 (
2487 with_cols,
2488 only_froms,
2489 only_cols,
2490 ) = compile_state._label_resolve_dict
2491 if within_columns_clause:
2492 resolve_dict = only_froms
2493 else:
2494 resolve_dict = only_cols
2495
2496 # this can be None in the case that a _label_reference()
2497 # were subject to a replacement operation, in which case
2498 # the replacement of the Label element may have changed
2499 # to something else like a ColumnClause expression.
2500 order_by_elem = element.element._order_by_label_element
2501
2502 if (
2503 order_by_elem is not None
2504 and order_by_elem.name in resolve_dict
2505 and order_by_elem.shares_lineage(
2506 resolve_dict[order_by_elem.name]
2507 )
2508 ):
2509 kwargs["render_label_as_label"] = (
2510 element.element._order_by_label_element
2511 )
2512 return self.process(
2513 element.element,
2514 within_columns_clause=within_columns_clause,
2515 **kwargs,
2516 )
2517
2518 def visit_textual_label_reference(
2519 self, element, within_columns_clause=False, **kwargs
2520 ):
2521 if not self.stack:
2522 # compiling the element outside of the context of a SELECT
2523 return self.process(element._text_clause)
2524
2525 try:
2526 compile_state = cast(
2527 "Union[SelectState, CompoundSelectState]",
2528 self.stack[-1]["compile_state"],
2529 )
2530 except KeyError as ke:
2531 coercions._no_text_coercion(
2532 element.element,
2533 extra=(
2534 "Can't resolve label reference for ORDER BY / "
2535 "GROUP BY / DISTINCT etc."
2536 ),
2537 exc_cls=exc.CompileError,
2538 err=ke,
2539 )
2540
2541 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2542 try:
2543 if within_columns_clause:
2544 col = only_froms[element.element]
2545 else:
2546 col = with_cols[element.element]
2547 except KeyError as err:
2548 coercions._no_text_coercion(
2549 element.element,
2550 extra=(
2551 "Can't resolve label reference for ORDER BY / "
2552 "GROUP BY / DISTINCT etc."
2553 ),
2554 exc_cls=exc.CompileError,
2555 err=err,
2556 )
2557 else:
2558 kwargs["render_label_as_label"] = col
2559 return self.process(
2560 col, within_columns_clause=within_columns_clause, **kwargs
2561 )
2562
2563 def visit_label(
2564 self,
2565 label,
2566 add_to_result_map=None,
2567 within_label_clause=False,
2568 within_columns_clause=False,
2569 render_label_as_label=None,
2570 result_map_targets=(),
2571 **kw,
2572 ):
2573 # only render labels within the columns clause
2574 # or ORDER BY clause of a select. dialect-specific compilers
2575 # can modify this behavior.
2576 render_label_with_as = (
2577 within_columns_clause and not within_label_clause
2578 )
2579 render_label_only = render_label_as_label is label
2580
2581 if render_label_only or render_label_with_as:
2582 if isinstance(label.name, elements._truncated_label):
2583 labelname = self._truncated_identifier("colident", label.name)
2584 else:
2585 labelname = label.name
2586
2587 if render_label_with_as:
2588 if add_to_result_map is not None:
2589 add_to_result_map(
2590 labelname,
2591 label.name,
2592 (label, labelname) + label._alt_names + result_map_targets,
2593 label.type,
2594 )
2595 return (
2596 label.element._compiler_dispatch(
2597 self,
2598 within_columns_clause=True,
2599 within_label_clause=True,
2600 **kw,
2601 )
2602 + OPERATORS[operators.as_]
2603 + self.preparer.format_label(label, labelname)
2604 )
2605 elif render_label_only:
2606 return self.preparer.format_label(label, labelname)
2607 else:
2608 return label.element._compiler_dispatch(
2609 self, within_columns_clause=False, **kw
2610 )
2611
2612 def _fallback_column_name(self, column):
2613 raise exc.CompileError(
2614 "Cannot compile Column object until its 'name' is assigned."
2615 )
2616
2617 def visit_lambda_element(self, element, **kw):
2618 sql_element = element._resolved
2619 return self.process(sql_element, **kw)
2620
2621 def visit_column(
2622 self,
2623 column: ColumnClause[Any],
2624 add_to_result_map: Optional[_ResultMapAppender] = None,
2625 include_table: bool = True,
2626 result_map_targets: Tuple[Any, ...] = (),
2627 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2628 **kwargs: Any,
2629 ) -> str:
2630 name = orig_name = column.name
2631 if name is None:
2632 name = self._fallback_column_name(column)
2633
2634 is_literal = column.is_literal
2635 if not is_literal and isinstance(name, elements._truncated_label):
2636 name = self._truncated_identifier("colident", name)
2637
2638 if add_to_result_map is not None:
2639 targets = (column, name, column.key) + result_map_targets
2640 if column._tq_label:
2641 targets += (column._tq_label,)
2642
2643 add_to_result_map(name, orig_name, targets, column.type)
2644
2645 if is_literal:
2646 # note we are not currently accommodating for
2647 # literal_column(quoted_name('ident', True)) here
2648 name = self.escape_literal_column(name)
2649 else:
2650 name = self.preparer.quote(name)
2651 table = column.table
2652 if table is None or not include_table or not table.named_with_column:
2653 return name
2654 else:
2655 effective_schema = self.preparer.schema_for_object(table)
2656
2657 if effective_schema:
2658 schema_prefix = (
2659 self.preparer.quote_schema(effective_schema) + "."
2660 )
2661 else:
2662 schema_prefix = ""
2663
2664 if TYPE_CHECKING:
2665 assert isinstance(table, NamedFromClause)
2666 tablename = table.name
2667
2668 if (
2669 not effective_schema
2670 and ambiguous_table_name_map
2671 and tablename in ambiguous_table_name_map
2672 ):
2673 tablename = ambiguous_table_name_map[tablename]
2674
2675 if isinstance(tablename, elements._truncated_label):
2676 tablename = self._truncated_identifier("alias", tablename)
2677
2678 return schema_prefix + self.preparer.quote(tablename) + "." + name
2679
2680 def visit_collation(self, element, **kw):
2681 return self.preparer.format_collation(element.collation)
2682
2683 def visit_fromclause(self, fromclause, **kwargs):
2684 return fromclause.name
2685
2686 def visit_index(self, index, **kwargs):
2687 return index.name
2688
2689 def visit_typeclause(self, typeclause, **kw):
2690 kw["type_expression"] = typeclause
2691 kw["identifier_preparer"] = self.preparer
2692 return self.dialect.type_compiler_instance.process(
2693 typeclause.type, **kw
2694 )
2695
2696 def post_process_text(self, text):
2697 if self.preparer._double_percents:
2698 text = text.replace("%", "%%")
2699 return text
2700
2701 def escape_literal_column(self, text):
2702 if self.preparer._double_percents:
2703 text = text.replace("%", "%%")
2704 return text
2705
2706 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2707 def do_bindparam(m):
2708 name = m.group(1)
2709 if name in textclause._bindparams:
2710 return self.process(textclause._bindparams[name], **kw)
2711 else:
2712 return self.bindparam_string(name, **kw)
2713
2714 if not self.stack:
2715 self.isplaintext = True
2716
2717 if add_to_result_map:
2718 # text() object is present in the columns clause of a
2719 # select(). Add a no-name entry to the result map so that
2720 # row[text()] produces a result
2721 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2722
2723 # un-escape any \:params
2724 return BIND_PARAMS_ESC.sub(
2725 lambda m: m.group(1),
2726 BIND_PARAMS.sub(
2727 do_bindparam, self.post_process_text(textclause.text)
2728 ),
2729 )
2730
2731 def visit_textual_select(
2732 self, taf, compound_index=None, asfrom=False, **kw
2733 ):
2734 toplevel = not self.stack
2735 entry = self._default_stack_entry if toplevel else self.stack[-1]
2736
2737 new_entry: _CompilerStackEntry = {
2738 "correlate_froms": set(),
2739 "asfrom_froms": set(),
2740 "selectable": taf,
2741 }
2742 self.stack.append(new_entry)
2743
2744 if taf._independent_ctes:
2745 self._dispatch_independent_ctes(taf, kw)
2746
2747 populate_result_map = (
2748 toplevel
2749 or (
2750 compound_index == 0
2751 and entry.get("need_result_map_for_compound", False)
2752 )
2753 or entry.get("need_result_map_for_nested", False)
2754 )
2755
2756 if populate_result_map:
2757 self._ordered_columns = self._textual_ordered_columns = (
2758 taf.positional
2759 )
2760
2761 # enable looser result column matching when the SQL text links to
2762 # Column objects by name only
2763 self._loose_column_name_matching = not taf.positional and bool(
2764 taf.column_args
2765 )
2766
2767 for c in taf.column_args:
2768 self.process(
2769 c,
2770 within_columns_clause=True,
2771 add_to_result_map=self._add_to_result_map,
2772 )
2773
2774 text = self.process(taf.element, **kw)
2775 if self.ctes:
2776 nesting_level = len(self.stack) if not toplevel else None
2777 text = self._render_cte_clause(nesting_level=nesting_level) + text
2778
2779 self.stack.pop(-1)
2780
2781 return text
2782
2783 def visit_null(self, expr: Null, **kw: Any) -> str:
2784 return "NULL"
2785
2786 def visit_true(self, expr: True_, **kw: Any) -> str:
2787 if self.dialect.supports_native_boolean:
2788 return "true"
2789 else:
2790 return "1"
2791
2792 def visit_false(self, expr: False_, **kw: Any) -> str:
2793 if self.dialect.supports_native_boolean:
2794 return "false"
2795 else:
2796 return "0"
2797
2798 def _generate_delimited_list(self, elements, separator, **kw):
2799 return separator.join(
2800 s
2801 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2802 if s
2803 )
2804
2805 def _generate_delimited_and_list(self, clauses, **kw):
2806 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2807 operators.and_,
2808 elements.True_._singleton,
2809 elements.False_._singleton,
2810 clauses,
2811 )
2812 if lcc == 1:
2813 return clauses[0]._compiler_dispatch(self, **kw)
2814 else:
2815 separator = OPERATORS[operators.and_]
2816 return separator.join(
2817 s
2818 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2819 if s
2820 )
2821
2822 def visit_tuple(self, clauselist, **kw):
2823 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2824
2825 def visit_element_list(self, element, **kw):
2826 return self._generate_delimited_list(element.clauses, " ", **kw)
2827
2828 def visit_order_by_list(self, element, **kw):
2829 return self._generate_delimited_list(element.clauses, ", ", **kw)
2830
2831 def visit_clauselist(self, clauselist, **kw):
2832 sep = clauselist.operator
2833 if sep is None:
2834 sep = " "
2835 else:
2836 sep = OPERATORS[clauselist.operator]
2837
2838 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2839
2840 def visit_expression_clauselist(self, clauselist, **kw):
2841 operator_ = clauselist.operator
2842
2843 disp = self._get_operator_dispatch(
2844 operator_, "expression_clauselist", None
2845 )
2846 if disp:
2847 return disp(clauselist, operator_, **kw)
2848
2849 try:
2850 opstring = OPERATORS[operator_]
2851 except KeyError as err:
2852 raise exc.UnsupportedCompilationError(self, operator_) from err
2853 else:
2854 kw["_in_operator_expression"] = True
2855 return self._generate_delimited_list(
2856 clauselist.clauses, opstring, **kw
2857 )
2858
2859 def visit_case(self, clause, **kwargs):
2860 x = "CASE "
2861 if clause.value is not None:
2862 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2863 for cond, result in clause.whens:
2864 x += (
2865 "WHEN "
2866 + cond._compiler_dispatch(self, **kwargs)
2867 + " THEN "
2868 + result._compiler_dispatch(self, **kwargs)
2869 + " "
2870 )
2871 if clause.else_ is not None:
2872 x += (
2873 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2874 )
2875 x += "END"
2876 return x
2877
2878 def visit_type_coerce(self, type_coerce, **kw):
2879 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2880
2881 def visit_cast(self, cast, **kwargs):
2882 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2883 match = re.match("(.*)( COLLATE .*)", type_clause)
2884 return "CAST(%s AS %s)%s" % (
2885 cast.clause._compiler_dispatch(self, **kwargs),
2886 match.group(1) if match else type_clause,
2887 match.group(2) if match else "",
2888 )
2889
2890 def visit_frame_clause(self, frameclause, **kw):
2891
2892 if frameclause.lower_type is elements._FrameClauseType.RANGE_UNBOUNDED:
2893 left = "UNBOUNDED PRECEDING"
2894 elif frameclause.lower_type is elements._FrameClauseType.RANGE_CURRENT:
2895 left = "CURRENT ROW"
2896 else:
2897 val = self.process(frameclause.lower_integer_bind, **kw)
2898 if (
2899 frameclause.lower_type
2900 is elements._FrameClauseType.RANGE_PRECEDING
2901 ):
2902 left = f"{val} PRECEDING"
2903 else:
2904 left = f"{val} FOLLOWING"
2905
2906 if frameclause.upper_type is elements._FrameClauseType.RANGE_UNBOUNDED:
2907 right = "UNBOUNDED FOLLOWING"
2908 elif frameclause.upper_type is elements._FrameClauseType.RANGE_CURRENT:
2909 right = "CURRENT ROW"
2910 else:
2911 val = self.process(frameclause.upper_integer_bind, **kw)
2912 if (
2913 frameclause.upper_type
2914 is elements._FrameClauseType.RANGE_PRECEDING
2915 ):
2916 right = f"{val} PRECEDING"
2917 else:
2918 right = f"{val} FOLLOWING"
2919
2920 return f"{left} AND {right}"
2921
2922 def visit_over(self, over, **kwargs):
2923 text = over.element._compiler_dispatch(self, **kwargs)
2924 if over.range_ is not None:
2925 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2926 elif over.rows is not None:
2927 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
2928 elif over.groups is not None:
2929 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
2930 else:
2931 range_ = None
2932
2933 return "%s OVER (%s)" % (
2934 text,
2935 " ".join(
2936 [
2937 "%s BY %s"
2938 % (word, clause._compiler_dispatch(self, **kwargs))
2939 for word, clause in (
2940 ("PARTITION", over.partition_by),
2941 ("ORDER", over.order_by),
2942 )
2943 if clause is not None and len(clause)
2944 ]
2945 + ([range_] if range_ else [])
2946 ),
2947 )
2948
2949 def visit_withingroup(self, withingroup, **kwargs):
2950 return "%s WITHIN GROUP (ORDER BY %s)" % (
2951 withingroup.element._compiler_dispatch(self, **kwargs),
2952 withingroup.order_by._compiler_dispatch(self, **kwargs),
2953 )
2954
2955 def visit_funcfilter(self, funcfilter, **kwargs):
2956 return "%s FILTER (WHERE %s)" % (
2957 funcfilter.func._compiler_dispatch(self, **kwargs),
2958 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2959 )
2960
2961 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
2962 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
2963 raise exc.CompileError(
2964 "this dialect does not support "
2965 "ORDER BY within an aggregate function"
2966 )
2967 elif (
2968 self.dialect.aggregate_order_by_style
2969 is AggregateOrderByStyle.INLINE
2970 ):
2971 new_fn = aggregateorderby.element._clone()
2972 new_fn.clause_expr = elements.Grouping(
2973 aggregate_orderby_inline(
2974 new_fn.clause_expr.element, aggregateorderby.order_by
2975 )
2976 )
2977
2978 return new_fn._compiler_dispatch(self, **kwargs)
2979 else:
2980 return self.visit_withingroup(aggregateorderby, **kwargs)
2981
2982 def visit_aggregate_orderby_inline(self, element, **kw):
2983 return "%s ORDER BY %s" % (
2984 self.process(element.element, **kw),
2985 self.process(element.aggregate_order_by, **kw),
2986 )
2987
2988 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
2989 # aggreagate_order_by attribute is present if visit_function
2990 # gave us a Function with aggregate_orderby_inline() as the inner
2991 # contents
2992 order_by = getattr(fn.clauses, "aggregate_order_by", None)
2993
2994 literal_exec = dict(kw)
2995 literal_exec["literal_execute"] = True
2996
2997 # break up the function into its components so we can apply
2998 # literal_execute to the second argument (the delimeter)
2999 cl = list(fn.clauses)
3000 expr, delimeter = cl[0:2]
3001 if (
3002 order_by is not None
3003 and self.dialect.aggregate_order_by_style
3004 is AggregateOrderByStyle.INLINE
3005 ):
3006 return (
3007 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3008 f"{delimeter._compiler_dispatch(self, **literal_exec)} "
3009 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3010 )
3011 else:
3012 return (
3013 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3014 f"{delimeter._compiler_dispatch(self, **literal_exec)})"
3015 )
3016
3017 def visit_extract(self, extract, **kwargs):
3018 field = self.extract_map.get(extract.field, extract.field)
3019 return "EXTRACT(%s FROM %s)" % (
3020 field,
3021 extract.expr._compiler_dispatch(self, **kwargs),
3022 )
3023
3024 def visit_scalar_function_column(self, element, **kw):
3025 compiled_fn = self.visit_function(element.fn, **kw)
3026 compiled_col = self.visit_column(element, **kw)
3027 return "(%s).%s" % (compiled_fn, compiled_col)
3028
3029 def visit_function(
3030 self,
3031 func: Function[Any],
3032 add_to_result_map: Optional[_ResultMapAppender] = None,
3033 **kwargs: Any,
3034 ) -> str:
3035 if add_to_result_map is not None:
3036 add_to_result_map(func.name, func.name, (func.name,), func.type)
3037
3038 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3039
3040 text: str
3041
3042 if disp:
3043 text = disp(func, **kwargs)
3044 else:
3045 name = FUNCTIONS.get(func._deannotate().__class__, None)
3046 if name:
3047 if func._has_args:
3048 name += "%(expr)s"
3049 else:
3050 name = func.name
3051 name = (
3052 self.preparer.quote(name)
3053 if self.preparer._requires_quotes_illegal_chars(name)
3054 or isinstance(name, elements.quoted_name)
3055 else name
3056 )
3057 name = name + "%(expr)s"
3058 text = ".".join(
3059 [
3060 (
3061 self.preparer.quote(tok)
3062 if self.preparer._requires_quotes_illegal_chars(tok)
3063 or isinstance(name, elements.quoted_name)
3064 else tok
3065 )
3066 for tok in func.packagenames
3067 ]
3068 + [name]
3069 ) % {"expr": self.function_argspec(func, **kwargs)}
3070
3071 if func._with_ordinality:
3072 text += " WITH ORDINALITY"
3073 return text
3074
3075 def visit_next_value_func(self, next_value, **kw):
3076 return self.visit_sequence(next_value.sequence)
3077
3078 def visit_sequence(self, sequence, **kw):
3079 raise NotImplementedError(
3080 "Dialect '%s' does not support sequence increments."
3081 % self.dialect.name
3082 )
3083
3084 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3085 return func.clause_expr._compiler_dispatch(self, **kwargs)
3086
3087 def visit_compound_select(
3088 self, cs, asfrom=False, compound_index=None, **kwargs
3089 ):
3090 toplevel = not self.stack
3091
3092 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3093
3094 if toplevel and not self.compile_state:
3095 self.compile_state = compile_state
3096
3097 compound_stmt = compile_state.statement
3098
3099 entry = self._default_stack_entry if toplevel else self.stack[-1]
3100 need_result_map = toplevel or (
3101 not compound_index
3102 and entry.get("need_result_map_for_compound", False)
3103 )
3104
3105 # indicates there is already a CompoundSelect in play
3106 if compound_index == 0:
3107 entry["select_0"] = cs
3108
3109 self.stack.append(
3110 {
3111 "correlate_froms": entry["correlate_froms"],
3112 "asfrom_froms": entry["asfrom_froms"],
3113 "selectable": cs,
3114 "compile_state": compile_state,
3115 "need_result_map_for_compound": need_result_map,
3116 }
3117 )
3118
3119 if compound_stmt._independent_ctes:
3120 self._dispatch_independent_ctes(compound_stmt, kwargs)
3121
3122 keyword = self.compound_keywords[cs.keyword]
3123
3124 text = (" " + keyword + " ").join(
3125 (
3126 c._compiler_dispatch(
3127 self, asfrom=asfrom, compound_index=i, **kwargs
3128 )
3129 for i, c in enumerate(cs.selects)
3130 )
3131 )
3132
3133 kwargs["include_table"] = False
3134 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3135 text += self.order_by_clause(cs, **kwargs)
3136 if cs._has_row_limiting_clause:
3137 text += self._row_limit_clause(cs, **kwargs)
3138
3139 if self.ctes:
3140 nesting_level = len(self.stack) if not toplevel else None
3141 text = (
3142 self._render_cte_clause(
3143 nesting_level=nesting_level,
3144 include_following_stack=True,
3145 )
3146 + text
3147 )
3148
3149 self.stack.pop(-1)
3150 return text
3151
3152 def _row_limit_clause(self, cs, **kwargs):
3153 if cs._fetch_clause is not None:
3154 return self.fetch_clause(cs, **kwargs)
3155 else:
3156 return self.limit_clause(cs, **kwargs)
3157
3158 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3159 attrname = "visit_%s_%s%s" % (
3160 operator_.__name__,
3161 qualifier1,
3162 "_" + qualifier2 if qualifier2 else "",
3163 )
3164 return getattr(self, attrname, None)
3165
3166 def visit_unary(
3167 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3168 ):
3169 if add_to_result_map is not None:
3170 result_map_targets += (unary,)
3171 kw["add_to_result_map"] = add_to_result_map
3172 kw["result_map_targets"] = result_map_targets
3173
3174 if unary.operator:
3175 if unary.modifier:
3176 raise exc.CompileError(
3177 "Unary expression does not support operator "
3178 "and modifier simultaneously"
3179 )
3180 disp = self._get_operator_dispatch(
3181 unary.operator, "unary", "operator"
3182 )
3183 if disp:
3184 return disp(unary, unary.operator, **kw)
3185 else:
3186 return self._generate_generic_unary_operator(
3187 unary, OPERATORS[unary.operator], **kw
3188 )
3189 elif unary.modifier:
3190 disp = self._get_operator_dispatch(
3191 unary.modifier, "unary", "modifier"
3192 )
3193 if disp:
3194 return disp(unary, unary.modifier, **kw)
3195 else:
3196 return self._generate_generic_unary_modifier(
3197 unary, OPERATORS[unary.modifier], **kw
3198 )
3199 else:
3200 raise exc.CompileError(
3201 "Unary expression has no operator or modifier"
3202 )
3203
3204 def visit_truediv_binary(self, binary, operator, **kw):
3205 if self.dialect.div_is_floordiv:
3206 return (
3207 self.process(binary.left, **kw)
3208 + " / "
3209 # TODO: would need a fast cast again here,
3210 # unless we want to use an implicit cast like "+ 0.0"
3211 + self.process(
3212 elements.Cast(
3213 binary.right,
3214 (
3215 binary.right.type
3216 if binary.right.type._type_affinity
3217 in (sqltypes.Numeric, sqltypes.Float)
3218 else sqltypes.Numeric()
3219 ),
3220 ),
3221 **kw,
3222 )
3223 )
3224 else:
3225 return (
3226 self.process(binary.left, **kw)
3227 + " / "
3228 + self.process(binary.right, **kw)
3229 )
3230
3231 def visit_floordiv_binary(self, binary, operator, **kw):
3232 if (
3233 self.dialect.div_is_floordiv
3234 and binary.right.type._type_affinity is sqltypes.Integer
3235 ):
3236 return (
3237 self.process(binary.left, **kw)
3238 + " / "
3239 + self.process(binary.right, **kw)
3240 )
3241 else:
3242 return "FLOOR(%s)" % (
3243 self.process(binary.left, **kw)
3244 + " / "
3245 + self.process(binary.right, **kw)
3246 )
3247
3248 def visit_is_true_unary_operator(self, element, operator, **kw):
3249 if (
3250 element._is_implicitly_boolean
3251 or self.dialect.supports_native_boolean
3252 ):
3253 return self.process(element.element, **kw)
3254 else:
3255 return "%s = 1" % self.process(element.element, **kw)
3256
3257 def visit_is_false_unary_operator(self, element, operator, **kw):
3258 if (
3259 element._is_implicitly_boolean
3260 or self.dialect.supports_native_boolean
3261 ):
3262 return "NOT %s" % self.process(element.element, **kw)
3263 else:
3264 return "%s = 0" % self.process(element.element, **kw)
3265
3266 def visit_not_match_op_binary(self, binary, operator, **kw):
3267 return "NOT %s" % self.visit_binary(
3268 binary, override_operator=operators.match_op
3269 )
3270
3271 def visit_not_in_op_binary(self, binary, operator, **kw):
3272 # The brackets are required in the NOT IN operation because the empty
3273 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3274 # The presence of the OR makes the brackets required.
3275 return "(%s)" % self._generate_generic_binary(
3276 binary, OPERATORS[operator], **kw
3277 )
3278
3279 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3280 if expand_op is operators.not_in_op:
3281 if len(type_) > 1:
3282 return "(%s)) OR (1 = 1" % (
3283 ", ".join("NULL" for element in type_)
3284 )
3285 else:
3286 return "NULL) OR (1 = 1"
3287 elif expand_op is operators.in_op:
3288 if len(type_) > 1:
3289 return "(%s)) AND (1 != 1" % (
3290 ", ".join("NULL" for element in type_)
3291 )
3292 else:
3293 return "NULL) AND (1 != 1"
3294 else:
3295 return self.visit_empty_set_expr(type_)
3296
3297 def visit_empty_set_expr(self, element_types, **kw):
3298 raise NotImplementedError(
3299 "Dialect '%s' does not support empty set expression."
3300 % self.dialect.name
3301 )
3302
3303 def _literal_execute_expanding_parameter_literal_binds(
3304 self, parameter, values, bind_expression_template=None
3305 ):
3306 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3307
3308 if not values:
3309 # empty IN expression. note we don't need to use
3310 # bind_expression_template here because there are no
3311 # expressions to render.
3312
3313 if typ_dialect_impl._is_tuple_type:
3314 replacement_expression = (
3315 "VALUES " if self.dialect.tuple_in_values else ""
3316 ) + self.visit_empty_set_op_expr(
3317 parameter.type.types, parameter.expand_op
3318 )
3319
3320 else:
3321 replacement_expression = self.visit_empty_set_op_expr(
3322 [parameter.type], parameter.expand_op
3323 )
3324
3325 elif typ_dialect_impl._is_tuple_type or (
3326 typ_dialect_impl._isnull
3327 and isinstance(values[0], collections_abc.Sequence)
3328 and not isinstance(values[0], (str, bytes))
3329 ):
3330 if typ_dialect_impl._has_bind_expression:
3331 raise NotImplementedError(
3332 "bind_expression() on TupleType not supported with "
3333 "literal_binds"
3334 )
3335
3336 replacement_expression = (
3337 "VALUES " if self.dialect.tuple_in_values else ""
3338 ) + ", ".join(
3339 "(%s)"
3340 % (
3341 ", ".join(
3342 self.render_literal_value(value, param_type)
3343 for value, param_type in zip(
3344 tuple_element, parameter.type.types
3345 )
3346 )
3347 )
3348 for i, tuple_element in enumerate(values)
3349 )
3350 else:
3351 if bind_expression_template:
3352 post_compile_pattern = self._post_compile_pattern
3353 m = post_compile_pattern.search(bind_expression_template)
3354 assert m and m.group(
3355 2
3356 ), "unexpected format for expanding parameter"
3357
3358 tok = m.group(2).split("~~")
3359 be_left, be_right = tok[1], tok[3]
3360 replacement_expression = ", ".join(
3361 "%s%s%s"
3362 % (
3363 be_left,
3364 self.render_literal_value(value, parameter.type),
3365 be_right,
3366 )
3367 for value in values
3368 )
3369 else:
3370 replacement_expression = ", ".join(
3371 self.render_literal_value(value, parameter.type)
3372 for value in values
3373 )
3374
3375 return (), replacement_expression
3376
3377 def _literal_execute_expanding_parameter(self, name, parameter, values):
3378 if parameter.literal_execute:
3379 return self._literal_execute_expanding_parameter_literal_binds(
3380 parameter, values
3381 )
3382
3383 dialect = self.dialect
3384 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3385
3386 if self._numeric_binds:
3387 bind_template = self.compilation_bindtemplate
3388 else:
3389 bind_template = self.bindtemplate
3390
3391 if (
3392 self.dialect._bind_typing_render_casts
3393 and typ_dialect_impl.render_bind_cast
3394 ):
3395
3396 def _render_bindtemplate(name):
3397 return self.render_bind_cast(
3398 parameter.type,
3399 typ_dialect_impl,
3400 bind_template % {"name": name},
3401 )
3402
3403 else:
3404
3405 def _render_bindtemplate(name):
3406 return bind_template % {"name": name}
3407
3408 if not values:
3409 to_update = []
3410 if typ_dialect_impl._is_tuple_type:
3411 replacement_expression = self.visit_empty_set_op_expr(
3412 parameter.type.types, parameter.expand_op
3413 )
3414 else:
3415 replacement_expression = self.visit_empty_set_op_expr(
3416 [parameter.type], parameter.expand_op
3417 )
3418
3419 elif typ_dialect_impl._is_tuple_type or (
3420 typ_dialect_impl._isnull
3421 and isinstance(values[0], collections_abc.Sequence)
3422 and not isinstance(values[0], (str, bytes))
3423 ):
3424 assert not typ_dialect_impl._is_array
3425 to_update = [
3426 ("%s_%s_%s" % (name, i, j), value)
3427 for i, tuple_element in enumerate(values, 1)
3428 for j, value in enumerate(tuple_element, 1)
3429 ]
3430
3431 replacement_expression = (
3432 "VALUES " if dialect.tuple_in_values else ""
3433 ) + ", ".join(
3434 "(%s)"
3435 % (
3436 ", ".join(
3437 _render_bindtemplate(
3438 to_update[i * len(tuple_element) + j][0]
3439 )
3440 for j, value in enumerate(tuple_element)
3441 )
3442 )
3443 for i, tuple_element in enumerate(values)
3444 )
3445 else:
3446 to_update = [
3447 ("%s_%s" % (name, i), value)
3448 for i, value in enumerate(values, 1)
3449 ]
3450 replacement_expression = ", ".join(
3451 _render_bindtemplate(key) for key, value in to_update
3452 )
3453
3454 return to_update, replacement_expression
3455
3456 def visit_binary(
3457 self,
3458 binary,
3459 override_operator=None,
3460 eager_grouping=False,
3461 from_linter=None,
3462 lateral_from_linter=None,
3463 **kw,
3464 ):
3465 if from_linter and operators.is_comparison(binary.operator):
3466 if lateral_from_linter is not None:
3467 enclosing_lateral = kw["enclosing_lateral"]
3468 lateral_from_linter.edges.update(
3469 itertools.product(
3470 _de_clone(
3471 binary.left._from_objects + [enclosing_lateral]
3472 ),
3473 _de_clone(
3474 binary.right._from_objects + [enclosing_lateral]
3475 ),
3476 )
3477 )
3478 else:
3479 from_linter.edges.update(
3480 itertools.product(
3481 _de_clone(binary.left._from_objects),
3482 _de_clone(binary.right._from_objects),
3483 )
3484 )
3485
3486 # don't allow "? = ?" to render
3487 if (
3488 self.ansi_bind_rules
3489 and isinstance(binary.left, elements.BindParameter)
3490 and isinstance(binary.right, elements.BindParameter)
3491 ):
3492 kw["literal_execute"] = True
3493
3494 operator_ = override_operator or binary.operator
3495 disp = self._get_operator_dispatch(operator_, "binary", None)
3496 if disp:
3497 return disp(binary, operator_, **kw)
3498 else:
3499 try:
3500 opstring = OPERATORS[operator_]
3501 except KeyError as err:
3502 raise exc.UnsupportedCompilationError(self, operator_) from err
3503 else:
3504 return self._generate_generic_binary(
3505 binary,
3506 opstring,
3507 from_linter=from_linter,
3508 lateral_from_linter=lateral_from_linter,
3509 **kw,
3510 )
3511
3512 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3513 return self.process(element.sql_function, **kw)
3514
3515 def visit_mod_binary(self, binary, operator, **kw):
3516 if self.preparer._double_percents:
3517 return (
3518 self.process(binary.left, **kw)
3519 + " %% "
3520 + self.process(binary.right, **kw)
3521 )
3522 else:
3523 return (
3524 self.process(binary.left, **kw)
3525 + " % "
3526 + self.process(binary.right, **kw)
3527 )
3528
3529 def visit_custom_op_binary(self, element, operator, **kw):
3530 kw["eager_grouping"] = operator.eager_grouping
3531 return self._generate_generic_binary(
3532 element,
3533 " " + self.escape_literal_column(operator.opstring) + " ",
3534 **kw,
3535 )
3536
3537 def visit_custom_op_unary_operator(self, element, operator, **kw):
3538 return self._generate_generic_unary_operator(
3539 element, self.escape_literal_column(operator.opstring) + " ", **kw
3540 )
3541
3542 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3543 return self._generate_generic_unary_modifier(
3544 element, " " + self.escape_literal_column(operator.opstring), **kw
3545 )
3546
3547 def _generate_generic_binary(
3548 self,
3549 binary: BinaryExpression[Any],
3550 opstring: str,
3551 eager_grouping: bool = False,
3552 **kw: Any,
3553 ) -> str:
3554 _in_operator_expression = kw.get("_in_operator_expression", False)
3555
3556 kw["_in_operator_expression"] = True
3557 kw["_binary_op"] = binary.operator
3558 text = (
3559 binary.left._compiler_dispatch(
3560 self, eager_grouping=eager_grouping, **kw
3561 )
3562 + opstring
3563 + binary.right._compiler_dispatch(
3564 self, eager_grouping=eager_grouping, **kw
3565 )
3566 )
3567
3568 if _in_operator_expression and eager_grouping:
3569 text = "(%s)" % text
3570 return text
3571
3572 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3573 return opstring + unary.element._compiler_dispatch(self, **kw)
3574
3575 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3576 return unary.element._compiler_dispatch(self, **kw) + opstring
3577
3578 @util.memoized_property
3579 def _like_percent_literal(self):
3580 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3581
3582 def visit_ilike_case_insensitive_operand(self, element, **kw):
3583 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3584
3585 def visit_contains_op_binary(self, binary, operator, **kw):
3586 binary = binary._clone()
3587 percent = self._like_percent_literal
3588 binary.right = percent.concat(binary.right).concat(percent)
3589 return self.visit_like_op_binary(binary, operator, **kw)
3590
3591 def visit_not_contains_op_binary(self, binary, operator, **kw):
3592 binary = binary._clone()
3593 percent = self._like_percent_literal
3594 binary.right = percent.concat(binary.right).concat(percent)
3595 return self.visit_not_like_op_binary(binary, operator, **kw)
3596
3597 def visit_icontains_op_binary(self, binary, operator, **kw):
3598 binary = binary._clone()
3599 percent = self._like_percent_literal
3600 binary.left = ilike_case_insensitive(binary.left)
3601 binary.right = percent.concat(
3602 ilike_case_insensitive(binary.right)
3603 ).concat(percent)
3604 return self.visit_ilike_op_binary(binary, operator, **kw)
3605
3606 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3607 binary = binary._clone()
3608 percent = self._like_percent_literal
3609 binary.left = ilike_case_insensitive(binary.left)
3610 binary.right = percent.concat(
3611 ilike_case_insensitive(binary.right)
3612 ).concat(percent)
3613 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3614
3615 def visit_startswith_op_binary(self, binary, operator, **kw):
3616 binary = binary._clone()
3617 percent = self._like_percent_literal
3618 binary.right = percent._rconcat(binary.right)
3619 return self.visit_like_op_binary(binary, operator, **kw)
3620
3621 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3622 binary = binary._clone()
3623 percent = self._like_percent_literal
3624 binary.right = percent._rconcat(binary.right)
3625 return self.visit_not_like_op_binary(binary, operator, **kw)
3626
3627 def visit_istartswith_op_binary(self, binary, operator, **kw):
3628 binary = binary._clone()
3629 percent = self._like_percent_literal
3630 binary.left = ilike_case_insensitive(binary.left)
3631 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3632 return self.visit_ilike_op_binary(binary, operator, **kw)
3633
3634 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3635 binary = binary._clone()
3636 percent = self._like_percent_literal
3637 binary.left = ilike_case_insensitive(binary.left)
3638 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3639 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3640
3641 def visit_endswith_op_binary(self, binary, operator, **kw):
3642 binary = binary._clone()
3643 percent = self._like_percent_literal
3644 binary.right = percent.concat(binary.right)
3645 return self.visit_like_op_binary(binary, operator, **kw)
3646
3647 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3648 binary = binary._clone()
3649 percent = self._like_percent_literal
3650 binary.right = percent.concat(binary.right)
3651 return self.visit_not_like_op_binary(binary, operator, **kw)
3652
3653 def visit_iendswith_op_binary(self, binary, operator, **kw):
3654 binary = binary._clone()
3655 percent = self._like_percent_literal
3656 binary.left = ilike_case_insensitive(binary.left)
3657 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3658 return self.visit_ilike_op_binary(binary, operator, **kw)
3659
3660 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3661 binary = binary._clone()
3662 percent = self._like_percent_literal
3663 binary.left = ilike_case_insensitive(binary.left)
3664 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3665 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3666
3667 def visit_like_op_binary(self, binary, operator, **kw):
3668 escape = binary.modifiers.get("escape", None)
3669
3670 return "%s LIKE %s" % (
3671 binary.left._compiler_dispatch(self, **kw),
3672 binary.right._compiler_dispatch(self, **kw),
3673 ) + (
3674 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3675 if escape is not None
3676 else ""
3677 )
3678
3679 def visit_not_like_op_binary(self, binary, operator, **kw):
3680 escape = binary.modifiers.get("escape", None)
3681 return "%s NOT LIKE %s" % (
3682 binary.left._compiler_dispatch(self, **kw),
3683 binary.right._compiler_dispatch(self, **kw),
3684 ) + (
3685 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3686 if escape is not None
3687 else ""
3688 )
3689
3690 def visit_ilike_op_binary(self, binary, operator, **kw):
3691 if operator is operators.ilike_op:
3692 binary = binary._clone()
3693 binary.left = ilike_case_insensitive(binary.left)
3694 binary.right = ilike_case_insensitive(binary.right)
3695 # else we assume ilower() has been applied
3696
3697 return self.visit_like_op_binary(binary, operator, **kw)
3698
3699 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3700 if operator is operators.not_ilike_op:
3701 binary = binary._clone()
3702 binary.left = ilike_case_insensitive(binary.left)
3703 binary.right = ilike_case_insensitive(binary.right)
3704 # else we assume ilower() has been applied
3705
3706 return self.visit_not_like_op_binary(binary, operator, **kw)
3707
3708 def visit_between_op_binary(self, binary, operator, **kw):
3709 symmetric = binary.modifiers.get("symmetric", False)
3710 return self._generate_generic_binary(
3711 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3712 )
3713
3714 def visit_not_between_op_binary(self, binary, operator, **kw):
3715 symmetric = binary.modifiers.get("symmetric", False)
3716 return self._generate_generic_binary(
3717 binary,
3718 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3719 **kw,
3720 )
3721
3722 def visit_regexp_match_op_binary(
3723 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3724 ) -> str:
3725 raise exc.CompileError(
3726 "%s dialect does not support regular expressions"
3727 % self.dialect.name
3728 )
3729
3730 def visit_not_regexp_match_op_binary(
3731 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3732 ) -> str:
3733 raise exc.CompileError(
3734 "%s dialect does not support regular expressions"
3735 % self.dialect.name
3736 )
3737
3738 def visit_regexp_replace_op_binary(
3739 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3740 ) -> str:
3741 raise exc.CompileError(
3742 "%s dialect does not support regular expression replacements"
3743 % self.dialect.name
3744 )
3745
3746 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3747 if bindmarkers is None:
3748 raise exc.CompileError(
3749 "DML target objects may only be used with "
3750 "compiled INSERT or UPDATE statements"
3751 )
3752
3753 bindmarkers[element.column.key] = element
3754 return f"__BINDMARKER_~~{element.column.key}~~"
3755
3756 def visit_bindparam(
3757 self,
3758 bindparam,
3759 within_columns_clause=False,
3760 literal_binds=False,
3761 skip_bind_expression=False,
3762 literal_execute=False,
3763 render_postcompile=False,
3764 **kwargs,
3765 ):
3766
3767 if not skip_bind_expression:
3768 impl = bindparam.type.dialect_impl(self.dialect)
3769 if impl._has_bind_expression:
3770 bind_expression = impl.bind_expression(bindparam)
3771 wrapped = self.process(
3772 bind_expression,
3773 skip_bind_expression=True,
3774 within_columns_clause=within_columns_clause,
3775 literal_binds=literal_binds and not bindparam.expanding,
3776 literal_execute=literal_execute,
3777 render_postcompile=render_postcompile,
3778 **kwargs,
3779 )
3780 if bindparam.expanding:
3781 # for postcompile w/ expanding, move the "wrapped" part
3782 # of this into the inside
3783
3784 m = re.match(
3785 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3786 )
3787 assert m, "unexpected format for expanding parameter"
3788 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3789 m.group(2),
3790 m.group(1),
3791 m.group(3),
3792 )
3793
3794 if literal_binds:
3795 ret = self.render_literal_bindparam(
3796 bindparam,
3797 within_columns_clause=True,
3798 bind_expression_template=wrapped,
3799 **kwargs,
3800 )
3801 return f"({ret})"
3802
3803 return wrapped
3804
3805 if not literal_binds:
3806 literal_execute = (
3807 literal_execute
3808 or bindparam.literal_execute
3809 or (within_columns_clause and self.ansi_bind_rules)
3810 )
3811 post_compile = literal_execute or bindparam.expanding
3812 else:
3813 post_compile = False
3814
3815 if literal_binds:
3816 ret = self.render_literal_bindparam(
3817 bindparam, within_columns_clause=True, **kwargs
3818 )
3819 if bindparam.expanding:
3820 ret = f"({ret})"
3821 return ret
3822
3823 name = self._truncate_bindparam(bindparam)
3824
3825 if name in self.binds:
3826 existing = self.binds[name]
3827 if existing is not bindparam:
3828 if (
3829 (existing.unique or bindparam.unique)
3830 and not existing.proxy_set.intersection(
3831 bindparam.proxy_set
3832 )
3833 and not existing._cloned_set.intersection(
3834 bindparam._cloned_set
3835 )
3836 ):
3837 raise exc.CompileError(
3838 "Bind parameter '%s' conflicts with "
3839 "unique bind parameter of the same name" % name
3840 )
3841 elif existing.expanding != bindparam.expanding:
3842 raise exc.CompileError(
3843 "Can't reuse bound parameter name '%s' in both "
3844 "'expanding' (e.g. within an IN expression) and "
3845 "non-expanding contexts. If this parameter is to "
3846 "receive a list/array value, set 'expanding=True' on "
3847 "it for expressions that aren't IN, otherwise use "
3848 "a different parameter name." % (name,)
3849 )
3850 elif existing._is_crud or bindparam._is_crud:
3851 if existing._is_crud and bindparam._is_crud:
3852 # TODO: this condition is not well understood.
3853 # see tests in test/sql/test_update.py
3854 raise exc.CompileError(
3855 "Encountered unsupported case when compiling an "
3856 "INSERT or UPDATE statement. If this is a "
3857 "multi-table "
3858 "UPDATE statement, please provide string-named "
3859 "arguments to the "
3860 "values() method with distinct names; support for "
3861 "multi-table UPDATE statements that "
3862 "target multiple tables for UPDATE is very "
3863 "limited",
3864 )
3865 else:
3866 raise exc.CompileError(
3867 f"bindparam() name '{bindparam.key}' is reserved "
3868 "for automatic usage in the VALUES or SET "
3869 "clause of this "
3870 "insert/update statement. Please use a "
3871 "name other than column name when using "
3872 "bindparam() "
3873 "with insert() or update() (for example, "
3874 f"'b_{bindparam.key}')."
3875 )
3876
3877 self.binds[bindparam.key] = self.binds[name] = bindparam
3878
3879 # if we are given a cache key that we're going to match against,
3880 # relate the bindparam here to one that is most likely present
3881 # in the "extracted params" portion of the cache key. this is used
3882 # to set up a positional mapping that is used to determine the
3883 # correct parameters for a subsequent use of this compiled with
3884 # a different set of parameter values. here, we accommodate for
3885 # parameters that may have been cloned both before and after the cache
3886 # key was been generated.
3887 ckbm_tuple = self._cache_key_bind_match
3888
3889 if ckbm_tuple:
3890 ckbm, cksm = ckbm_tuple
3891 for bp in bindparam._cloned_set:
3892 if bp.key in cksm:
3893 cb = cksm[bp.key]
3894 ckbm[cb].append(bindparam)
3895
3896 if bindparam.isoutparam:
3897 self.has_out_parameters = True
3898
3899 if post_compile:
3900 if render_postcompile:
3901 self._render_postcompile = True
3902
3903 if literal_execute:
3904 self.literal_execute_params |= {bindparam}
3905 else:
3906 self.post_compile_params |= {bindparam}
3907
3908 ret = self.bindparam_string(
3909 name,
3910 post_compile=post_compile,
3911 expanding=bindparam.expanding,
3912 bindparam_type=bindparam.type,
3913 **kwargs,
3914 )
3915
3916 if bindparam.expanding:
3917 ret = f"({ret})"
3918
3919 return ret
3920
3921 def render_bind_cast(self, type_, dbapi_type, sqltext):
3922 raise NotImplementedError()
3923
3924 def render_literal_bindparam(
3925 self,
3926 bindparam,
3927 render_literal_value=NO_ARG,
3928 bind_expression_template=None,
3929 **kw,
3930 ):
3931 if render_literal_value is not NO_ARG:
3932 value = render_literal_value
3933 else:
3934 if bindparam.value is None and bindparam.callable is None:
3935 op = kw.get("_binary_op", None)
3936 if op and op not in (operators.is_, operators.is_not):
3937 util.warn_limited(
3938 "Bound parameter '%s' rendering literal NULL in a SQL "
3939 "expression; comparisons to NULL should not use "
3940 "operators outside of 'is' or 'is not'",
3941 (bindparam.key,),
3942 )
3943 return self.process(sqltypes.NULLTYPE, **kw)
3944 value = bindparam.effective_value
3945
3946 if bindparam.expanding:
3947 leep = self._literal_execute_expanding_parameter_literal_binds
3948 to_update, replacement_expr = leep(
3949 bindparam,
3950 value,
3951 bind_expression_template=bind_expression_template,
3952 )
3953 return replacement_expr
3954 else:
3955 return self.render_literal_value(value, bindparam.type)
3956
3957 def render_literal_value(
3958 self, value: Any, type_: sqltypes.TypeEngine[Any]
3959 ) -> str:
3960 """Render the value of a bind parameter as a quoted literal.
3961
3962 This is used for statement sections that do not accept bind parameters
3963 on the target driver/database.
3964
3965 This should be implemented by subclasses using the quoting services
3966 of the DBAPI.
3967
3968 """
3969
3970 if value is None and not type_.should_evaluate_none:
3971 # issue #10535 - handle NULL in the compiler without placing
3972 # this onto each type, except for "evaluate None" types
3973 # (e.g. JSON)
3974 return self.process(elements.Null._instance())
3975
3976 processor = type_._cached_literal_processor(self.dialect)
3977 if processor:
3978 try:
3979 return processor(value)
3980 except Exception as e:
3981 raise exc.CompileError(
3982 f"Could not render literal value "
3983 f'"{sql_util._repr_single_value(value)}" '
3984 f"with datatype "
3985 f"{type_}; see parent stack trace for "
3986 "more detail."
3987 ) from e
3988
3989 else:
3990 raise exc.CompileError(
3991 f"No literal value renderer is available for literal value "
3992 f'"{sql_util._repr_single_value(value)}" '
3993 f"with datatype {type_}"
3994 )
3995
3996 def _truncate_bindparam(self, bindparam):
3997 if bindparam in self.bind_names:
3998 return self.bind_names[bindparam]
3999
4000 bind_name = bindparam.key
4001 if isinstance(bind_name, elements._truncated_label):
4002 bind_name = self._truncated_identifier("bindparam", bind_name)
4003
4004 # add to bind_names for translation
4005 self.bind_names[bindparam] = bind_name
4006
4007 return bind_name
4008
4009 def _truncated_identifier(
4010 self, ident_class: str, name: _truncated_label
4011 ) -> str:
4012 if (ident_class, name) in self.truncated_names:
4013 return self.truncated_names[(ident_class, name)]
4014
4015 anonname = name.apply_map(self.anon_map)
4016
4017 if len(anonname) > self.label_length - 6:
4018 counter = self._truncated_counters.get(ident_class, 1)
4019 truncname = (
4020 anonname[0 : max(self.label_length - 6, 0)]
4021 + "_"
4022 + hex(counter)[2:]
4023 )
4024 self._truncated_counters[ident_class] = counter + 1
4025 else:
4026 truncname = anonname
4027 self.truncated_names[(ident_class, name)] = truncname
4028 return truncname
4029
4030 def _anonymize(self, name: str) -> str:
4031 return name % self.anon_map
4032
4033 def bindparam_string(
4034 self,
4035 name: str,
4036 post_compile: bool = False,
4037 expanding: bool = False,
4038 escaped_from: Optional[str] = None,
4039 bindparam_type: Optional[TypeEngine[Any]] = None,
4040 accumulate_bind_names: Optional[Set[str]] = None,
4041 visited_bindparam: Optional[List[str]] = None,
4042 **kw: Any,
4043 ) -> str:
4044 # TODO: accumulate_bind_names is passed by crud.py to gather
4045 # names on a per-value basis, visited_bindparam is passed by
4046 # visit_insert() to collect all parameters in the statement.
4047 # see if this gathering can be simplified somehow
4048 if accumulate_bind_names is not None:
4049 accumulate_bind_names.add(name)
4050 if visited_bindparam is not None:
4051 visited_bindparam.append(name)
4052
4053 if not escaped_from:
4054 if self._bind_translate_re.search(name):
4055 # not quite the translate use case as we want to
4056 # also get a quick boolean if we even found
4057 # unusual characters in the name
4058 new_name = self._bind_translate_re.sub(
4059 lambda m: self._bind_translate_chars[m.group(0)],
4060 name,
4061 )
4062 escaped_from = name
4063 name = new_name
4064
4065 if escaped_from:
4066 self.escaped_bind_names = self.escaped_bind_names.union(
4067 {escaped_from: name}
4068 )
4069 if post_compile:
4070 ret = "__[POSTCOMPILE_%s]" % name
4071 if expanding:
4072 # for expanding, bound parameters or literal values will be
4073 # rendered per item
4074 return ret
4075
4076 # otherwise, for non-expanding "literal execute", apply
4077 # bind casts as determined by the datatype
4078 if bindparam_type is not None:
4079 type_impl = bindparam_type._unwrapped_dialect_impl(
4080 self.dialect
4081 )
4082 if type_impl.render_literal_cast:
4083 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4084 return ret
4085 elif self.state is CompilerState.COMPILING:
4086 ret = self.compilation_bindtemplate % {"name": name}
4087 else:
4088 ret = self.bindtemplate % {"name": name}
4089
4090 if (
4091 bindparam_type is not None
4092 and self.dialect._bind_typing_render_casts
4093 ):
4094 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4095 if type_impl.render_bind_cast:
4096 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4097
4098 return ret
4099
4100 def _dispatch_independent_ctes(self, stmt, kw):
4101 local_kw = kw.copy()
4102 local_kw.pop("cte_opts", None)
4103 for cte, opt in zip(
4104 stmt._independent_ctes, stmt._independent_ctes_opts
4105 ):
4106 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4107
4108 def visit_cte(
4109 self,
4110 cte: CTE,
4111 asfrom: bool = False,
4112 ashint: bool = False,
4113 fromhints: Optional[_FromHintsType] = None,
4114 visiting_cte: Optional[CTE] = None,
4115 from_linter: Optional[FromLinter] = None,
4116 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4117 **kwargs: Any,
4118 ) -> Optional[str]:
4119 self_ctes = self._init_cte_state()
4120 assert self_ctes is self.ctes
4121
4122 kwargs["visiting_cte"] = cte
4123
4124 cte_name = cte.name
4125
4126 if isinstance(cte_name, elements._truncated_label):
4127 cte_name = self._truncated_identifier("alias", cte_name)
4128
4129 is_new_cte = True
4130 embedded_in_current_named_cte = False
4131
4132 _reference_cte = cte._get_reference_cte()
4133
4134 nesting = cte.nesting or cte_opts.nesting
4135
4136 # check for CTE already encountered
4137 if _reference_cte in self.level_name_by_cte:
4138 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4139 _reference_cte
4140 ]
4141 assert _ == cte_name
4142
4143 cte_level_name = (cte_level, cte_name)
4144 existing_cte = self.ctes_by_level_name[cte_level_name]
4145
4146 # check if we are receiving it here with a specific
4147 # "nest_here" location; if so, move it to this location
4148
4149 if cte_opts.nesting:
4150 if existing_cte_opts.nesting:
4151 raise exc.CompileError(
4152 "CTE is stated as 'nest_here' in "
4153 "more than one location"
4154 )
4155
4156 old_level_name = (cte_level, cte_name)
4157 cte_level = len(self.stack) if nesting else 1
4158 cte_level_name = new_level_name = (cte_level, cte_name)
4159
4160 del self.ctes_by_level_name[old_level_name]
4161 self.ctes_by_level_name[new_level_name] = existing_cte
4162 self.level_name_by_cte[_reference_cte] = new_level_name + (
4163 cte_opts,
4164 )
4165
4166 else:
4167 cte_level = len(self.stack) if nesting else 1
4168 cte_level_name = (cte_level, cte_name)
4169
4170 if cte_level_name in self.ctes_by_level_name:
4171 existing_cte = self.ctes_by_level_name[cte_level_name]
4172 else:
4173 existing_cte = None
4174
4175 if existing_cte is not None:
4176 embedded_in_current_named_cte = visiting_cte is existing_cte
4177
4178 # we've generated a same-named CTE that we are enclosed in,
4179 # or this is the same CTE. just return the name.
4180 if cte is existing_cte._restates or cte is existing_cte:
4181 is_new_cte = False
4182 elif existing_cte is cte._restates:
4183 # we've generated a same-named CTE that is
4184 # enclosed in us - we take precedence, so
4185 # discard the text for the "inner".
4186 del self_ctes[existing_cte]
4187
4188 existing_cte_reference_cte = existing_cte._get_reference_cte()
4189
4190 assert existing_cte_reference_cte is _reference_cte
4191 assert existing_cte_reference_cte is existing_cte
4192
4193 del self.level_name_by_cte[existing_cte_reference_cte]
4194 else:
4195 if (
4196 # if the two CTEs have the same hash, which we expect
4197 # here means that one/both is an annotated of the other
4198 (hash(cte) == hash(existing_cte))
4199 # or...
4200 or (
4201 (
4202 # if they are clones, i.e. they came from the ORM
4203 # or some other visit method
4204 cte._is_clone_of is not None
4205 or existing_cte._is_clone_of is not None
4206 )
4207 # and are deep-copy identical
4208 and cte.compare(existing_cte)
4209 )
4210 ):
4211 # then consider these two CTEs the same
4212 is_new_cte = False
4213 else:
4214 # otherwise these are two CTEs that either will render
4215 # differently, or were indicated separately by the user,
4216 # with the same name
4217 raise exc.CompileError(
4218 "Multiple, unrelated CTEs found with "
4219 "the same name: %r" % cte_name
4220 )
4221
4222 if not asfrom and not is_new_cte:
4223 return None
4224
4225 if cte._cte_alias is not None:
4226 pre_alias_cte = cte._cte_alias
4227 cte_pre_alias_name = cte._cte_alias.name
4228 if isinstance(cte_pre_alias_name, elements._truncated_label):
4229 cte_pre_alias_name = self._truncated_identifier(
4230 "alias", cte_pre_alias_name
4231 )
4232 else:
4233 pre_alias_cte = cte
4234 cte_pre_alias_name = None
4235
4236 if is_new_cte:
4237 self.ctes_by_level_name[cte_level_name] = cte
4238 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4239 cte_opts,
4240 )
4241
4242 if pre_alias_cte not in self.ctes:
4243 self.visit_cte(pre_alias_cte, **kwargs)
4244
4245 if not cte_pre_alias_name and cte not in self_ctes:
4246 if cte.recursive:
4247 self.ctes_recursive = True
4248 text = self.preparer.format_alias(cte, cte_name)
4249 if cte.recursive or cte.element.name_cte_columns:
4250 col_source = cte.element
4251
4252 # TODO: can we get at the .columns_plus_names collection
4253 # that is already (or will be?) generated for the SELECT
4254 # rather than calling twice?
4255 recur_cols = [
4256 # TODO: proxy_name is not technically safe,
4257 # see test_cte->
4258 # test_with_recursive_no_name_currently_buggy. not
4259 # clear what should be done with such a case
4260 fallback_label_name or proxy_name
4261 for (
4262 _,
4263 proxy_name,
4264 fallback_label_name,
4265 c,
4266 repeated,
4267 ) in (col_source._generate_columns_plus_names(True))
4268 if not repeated
4269 ]
4270
4271 text += "(%s)" % (
4272 ", ".join(
4273 self.preparer.format_label_name(
4274 ident, anon_map=self.anon_map
4275 )
4276 for ident in recur_cols
4277 )
4278 )
4279
4280 assert kwargs.get("subquery", False) is False
4281
4282 if not self.stack:
4283 # toplevel, this is a stringify of the
4284 # cte directly. just compile the inner
4285 # the way alias() does.
4286 return cte.element._compiler_dispatch(
4287 self, asfrom=asfrom, **kwargs
4288 )
4289 else:
4290 prefixes = self._generate_prefixes(
4291 cte, cte._prefixes, **kwargs
4292 )
4293 inner = cte.element._compiler_dispatch(
4294 self, asfrom=True, **kwargs
4295 )
4296
4297 text += " AS %s\n(%s)" % (prefixes, inner)
4298
4299 if cte._suffixes:
4300 text += " " + self._generate_prefixes(
4301 cte, cte._suffixes, **kwargs
4302 )
4303
4304 self_ctes[cte] = text
4305
4306 if asfrom:
4307 if from_linter:
4308 from_linter.froms[cte._de_clone()] = cte_name
4309
4310 if not is_new_cte and embedded_in_current_named_cte:
4311 return self.preparer.format_alias(cte, cte_name)
4312
4313 if cte_pre_alias_name:
4314 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4315 if self.preparer._requires_quotes(cte_name):
4316 cte_name = self.preparer.quote(cte_name)
4317 text += self.get_render_as_alias_suffix(cte_name)
4318 return text # type: ignore[no-any-return]
4319 else:
4320 return self.preparer.format_alias(cte, cte_name)
4321
4322 return None
4323
4324 def visit_table_valued_alias(self, element, **kw):
4325 if element.joins_implicitly:
4326 kw["from_linter"] = None
4327 if element._is_lateral:
4328 return self.visit_lateral(element, **kw)
4329 else:
4330 return self.visit_alias(element, **kw)
4331
4332 def visit_table_valued_column(self, element, **kw):
4333 return self.visit_column(element, **kw)
4334
4335 def visit_alias(
4336 self,
4337 alias,
4338 asfrom=False,
4339 ashint=False,
4340 iscrud=False,
4341 fromhints=None,
4342 subquery=False,
4343 lateral=False,
4344 enclosing_alias=None,
4345 from_linter=None,
4346 **kwargs,
4347 ):
4348 if lateral:
4349 if "enclosing_lateral" not in kwargs:
4350 # if lateral is set and enclosing_lateral is not
4351 # present, we assume we are being called directly
4352 # from visit_lateral() and we need to set enclosing_lateral.
4353 assert alias._is_lateral
4354 kwargs["enclosing_lateral"] = alias
4355
4356 # for lateral objects, we track a second from_linter that is...
4357 # lateral! to the level above us.
4358 if (
4359 from_linter
4360 and "lateral_from_linter" not in kwargs
4361 and "enclosing_lateral" in kwargs
4362 ):
4363 kwargs["lateral_from_linter"] = from_linter
4364
4365 if enclosing_alias is not None and enclosing_alias.element is alias:
4366 inner = alias.element._compiler_dispatch(
4367 self,
4368 asfrom=asfrom,
4369 ashint=ashint,
4370 iscrud=iscrud,
4371 fromhints=fromhints,
4372 lateral=lateral,
4373 enclosing_alias=alias,
4374 **kwargs,
4375 )
4376 if subquery and (asfrom or lateral):
4377 inner = "(%s)" % (inner,)
4378 return inner
4379 else:
4380 kwargs["enclosing_alias"] = alias
4381
4382 if asfrom or ashint:
4383 if isinstance(alias.name, elements._truncated_label):
4384 alias_name = self._truncated_identifier("alias", alias.name)
4385 else:
4386 alias_name = alias.name
4387
4388 if ashint:
4389 return self.preparer.format_alias(alias, alias_name)
4390 elif asfrom:
4391 if from_linter:
4392 from_linter.froms[alias._de_clone()] = alias_name
4393
4394 inner = alias.element._compiler_dispatch(
4395 self, asfrom=True, lateral=lateral, **kwargs
4396 )
4397 if subquery:
4398 inner = "(%s)" % (inner,)
4399
4400 ret = inner + self.get_render_as_alias_suffix(
4401 self.preparer.format_alias(alias, alias_name)
4402 )
4403
4404 if alias._supports_derived_columns and alias._render_derived:
4405 ret += "(%s)" % (
4406 ", ".join(
4407 "%s%s"
4408 % (
4409 self.preparer.quote(col.name),
4410 (
4411 " %s"
4412 % self.dialect.type_compiler_instance.process(
4413 col.type, **kwargs
4414 )
4415 if alias._render_derived_w_types
4416 else ""
4417 ),
4418 )
4419 for col in alias.c
4420 )
4421 )
4422
4423 if fromhints and alias in fromhints:
4424 ret = self.format_from_hint_text(
4425 ret, alias, fromhints[alias], iscrud
4426 )
4427
4428 return ret
4429 else:
4430 # note we cancel the "subquery" flag here as well
4431 return alias.element._compiler_dispatch(
4432 self, lateral=lateral, **kwargs
4433 )
4434
4435 def visit_subquery(self, subquery, **kw):
4436 kw["subquery"] = True
4437 return self.visit_alias(subquery, **kw)
4438
4439 def visit_lateral(self, lateral_, **kw):
4440 kw["lateral"] = True
4441 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4442
4443 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4444 text = "%s TABLESAMPLE %s" % (
4445 self.visit_alias(tablesample, asfrom=True, **kw),
4446 tablesample._get_method()._compiler_dispatch(self, **kw),
4447 )
4448
4449 if tablesample.seed is not None:
4450 text += " REPEATABLE (%s)" % (
4451 tablesample.seed._compiler_dispatch(self, **kw)
4452 )
4453
4454 return text
4455
4456 def _render_values(self, element, **kw):
4457 kw.setdefault("literal_binds", element.literal_binds)
4458 tuples = ", ".join(
4459 self.process(
4460 elements.Tuple(
4461 types=element._column_types, *elem
4462 ).self_group(),
4463 **kw,
4464 )
4465 for chunk in element._data
4466 for elem in chunk
4467 )
4468 return f"VALUES {tuples}"
4469
4470 def visit_values(
4471 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4472 ):
4473
4474 if element._independent_ctes:
4475 self._dispatch_independent_ctes(element, kw)
4476
4477 v = self._render_values(element, **kw)
4478
4479 if element._unnamed:
4480 name = None
4481 elif isinstance(element.name, elements._truncated_label):
4482 name = self._truncated_identifier("values", element.name)
4483 else:
4484 name = element.name
4485
4486 if element._is_lateral:
4487 lateral = "LATERAL "
4488 else:
4489 lateral = ""
4490
4491 if asfrom:
4492 if from_linter:
4493 from_linter.froms[element._de_clone()] = (
4494 name if name is not None else "(unnamed VALUES element)"
4495 )
4496
4497 if visiting_cte is not None and visiting_cte.element is element:
4498 if element._is_lateral:
4499 raise exc.CompileError(
4500 "Can't use a LATERAL VALUES expression inside of a CTE"
4501 )
4502 elif name:
4503 kw["include_table"] = False
4504 v = "%s(%s)%s (%s)" % (
4505 lateral,
4506 v,
4507 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4508 (
4509 ", ".join(
4510 c._compiler_dispatch(self, **kw)
4511 for c in element.columns
4512 )
4513 ),
4514 )
4515 else:
4516 v = "%s(%s)" % (lateral, v)
4517 return v
4518
4519 def visit_scalar_values(self, element, **kw):
4520 return f"({self._render_values(element, **kw)})"
4521
4522 def get_render_as_alias_suffix(self, alias_name_text):
4523 return " AS " + alias_name_text
4524
4525 def _add_to_result_map(
4526 self,
4527 keyname: str,
4528 name: str,
4529 objects: Tuple[Any, ...],
4530 type_: TypeEngine[Any],
4531 ) -> None:
4532
4533 # note objects must be non-empty for cursor.py to handle the
4534 # collection properly
4535 assert objects
4536
4537 if keyname is None or keyname == "*":
4538 self._ordered_columns = False
4539 self._ad_hoc_textual = True
4540 if type_._is_tuple_type:
4541 raise exc.CompileError(
4542 "Most backends don't support SELECTing "
4543 "from a tuple() object. If this is an ORM query, "
4544 "consider using the Bundle object."
4545 )
4546 self._result_columns.append(
4547 ResultColumnsEntry(keyname, name, objects, type_)
4548 )
4549
4550 def _label_returning_column(
4551 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4552 ):
4553 """Render a column with necessary labels inside of a RETURNING clause.
4554
4555 This method is provided for individual dialects in place of calling
4556 the _label_select_column method directly, so that the two use cases
4557 of RETURNING vs. SELECT can be disambiguated going forward.
4558
4559 .. versionadded:: 1.4.21
4560
4561 """
4562 return self._label_select_column(
4563 None,
4564 column,
4565 populate_result_map,
4566 False,
4567 {} if column_clause_args is None else column_clause_args,
4568 **kw,
4569 )
4570
4571 def _label_select_column(
4572 self,
4573 select,
4574 column,
4575 populate_result_map,
4576 asfrom,
4577 column_clause_args,
4578 name=None,
4579 proxy_name=None,
4580 fallback_label_name=None,
4581 within_columns_clause=True,
4582 column_is_repeated=False,
4583 need_column_expressions=False,
4584 include_table=True,
4585 ):
4586 """produce labeled columns present in a select()."""
4587 impl = column.type.dialect_impl(self.dialect)
4588
4589 if impl._has_column_expression and (
4590 need_column_expressions or populate_result_map
4591 ):
4592 col_expr = impl.column_expression(column)
4593 else:
4594 col_expr = column
4595
4596 if populate_result_map:
4597 # pass an "add_to_result_map" callable into the compilation
4598 # of embedded columns. this collects information about the
4599 # column as it will be fetched in the result and is coordinated
4600 # with cursor.description when the query is executed.
4601 add_to_result_map = self._add_to_result_map
4602
4603 # if the SELECT statement told us this column is a repeat,
4604 # wrap the callable with one that prevents the addition of the
4605 # targets
4606 if column_is_repeated:
4607 _add_to_result_map = add_to_result_map
4608
4609 def add_to_result_map(keyname, name, objects, type_):
4610 _add_to_result_map(keyname, name, (keyname,), type_)
4611
4612 # if we redefined col_expr for type expressions, wrap the
4613 # callable with one that adds the original column to the targets
4614 elif col_expr is not column:
4615 _add_to_result_map = add_to_result_map
4616
4617 def add_to_result_map(keyname, name, objects, type_):
4618 _add_to_result_map(
4619 keyname, name, (column,) + objects, type_
4620 )
4621
4622 else:
4623 add_to_result_map = None
4624
4625 # this method is used by some of the dialects for RETURNING,
4626 # which has different inputs. _label_returning_column was added
4627 # as the better target for this now however for 1.4 we will keep
4628 # _label_select_column directly compatible with this use case.
4629 # these assertions right now set up the current expected inputs
4630 assert within_columns_clause, (
4631 "_label_select_column is only relevant within "
4632 "the columns clause of a SELECT or RETURNING"
4633 )
4634 if isinstance(column, elements.Label):
4635 if col_expr is not column:
4636 result_expr = _CompileLabel(
4637 col_expr, column.name, alt_names=(column.element,)
4638 )
4639 else:
4640 result_expr = col_expr
4641
4642 elif name:
4643 # here, _columns_plus_names has determined there's an explicit
4644 # label name we need to use. this is the default for
4645 # tablenames_plus_columnnames as well as when columns are being
4646 # deduplicated on name
4647
4648 assert (
4649 proxy_name is not None
4650 ), "proxy_name is required if 'name' is passed"
4651
4652 result_expr = _CompileLabel(
4653 col_expr,
4654 name,
4655 alt_names=(
4656 proxy_name,
4657 # this is a hack to allow legacy result column lookups
4658 # to work as they did before; this goes away in 2.0.
4659 # TODO: this only seems to be tested indirectly
4660 # via test/orm/test_deprecations.py. should be a
4661 # resultset test for this
4662 column._tq_label,
4663 ),
4664 )
4665 else:
4666 # determine here whether this column should be rendered in
4667 # a labelled context or not, as we were given no required label
4668 # name from the caller. Here we apply heuristics based on the kind
4669 # of SQL expression involved.
4670
4671 if col_expr is not column:
4672 # type-specific expression wrapping the given column,
4673 # so we render a label
4674 render_with_label = True
4675 elif isinstance(column, elements.ColumnClause):
4676 # table-bound column, we render its name as a label if we are
4677 # inside of a subquery only
4678 render_with_label = (
4679 asfrom
4680 and not column.is_literal
4681 and column.table is not None
4682 )
4683 elif isinstance(column, elements.TextClause):
4684 render_with_label = False
4685 elif isinstance(column, elements.UnaryExpression):
4686 # unary expression. notes added as of #12681
4687 #
4688 # By convention, the visit_unary() method
4689 # itself does not add an entry to the result map, and relies
4690 # upon either the inner expression creating a result map
4691 # entry, or if not, by creating a label here that produces
4692 # the result map entry. Where that happens is based on whether
4693 # or not the element immediately inside the unary is a
4694 # NamedColumn subclass or not.
4695 #
4696 # Now, this also impacts how the SELECT is written; if
4697 # we decide to generate a label here, we get the usual
4698 # "~(x+y) AS anon_1" thing in the columns clause. If we
4699 # don't, we don't get an AS at all, we get like
4700 # "~table.column".
4701 #
4702 # But here is the important thing as of modernish (like 1.4)
4703 # versions of SQLAlchemy - **whether or not the AS <label>
4704 # is present in the statement is not actually important**.
4705 # We target result columns **positionally** for a fully
4706 # compiled ``Select()`` object; before 1.4 we needed those
4707 # labels to match in cursor.description etc etc but now it
4708 # really doesn't matter.
4709 # So really, we could set render_with_label True in all cases.
4710 # Or we could just have visit_unary() populate the result map
4711 # in all cases.
4712 #
4713 # What we're doing here is strictly trying to not rock the
4714 # boat too much with when we do/don't render "AS label";
4715 # labels being present helps in the edge cases that we
4716 # "fall back" to named cursor.description matching, labels
4717 # not being present for columns keeps us from having awkward
4718 # phrases like "SELECT DISTINCT table.x AS x".
4719 render_with_label = (
4720 (
4721 # exception case to detect if we render "not boolean"
4722 # as "not <col>" for native boolean or "<col> = 1"
4723 # for non-native boolean. this is controlled by
4724 # visit_is_<true|false>_unary_operator
4725 column.operator
4726 in (operators.is_false, operators.is_true)
4727 and not self.dialect.supports_native_boolean
4728 )
4729 or column._wraps_unnamed_column()
4730 or asfrom
4731 )
4732 elif (
4733 # general class of expressions that don't have a SQL-column
4734 # addressible name. includes scalar selects, bind parameters,
4735 # SQL functions, others
4736 not isinstance(column, elements.NamedColumn)
4737 # deeper check that indicates there's no natural "name" to
4738 # this element, which accommodates for custom SQL constructs
4739 # that might have a ".name" attribute (but aren't SQL
4740 # functions) but are not implementing this more recently added
4741 # base class. in theory the "NamedColumn" check should be
4742 # enough, however here we seek to maintain legacy behaviors
4743 # as well.
4744 and column._non_anon_label is None
4745 ):
4746 render_with_label = True
4747 else:
4748 render_with_label = False
4749
4750 if render_with_label:
4751 if not fallback_label_name:
4752 # used by the RETURNING case right now. we generate it
4753 # here as 3rd party dialects may be referring to
4754 # _label_select_column method directly instead of the
4755 # just-added _label_returning_column method
4756 assert not column_is_repeated
4757 fallback_label_name = column._anon_name_label
4758
4759 fallback_label_name = (
4760 elements._truncated_label(fallback_label_name)
4761 if not isinstance(
4762 fallback_label_name, elements._truncated_label
4763 )
4764 else fallback_label_name
4765 )
4766
4767 result_expr = _CompileLabel(
4768 col_expr, fallback_label_name, alt_names=(proxy_name,)
4769 )
4770 else:
4771 result_expr = col_expr
4772
4773 column_clause_args.update(
4774 within_columns_clause=within_columns_clause,
4775 add_to_result_map=add_to_result_map,
4776 include_table=include_table,
4777 )
4778 return result_expr._compiler_dispatch(self, **column_clause_args)
4779
4780 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4781 hinttext = self.get_from_hint_text(table, hint)
4782 if hinttext:
4783 sqltext += " " + hinttext
4784 return sqltext
4785
4786 def get_select_hint_text(self, byfroms):
4787 return None
4788
4789 def get_from_hint_text(
4790 self, table: FromClause, text: Optional[str]
4791 ) -> Optional[str]:
4792 return None
4793
4794 def get_crud_hint_text(self, table, text):
4795 return None
4796
4797 def get_statement_hint_text(self, hint_texts):
4798 return " ".join(hint_texts)
4799
4800 _default_stack_entry: _CompilerStackEntry
4801
4802 if not typing.TYPE_CHECKING:
4803 _default_stack_entry = util.immutabledict(
4804 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4805 )
4806
4807 def _display_froms_for_select(
4808 self, select_stmt, asfrom, lateral=False, **kw
4809 ):
4810 # utility method to help external dialects
4811 # get the correct from list for a select.
4812 # specifically the oracle dialect needs this feature
4813 # right now.
4814 toplevel = not self.stack
4815 entry = self._default_stack_entry if toplevel else self.stack[-1]
4816
4817 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4818
4819 correlate_froms = entry["correlate_froms"]
4820 asfrom_froms = entry["asfrom_froms"]
4821
4822 if asfrom and not lateral:
4823 froms = compile_state._get_display_froms(
4824 explicit_correlate_froms=correlate_froms.difference(
4825 asfrom_froms
4826 ),
4827 implicit_correlate_froms=(),
4828 )
4829 else:
4830 froms = compile_state._get_display_froms(
4831 explicit_correlate_froms=correlate_froms,
4832 implicit_correlate_froms=asfrom_froms,
4833 )
4834 return froms
4835
4836 translate_select_structure: Any = None
4837 """if not ``None``, should be a callable which accepts ``(select_stmt,
4838 **kw)`` and returns a select object. this is used for structural changes
4839 mostly to accommodate for LIMIT/OFFSET schemes
4840
4841 """
4842
4843 def visit_select(
4844 self,
4845 select_stmt,
4846 asfrom=False,
4847 insert_into=False,
4848 fromhints=None,
4849 compound_index=None,
4850 select_wraps_for=None,
4851 lateral=False,
4852 from_linter=None,
4853 **kwargs,
4854 ):
4855 assert select_wraps_for is None, (
4856 "SQLAlchemy 1.4 requires use of "
4857 "the translate_select_structure hook for structural "
4858 "translations of SELECT objects"
4859 )
4860
4861 # initial setup of SELECT. the compile_state_factory may now
4862 # be creating a totally different SELECT from the one that was
4863 # passed in. for ORM use this will convert from an ORM-state
4864 # SELECT to a regular "Core" SELECT. other composed operations
4865 # such as computation of joins will be performed.
4866
4867 kwargs["within_columns_clause"] = False
4868
4869 compile_state = select_stmt._compile_state_factory(
4870 select_stmt, self, **kwargs
4871 )
4872 kwargs["ambiguous_table_name_map"] = (
4873 compile_state._ambiguous_table_name_map
4874 )
4875
4876 select_stmt = compile_state.statement
4877
4878 toplevel = not self.stack
4879
4880 if toplevel and not self.compile_state:
4881 self.compile_state = compile_state
4882
4883 is_embedded_select = compound_index is not None or insert_into
4884
4885 # translate step for Oracle, SQL Server which often need to
4886 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4887 # other conditions
4888 if self.translate_select_structure:
4889 new_select_stmt = self.translate_select_structure(
4890 select_stmt, asfrom=asfrom, **kwargs
4891 )
4892
4893 # if SELECT was restructured, maintain a link to the originals
4894 # and assemble a new compile state
4895 if new_select_stmt is not select_stmt:
4896 compile_state_wraps_for = compile_state
4897 select_wraps_for = select_stmt
4898 select_stmt = new_select_stmt
4899
4900 compile_state = select_stmt._compile_state_factory(
4901 select_stmt, self, **kwargs
4902 )
4903 select_stmt = compile_state.statement
4904
4905 entry = self._default_stack_entry if toplevel else self.stack[-1]
4906
4907 populate_result_map = need_column_expressions = (
4908 toplevel
4909 or entry.get("need_result_map_for_compound", False)
4910 or entry.get("need_result_map_for_nested", False)
4911 )
4912
4913 # indicates there is a CompoundSelect in play and we are not the
4914 # first select
4915 if compound_index:
4916 populate_result_map = False
4917
4918 # this was first proposed as part of #3372; however, it is not
4919 # reached in current tests and could possibly be an assertion
4920 # instead.
4921 if not populate_result_map and "add_to_result_map" in kwargs:
4922 del kwargs["add_to_result_map"]
4923
4924 froms = self._setup_select_stack(
4925 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4926 )
4927
4928 column_clause_args = kwargs.copy()
4929 column_clause_args.update(
4930 {"within_label_clause": False, "within_columns_clause": False}
4931 )
4932
4933 text = "SELECT " # we're off to a good start !
4934
4935 if select_stmt._post_select_clause is not None:
4936 psc = self.process(select_stmt._post_select_clause, **kwargs)
4937 if psc is not None:
4938 text += psc + " "
4939
4940 if select_stmt._hints:
4941 hint_text, byfrom = self._setup_select_hints(select_stmt)
4942 if hint_text:
4943 text += hint_text + " "
4944 else:
4945 byfrom = None
4946
4947 if select_stmt._independent_ctes:
4948 self._dispatch_independent_ctes(select_stmt, kwargs)
4949
4950 if select_stmt._prefixes:
4951 text += self._generate_prefixes(
4952 select_stmt, select_stmt._prefixes, **kwargs
4953 )
4954
4955 text += self.get_select_precolumns(select_stmt, **kwargs)
4956
4957 if select_stmt._pre_columns_clause is not None:
4958 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
4959 if pcc is not None:
4960 text += pcc + " "
4961
4962 # the actual list of columns to print in the SELECT column list.
4963 inner_columns = [
4964 c
4965 for c in [
4966 self._label_select_column(
4967 select_stmt,
4968 column,
4969 populate_result_map,
4970 asfrom,
4971 column_clause_args,
4972 name=name,
4973 proxy_name=proxy_name,
4974 fallback_label_name=fallback_label_name,
4975 column_is_repeated=repeated,
4976 need_column_expressions=need_column_expressions,
4977 )
4978 for (
4979 name,
4980 proxy_name,
4981 fallback_label_name,
4982 column,
4983 repeated,
4984 ) in compile_state.columns_plus_names
4985 ]
4986 if c is not None
4987 ]
4988
4989 if populate_result_map and select_wraps_for is not None:
4990 # if this select was generated from translate_select,
4991 # rewrite the targeted columns in the result map
4992
4993 translate = dict(
4994 zip(
4995 [
4996 name
4997 for (
4998 key,
4999 proxy_name,
5000 fallback_label_name,
5001 name,
5002 repeated,
5003 ) in compile_state.columns_plus_names
5004 ],
5005 [
5006 name
5007 for (
5008 key,
5009 proxy_name,
5010 fallback_label_name,
5011 name,
5012 repeated,
5013 ) in compile_state_wraps_for.columns_plus_names
5014 ],
5015 )
5016 )
5017
5018 self._result_columns = [
5019 ResultColumnsEntry(
5020 key, name, tuple(translate.get(o, o) for o in obj), type_
5021 )
5022 for key, name, obj, type_ in self._result_columns
5023 ]
5024
5025 text = self._compose_select_body(
5026 text,
5027 select_stmt,
5028 compile_state,
5029 inner_columns,
5030 froms,
5031 byfrom,
5032 toplevel,
5033 kwargs,
5034 )
5035
5036 if select_stmt._post_body_clause is not None:
5037 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5038 if pbc:
5039 text += " " + pbc
5040
5041 if select_stmt._statement_hints:
5042 per_dialect = [
5043 ht
5044 for (dialect_name, ht) in select_stmt._statement_hints
5045 if dialect_name in ("*", self.dialect.name)
5046 ]
5047 if per_dialect:
5048 text += " " + self.get_statement_hint_text(per_dialect)
5049
5050 # In compound query, CTEs are shared at the compound level
5051 if self.ctes and (not is_embedded_select or toplevel):
5052 nesting_level = len(self.stack) if not toplevel else None
5053 text = self._render_cte_clause(nesting_level=nesting_level) + text
5054
5055 if select_stmt._suffixes:
5056 text += " " + self._generate_prefixes(
5057 select_stmt, select_stmt._suffixes, **kwargs
5058 )
5059
5060 self.stack.pop(-1)
5061
5062 return text
5063
5064 def _setup_select_hints(
5065 self, select: Select[Unpack[TupleAny]]
5066 ) -> Tuple[str, _FromHintsType]:
5067 byfrom = {
5068 from_: hinttext
5069 % {"name": from_._compiler_dispatch(self, ashint=True)}
5070 for (from_, dialect), hinttext in select._hints.items()
5071 if dialect in ("*", self.dialect.name)
5072 }
5073 hint_text = self.get_select_hint_text(byfrom)
5074 return hint_text, byfrom
5075
5076 def _setup_select_stack(
5077 self, select, compile_state, entry, asfrom, lateral, compound_index
5078 ):
5079 correlate_froms = entry["correlate_froms"]
5080 asfrom_froms = entry["asfrom_froms"]
5081
5082 if compound_index == 0:
5083 entry["select_0"] = select
5084 elif compound_index:
5085 select_0 = entry["select_0"]
5086 numcols = len(select_0._all_selected_columns)
5087
5088 if len(compile_state.columns_plus_names) != numcols:
5089 raise exc.CompileError(
5090 "All selectables passed to "
5091 "CompoundSelect must have identical numbers of "
5092 "columns; select #%d has %d columns, select "
5093 "#%d has %d"
5094 % (
5095 1,
5096 numcols,
5097 compound_index + 1,
5098 len(select._all_selected_columns),
5099 )
5100 )
5101
5102 if asfrom and not lateral:
5103 froms = compile_state._get_display_froms(
5104 explicit_correlate_froms=correlate_froms.difference(
5105 asfrom_froms
5106 ),
5107 implicit_correlate_froms=(),
5108 )
5109 else:
5110 froms = compile_state._get_display_froms(
5111 explicit_correlate_froms=correlate_froms,
5112 implicit_correlate_froms=asfrom_froms,
5113 )
5114
5115 new_correlate_froms = set(_from_objects(*froms))
5116 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5117
5118 new_entry: _CompilerStackEntry = {
5119 "asfrom_froms": new_correlate_froms,
5120 "correlate_froms": all_correlate_froms,
5121 "selectable": select,
5122 "compile_state": compile_state,
5123 }
5124 self.stack.append(new_entry)
5125
5126 return froms
5127
5128 def _compose_select_body(
5129 self,
5130 text,
5131 select,
5132 compile_state,
5133 inner_columns,
5134 froms,
5135 byfrom,
5136 toplevel,
5137 kwargs,
5138 ):
5139 text += ", ".join(inner_columns)
5140
5141 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5142 from_linter = FromLinter({}, set())
5143 warn_linting = self.linting & WARN_LINTING
5144 if toplevel:
5145 self.from_linter = from_linter
5146 else:
5147 from_linter = None
5148 warn_linting = False
5149
5150 # adjust the whitespace for no inner columns, part of #9440,
5151 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5152 # "SELECT FROM ...".
5153 # while it would be better to have built the SELECT starting string
5154 # without trailing whitespace first, then add whitespace only if inner
5155 # cols were present, this breaks compatibility with various custom
5156 # compilation schemes that are currently being tested.
5157 if not inner_columns:
5158 text = text.rstrip()
5159
5160 if froms:
5161 text += " \nFROM "
5162
5163 if select._hints:
5164 text += ", ".join(
5165 [
5166 f._compiler_dispatch(
5167 self,
5168 asfrom=True,
5169 fromhints=byfrom,
5170 from_linter=from_linter,
5171 **kwargs,
5172 )
5173 for f in froms
5174 ]
5175 )
5176 else:
5177 text += ", ".join(
5178 [
5179 f._compiler_dispatch(
5180 self,
5181 asfrom=True,
5182 from_linter=from_linter,
5183 **kwargs,
5184 )
5185 for f in froms
5186 ]
5187 )
5188 else:
5189 text += self.default_from()
5190
5191 if select._where_criteria:
5192 t = self._generate_delimited_and_list(
5193 select._where_criteria, from_linter=from_linter, **kwargs
5194 )
5195 if t:
5196 text += " \nWHERE " + t
5197
5198 if warn_linting:
5199 assert from_linter is not None
5200 from_linter.warn()
5201
5202 if select._group_by_clauses:
5203 text += self.group_by_clause(select, **kwargs)
5204
5205 if select._having_criteria:
5206 t = self._generate_delimited_and_list(
5207 select._having_criteria, **kwargs
5208 )
5209 if t:
5210 text += " \nHAVING " + t
5211
5212 if select._post_criteria_clause is not None:
5213 pcc = self.process(select._post_criteria_clause, **kwargs)
5214 if pcc is not None:
5215 text += " \n" + pcc
5216
5217 if select._order_by_clauses:
5218 text += self.order_by_clause(select, **kwargs)
5219
5220 if select._has_row_limiting_clause:
5221 text += self._row_limit_clause(select, **kwargs)
5222
5223 if select._for_update_arg is not None:
5224 text += self.for_update_clause(select, **kwargs)
5225
5226 return text
5227
5228 def _generate_prefixes(self, stmt, prefixes, **kw):
5229 clause = " ".join(
5230 prefix._compiler_dispatch(self, **kw)
5231 for prefix, dialect_name in prefixes
5232 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5233 )
5234 if clause:
5235 clause += " "
5236 return clause
5237
5238 def _render_cte_clause(
5239 self,
5240 nesting_level=None,
5241 include_following_stack=False,
5242 ):
5243 """
5244 include_following_stack
5245 Also render the nesting CTEs on the next stack. Useful for
5246 SQL structures like UNION or INSERT that can wrap SELECT
5247 statements containing nesting CTEs.
5248 """
5249 if not self.ctes:
5250 return ""
5251
5252 ctes: MutableMapping[CTE, str]
5253
5254 if nesting_level and nesting_level > 1:
5255 ctes = util.OrderedDict()
5256 for cte in list(self.ctes.keys()):
5257 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5258 cte._get_reference_cte()
5259 ]
5260 nesting = cte.nesting or cte_opts.nesting
5261 is_rendered_level = cte_level == nesting_level or (
5262 include_following_stack and cte_level == nesting_level + 1
5263 )
5264 if not (nesting and is_rendered_level):
5265 continue
5266
5267 ctes[cte] = self.ctes[cte]
5268
5269 else:
5270 ctes = self.ctes
5271
5272 if not ctes:
5273 return ""
5274 ctes_recursive = any([cte.recursive for cte in ctes])
5275
5276 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5277 cte_text += ", \n".join([txt for txt in ctes.values()])
5278 cte_text += "\n "
5279
5280 if nesting_level and nesting_level > 1:
5281 for cte in list(ctes.keys()):
5282 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5283 cte._get_reference_cte()
5284 ]
5285 del self.ctes[cte]
5286 del self.ctes_by_level_name[(cte_level, cte_name)]
5287 del self.level_name_by_cte[cte._get_reference_cte()]
5288
5289 return cte_text
5290
5291 def get_cte_preamble(self, recursive):
5292 if recursive:
5293 return "WITH RECURSIVE"
5294 else:
5295 return "WITH"
5296
5297 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5298 """Called when building a ``SELECT`` statement, position is just
5299 before column list.
5300
5301 """
5302 if select._distinct_on:
5303 util.warn_deprecated(
5304 "DISTINCT ON is currently supported only by the PostgreSQL "
5305 "dialect. Use of DISTINCT ON for other backends is currently "
5306 "silently ignored, however this usage is deprecated, and will "
5307 "raise CompileError in a future release for all backends "
5308 "that do not support this syntax.",
5309 version="1.4",
5310 )
5311 return "DISTINCT " if select._distinct else ""
5312
5313 def group_by_clause(self, select, **kw):
5314 """allow dialects to customize how GROUP BY is rendered."""
5315
5316 group_by = self._generate_delimited_list(
5317 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5318 )
5319 if group_by:
5320 return " GROUP BY " + group_by
5321 else:
5322 return ""
5323
5324 def order_by_clause(self, select, **kw):
5325 """allow dialects to customize how ORDER BY is rendered."""
5326
5327 order_by = self._generate_delimited_list(
5328 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5329 )
5330
5331 if order_by:
5332 return " ORDER BY " + order_by
5333 else:
5334 return ""
5335
5336 def for_update_clause(self, select, **kw):
5337 return " FOR UPDATE"
5338
5339 def returning_clause(
5340 self,
5341 stmt: UpdateBase,
5342 returning_cols: Sequence[_ColumnsClauseElement],
5343 *,
5344 populate_result_map: bool,
5345 **kw: Any,
5346 ) -> str:
5347 columns = [
5348 self._label_returning_column(
5349 stmt,
5350 column,
5351 populate_result_map,
5352 fallback_label_name=fallback_label_name,
5353 column_is_repeated=repeated,
5354 name=name,
5355 proxy_name=proxy_name,
5356 **kw,
5357 )
5358 for (
5359 name,
5360 proxy_name,
5361 fallback_label_name,
5362 column,
5363 repeated,
5364 ) in stmt._generate_columns_plus_names(
5365 True, cols=base._select_iterables(returning_cols)
5366 )
5367 ]
5368
5369 return "RETURNING " + ", ".join(columns)
5370
5371 def limit_clause(self, select, **kw):
5372 text = ""
5373 if select._limit_clause is not None:
5374 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5375 if select._offset_clause is not None:
5376 if select._limit_clause is None:
5377 text += "\n LIMIT -1"
5378 text += " OFFSET " + self.process(select._offset_clause, **kw)
5379 return text
5380
5381 def fetch_clause(
5382 self,
5383 select,
5384 fetch_clause=None,
5385 require_offset=False,
5386 use_literal_execute_for_simple_int=False,
5387 **kw,
5388 ):
5389 if fetch_clause is None:
5390 fetch_clause = select._fetch_clause
5391 fetch_clause_options = select._fetch_clause_options
5392 else:
5393 fetch_clause_options = {"percent": False, "with_ties": False}
5394
5395 text = ""
5396
5397 if select._offset_clause is not None:
5398 offset_clause = select._offset_clause
5399 if (
5400 use_literal_execute_for_simple_int
5401 and select._simple_int_clause(offset_clause)
5402 ):
5403 offset_clause = offset_clause.render_literal_execute()
5404 offset_str = self.process(offset_clause, **kw)
5405 text += "\n OFFSET %s ROWS" % offset_str
5406 elif require_offset:
5407 text += "\n OFFSET 0 ROWS"
5408
5409 if fetch_clause is not None:
5410 if (
5411 use_literal_execute_for_simple_int
5412 and select._simple_int_clause(fetch_clause)
5413 ):
5414 fetch_clause = fetch_clause.render_literal_execute()
5415 text += "\n FETCH FIRST %s%s ROWS %s" % (
5416 self.process(fetch_clause, **kw),
5417 " PERCENT" if fetch_clause_options["percent"] else "",
5418 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5419 )
5420 return text
5421
5422 def visit_table(
5423 self,
5424 table,
5425 asfrom=False,
5426 iscrud=False,
5427 ashint=False,
5428 fromhints=None,
5429 use_schema=True,
5430 from_linter=None,
5431 ambiguous_table_name_map=None,
5432 enclosing_alias=None,
5433 **kwargs,
5434 ):
5435 if from_linter:
5436 from_linter.froms[table] = table.fullname
5437
5438 if asfrom or ashint:
5439 effective_schema = self.preparer.schema_for_object(table)
5440
5441 if use_schema and effective_schema:
5442 ret = (
5443 self.preparer.quote_schema(effective_schema)
5444 + "."
5445 + self.preparer.quote(table.name)
5446 )
5447 else:
5448 ret = self.preparer.quote(table.name)
5449
5450 if (
5451 (
5452 enclosing_alias is None
5453 or enclosing_alias.element is not table
5454 )
5455 and not effective_schema
5456 and ambiguous_table_name_map
5457 and table.name in ambiguous_table_name_map
5458 ):
5459 anon_name = self._truncated_identifier(
5460 "alias", ambiguous_table_name_map[table.name]
5461 )
5462
5463 ret = ret + self.get_render_as_alias_suffix(
5464 self.preparer.format_alias(None, anon_name)
5465 )
5466
5467 if fromhints and table in fromhints:
5468 ret = self.format_from_hint_text(
5469 ret, table, fromhints[table], iscrud
5470 )
5471 return ret
5472 else:
5473 return ""
5474
5475 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5476 if from_linter:
5477 from_linter.edges.update(
5478 itertools.product(
5479 _de_clone(join.left._from_objects),
5480 _de_clone(join.right._from_objects),
5481 )
5482 )
5483
5484 if join.full:
5485 join_type = " FULL OUTER JOIN "
5486 elif join.isouter:
5487 join_type = " LEFT OUTER JOIN "
5488 else:
5489 join_type = " JOIN "
5490 return (
5491 join.left._compiler_dispatch(
5492 self, asfrom=True, from_linter=from_linter, **kwargs
5493 )
5494 + join_type
5495 + join.right._compiler_dispatch(
5496 self, asfrom=True, from_linter=from_linter, **kwargs
5497 )
5498 + " ON "
5499 # TODO: likely need asfrom=True here?
5500 + join.onclause._compiler_dispatch(
5501 self, from_linter=from_linter, **kwargs
5502 )
5503 )
5504
5505 def _setup_crud_hints(self, stmt, table_text):
5506 dialect_hints = {
5507 table: hint_text
5508 for (table, dialect), hint_text in stmt._hints.items()
5509 if dialect in ("*", self.dialect.name)
5510 }
5511 if stmt.table in dialect_hints:
5512 table_text = self.format_from_hint_text(
5513 table_text, stmt.table, dialect_hints[stmt.table], True
5514 )
5515 return dialect_hints, table_text
5516
5517 # within the realm of "insertmanyvalues sentinel columns",
5518 # these lookups match different kinds of Column() configurations
5519 # to specific backend capabilities. they are broken into two
5520 # lookups, one for autoincrement columns and the other for non
5521 # autoincrement columns
5522 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5523 {
5524 _SentinelDefaultCharacterization.CLIENTSIDE: (
5525 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5526 ),
5527 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5528 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5529 ),
5530 _SentinelDefaultCharacterization.NONE: (
5531 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5532 ),
5533 _SentinelDefaultCharacterization.IDENTITY: (
5534 InsertmanyvaluesSentinelOpts.IDENTITY
5535 ),
5536 _SentinelDefaultCharacterization.SEQUENCE: (
5537 InsertmanyvaluesSentinelOpts.SEQUENCE
5538 ),
5539 }
5540 )
5541 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5542 {
5543 _SentinelDefaultCharacterization.NONE: (
5544 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5545 ),
5546 }
5547 )
5548
5549 def _get_sentinel_column_for_table(
5550 self, table: Table
5551 ) -> Optional[Sequence[Column[Any]]]:
5552 """given a :class:`.Table`, return a usable sentinel column or
5553 columns for this dialect if any.
5554
5555 Return None if no sentinel columns could be identified, or raise an
5556 error if a column was marked as a sentinel explicitly but isn't
5557 compatible with this dialect.
5558
5559 """
5560
5561 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5562 sentinel_characteristics = table._sentinel_column_characteristics
5563
5564 sent_cols = sentinel_characteristics.columns
5565
5566 if sent_cols is None:
5567 return None
5568
5569 if sentinel_characteristics.is_autoinc:
5570 bitmask = self._sentinel_col_autoinc_lookup.get(
5571 sentinel_characteristics.default_characterization, 0
5572 )
5573 else:
5574 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5575 sentinel_characteristics.default_characterization, 0
5576 )
5577
5578 if sentinel_opts & bitmask:
5579 return sent_cols
5580
5581 if sentinel_characteristics.is_explicit:
5582 # a column was explicitly marked as insert_sentinel=True,
5583 # however it is not compatible with this dialect. they should
5584 # not indicate this column as a sentinel if they need to include
5585 # this dialect.
5586
5587 # TODO: do we want non-primary key explicit sentinel cols
5588 # that can gracefully degrade for some backends?
5589 # insert_sentinel="degrade" perhaps. not for the initial release.
5590 # I am hoping people are generally not dealing with this sentinel
5591 # business at all.
5592
5593 # if is_explicit is True, there will be only one sentinel column.
5594
5595 raise exc.InvalidRequestError(
5596 f"Column {sent_cols[0]} can't be explicitly "
5597 "marked as a sentinel column when using the "
5598 f"{self.dialect.name} dialect, as the "
5599 "particular type of default generation on this column is "
5600 "not currently compatible with this dialect's specific "
5601 f"INSERT..RETURNING syntax which can receive the "
5602 "server-generated value in "
5603 "a deterministic way. To remove this error, remove "
5604 "insert_sentinel=True from primary key autoincrement "
5605 "columns; these columns are automatically used as "
5606 "sentinels for supported dialects in any case."
5607 )
5608
5609 return None
5610
5611 def _deliver_insertmanyvalues_batches(
5612 self,
5613 statement: str,
5614 parameters: _DBAPIMultiExecuteParams,
5615 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5616 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5617 batch_size: int,
5618 sort_by_parameter_order: bool,
5619 schema_translate_map: Optional[SchemaTranslateMapType],
5620 ) -> Iterator[_InsertManyValuesBatch]:
5621 imv = self._insertmanyvalues
5622 assert imv is not None
5623
5624 if not imv.sentinel_param_keys:
5625 _sentinel_from_params = None
5626 else:
5627 _sentinel_from_params = operator.itemgetter(
5628 *imv.sentinel_param_keys
5629 )
5630
5631 lenparams = len(parameters)
5632 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5633 # backend doesn't support
5634 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5635 # at the moment this is basically SQL Server due to
5636 # not being able to use DEFAULT for identity column
5637 # just yield out that many single statements! still
5638 # faster than a whole connection.execute() call ;)
5639 #
5640 # note we still are taking advantage of the fact that we know
5641 # we are using RETURNING. The generalized approach of fetching
5642 # cursor.lastrowid etc. still goes through the more heavyweight
5643 # "ExecutionContext per statement" system as it isn't usable
5644 # as a generic "RETURNING" approach
5645 use_row_at_a_time = True
5646 downgraded = False
5647 elif not self.dialect.supports_multivalues_insert or (
5648 sort_by_parameter_order
5649 and self._result_columns
5650 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5651 ):
5652 # deterministic order was requested and the compiler could
5653 # not organize sentinel columns for this dialect/statement.
5654 # use row at a time
5655 use_row_at_a_time = True
5656 downgraded = True
5657 else:
5658 use_row_at_a_time = False
5659 downgraded = False
5660
5661 if use_row_at_a_time:
5662 for batchnum, (param, compiled_param) in enumerate(
5663 cast(
5664 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5665 zip(parameters, compiled_parameters),
5666 ),
5667 1,
5668 ):
5669 yield _InsertManyValuesBatch(
5670 statement,
5671 param,
5672 generic_setinputsizes,
5673 [param],
5674 (
5675 [_sentinel_from_params(compiled_param)]
5676 if _sentinel_from_params
5677 else []
5678 ),
5679 1,
5680 batchnum,
5681 lenparams,
5682 sort_by_parameter_order,
5683 downgraded,
5684 )
5685 return
5686
5687 if schema_translate_map:
5688 rst = functools.partial(
5689 self.preparer._render_schema_translates,
5690 schema_translate_map=schema_translate_map,
5691 )
5692 else:
5693 rst = None
5694
5695 imv_single_values_expr = imv.single_values_expr
5696 if rst:
5697 imv_single_values_expr = rst(imv_single_values_expr)
5698
5699 executemany_values = f"({imv_single_values_expr})"
5700 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5701
5702 # Use optional insertmanyvalues_max_parameters
5703 # to further shrink the batch size so that there are no more than
5704 # insertmanyvalues_max_parameters params.
5705 # Currently used by SQL Server, which limits statements to 2100 bound
5706 # parameters (actually 2099).
5707 max_params = self.dialect.insertmanyvalues_max_parameters
5708 if max_params:
5709 total_num_of_params = len(self.bind_names)
5710 num_params_per_batch = len(imv.insert_crud_params)
5711 num_params_outside_of_batch = (
5712 total_num_of_params - num_params_per_batch
5713 )
5714 batch_size = min(
5715 batch_size,
5716 (
5717 (max_params - num_params_outside_of_batch)
5718 // num_params_per_batch
5719 ),
5720 )
5721
5722 batches = cast("List[Sequence[Any]]", list(parameters))
5723 compiled_batches = cast(
5724 "List[Sequence[Any]]", list(compiled_parameters)
5725 )
5726
5727 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5728 batchnum = 1
5729 total_batches = lenparams // batch_size + (
5730 1 if lenparams % batch_size else 0
5731 )
5732
5733 insert_crud_params = imv.insert_crud_params
5734 assert insert_crud_params is not None
5735
5736 if rst:
5737 insert_crud_params = [
5738 (col, key, rst(expr), st)
5739 for col, key, expr, st in insert_crud_params
5740 ]
5741
5742 escaped_bind_names: Mapping[str, str]
5743 expand_pos_lower_index = expand_pos_upper_index = 0
5744
5745 if not self.positional:
5746 if self.escaped_bind_names:
5747 escaped_bind_names = self.escaped_bind_names
5748 else:
5749 escaped_bind_names = {}
5750
5751 all_keys = set(parameters[0])
5752
5753 def apply_placeholders(keys, formatted):
5754 for key in keys:
5755 key = escaped_bind_names.get(key, key)
5756 formatted = formatted.replace(
5757 self.bindtemplate % {"name": key},
5758 self.bindtemplate
5759 % {"name": f"{key}__EXECMANY_INDEX__"},
5760 )
5761 return formatted
5762
5763 if imv.embed_values_counter:
5764 imv_values_counter = ", _IMV_VALUES_COUNTER"
5765 else:
5766 imv_values_counter = ""
5767 formatted_values_clause = f"""({', '.join(
5768 apply_placeholders(bind_keys, formatted)
5769 for _, _, formatted, bind_keys in insert_crud_params
5770 )}{imv_values_counter})"""
5771
5772 keys_to_replace = all_keys.intersection(
5773 escaped_bind_names.get(key, key)
5774 for _, _, _, bind_keys in insert_crud_params
5775 for key in bind_keys
5776 )
5777 base_parameters = {
5778 key: parameters[0][key]
5779 for key in all_keys.difference(keys_to_replace)
5780 }
5781 executemany_values_w_comma = ""
5782 else:
5783 formatted_values_clause = ""
5784 keys_to_replace = set()
5785 base_parameters = {}
5786
5787 if imv.embed_values_counter:
5788 executemany_values_w_comma = (
5789 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5790 )
5791 else:
5792 executemany_values_w_comma = f"({imv_single_values_expr}), "
5793
5794 all_names_we_will_expand: Set[str] = set()
5795 for elem in imv.insert_crud_params:
5796 all_names_we_will_expand.update(elem[3])
5797
5798 # get the start and end position in a particular list
5799 # of parameters where we will be doing the "expanding".
5800 # statements can have params on either side or both sides,
5801 # given RETURNING and CTEs
5802 if all_names_we_will_expand:
5803 positiontup = self.positiontup
5804 assert positiontup is not None
5805
5806 all_expand_positions = {
5807 idx
5808 for idx, name in enumerate(positiontup)
5809 if name in all_names_we_will_expand
5810 }
5811 expand_pos_lower_index = min(all_expand_positions)
5812 expand_pos_upper_index = max(all_expand_positions) + 1
5813 assert (
5814 len(all_expand_positions)
5815 == expand_pos_upper_index - expand_pos_lower_index
5816 )
5817
5818 if self._numeric_binds:
5819 escaped = re.escape(self._numeric_binds_identifier_char)
5820 executemany_values_w_comma = re.sub(
5821 rf"{escaped}\d+", "%s", executemany_values_w_comma
5822 )
5823
5824 while batches:
5825 batch = batches[0:batch_size]
5826 compiled_batch = compiled_batches[0:batch_size]
5827
5828 batches[0:batch_size] = []
5829 compiled_batches[0:batch_size] = []
5830
5831 if batches:
5832 current_batch_size = batch_size
5833 else:
5834 current_batch_size = len(batch)
5835
5836 if generic_setinputsizes:
5837 # if setinputsizes is present, expand this collection to
5838 # suit the batch length as well
5839 # currently this will be mssql+pyodbc for internal dialects
5840 processed_setinputsizes = [
5841 (new_key, len_, typ)
5842 for new_key, len_, typ in (
5843 (f"{key}_{index}", len_, typ)
5844 for index in range(current_batch_size)
5845 for key, len_, typ in generic_setinputsizes
5846 )
5847 ]
5848
5849 replaced_parameters: Any
5850 if self.positional:
5851 num_ins_params = imv.num_positional_params_counted
5852
5853 batch_iterator: Iterable[Sequence[Any]]
5854 extra_params_left: Sequence[Any]
5855 extra_params_right: Sequence[Any]
5856
5857 if num_ins_params == len(batch[0]):
5858 extra_params_left = extra_params_right = ()
5859 batch_iterator = batch
5860 else:
5861 extra_params_left = batch[0][:expand_pos_lower_index]
5862 extra_params_right = batch[0][expand_pos_upper_index:]
5863 batch_iterator = (
5864 b[expand_pos_lower_index:expand_pos_upper_index]
5865 for b in batch
5866 )
5867
5868 if imv.embed_values_counter:
5869 expanded_values_string = (
5870 "".join(
5871 executemany_values_w_comma.replace(
5872 "_IMV_VALUES_COUNTER", str(i)
5873 )
5874 for i, _ in enumerate(batch)
5875 )
5876 )[:-2]
5877 else:
5878 expanded_values_string = (
5879 (executemany_values_w_comma * current_batch_size)
5880 )[:-2]
5881
5882 if self._numeric_binds and num_ins_params > 0:
5883 # numeric will always number the parameters inside of
5884 # VALUES (and thus order self.positiontup) to be higher
5885 # than non-VALUES parameters, no matter where in the
5886 # statement those non-VALUES parameters appear (this is
5887 # ensured in _process_numeric by numbering first all
5888 # params that are not in _values_bindparam)
5889 # therefore all extra params are always
5890 # on the left side and numbered lower than the VALUES
5891 # parameters
5892 assert not extra_params_right
5893
5894 start = expand_pos_lower_index + 1
5895 end = num_ins_params * (current_batch_size) + start
5896
5897 # need to format here, since statement may contain
5898 # unescaped %, while values_string contains just (%s, %s)
5899 positions = tuple(
5900 f"{self._numeric_binds_identifier_char}{i}"
5901 for i in range(start, end)
5902 )
5903 expanded_values_string = expanded_values_string % positions
5904
5905 replaced_statement = statement.replace(
5906 "__EXECMANY_TOKEN__", expanded_values_string
5907 )
5908
5909 replaced_parameters = tuple(
5910 itertools.chain.from_iterable(batch_iterator)
5911 )
5912
5913 replaced_parameters = (
5914 extra_params_left
5915 + replaced_parameters
5916 + extra_params_right
5917 )
5918
5919 else:
5920 replaced_values_clauses = []
5921 replaced_parameters = base_parameters.copy()
5922
5923 for i, param in enumerate(batch):
5924 fmv = formatted_values_clause.replace(
5925 "EXECMANY_INDEX__", str(i)
5926 )
5927 if imv.embed_values_counter:
5928 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5929
5930 replaced_values_clauses.append(fmv)
5931 replaced_parameters.update(
5932 {f"{key}__{i}": param[key] for key in keys_to_replace}
5933 )
5934
5935 replaced_statement = statement.replace(
5936 "__EXECMANY_TOKEN__",
5937 ", ".join(replaced_values_clauses),
5938 )
5939
5940 yield _InsertManyValuesBatch(
5941 replaced_statement,
5942 replaced_parameters,
5943 processed_setinputsizes,
5944 batch,
5945 (
5946 [_sentinel_from_params(cb) for cb in compiled_batch]
5947 if _sentinel_from_params
5948 else []
5949 ),
5950 current_batch_size,
5951 batchnum,
5952 total_batches,
5953 sort_by_parameter_order,
5954 False,
5955 )
5956 batchnum += 1
5957
5958 def visit_insert(
5959 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5960 ):
5961 compile_state = insert_stmt._compile_state_factory(
5962 insert_stmt, self, **kw
5963 )
5964 insert_stmt = compile_state.statement
5965
5966 if visiting_cte is not None:
5967 kw["visiting_cte"] = visiting_cte
5968 toplevel = False
5969 else:
5970 toplevel = not self.stack
5971
5972 if toplevel:
5973 self.isinsert = True
5974 if not self.dml_compile_state:
5975 self.dml_compile_state = compile_state
5976 if not self.compile_state:
5977 self.compile_state = compile_state
5978
5979 self.stack.append(
5980 {
5981 "correlate_froms": set(),
5982 "asfrom_froms": set(),
5983 "selectable": insert_stmt,
5984 }
5985 )
5986
5987 counted_bindparam = 0
5988
5989 # reset any incoming "visited_bindparam" collection
5990 visited_bindparam = None
5991
5992 # for positional, insertmanyvalues needs to know how many
5993 # bound parameters are in the VALUES sequence; there's no simple
5994 # rule because default expressions etc. can have zero or more
5995 # params inside them. After multiple attempts to figure this out,
5996 # this very simplistic "count after" works and is
5997 # likely the least amount of callcounts, though looks clumsy
5998 if self.positional and visiting_cte is None:
5999 # if we are inside a CTE, don't count parameters
6000 # here since they wont be for insertmanyvalues. keep
6001 # visited_bindparam at None so no counting happens.
6002 # see #9173
6003 visited_bindparam = []
6004
6005 crud_params_struct = crud._get_crud_params(
6006 self,
6007 insert_stmt,
6008 compile_state,
6009 toplevel,
6010 visited_bindparam=visited_bindparam,
6011 **kw,
6012 )
6013
6014 if self.positional and visited_bindparam is not None:
6015 counted_bindparam = len(visited_bindparam)
6016 if self._numeric_binds:
6017 if self._values_bindparam is not None:
6018 self._values_bindparam += visited_bindparam
6019 else:
6020 self._values_bindparam = visited_bindparam
6021
6022 crud_params_single = crud_params_struct.single_params
6023
6024 if (
6025 not crud_params_single
6026 and not self.dialect.supports_default_values
6027 and not self.dialect.supports_default_metavalue
6028 and not self.dialect.supports_empty_insert
6029 ):
6030 raise exc.CompileError(
6031 "The '%s' dialect with current database "
6032 "version settings does not support empty "
6033 "inserts." % self.dialect.name
6034 )
6035
6036 if compile_state._has_multi_parameters:
6037 if not self.dialect.supports_multivalues_insert:
6038 raise exc.CompileError(
6039 "The '%s' dialect with current database "
6040 "version settings does not support "
6041 "in-place multirow inserts." % self.dialect.name
6042 )
6043 elif (
6044 self.implicit_returning or insert_stmt._returning
6045 ) and insert_stmt._sort_by_parameter_order:
6046 raise exc.CompileError(
6047 "RETURNING cannot be determinstically sorted when "
6048 "using an INSERT which includes multi-row values()."
6049 )
6050 crud_params_single = crud_params_struct.single_params
6051 else:
6052 crud_params_single = crud_params_struct.single_params
6053
6054 preparer = self.preparer
6055 supports_default_values = self.dialect.supports_default_values
6056
6057 text = "INSERT "
6058
6059 if insert_stmt._prefixes:
6060 text += self._generate_prefixes(
6061 insert_stmt, insert_stmt._prefixes, **kw
6062 )
6063
6064 text += "INTO "
6065 table_text = preparer.format_table(insert_stmt.table)
6066
6067 if insert_stmt._hints:
6068 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6069
6070 if insert_stmt._independent_ctes:
6071 self._dispatch_independent_ctes(insert_stmt, kw)
6072
6073 text += table_text
6074
6075 if crud_params_single or not supports_default_values:
6076 text += " (%s)" % ", ".join(
6077 [expr for _, expr, _, _ in crud_params_single]
6078 )
6079
6080 # look for insertmanyvalues attributes that would have been configured
6081 # by crud.py as it scanned through the columns to be part of the
6082 # INSERT
6083 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6084 named_sentinel_params: Optional[Sequence[str]] = None
6085 add_sentinel_cols = None
6086 implicit_sentinel = False
6087
6088 returning_cols = self.implicit_returning or insert_stmt._returning
6089 if returning_cols:
6090 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6091 if add_sentinel_cols is not None:
6092 assert use_insertmanyvalues
6093
6094 # search for the sentinel column explicitly present
6095 # in the INSERT columns list, and additionally check that
6096 # this column has a bound parameter name set up that's in the
6097 # parameter list. If both of these cases are present, it means
6098 # we will have a client side value for the sentinel in each
6099 # parameter set.
6100
6101 _params_by_col = {
6102 col: param_names
6103 for col, _, _, param_names in crud_params_single
6104 }
6105 named_sentinel_params = []
6106 for _add_sentinel_col in add_sentinel_cols:
6107 if _add_sentinel_col not in _params_by_col:
6108 named_sentinel_params = None
6109 break
6110 param_name = self._within_exec_param_key_getter(
6111 _add_sentinel_col
6112 )
6113 if param_name not in _params_by_col[_add_sentinel_col]:
6114 named_sentinel_params = None
6115 break
6116 named_sentinel_params.append(param_name)
6117
6118 if named_sentinel_params is None:
6119 # if we are not going to have a client side value for
6120 # the sentinel in the parameter set, that means it's
6121 # an autoincrement, an IDENTITY, or a server-side SQL
6122 # expression like nextval('seqname'). So this is
6123 # an "implicit" sentinel; we will look for it in
6124 # RETURNING
6125 # only, and then sort on it. For this case on PG,
6126 # SQL Server we have to use a special INSERT form
6127 # that guarantees the server side function lines up with
6128 # the entries in the VALUES.
6129 if (
6130 self.dialect.insertmanyvalues_implicit_sentinel
6131 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6132 ):
6133 implicit_sentinel = True
6134 else:
6135 # here, we are not using a sentinel at all
6136 # and we are likely the SQLite dialect.
6137 # The first add_sentinel_col that we have should not
6138 # be marked as "insert_sentinel=True". if it was,
6139 # an error should have been raised in
6140 # _get_sentinel_column_for_table.
6141 assert not add_sentinel_cols[0]._insert_sentinel, (
6142 "sentinel selection rules should have prevented "
6143 "us from getting here for this dialect"
6144 )
6145
6146 # always put the sentinel columns last. even if they are
6147 # in the returning list already, they will be there twice
6148 # then.
6149 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6150
6151 returning_clause = self.returning_clause(
6152 insert_stmt,
6153 returning_cols,
6154 populate_result_map=toplevel,
6155 )
6156
6157 if self.returning_precedes_values:
6158 text += " " + returning_clause
6159
6160 else:
6161 returning_clause = None
6162
6163 if insert_stmt.select is not None:
6164 # placed here by crud.py
6165 select_text = self.process(
6166 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6167 )
6168
6169 if self.ctes and self.dialect.cte_follows_insert:
6170 nesting_level = len(self.stack) if not toplevel else None
6171 text += " %s%s" % (
6172 self._render_cte_clause(
6173 nesting_level=nesting_level,
6174 include_following_stack=True,
6175 ),
6176 select_text,
6177 )
6178 else:
6179 text += " %s" % select_text
6180 elif not crud_params_single and supports_default_values:
6181 text += " DEFAULT VALUES"
6182 if use_insertmanyvalues:
6183 self._insertmanyvalues = _InsertManyValues(
6184 True,
6185 self.dialect.default_metavalue_token,
6186 cast(
6187 "List[crud._CrudParamElementStr]", crud_params_single
6188 ),
6189 counted_bindparam,
6190 sort_by_parameter_order=(
6191 insert_stmt._sort_by_parameter_order
6192 ),
6193 includes_upsert_behaviors=(
6194 insert_stmt._post_values_clause is not None
6195 ),
6196 sentinel_columns=add_sentinel_cols,
6197 num_sentinel_columns=(
6198 len(add_sentinel_cols) if add_sentinel_cols else 0
6199 ),
6200 implicit_sentinel=implicit_sentinel,
6201 )
6202 elif compile_state._has_multi_parameters:
6203 text += " VALUES %s" % (
6204 ", ".join(
6205 "(%s)"
6206 % (", ".join(value for _, _, value, _ in crud_param_set))
6207 for crud_param_set in crud_params_struct.all_multi_params
6208 ),
6209 )
6210 else:
6211 insert_single_values_expr = ", ".join(
6212 [
6213 value
6214 for _, _, value, _ in cast(
6215 "List[crud._CrudParamElementStr]",
6216 crud_params_single,
6217 )
6218 ]
6219 )
6220
6221 if use_insertmanyvalues:
6222 if (
6223 implicit_sentinel
6224 and (
6225 self.dialect.insertmanyvalues_implicit_sentinel
6226 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6227 )
6228 # this is checking if we have
6229 # INSERT INTO table (id) VALUES (DEFAULT).
6230 and not (crud_params_struct.is_default_metavalue_only)
6231 ):
6232 # if we have a sentinel column that is server generated,
6233 # then for selected backends render the VALUES list as a
6234 # subquery. This is the orderable form supported by
6235 # PostgreSQL and SQL Server.
6236 embed_sentinel_value = True
6237
6238 render_bind_casts = (
6239 self.dialect.insertmanyvalues_implicit_sentinel
6240 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6241 )
6242
6243 colnames = ", ".join(
6244 f"p{i}" for i, _ in enumerate(crud_params_single)
6245 )
6246
6247 if render_bind_casts:
6248 # render casts for the SELECT list. For PG, we are
6249 # already rendering bind casts in the parameter list,
6250 # selectively for the more "tricky" types like ARRAY.
6251 # however, even for the "easy" types, if the parameter
6252 # is NULL for every entry, PG gives up and says
6253 # "it must be TEXT", which fails for other easy types
6254 # like ints. So we cast on this side too.
6255 colnames_w_cast = ", ".join(
6256 self.render_bind_cast(
6257 col.type,
6258 col.type._unwrapped_dialect_impl(self.dialect),
6259 f"p{i}",
6260 )
6261 for i, (col, *_) in enumerate(crud_params_single)
6262 )
6263 else:
6264 colnames_w_cast = colnames
6265
6266 text += (
6267 f" SELECT {colnames_w_cast} FROM "
6268 f"(VALUES ({insert_single_values_expr})) "
6269 f"AS imp_sen({colnames}, sen_counter) "
6270 "ORDER BY sen_counter"
6271 )
6272 else:
6273 # otherwise, if no sentinel or backend doesn't support
6274 # orderable subquery form, use a plain VALUES list
6275 embed_sentinel_value = False
6276 text += f" VALUES ({insert_single_values_expr})"
6277
6278 self._insertmanyvalues = _InsertManyValues(
6279 is_default_expr=False,
6280 single_values_expr=insert_single_values_expr,
6281 insert_crud_params=cast(
6282 "List[crud._CrudParamElementStr]",
6283 crud_params_single,
6284 ),
6285 num_positional_params_counted=counted_bindparam,
6286 sort_by_parameter_order=(
6287 insert_stmt._sort_by_parameter_order
6288 ),
6289 includes_upsert_behaviors=(
6290 insert_stmt._post_values_clause is not None
6291 ),
6292 sentinel_columns=add_sentinel_cols,
6293 num_sentinel_columns=(
6294 len(add_sentinel_cols) if add_sentinel_cols else 0
6295 ),
6296 sentinel_param_keys=named_sentinel_params,
6297 implicit_sentinel=implicit_sentinel,
6298 embed_values_counter=embed_sentinel_value,
6299 )
6300
6301 else:
6302 text += f" VALUES ({insert_single_values_expr})"
6303
6304 if insert_stmt._post_values_clause is not None:
6305 post_values_clause = self.process(
6306 insert_stmt._post_values_clause, **kw
6307 )
6308 if post_values_clause:
6309 text += " " + post_values_clause
6310
6311 if returning_clause and not self.returning_precedes_values:
6312 text += " " + returning_clause
6313
6314 if self.ctes and not self.dialect.cte_follows_insert:
6315 nesting_level = len(self.stack) if not toplevel else None
6316 text = (
6317 self._render_cte_clause(
6318 nesting_level=nesting_level,
6319 include_following_stack=True,
6320 )
6321 + text
6322 )
6323
6324 self.stack.pop(-1)
6325
6326 return text
6327
6328 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6329 """Provide a hook to override the initial table clause
6330 in an UPDATE statement.
6331
6332 MySQL overrides this.
6333
6334 """
6335 kw["asfrom"] = True
6336 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6337
6338 def update_from_clause(
6339 self, update_stmt, from_table, extra_froms, from_hints, **kw
6340 ):
6341 """Provide a hook to override the generation of an
6342 UPDATE..FROM clause.
6343 MySQL and MSSQL override this.
6344 """
6345 raise NotImplementedError(
6346 "This backend does not support multiple-table "
6347 "criteria within UPDATE"
6348 )
6349
6350 def update_post_criteria_clause(
6351 self, update_stmt: Update, **kw: Any
6352 ) -> Optional[str]:
6353 """provide a hook to override generation after the WHERE criteria
6354 in an UPDATE statement
6355
6356 .. versionadded:: 2.1
6357
6358 """
6359 if update_stmt._post_criteria_clause is not None:
6360 return self.process(
6361 update_stmt._post_criteria_clause,
6362 **kw,
6363 )
6364 else:
6365 return None
6366
6367 def delete_post_criteria_clause(
6368 self, delete_stmt: Delete, **kw: Any
6369 ) -> Optional[str]:
6370 """provide a hook to override generation after the WHERE criteria
6371 in a DELETE statement
6372
6373 .. versionadded:: 2.1
6374
6375 """
6376 if delete_stmt._post_criteria_clause is not None:
6377 return self.process(
6378 delete_stmt._post_criteria_clause,
6379 **kw,
6380 )
6381 else:
6382 return None
6383
6384 def visit_update(
6385 self,
6386 update_stmt: Update,
6387 visiting_cte: Optional[CTE] = None,
6388 **kw: Any,
6389 ) -> str:
6390 compile_state = update_stmt._compile_state_factory(
6391 update_stmt, self, **kw
6392 )
6393 if TYPE_CHECKING:
6394 assert isinstance(compile_state, UpdateDMLState)
6395 update_stmt = compile_state.statement # type: ignore[assignment]
6396
6397 if visiting_cte is not None:
6398 kw["visiting_cte"] = visiting_cte
6399 toplevel = False
6400 else:
6401 toplevel = not self.stack
6402
6403 if toplevel:
6404 self.isupdate = True
6405 if not self.dml_compile_state:
6406 self.dml_compile_state = compile_state
6407 if not self.compile_state:
6408 self.compile_state = compile_state
6409
6410 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6411 from_linter = FromLinter({}, set())
6412 warn_linting = self.linting & WARN_LINTING
6413 if toplevel:
6414 self.from_linter = from_linter
6415 else:
6416 from_linter = None
6417 warn_linting = False
6418
6419 extra_froms = compile_state._extra_froms
6420 is_multitable = bool(extra_froms)
6421
6422 if is_multitable:
6423 # main table might be a JOIN
6424 main_froms = set(_from_objects(update_stmt.table))
6425 render_extra_froms = [
6426 f for f in extra_froms if f not in main_froms
6427 ]
6428 correlate_froms = main_froms.union(extra_froms)
6429 else:
6430 render_extra_froms = []
6431 correlate_froms = {update_stmt.table}
6432
6433 self.stack.append(
6434 {
6435 "correlate_froms": correlate_froms,
6436 "asfrom_froms": correlate_froms,
6437 "selectable": update_stmt,
6438 }
6439 )
6440
6441 text = "UPDATE "
6442
6443 if update_stmt._prefixes:
6444 text += self._generate_prefixes(
6445 update_stmt, update_stmt._prefixes, **kw
6446 )
6447
6448 table_text = self.update_tables_clause(
6449 update_stmt,
6450 update_stmt.table,
6451 render_extra_froms,
6452 from_linter=from_linter,
6453 **kw,
6454 )
6455 crud_params_struct = crud._get_crud_params(
6456 self, update_stmt, compile_state, toplevel, **kw
6457 )
6458 crud_params = crud_params_struct.single_params
6459
6460 if update_stmt._hints:
6461 dialect_hints, table_text = self._setup_crud_hints(
6462 update_stmt, table_text
6463 )
6464 else:
6465 dialect_hints = None
6466
6467 if update_stmt._independent_ctes:
6468 self._dispatch_independent_ctes(update_stmt, kw)
6469
6470 text += table_text
6471
6472 text += " SET "
6473 text += ", ".join(
6474 expr + "=" + value
6475 for _, expr, value, _ in cast(
6476 "List[Tuple[Any, str, str, Any]]", crud_params
6477 )
6478 )
6479
6480 if self.implicit_returning or update_stmt._returning:
6481 if self.returning_precedes_values:
6482 text += " " + self.returning_clause(
6483 update_stmt,
6484 self.implicit_returning or update_stmt._returning,
6485 populate_result_map=toplevel,
6486 )
6487
6488 if extra_froms:
6489 extra_from_text = self.update_from_clause(
6490 update_stmt,
6491 update_stmt.table,
6492 render_extra_froms,
6493 dialect_hints,
6494 from_linter=from_linter,
6495 **kw,
6496 )
6497 if extra_from_text:
6498 text += " " + extra_from_text
6499
6500 if update_stmt._where_criteria:
6501 t = self._generate_delimited_and_list(
6502 update_stmt._where_criteria, from_linter=from_linter, **kw
6503 )
6504 if t:
6505 text += " WHERE " + t
6506
6507 ulc = self.update_post_criteria_clause(
6508 update_stmt, from_linter=from_linter, **kw
6509 )
6510 if ulc:
6511 text += " " + ulc
6512
6513 if (
6514 self.implicit_returning or update_stmt._returning
6515 ) and not self.returning_precedes_values:
6516 text += " " + self.returning_clause(
6517 update_stmt,
6518 self.implicit_returning or update_stmt._returning,
6519 populate_result_map=toplevel,
6520 )
6521
6522 if self.ctes:
6523 nesting_level = len(self.stack) if not toplevel else None
6524 text = self._render_cte_clause(nesting_level=nesting_level) + text
6525
6526 if warn_linting:
6527 assert from_linter is not None
6528 from_linter.warn(stmt_type="UPDATE")
6529
6530 self.stack.pop(-1)
6531
6532 return text # type: ignore[no-any-return]
6533
6534 def delete_extra_from_clause(
6535 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6536 ):
6537 """Provide a hook to override the generation of an
6538 DELETE..FROM clause.
6539
6540 This can be used to implement DELETE..USING for example.
6541
6542 MySQL and MSSQL override this.
6543
6544 """
6545 raise NotImplementedError(
6546 "This backend does not support multiple-table "
6547 "criteria within DELETE"
6548 )
6549
6550 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6551 return from_table._compiler_dispatch(
6552 self, asfrom=True, iscrud=True, **kw
6553 )
6554
6555 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6556 compile_state = delete_stmt._compile_state_factory(
6557 delete_stmt, self, **kw
6558 )
6559 delete_stmt = compile_state.statement
6560
6561 if visiting_cte is not None:
6562 kw["visiting_cte"] = visiting_cte
6563 toplevel = False
6564 else:
6565 toplevel = not self.stack
6566
6567 if toplevel:
6568 self.isdelete = True
6569 if not self.dml_compile_state:
6570 self.dml_compile_state = compile_state
6571 if not self.compile_state:
6572 self.compile_state = compile_state
6573
6574 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6575 from_linter = FromLinter({}, set())
6576 warn_linting = self.linting & WARN_LINTING
6577 if toplevel:
6578 self.from_linter = from_linter
6579 else:
6580 from_linter = None
6581 warn_linting = False
6582
6583 extra_froms = compile_state._extra_froms
6584
6585 correlate_froms = {delete_stmt.table}.union(extra_froms)
6586 self.stack.append(
6587 {
6588 "correlate_froms": correlate_froms,
6589 "asfrom_froms": correlate_froms,
6590 "selectable": delete_stmt,
6591 }
6592 )
6593
6594 text = "DELETE "
6595
6596 if delete_stmt._prefixes:
6597 text += self._generate_prefixes(
6598 delete_stmt, delete_stmt._prefixes, **kw
6599 )
6600
6601 text += "FROM "
6602
6603 try:
6604 table_text = self.delete_table_clause(
6605 delete_stmt,
6606 delete_stmt.table,
6607 extra_froms,
6608 from_linter=from_linter,
6609 )
6610 except TypeError:
6611 # anticipate 3rd party dialects that don't include **kw
6612 # TODO: remove in 2.1
6613 table_text = self.delete_table_clause(
6614 delete_stmt, delete_stmt.table, extra_froms
6615 )
6616 if from_linter:
6617 _ = self.process(delete_stmt.table, from_linter=from_linter)
6618
6619 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6620
6621 if delete_stmt._hints:
6622 dialect_hints, table_text = self._setup_crud_hints(
6623 delete_stmt, table_text
6624 )
6625 else:
6626 dialect_hints = None
6627
6628 if delete_stmt._independent_ctes:
6629 self._dispatch_independent_ctes(delete_stmt, kw)
6630
6631 text += table_text
6632
6633 if (
6634 self.implicit_returning or delete_stmt._returning
6635 ) and self.returning_precedes_values:
6636 text += " " + self.returning_clause(
6637 delete_stmt,
6638 self.implicit_returning or delete_stmt._returning,
6639 populate_result_map=toplevel,
6640 )
6641
6642 if extra_froms:
6643 extra_from_text = self.delete_extra_from_clause(
6644 delete_stmt,
6645 delete_stmt.table,
6646 extra_froms,
6647 dialect_hints,
6648 from_linter=from_linter,
6649 **kw,
6650 )
6651 if extra_from_text:
6652 text += " " + extra_from_text
6653
6654 if delete_stmt._where_criteria:
6655 t = self._generate_delimited_and_list(
6656 delete_stmt._where_criteria, from_linter=from_linter, **kw
6657 )
6658 if t:
6659 text += " WHERE " + t
6660
6661 dlc = self.delete_post_criteria_clause(
6662 delete_stmt, from_linter=from_linter, **kw
6663 )
6664 if dlc:
6665 text += " " + dlc
6666
6667 if (
6668 self.implicit_returning or delete_stmt._returning
6669 ) and not self.returning_precedes_values:
6670 text += " " + self.returning_clause(
6671 delete_stmt,
6672 self.implicit_returning or delete_stmt._returning,
6673 populate_result_map=toplevel,
6674 )
6675
6676 if self.ctes:
6677 nesting_level = len(self.stack) if not toplevel else None
6678 text = self._render_cte_clause(nesting_level=nesting_level) + text
6679
6680 if warn_linting:
6681 assert from_linter is not None
6682 from_linter.warn(stmt_type="DELETE")
6683
6684 self.stack.pop(-1)
6685
6686 return text
6687
6688 def visit_savepoint(self, savepoint_stmt, **kw):
6689 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6690
6691 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6692 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6693 savepoint_stmt
6694 )
6695
6696 def visit_release_savepoint(self, savepoint_stmt, **kw):
6697 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6698 savepoint_stmt
6699 )
6700
6701
6702class StrSQLCompiler(SQLCompiler):
6703 """A :class:`.SQLCompiler` subclass which allows a small selection
6704 of non-standard SQL features to render into a string value.
6705
6706 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6707 element is directly stringified without calling upon the
6708 :meth:`_expression.ClauseElement.compile` method.
6709 It can render a limited set
6710 of non-standard SQL constructs to assist in basic stringification,
6711 however for more substantial custom or dialect-specific SQL constructs,
6712 it will be necessary to make use of
6713 :meth:`_expression.ClauseElement.compile`
6714 directly.
6715
6716 .. seealso::
6717
6718 :ref:`faq_sql_expression_string`
6719
6720 """
6721
6722 def _fallback_column_name(self, column):
6723 return "<name unknown>"
6724
6725 @util.preload_module("sqlalchemy.engine.url")
6726 def visit_unsupported_compilation(self, element, err, **kw):
6727 if element.stringify_dialect != "default":
6728 url = util.preloaded.engine_url
6729 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6730
6731 compiler = dialect.statement_compiler(
6732 dialect, None, _supporting_against=self
6733 )
6734 if not isinstance(compiler, StrSQLCompiler):
6735 return compiler.process(element, **kw)
6736
6737 return super().visit_unsupported_compilation(element, err)
6738
6739 def visit_getitem_binary(self, binary, operator, **kw):
6740 return "%s[%s]" % (
6741 self.process(binary.left, **kw),
6742 self.process(binary.right, **kw),
6743 )
6744
6745 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6746 return self.visit_getitem_binary(binary, operator, **kw)
6747
6748 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6749 return self.visit_getitem_binary(binary, operator, **kw)
6750
6751 def visit_sequence(self, sequence, **kw):
6752 return (
6753 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6754 )
6755
6756 def returning_clause(
6757 self,
6758 stmt: UpdateBase,
6759 returning_cols: Sequence[_ColumnsClauseElement],
6760 *,
6761 populate_result_map: bool,
6762 **kw: Any,
6763 ) -> str:
6764 columns = [
6765 self._label_select_column(None, c, True, False, {})
6766 for c in base._select_iterables(returning_cols)
6767 ]
6768 return "RETURNING " + ", ".join(columns)
6769
6770 def update_from_clause(
6771 self, update_stmt, from_table, extra_froms, from_hints, **kw
6772 ):
6773 kw["asfrom"] = True
6774 return "FROM " + ", ".join(
6775 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6776 for t in extra_froms
6777 )
6778
6779 def delete_extra_from_clause(
6780 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6781 ):
6782 kw["asfrom"] = True
6783 return ", " + ", ".join(
6784 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6785 for t in extra_froms
6786 )
6787
6788 def visit_empty_set_expr(self, element_types, **kw):
6789 return "SELECT 1 WHERE 1!=1"
6790
6791 def get_from_hint_text(self, table, text):
6792 return "[%s]" % text
6793
6794 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6795 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6796
6797 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6798 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6799
6800 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6801 return "<regexp replace>(%s, %s)" % (
6802 binary.left._compiler_dispatch(self, **kw),
6803 binary.right._compiler_dispatch(self, **kw),
6804 )
6805
6806 def visit_try_cast(self, cast, **kwargs):
6807 return "TRY_CAST(%s AS %s)" % (
6808 cast.clause._compiler_dispatch(self, **kwargs),
6809 cast.typeclause._compiler_dispatch(self, **kwargs),
6810 )
6811
6812
6813class DDLCompiler(Compiled):
6814 is_ddl = True
6815
6816 if TYPE_CHECKING:
6817
6818 def __init__(
6819 self,
6820 dialect: Dialect,
6821 statement: ExecutableDDLElement,
6822 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6823 render_schema_translate: bool = ...,
6824 compile_kwargs: Mapping[str, Any] = ...,
6825 ): ...
6826
6827 @util.ro_memoized_property
6828 def sql_compiler(self) -> SQLCompiler:
6829 return self.dialect.statement_compiler(
6830 self.dialect, None, schema_translate_map=self.schema_translate_map
6831 )
6832
6833 @util.memoized_property
6834 def type_compiler(self):
6835 return self.dialect.type_compiler_instance
6836
6837 def construct_params(
6838 self,
6839 params: Optional[_CoreSingleExecuteParams] = None,
6840 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6841 escape_names: bool = True,
6842 ) -> Optional[_MutableCoreSingleExecuteParams]:
6843 return None
6844
6845 def visit_ddl(self, ddl, **kwargs):
6846 # table events can substitute table and schema name
6847 context = ddl.context
6848 if isinstance(ddl.target, schema.Table):
6849 context = context.copy()
6850
6851 preparer = self.preparer
6852 path = preparer.format_table_seq(ddl.target)
6853 if len(path) == 1:
6854 table, sch = path[0], ""
6855 else:
6856 table, sch = path[-1], path[0]
6857
6858 context.setdefault("table", table)
6859 context.setdefault("schema", sch)
6860 context.setdefault("fullname", preparer.format_table(ddl.target))
6861
6862 return self.sql_compiler.post_process_text(ddl.statement % context)
6863
6864 def visit_create_schema(self, create, **kw):
6865 text = "CREATE SCHEMA "
6866 if create.if_not_exists:
6867 text += "IF NOT EXISTS "
6868 return text + self.preparer.format_schema(create.element)
6869
6870 def visit_drop_schema(self, drop, **kw):
6871 text = "DROP SCHEMA "
6872 if drop.if_exists:
6873 text += "IF EXISTS "
6874 text += self.preparer.format_schema(drop.element)
6875 if drop.cascade:
6876 text += " CASCADE"
6877 return text
6878
6879 def visit_create_table(self, create, **kw):
6880 table = create.element
6881 preparer = self.preparer
6882
6883 text = "\nCREATE "
6884 if table._prefixes:
6885 text += " ".join(table._prefixes) + " "
6886
6887 text += "TABLE "
6888 if create.if_not_exists:
6889 text += "IF NOT EXISTS "
6890
6891 text += preparer.format_table(table) + " "
6892
6893 create_table_suffix = self.create_table_suffix(table)
6894 if create_table_suffix:
6895 text += create_table_suffix + " "
6896
6897 text += "("
6898
6899 separator = "\n"
6900
6901 # if only one primary key, specify it along with the column
6902 first_pk = False
6903 for create_column in create.columns:
6904 column = create_column.element
6905 try:
6906 processed = self.process(
6907 create_column, first_pk=column.primary_key and not first_pk
6908 )
6909 if processed is not None:
6910 text += separator
6911 separator = ", \n"
6912 text += "\t" + processed
6913 if column.primary_key:
6914 first_pk = True
6915 except exc.CompileError as ce:
6916 raise exc.CompileError(
6917 "(in table '%s', column '%s'): %s"
6918 % (table.description, column.name, ce.args[0])
6919 ) from ce
6920
6921 const = self.create_table_constraints(
6922 table,
6923 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6924 )
6925 if const:
6926 text += separator + "\t" + const
6927
6928 text += "\n)%s\n\n" % self.post_create_table(table)
6929 return text
6930
6931 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
6932 prep = self.preparer
6933
6934 inner_kw = dict(kw)
6935 inner_kw["literal_binds"] = True
6936 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
6937
6938 parts = [
6939 "CREATE",
6940 "TEMPORARY" if element.temporary else None,
6941 "TABLE",
6942 "IF NOT EXISTS" if element.if_not_exists else None,
6943 prep.format_table(element.table),
6944 "AS",
6945 select_sql,
6946 ]
6947 return " ".join(p for p in parts if p)
6948
6949 def visit_create_column(self, create, first_pk=False, **kw):
6950 column = create.element
6951
6952 if column.system:
6953 return None
6954
6955 text = self.get_column_specification(column, first_pk=first_pk)
6956 const = " ".join(
6957 self.process(constraint) for constraint in column.constraints
6958 )
6959 if const:
6960 text += " " + const
6961
6962 return text
6963
6964 def create_table_constraints(
6965 self, table, _include_foreign_key_constraints=None, **kw
6966 ):
6967 # On some DB order is significant: visit PK first, then the
6968 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6969 constraints = []
6970 if table.primary_key:
6971 constraints.append(table.primary_key)
6972
6973 all_fkcs = table.foreign_key_constraints
6974 if _include_foreign_key_constraints is not None:
6975 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6976 else:
6977 omit_fkcs = set()
6978
6979 constraints.extend(
6980 [
6981 c
6982 for c in table._sorted_constraints
6983 if c is not table.primary_key and c not in omit_fkcs
6984 ]
6985 )
6986
6987 return ", \n\t".join(
6988 p
6989 for p in (
6990 self.process(constraint)
6991 for constraint in constraints
6992 if (constraint._should_create_for_compiler(self))
6993 and (
6994 not self.dialect.supports_alter
6995 or not getattr(constraint, "use_alter", False)
6996 )
6997 )
6998 if p is not None
6999 )
7000
7001 def visit_drop_table(self, drop, **kw):
7002 text = "\nDROP TABLE "
7003 if drop.if_exists:
7004 text += "IF EXISTS "
7005 return text + self.preparer.format_table(drop.element)
7006
7007 def visit_drop_view(self, drop, **kw):
7008 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
7009
7010 def _verify_index_table(self, index: Index) -> None:
7011 if index.table is None:
7012 raise exc.CompileError(
7013 "Index '%s' is not associated with any table." % index.name
7014 )
7015
7016 def visit_create_index(
7017 self, create, include_schema=False, include_table_schema=True, **kw
7018 ):
7019 index = create.element
7020 self._verify_index_table(index)
7021 preparer = self.preparer
7022 text = "CREATE "
7023 if index.unique:
7024 text += "UNIQUE "
7025 if index.name is None:
7026 raise exc.CompileError(
7027 "CREATE INDEX requires that the index have a name"
7028 )
7029
7030 text += "INDEX "
7031 if create.if_not_exists:
7032 text += "IF NOT EXISTS "
7033
7034 text += "%s ON %s (%s)" % (
7035 self._prepared_index_name(index, include_schema=include_schema),
7036 preparer.format_table(
7037 index.table, use_schema=include_table_schema
7038 ),
7039 ", ".join(
7040 self.sql_compiler.process(
7041 expr, include_table=False, literal_binds=True
7042 )
7043 for expr in index.expressions
7044 ),
7045 )
7046 return text
7047
7048 def visit_drop_index(self, drop, **kw):
7049 index = drop.element
7050
7051 if index.name is None:
7052 raise exc.CompileError(
7053 "DROP INDEX requires that the index have a name"
7054 )
7055 text = "\nDROP INDEX "
7056 if drop.if_exists:
7057 text += "IF EXISTS "
7058
7059 return text + self._prepared_index_name(index, include_schema=True)
7060
7061 def _prepared_index_name(
7062 self, index: Index, include_schema: bool = False
7063 ) -> str:
7064 if index.table is not None:
7065 effective_schema = self.preparer.schema_for_object(index.table)
7066 else:
7067 effective_schema = None
7068 if include_schema and effective_schema:
7069 schema_name = self.preparer.quote_schema(effective_schema)
7070 else:
7071 schema_name = None
7072
7073 index_name: str = self.preparer.format_index(index)
7074
7075 if schema_name:
7076 index_name = schema_name + "." + index_name
7077 return index_name
7078
7079 def visit_add_constraint(self, create, **kw):
7080 return "ALTER TABLE %s ADD %s" % (
7081 self.preparer.format_table(create.element.table),
7082 self.process(create.element),
7083 )
7084
7085 def visit_set_table_comment(self, create, **kw):
7086 return "COMMENT ON TABLE %s IS %s" % (
7087 self.preparer.format_table(create.element),
7088 self.sql_compiler.render_literal_value(
7089 create.element.comment, sqltypes.String()
7090 ),
7091 )
7092
7093 def visit_drop_table_comment(self, drop, **kw):
7094 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7095 drop.element
7096 )
7097
7098 def visit_set_column_comment(self, create, **kw):
7099 return "COMMENT ON COLUMN %s IS %s" % (
7100 self.preparer.format_column(
7101 create.element, use_table=True, use_schema=True
7102 ),
7103 self.sql_compiler.render_literal_value(
7104 create.element.comment, sqltypes.String()
7105 ),
7106 )
7107
7108 def visit_drop_column_comment(self, drop, **kw):
7109 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7110 drop.element, use_table=True
7111 )
7112
7113 def visit_set_constraint_comment(self, create, **kw):
7114 raise exc.UnsupportedCompilationError(self, type(create))
7115
7116 def visit_drop_constraint_comment(self, drop, **kw):
7117 raise exc.UnsupportedCompilationError(self, type(drop))
7118
7119 def get_identity_options(self, identity_options):
7120 text = []
7121 if identity_options.increment is not None:
7122 text.append("INCREMENT BY %d" % identity_options.increment)
7123 if identity_options.start is not None:
7124 text.append("START WITH %d" % identity_options.start)
7125 if identity_options.minvalue is not None:
7126 text.append("MINVALUE %d" % identity_options.minvalue)
7127 if identity_options.maxvalue is not None:
7128 text.append("MAXVALUE %d" % identity_options.maxvalue)
7129 if identity_options.nominvalue is not None:
7130 text.append("NO MINVALUE")
7131 if identity_options.nomaxvalue is not None:
7132 text.append("NO MAXVALUE")
7133 if identity_options.cache is not None:
7134 text.append("CACHE %d" % identity_options.cache)
7135 if identity_options.cycle is not None:
7136 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7137 return " ".join(text)
7138
7139 def visit_create_sequence(self, create, prefix=None, **kw):
7140 text = "CREATE SEQUENCE "
7141 if create.if_not_exists:
7142 text += "IF NOT EXISTS "
7143 text += self.preparer.format_sequence(create.element)
7144
7145 if prefix:
7146 text += prefix
7147 options = self.get_identity_options(create.element)
7148 if options:
7149 text += " " + options
7150 return text
7151
7152 def visit_drop_sequence(self, drop, **kw):
7153 text = "DROP SEQUENCE "
7154 if drop.if_exists:
7155 text += "IF EXISTS "
7156 return text + self.preparer.format_sequence(drop.element)
7157
7158 def visit_drop_constraint(self, drop, **kw):
7159 constraint = drop.element
7160 if constraint.name is not None:
7161 formatted_name = self.preparer.format_constraint(constraint)
7162 else:
7163 formatted_name = None
7164
7165 if formatted_name is None:
7166 raise exc.CompileError(
7167 "Can't emit DROP CONSTRAINT for constraint %r; "
7168 "it has no name" % drop.element
7169 )
7170 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7171 self.preparer.format_table(drop.element.table),
7172 "IF EXISTS " if drop.if_exists else "",
7173 formatted_name,
7174 " CASCADE" if drop.cascade else "",
7175 )
7176
7177 def get_column_specification(self, column, **kwargs):
7178 colspec = (
7179 self.preparer.format_column(column)
7180 + " "
7181 + self.dialect.type_compiler_instance.process(
7182 column.type, type_expression=column
7183 )
7184 )
7185 default = self.get_column_default_string(column)
7186 if default is not None:
7187 colspec += " DEFAULT " + default
7188
7189 if column.computed is not None:
7190 colspec += " " + self.process(column.computed)
7191
7192 if (
7193 column.identity is not None
7194 and self.dialect.supports_identity_columns
7195 ):
7196 colspec += " " + self.process(column.identity)
7197
7198 if not column.nullable and (
7199 not column.identity or not self.dialect.supports_identity_columns
7200 ):
7201 colspec += " NOT NULL"
7202 return colspec
7203
7204 def create_table_suffix(self, table):
7205 return ""
7206
7207 def post_create_table(self, table):
7208 return ""
7209
7210 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7211 if isinstance(column.server_default, schema.DefaultClause):
7212 return self.render_default_string(column.server_default.arg)
7213 else:
7214 return None
7215
7216 def render_default_string(self, default: Union[Visitable, str]) -> str:
7217 if isinstance(default, str):
7218 return self.sql_compiler.render_literal_value(
7219 default, sqltypes.STRINGTYPE
7220 )
7221 else:
7222 return self.sql_compiler.process(default, literal_binds=True)
7223
7224 def visit_table_or_column_check_constraint(self, constraint, **kw):
7225 if constraint.is_column_level:
7226 return self.visit_column_check_constraint(constraint)
7227 else:
7228 return self.visit_check_constraint(constraint)
7229
7230 def visit_check_constraint(self, constraint, **kw):
7231 text = ""
7232 if constraint.name is not None:
7233 formatted_name = self.preparer.format_constraint(constraint)
7234 if formatted_name is not None:
7235 text += "CONSTRAINT %s " % formatted_name
7236 text += "CHECK (%s)" % self.sql_compiler.process(
7237 constraint.sqltext, include_table=False, literal_binds=True
7238 )
7239 text += self.define_constraint_deferrability(constraint)
7240 return text
7241
7242 def visit_column_check_constraint(self, constraint, **kw):
7243 text = ""
7244 if constraint.name is not None:
7245 formatted_name = self.preparer.format_constraint(constraint)
7246 if formatted_name is not None:
7247 text += "CONSTRAINT %s " % formatted_name
7248 text += "CHECK (%s)" % self.sql_compiler.process(
7249 constraint.sqltext, include_table=False, literal_binds=True
7250 )
7251 text += self.define_constraint_deferrability(constraint)
7252 return text
7253
7254 def visit_primary_key_constraint(
7255 self, constraint: PrimaryKeyConstraint, **kw: Any
7256 ) -> str:
7257 if len(constraint) == 0:
7258 return ""
7259 text = ""
7260 if constraint.name is not None:
7261 formatted_name = self.preparer.format_constraint(constraint)
7262 if formatted_name is not None:
7263 text += "CONSTRAINT %s " % formatted_name
7264 text += "PRIMARY KEY "
7265 text += "(%s)" % ", ".join(
7266 self.preparer.quote(c.name)
7267 for c in (
7268 constraint.columns_autoinc_first
7269 if constraint._implicit_generated
7270 else constraint.columns
7271 )
7272 )
7273 text += self.define_constraint_deferrability(constraint)
7274 return text
7275
7276 def visit_foreign_key_constraint(self, constraint, **kw):
7277 preparer = self.preparer
7278 text = ""
7279 if constraint.name is not None:
7280 formatted_name = self.preparer.format_constraint(constraint)
7281 if formatted_name is not None:
7282 text += "CONSTRAINT %s " % formatted_name
7283 remote_table = list(constraint.elements)[0].column.table
7284 text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7285 ", ".join(
7286 preparer.quote(f.parent.name) for f in constraint.elements
7287 ),
7288 self.define_constraint_remote_table(
7289 constraint, remote_table, preparer
7290 ),
7291 ", ".join(
7292 preparer.quote(f.column.name) for f in constraint.elements
7293 ),
7294 )
7295 text += self.define_constraint_match(constraint)
7296 text += self.define_constraint_cascades(constraint)
7297 text += self.define_constraint_deferrability(constraint)
7298 return text
7299
7300 def define_constraint_remote_table(self, constraint, table, preparer):
7301 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7302
7303 return preparer.format_table(table)
7304
7305 def visit_unique_constraint(
7306 self, constraint: UniqueConstraint, **kw: Any
7307 ) -> str:
7308 if len(constraint) == 0:
7309 return ""
7310 text = ""
7311 if constraint.name is not None:
7312 formatted_name = self.preparer.format_constraint(constraint)
7313 if formatted_name is not None:
7314 text += "CONSTRAINT %s " % formatted_name
7315 text += "UNIQUE %s(%s)" % (
7316 self.define_unique_constraint_distinct(constraint, **kw),
7317 ", ".join(self.preparer.quote(c.name) for c in constraint),
7318 )
7319 text += self.define_constraint_deferrability(constraint)
7320 return text
7321
7322 def define_unique_constraint_distinct(
7323 self, constraint: UniqueConstraint, **kw: Any
7324 ) -> str:
7325 return ""
7326
7327 def define_constraint_cascades(
7328 self, constraint: ForeignKeyConstraint
7329 ) -> str:
7330 text = ""
7331 if constraint.ondelete is not None:
7332 text += self.define_constraint_ondelete_cascade(constraint)
7333
7334 if constraint.onupdate is not None:
7335 text += self.define_constraint_onupdate_cascade(constraint)
7336 return text
7337
7338 def define_constraint_ondelete_cascade(
7339 self, constraint: ForeignKeyConstraint
7340 ) -> str:
7341 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7342 constraint.ondelete, FK_ON_DELETE
7343 )
7344
7345 def define_constraint_onupdate_cascade(
7346 self, constraint: ForeignKeyConstraint
7347 ) -> str:
7348 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7349 constraint.onupdate, FK_ON_UPDATE
7350 )
7351
7352 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7353 text = ""
7354 if constraint.deferrable is not None:
7355 if constraint.deferrable:
7356 text += " DEFERRABLE"
7357 else:
7358 text += " NOT DEFERRABLE"
7359 if constraint.initially is not None:
7360 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7361 constraint.initially, FK_INITIALLY
7362 )
7363 return text
7364
7365 def define_constraint_match(self, constraint):
7366 text = ""
7367 if constraint.match is not None:
7368 text += " MATCH %s" % constraint.match
7369 return text
7370
7371 def visit_computed_column(self, generated, **kw):
7372 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7373 generated.sqltext, include_table=False, literal_binds=True
7374 )
7375 if generated.persisted is True:
7376 text += " STORED"
7377 elif generated.persisted is False:
7378 text += " VIRTUAL"
7379 return text
7380
7381 def visit_identity_column(self, identity, **kw):
7382 text = "GENERATED %s AS IDENTITY" % (
7383 "ALWAYS" if identity.always else "BY DEFAULT",
7384 )
7385 options = self.get_identity_options(identity)
7386 if options:
7387 text += " (%s)" % options
7388 return text
7389
7390
7391class GenericTypeCompiler(TypeCompiler):
7392 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7393 return "FLOAT"
7394
7395 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7396 return "DOUBLE"
7397
7398 def visit_DOUBLE_PRECISION(
7399 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7400 ) -> str:
7401 return "DOUBLE PRECISION"
7402
7403 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7404 return "REAL"
7405
7406 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7407 if type_.precision is None:
7408 return "NUMERIC"
7409 elif type_.scale is None:
7410 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7411 else:
7412 return "NUMERIC(%(precision)s, %(scale)s)" % {
7413 "precision": type_.precision,
7414 "scale": type_.scale,
7415 }
7416
7417 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7418 if type_.precision is None:
7419 return "DECIMAL"
7420 elif type_.scale is None:
7421 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7422 else:
7423 return "DECIMAL(%(precision)s, %(scale)s)" % {
7424 "precision": type_.precision,
7425 "scale": type_.scale,
7426 }
7427
7428 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7429 return "INTEGER"
7430
7431 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7432 return "SMALLINT"
7433
7434 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7435 return "BIGINT"
7436
7437 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7438 return "TIMESTAMP"
7439
7440 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7441 return "DATETIME"
7442
7443 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7444 return "DATE"
7445
7446 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7447 return "TIME"
7448
7449 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7450 return "CLOB"
7451
7452 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7453 return "NCLOB"
7454
7455 def _render_string_type(
7456 self, name: str, length: Optional[int], collation: Optional[str]
7457 ) -> str:
7458 text = name
7459 if length:
7460 text += f"({length})"
7461 if collation:
7462 text += f' COLLATE "{collation}"'
7463 return text
7464
7465 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7466 return self._render_string_type("CHAR", type_.length, type_.collation)
7467
7468 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7469 return self._render_string_type("NCHAR", type_.length, type_.collation)
7470
7471 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7472 return self._render_string_type(
7473 "VARCHAR", type_.length, type_.collation
7474 )
7475
7476 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7477 return self._render_string_type(
7478 "NVARCHAR", type_.length, type_.collation
7479 )
7480
7481 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7482 return self._render_string_type("TEXT", type_.length, type_.collation)
7483
7484 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7485 return "UUID"
7486
7487 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7488 return "BLOB"
7489
7490 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7491 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7492
7493 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7494 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7495
7496 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7497 return "BOOLEAN"
7498
7499 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7500 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7501 return self._render_string_type("CHAR", length=32, collation=None)
7502 else:
7503 return self.visit_UUID(type_, **kw)
7504
7505 def visit_large_binary(
7506 self, type_: sqltypes.LargeBinary, **kw: Any
7507 ) -> str:
7508 return self.visit_BLOB(type_, **kw)
7509
7510 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7511 return self.visit_BOOLEAN(type_, **kw)
7512
7513 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7514 return self.visit_TIME(type_, **kw)
7515
7516 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7517 return self.visit_DATETIME(type_, **kw)
7518
7519 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7520 return self.visit_DATE(type_, **kw)
7521
7522 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7523 return self.visit_BIGINT(type_, **kw)
7524
7525 def visit_small_integer(
7526 self, type_: sqltypes.SmallInteger, **kw: Any
7527 ) -> str:
7528 return self.visit_SMALLINT(type_, **kw)
7529
7530 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7531 return self.visit_INTEGER(type_, **kw)
7532
7533 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7534 return self.visit_REAL(type_, **kw)
7535
7536 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7537 return self.visit_FLOAT(type_, **kw)
7538
7539 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7540 return self.visit_DOUBLE(type_, **kw)
7541
7542 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7543 return self.visit_NUMERIC(type_, **kw)
7544
7545 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7546 return self.visit_VARCHAR(type_, **kw)
7547
7548 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7549 return self.visit_VARCHAR(type_, **kw)
7550
7551 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7552 return self.visit_TEXT(type_, **kw)
7553
7554 def visit_unicode_text(
7555 self, type_: sqltypes.UnicodeText, **kw: Any
7556 ) -> str:
7557 return self.visit_TEXT(type_, **kw)
7558
7559 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7560 return self.visit_VARCHAR(type_, **kw)
7561
7562 def visit_null(self, type_, **kw):
7563 raise exc.CompileError(
7564 "Can't generate DDL for %r; "
7565 "did you forget to specify a "
7566 "type on this Column?" % type_
7567 )
7568
7569 def visit_type_decorator(
7570 self, type_: TypeDecorator[Any], **kw: Any
7571 ) -> str:
7572 return self.process(type_.type_engine(self.dialect), **kw)
7573
7574 def visit_user_defined(
7575 self, type_: UserDefinedType[Any], **kw: Any
7576 ) -> str:
7577 return type_.get_col_spec(**kw)
7578
7579
7580class StrSQLTypeCompiler(GenericTypeCompiler):
7581 def process(self, type_, **kw):
7582 try:
7583 _compiler_dispatch = type_._compiler_dispatch
7584 except AttributeError:
7585 return self._visit_unknown(type_, **kw)
7586 else:
7587 return _compiler_dispatch(self, **kw)
7588
7589 def __getattr__(self, key):
7590 if key.startswith("visit_"):
7591 return self._visit_unknown
7592 else:
7593 raise AttributeError(key)
7594
7595 def _visit_unknown(self, type_, **kw):
7596 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7597 return type_.__class__.__name__
7598 else:
7599 return repr(type_)
7600
7601 def visit_null(self, type_, **kw):
7602 return "NULL"
7603
7604 def visit_user_defined(self, type_, **kw):
7605 try:
7606 get_col_spec = type_.get_col_spec
7607 except AttributeError:
7608 return repr(type_)
7609 else:
7610 return get_col_spec(**kw)
7611
7612
7613class _SchemaForObjectCallable(Protocol):
7614 def __call__(self, obj: Any, /) -> str: ...
7615
7616
7617class _BindNameForColProtocol(Protocol):
7618 def __call__(self, col: ColumnClause[Any]) -> str: ...
7619
7620
7621class IdentifierPreparer:
7622 """Handle quoting and case-folding of identifiers based on options."""
7623
7624 reserved_words = RESERVED_WORDS
7625
7626 legal_characters = LEGAL_CHARACTERS
7627
7628 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7629
7630 initial_quote: str
7631
7632 final_quote: str
7633
7634 _strings: MutableMapping[str, str]
7635
7636 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7637 """Return the .schema attribute for an object.
7638
7639 For the default IdentifierPreparer, the schema for an object is always
7640 the value of the ".schema" attribute. if the preparer is replaced
7641 with one that has a non-empty schema_translate_map, the value of the
7642 ".schema" attribute is rendered a symbol that will be converted to a
7643 real schema name from the mapping post-compile.
7644
7645 """
7646
7647 _includes_none_schema_translate: bool = False
7648
7649 def __init__(
7650 self,
7651 dialect: Dialect,
7652 initial_quote: str = '"',
7653 final_quote: Optional[str] = None,
7654 escape_quote: str = '"',
7655 quote_case_sensitive_collations: bool = True,
7656 omit_schema: bool = False,
7657 ):
7658 """Construct a new ``IdentifierPreparer`` object.
7659
7660 initial_quote
7661 Character that begins a delimited identifier.
7662
7663 final_quote
7664 Character that ends a delimited identifier. Defaults to
7665 `initial_quote`.
7666
7667 omit_schema
7668 Prevent prepending schema name. Useful for databases that do
7669 not support schemae.
7670 """
7671
7672 self.dialect = dialect
7673 self.initial_quote = initial_quote
7674 self.final_quote = final_quote or self.initial_quote
7675 self.escape_quote = escape_quote
7676 self.escape_to_quote = self.escape_quote * 2
7677 self.omit_schema = omit_schema
7678 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7679 self._strings = {}
7680 self._double_percents = self.dialect.paramstyle in (
7681 "format",
7682 "pyformat",
7683 )
7684
7685 def _with_schema_translate(self, schema_translate_map):
7686 prep = self.__class__.__new__(self.__class__)
7687 prep.__dict__.update(self.__dict__)
7688
7689 includes_none = None in schema_translate_map
7690
7691 def symbol_getter(obj):
7692 name = obj.schema
7693 if obj._use_schema_map and (name is not None or includes_none):
7694 if name is not None and ("[" in name or "]" in name):
7695 raise exc.CompileError(
7696 "Square bracket characters ([]) not supported "
7697 "in schema translate name '%s'" % name
7698 )
7699 return quoted_name(
7700 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7701 )
7702 else:
7703 return obj.schema
7704
7705 prep.schema_for_object = symbol_getter
7706 prep._includes_none_schema_translate = includes_none
7707 return prep
7708
7709 def _render_schema_translates(
7710 self, statement: str, schema_translate_map: SchemaTranslateMapType
7711 ) -> str:
7712 d = schema_translate_map
7713 if None in d:
7714 if not self._includes_none_schema_translate:
7715 raise exc.InvalidRequestError(
7716 "schema translate map which previously did not have "
7717 "`None` present as a key now has `None` present; compiled "
7718 "statement may lack adequate placeholders. Please use "
7719 "consistent keys in successive "
7720 "schema_translate_map dictionaries."
7721 )
7722
7723 d["_none"] = d[None] # type: ignore[index]
7724
7725 def replace(m):
7726 name = m.group(2)
7727 if name in d:
7728 effective_schema = d[name]
7729 else:
7730 if name in (None, "_none"):
7731 raise exc.InvalidRequestError(
7732 "schema translate map which previously had `None` "
7733 "present as a key now no longer has it present; don't "
7734 "know how to apply schema for compiled statement. "
7735 "Please use consistent keys in successive "
7736 "schema_translate_map dictionaries."
7737 )
7738 effective_schema = name
7739
7740 if not effective_schema:
7741 effective_schema = self.dialect.default_schema_name
7742 if not effective_schema:
7743 # TODO: no coverage here
7744 raise exc.CompileError(
7745 "Dialect has no default schema name; can't "
7746 "use None as dynamic schema target."
7747 )
7748 return self.quote_schema(effective_schema)
7749
7750 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7751
7752 def _escape_identifier(self, value: str) -> str:
7753 """Escape an identifier.
7754
7755 Subclasses should override this to provide database-dependent
7756 escaping behavior.
7757 """
7758
7759 value = value.replace(self.escape_quote, self.escape_to_quote)
7760 if self._double_percents:
7761 value = value.replace("%", "%%")
7762 return value
7763
7764 def _unescape_identifier(self, value: str) -> str:
7765 """Canonicalize an escaped identifier.
7766
7767 Subclasses should override this to provide database-dependent
7768 unescaping behavior that reverses _escape_identifier.
7769 """
7770
7771 return value.replace(self.escape_to_quote, self.escape_quote)
7772
7773 def validate_sql_phrase(self, element, reg):
7774 """keyword sequence filter.
7775
7776 a filter for elements that are intended to represent keyword sequences,
7777 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7778 should be present.
7779
7780 """
7781
7782 if element is not None and not reg.match(element):
7783 raise exc.CompileError(
7784 "Unexpected SQL phrase: %r (matching against %r)"
7785 % (element, reg.pattern)
7786 )
7787 return element
7788
7789 def quote_identifier(self, value: str) -> str:
7790 """Quote an identifier.
7791
7792 Subclasses should override this to provide database-dependent
7793 quoting behavior.
7794 """
7795
7796 return (
7797 self.initial_quote
7798 + self._escape_identifier(value)
7799 + self.final_quote
7800 )
7801
7802 def _requires_quotes(self, value: str) -> bool:
7803 """Return True if the given identifier requires quoting."""
7804 lc_value = value.lower()
7805 return (
7806 lc_value in self.reserved_words
7807 or value[0] in self.illegal_initial_characters
7808 or not self.legal_characters.match(str(value))
7809 or (lc_value != value)
7810 )
7811
7812 def _requires_quotes_illegal_chars(self, value):
7813 """Return True if the given identifier requires quoting, but
7814 not taking case convention into account."""
7815 return not self.legal_characters.match(str(value))
7816
7817 def quote_schema(self, schema: str) -> str:
7818 """Conditionally quote a schema name.
7819
7820
7821 The name is quoted if it is a reserved word, contains quote-necessary
7822 characters, or is an instance of :class:`.quoted_name` which includes
7823 ``quote`` set to ``True``.
7824
7825 Subclasses can override this to provide database-dependent
7826 quoting behavior for schema names.
7827
7828 :param schema: string schema name
7829 """
7830 return self.quote(schema)
7831
7832 def quote(self, ident: str) -> str:
7833 """Conditionally quote an identifier.
7834
7835 The identifier is quoted if it is a reserved word, contains
7836 quote-necessary characters, or is an instance of
7837 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7838
7839 Subclasses can override this to provide database-dependent
7840 quoting behavior for identifier names.
7841
7842 :param ident: string identifier
7843 """
7844 force = getattr(ident, "quote", None)
7845
7846 if force is None:
7847 if ident in self._strings:
7848 return self._strings[ident]
7849 else:
7850 if self._requires_quotes(ident):
7851 self._strings[ident] = self.quote_identifier(ident)
7852 else:
7853 self._strings[ident] = ident
7854 return self._strings[ident]
7855 elif force:
7856 return self.quote_identifier(ident)
7857 else:
7858 return ident
7859
7860 def format_collation(self, collation_name):
7861 if self.quote_case_sensitive_collations:
7862 return self.quote(collation_name)
7863 else:
7864 return collation_name
7865
7866 def format_sequence(
7867 self, sequence: schema.Sequence, use_schema: bool = True
7868 ) -> str:
7869 name = self.quote(sequence.name)
7870
7871 effective_schema = self.schema_for_object(sequence)
7872
7873 if (
7874 not self.omit_schema
7875 and use_schema
7876 and effective_schema is not None
7877 ):
7878 name = self.quote_schema(effective_schema) + "." + name
7879 return name
7880
7881 def format_label(
7882 self, label: Label[Any], name: Optional[str] = None
7883 ) -> str:
7884 return self.quote(name or label.name)
7885
7886 def format_alias(
7887 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7888 ) -> str:
7889 if name is None:
7890 assert alias is not None
7891 return self.quote(alias.name)
7892 else:
7893 return self.quote(name)
7894
7895 def format_savepoint(self, savepoint, name=None):
7896 # Running the savepoint name through quoting is unnecessary
7897 # for all known dialects. This is here to support potential
7898 # third party use cases
7899 ident = name or savepoint.ident
7900 if self._requires_quotes(ident):
7901 ident = self.quote_identifier(ident)
7902 return ident
7903
7904 @util.preload_module("sqlalchemy.sql.naming")
7905 def format_constraint(
7906 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7907 ) -> Optional[str]:
7908 naming = util.preloaded.sql_naming
7909
7910 if constraint.name is _NONE_NAME:
7911 name = naming._constraint_name_for_table(
7912 constraint, constraint.table
7913 )
7914
7915 if name is None:
7916 return None
7917 else:
7918 name = constraint.name
7919
7920 assert name is not None
7921 if constraint.__visit_name__ == "index":
7922 return self.truncate_and_render_index_name(
7923 name, _alembic_quote=_alembic_quote
7924 )
7925 else:
7926 return self.truncate_and_render_constraint_name(
7927 name, _alembic_quote=_alembic_quote
7928 )
7929
7930 def truncate_and_render_index_name(
7931 self, name: str, _alembic_quote: bool = True
7932 ) -> str:
7933 # calculate these at format time so that ad-hoc changes
7934 # to dialect.max_identifier_length etc. can be reflected
7935 # as IdentifierPreparer is long lived
7936 max_ = (
7937 self.dialect.max_index_name_length
7938 or self.dialect.max_identifier_length
7939 )
7940 return self._truncate_and_render_maxlen_name(
7941 name, max_, _alembic_quote
7942 )
7943
7944 def truncate_and_render_constraint_name(
7945 self, name: str, _alembic_quote: bool = True
7946 ) -> str:
7947 # calculate these at format time so that ad-hoc changes
7948 # to dialect.max_identifier_length etc. can be reflected
7949 # as IdentifierPreparer is long lived
7950 max_ = (
7951 self.dialect.max_constraint_name_length
7952 or self.dialect.max_identifier_length
7953 )
7954 return self._truncate_and_render_maxlen_name(
7955 name, max_, _alembic_quote
7956 )
7957
7958 def _truncate_and_render_maxlen_name(
7959 self, name: str, max_: int, _alembic_quote: bool
7960 ) -> str:
7961 if isinstance(name, elements._truncated_label):
7962 if len(name) > max_:
7963 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7964 else:
7965 self.dialect.validate_identifier(name)
7966
7967 if not _alembic_quote:
7968 return name
7969 else:
7970 return self.quote(name)
7971
7972 def format_index(self, index: Index) -> str:
7973 name = self.format_constraint(index)
7974 assert name is not None
7975 return name
7976
7977 def format_table(
7978 self,
7979 table: FromClause,
7980 use_schema: bool = True,
7981 name: Optional[str] = None,
7982 ) -> str:
7983 """Prepare a quoted table and schema name."""
7984 if name is None:
7985 if TYPE_CHECKING:
7986 assert isinstance(table, NamedFromClause)
7987 name = table.name
7988
7989 result = self.quote(name)
7990
7991 effective_schema = self.schema_for_object(table)
7992
7993 if not self.omit_schema and use_schema and effective_schema:
7994 result = self.quote_schema(effective_schema) + "." + result
7995 return result
7996
7997 def format_schema(self, name):
7998 """Prepare a quoted schema name."""
7999
8000 return self.quote(name)
8001
8002 def format_label_name(
8003 self,
8004 name,
8005 anon_map=None,
8006 ):
8007 """Prepare a quoted column name."""
8008
8009 if anon_map is not None and isinstance(
8010 name, elements._truncated_label
8011 ):
8012 name = name.apply_map(anon_map)
8013
8014 return self.quote(name)
8015
8016 def format_column(
8017 self,
8018 column: ColumnElement[Any],
8019 use_table: bool = False,
8020 name: Optional[str] = None,
8021 table_name: Optional[str] = None,
8022 use_schema: bool = False,
8023 anon_map: Optional[Mapping[str, Any]] = None,
8024 ) -> str:
8025 """Prepare a quoted column name."""
8026
8027 if name is None:
8028 name = column.name
8029 assert name is not None
8030
8031 if anon_map is not None and isinstance(
8032 name, elements._truncated_label
8033 ):
8034 name = name.apply_map(anon_map)
8035
8036 if not getattr(column, "is_literal", False):
8037 if use_table:
8038 return (
8039 self.format_table(
8040 column.table, use_schema=use_schema, name=table_name
8041 )
8042 + "."
8043 + self.quote(name)
8044 )
8045 else:
8046 return self.quote(name)
8047 else:
8048 # literal textual elements get stuck into ColumnClause a lot,
8049 # which shouldn't get quoted
8050
8051 if use_table:
8052 return (
8053 self.format_table(
8054 column.table, use_schema=use_schema, name=table_name
8055 )
8056 + "."
8057 + name
8058 )
8059 else:
8060 return name
8061
8062 def format_table_seq(self, table, use_schema=True):
8063 """Format table name and schema as a tuple."""
8064
8065 # Dialects with more levels in their fully qualified references
8066 # ('database', 'owner', etc.) could override this and return
8067 # a longer sequence.
8068
8069 effective_schema = self.schema_for_object(table)
8070
8071 if not self.omit_schema and use_schema and effective_schema:
8072 return (
8073 self.quote_schema(effective_schema),
8074 self.format_table(table, use_schema=False),
8075 )
8076 else:
8077 return (self.format_table(table, use_schema=False),)
8078
8079 @util.memoized_property
8080 def _r_identifiers(self):
8081 initial, final, escaped_final = (
8082 re.escape(s)
8083 for s in (
8084 self.initial_quote,
8085 self.final_quote,
8086 self._escape_identifier(self.final_quote),
8087 )
8088 )
8089 r = re.compile(
8090 r"(?:"
8091 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8092 r"|([^\.]+))(?=\.|$))+"
8093 % {"initial": initial, "final": final, "escaped": escaped_final}
8094 )
8095 return r
8096
8097 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8098 """Unpack 'schema.table.column'-like strings into components."""
8099
8100 r = self._r_identifiers
8101 return [
8102 self._unescape_identifier(i)
8103 for i in [a or b for a, b in r.findall(identifiers)]
8104 ]