1# sql/compiler.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import Final
44from typing import FrozenSet
45from typing import Iterable
46from typing import Iterator
47from typing import List
48from typing import Literal
49from typing import Mapping
50from typing import MutableMapping
51from typing import NamedTuple
52from typing import NoReturn
53from typing import Optional
54from typing import Pattern
55from typing import Protocol
56from typing import Sequence
57from typing import Set
58from typing import Tuple
59from typing import Type
60from typing import TYPE_CHECKING
61from typing import TypedDict
62from typing import Union
63
64from . import base
65from . import coercions
66from . import crud
67from . import elements
68from . import functions
69from . import operators
70from . import roles
71from . import schema
72from . import selectable
73from . import sqltypes
74from . import util as sql_util
75from ._typing import is_column_element
76from ._typing import is_dml
77from .base import _de_clone
78from .base import _from_objects
79from .base import _NONE_NAME
80from .base import _SentinelDefaultCharacterization
81from .base import NO_ARG
82from .elements import quoted_name
83from .sqltypes import TupleType
84from .visitors import prefix_anon_map
85from .. import exc
86from .. import util
87from ..util import FastIntFlag
88from ..util.typing import Self
89from ..util.typing import TupleAny
90from ..util.typing import Unpack
91
92if typing.TYPE_CHECKING:
93 from .annotation import _AnnotationDict
94 from .base import _AmbiguousTableNameMap
95 from .base import CompileState
96 from .base import Executable
97 from .base import ExecutableStatement
98 from .cache_key import CacheKey
99 from .ddl import _TableViaSelect
100 from .ddl import CreateTableAs
101 from .ddl import CreateView
102 from .ddl import ExecutableDDLElement
103 from .dml import Delete
104 from .dml import Insert
105 from .dml import Update
106 from .dml import UpdateBase
107 from .dml import UpdateDMLState
108 from .dml import ValuesBase
109 from .elements import _truncated_label
110 from .elements import BinaryExpression
111 from .elements import BindParameter
112 from .elements import ClauseElement
113 from .elements import ColumnClause
114 from .elements import ColumnElement
115 from .elements import False_
116 from .elements import Label
117 from .elements import Null
118 from .elements import True_
119 from .functions import Function
120 from .schema import CheckConstraint
121 from .schema import Column
122 from .schema import Constraint
123 from .schema import ForeignKeyConstraint
124 from .schema import Index
125 from .schema import PrimaryKeyConstraint
126 from .schema import Table
127 from .schema import UniqueConstraint
128 from .selectable import _ColumnsClauseElement
129 from .selectable import AliasedReturnsRows
130 from .selectable import CompoundSelectState
131 from .selectable import CTE
132 from .selectable import FromClause
133 from .selectable import NamedFromClause
134 from .selectable import ReturnsRows
135 from .selectable import Select
136 from .selectable import SelectState
137 from .type_api import _BindProcessorType
138 from .type_api import TypeDecorator
139 from .type_api import TypeEngine
140 from .type_api import UserDefinedType
141 from .visitors import Visitable
142 from ..engine.cursor import CursorResultMetaData
143 from ..engine.interfaces import _CoreSingleExecuteParams
144 from ..engine.interfaces import _DBAPIAnyExecuteParams
145 from ..engine.interfaces import _DBAPIMultiExecuteParams
146 from ..engine.interfaces import _DBAPISingleExecuteParams
147 from ..engine.interfaces import _ExecuteOptions
148 from ..engine.interfaces import _GenericSetInputSizesType
149 from ..engine.interfaces import _MutableCoreSingleExecuteParams
150 from ..engine.interfaces import Dialect
151 from ..engine.interfaces import SchemaTranslateMapType
152
153
154_FromHintsType = Dict["FromClause", str]
155
156RESERVED_WORDS = {
157 "all",
158 "analyse",
159 "analyze",
160 "and",
161 "any",
162 "array",
163 "as",
164 "asc",
165 "asymmetric",
166 "authorization",
167 "between",
168 "binary",
169 "both",
170 "case",
171 "cast",
172 "check",
173 "collate",
174 "column",
175 "constraint",
176 "create",
177 "cross",
178 "current_date",
179 "current_role",
180 "current_time",
181 "current_timestamp",
182 "current_user",
183 "default",
184 "deferrable",
185 "desc",
186 "distinct",
187 "do",
188 "else",
189 "end",
190 "except",
191 "false",
192 "for",
193 "foreign",
194 "freeze",
195 "from",
196 "full",
197 "grant",
198 "group",
199 "having",
200 "ilike",
201 "in",
202 "initially",
203 "inner",
204 "intersect",
205 "into",
206 "is",
207 "isnull",
208 "join",
209 "leading",
210 "left",
211 "like",
212 "limit",
213 "localtime",
214 "localtimestamp",
215 "natural",
216 "new",
217 "not",
218 "notnull",
219 "null",
220 "off",
221 "offset",
222 "old",
223 "on",
224 "only",
225 "or",
226 "order",
227 "outer",
228 "overlaps",
229 "placing",
230 "primary",
231 "references",
232 "right",
233 "select",
234 "session_user",
235 "set",
236 "similar",
237 "some",
238 "symmetric",
239 "table",
240 "then",
241 "to",
242 "trailing",
243 "true",
244 "union",
245 "unique",
246 "user",
247 "using",
248 "verbose",
249 "when",
250 "where",
251}
252
253LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
254LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
255ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
256
257FK_ON_DELETE = re.compile(
258 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
259)
260FK_ON_UPDATE = re.compile(
261 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
262)
263FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
264BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
265BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
266
267_pyformat_template = "%%(%(name)s)s"
268BIND_TEMPLATES = {
269 "pyformat": _pyformat_template,
270 "qmark": "?",
271 "format": "%%s",
272 "numeric": ":[_POSITION]",
273 "numeric_dollar": "$[_POSITION]",
274 "named": ":%(name)s",
275}
276
277
278OPERATORS = {
279 # binary
280 operators.and_: " AND ",
281 operators.or_: " OR ",
282 operators.add: " + ",
283 operators.mul: " * ",
284 operators.sub: " - ",
285 operators.mod: " % ",
286 operators.neg: "-",
287 operators.lt: " < ",
288 operators.le: " <= ",
289 operators.ne: " != ",
290 operators.gt: " > ",
291 operators.ge: " >= ",
292 operators.eq: " = ",
293 operators.is_distinct_from: " IS DISTINCT FROM ",
294 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
295 operators.concat_op: " || ",
296 operators.match_op: " MATCH ",
297 operators.not_match_op: " NOT MATCH ",
298 operators.in_op: " IN ",
299 operators.not_in_op: " NOT IN ",
300 operators.comma_op: ", ",
301 operators.from_: " FROM ",
302 operators.as_: " AS ",
303 operators.is_: " IS ",
304 operators.is_not: " IS NOT ",
305 operators.collate: " COLLATE ",
306 # unary
307 operators.exists: "EXISTS ",
308 operators.distinct_op: "DISTINCT ",
309 operators.inv: "NOT ",
310 operators.any_op: "ANY ",
311 operators.all_op: "ALL ",
312 # modifiers
313 operators.desc_op: " DESC",
314 operators.asc_op: " ASC",
315 operators.nulls_first_op: " NULLS FIRST",
316 operators.nulls_last_op: " NULLS LAST",
317 # bitwise
318 operators.bitwise_xor_op: " ^ ",
319 operators.bitwise_or_op: " | ",
320 operators.bitwise_and_op: " & ",
321 operators.bitwise_not_op: "~",
322 operators.bitwise_lshift_op: " << ",
323 operators.bitwise_rshift_op: " >> ",
324}
325
326FUNCTIONS: Dict[Type[Function[Any]], str] = {
327 functions.coalesce: "coalesce",
328 functions.current_date: "CURRENT_DATE",
329 functions.current_time: "CURRENT_TIME",
330 functions.current_timestamp: "CURRENT_TIMESTAMP",
331 functions.current_user: "CURRENT_USER",
332 functions.localtime: "LOCALTIME",
333 functions.localtimestamp: "LOCALTIMESTAMP",
334 functions.random: "random",
335 functions.sysdate: "sysdate",
336 functions.session_user: "SESSION_USER",
337 functions.user: "USER",
338 functions.cube: "CUBE",
339 functions.rollup: "ROLLUP",
340 functions.grouping_sets: "GROUPING SETS",
341}
342
343
344EXTRACT_MAP = {
345 "month": "month",
346 "day": "day",
347 "year": "year",
348 "second": "second",
349 "hour": "hour",
350 "doy": "doy",
351 "minute": "minute",
352 "quarter": "quarter",
353 "dow": "dow",
354 "week": "week",
355 "epoch": "epoch",
356 "milliseconds": "milliseconds",
357 "microseconds": "microseconds",
358 "timezone_hour": "timezone_hour",
359 "timezone_minute": "timezone_minute",
360}
361
362COMPOUND_KEYWORDS = {
363 selectable._CompoundSelectKeyword.UNION: "UNION",
364 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
365 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
366 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
367 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
368 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
369}
370
371
372class ResultColumnsEntry(NamedTuple):
373 """Tracks a column expression that is expected to be represented
374 in the result rows for this statement.
375
376 This normally refers to the columns clause of a SELECT statement
377 but may also refer to a RETURNING clause, as well as for dialect-specific
378 emulations.
379
380 """
381
382 keyname: str
383 """string name that's expected in cursor.description"""
384
385 name: str
386 """column name, may be labeled"""
387
388 objects: Tuple[Any, ...]
389 """sequence of objects that should be able to locate this column
390 in a RowMapping. This is typically string names and aliases
391 as well as Column objects.
392
393 """
394
395 type: TypeEngine[Any]
396 """Datatype to be associated with this column. This is where
397 the "result processing" logic directly links the compiled statement
398 to the rows that come back from the cursor.
399
400 """
401
402
403class _ResultMapAppender(Protocol):
404 def __call__(
405 self,
406 keyname: str,
407 name: str,
408 objects: Sequence[Any],
409 type_: TypeEngine[Any],
410 ) -> None: ...
411
412
413# integer indexes into ResultColumnsEntry used by cursor.py.
414# some profiling showed integer access faster than named tuple
415RM_RENDERED_NAME: Literal[0] = 0
416RM_NAME: Literal[1] = 1
417RM_OBJECTS: Literal[2] = 2
418RM_TYPE: Literal[3] = 3
419
420
421class _BaseCompilerStackEntry(TypedDict):
422 asfrom_froms: Set[FromClause]
423 correlate_froms: Set[FromClause]
424 selectable: ReturnsRows
425
426
427class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
428 compile_state: CompileState
429 need_result_map_for_nested: bool
430 need_result_map_for_compound: bool
431 select_0: ReturnsRows
432 insert_from_select: Select[Unpack[TupleAny]]
433
434
435class ExpandedState(NamedTuple):
436 """represents state to use when producing "expanded" and
437 "post compile" bound parameters for a statement.
438
439 "expanded" parameters are parameters that are generated at
440 statement execution time to suit a number of parameters passed, the most
441 prominent example being the individual elements inside of an IN expression.
442
443 "post compile" parameters are parameters where the SQL literal value
444 will be rendered into the SQL statement at execution time, rather than
445 being passed as separate parameters to the driver.
446
447 To create an :class:`.ExpandedState` instance, use the
448 :meth:`.SQLCompiler.construct_expanded_state` method on any
449 :class:`.SQLCompiler` instance.
450
451 """
452
453 statement: str
454 """String SQL statement with parameters fully expanded"""
455
456 parameters: _CoreSingleExecuteParams
457 """Parameter dictionary with parameters fully expanded.
458
459 For a statement that uses named parameters, this dictionary will map
460 exactly to the names in the statement. For a statement that uses
461 positional parameters, the :attr:`.ExpandedState.positional_parameters`
462 will yield a tuple with the positional parameter set.
463
464 """
465
466 processors: Mapping[str, _BindProcessorType[Any]]
467 """mapping of bound value processors"""
468
469 positiontup: Optional[Sequence[str]]
470 """Sequence of string names indicating the order of positional
471 parameters"""
472
473 parameter_expansion: Mapping[str, List[str]]
474 """Mapping representing the intermediary link from original parameter
475 name to list of "expanded" parameter names, for those parameters that
476 were expanded."""
477
478 @property
479 def positional_parameters(self) -> Tuple[Any, ...]:
480 """Tuple of positional parameters, for statements that were compiled
481 using a positional paramstyle.
482
483 """
484 if self.positiontup is None:
485 raise exc.InvalidRequestError(
486 "statement does not use a positional paramstyle"
487 )
488 return tuple(self.parameters[key] for key in self.positiontup)
489
490 @property
491 def additional_parameters(self) -> _CoreSingleExecuteParams:
492 """synonym for :attr:`.ExpandedState.parameters`."""
493 return self.parameters
494
495
496class _InsertManyValues(NamedTuple):
497 """represents state to use for executing an "insertmanyvalues" statement.
498
499 The primary consumers of this object are the
500 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
501 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
502
503 .. versionadded:: 2.0
504
505 """
506
507 is_default_expr: bool
508 """if True, the statement is of the form
509 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
510
511 """
512
513 single_values_expr: str
514 """The rendered "values" clause of the INSERT statement.
515
516 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
517 The insertmanyvalues logic uses this string as a search and replace
518 target.
519
520 """
521
522 insert_crud_params: List[crud._CrudParamElementStr]
523 """List of Column / bind names etc. used while rewriting the statement"""
524
525 num_positional_params_counted: int
526 """the number of bound parameters in a single-row statement.
527
528 This count may be larger or smaller than the actual number of columns
529 targeted in the INSERT, as it accommodates for SQL expressions
530 in the values list that may have zero or more parameters embedded
531 within them.
532
533 This count is part of what's used to organize rewritten parameter lists
534 when batching.
535
536 """
537
538 sort_by_parameter_order: bool = False
539 """if the deterministic_returnined_order parameter were used on the
540 insert.
541
542 All of the attributes following this will only be used if this is True.
543
544 """
545
546 includes_upsert_behaviors: bool = False
547 """if True, we have to accommodate for upsert behaviors.
548
549 This will in some cases downgrade "insertmanyvalues" that requests
550 deterministic ordering.
551
552 """
553
554 sentinel_columns: Optional[Sequence[Column[Any]]] = None
555 """List of sentinel columns that were located.
556
557 This list is only here if the INSERT asked for
558 sort_by_parameter_order=True,
559 and dialect-appropriate sentinel columns were located.
560
561 .. versionadded:: 2.0.10
562
563 """
564
565 num_sentinel_columns: int = 0
566 """how many sentinel columns are in the above list, if any.
567
568 This is the same as
569 ``len(sentinel_columns) if sentinel_columns is not None else 0``
570
571 """
572
573 sentinel_param_keys: Optional[Sequence[str]] = None
574 """parameter str keys in each param dictionary / tuple
575 that would link to the client side "sentinel" values for that row, which
576 we can use to match up parameter sets to result rows.
577
578 This is only present if sentinel_columns is present and the INSERT
579 statement actually refers to client side values for these sentinel
580 columns.
581
582 .. versionadded:: 2.0.10
583
584 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
585 only, used against the "compiled parameteters" collection before
586 the parameters were converted by bound parameter processors
587
588 """
589
590 implicit_sentinel: bool = False
591 """if True, we have exactly one sentinel column and it uses a server side
592 value, currently has to generate an incrementing integer value.
593
594 The dialect in question would have asserted that it supports receiving
595 these values back and sorting on that value as a means of guaranteeing
596 correlation with the incoming parameter list.
597
598 .. versionadded:: 2.0.10
599
600 """
601
602 embed_values_counter: bool = False
603 """Whether to embed an incrementing integer counter in each parameter
604 set within the VALUES clause as parameters are batched over.
605
606 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
607 where a subquery is used to produce value tuples. Current support
608 includes PostgreSQL, Microsoft SQL Server.
609
610 .. versionadded:: 2.0.10
611
612 """
613
614
615class _InsertManyValuesBatch(NamedTuple):
616 """represents an individual batch SQL statement for insertmanyvalues.
617
618 This is passed through the
619 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
620 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
621 to the :class:`.Connection` within the
622 :meth:`.Connection._exec_insertmany_context` method.
623
624 .. versionadded:: 2.0.10
625
626 """
627
628 replaced_statement: str
629 replaced_parameters: _DBAPIAnyExecuteParams
630 processed_setinputsizes: Optional[_GenericSetInputSizesType]
631 batch: Sequence[_DBAPISingleExecuteParams]
632 sentinel_values: Sequence[Tuple[Any, ...]]
633 current_batch_size: int
634 batchnum: int
635 total_batches: int
636 rows_sorted: bool
637 is_downgraded: bool
638
639
640class InsertmanyvaluesSentinelOpts(FastIntFlag):
641 """bitflag enum indicating styles of PK defaults
642 which can work as implicit sentinel columns
643
644 """
645
646 NOT_SUPPORTED = 1
647 AUTOINCREMENT = 2
648 IDENTITY = 4
649 SEQUENCE = 8
650 MONOTONIC_FUNCTION = 16
651
652 ANY_AUTOINCREMENT = (
653 AUTOINCREMENT | IDENTITY | SEQUENCE | MONOTONIC_FUNCTION
654 )
655 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
656
657 USE_INSERT_FROM_SELECT = 32
658 RENDER_SELECT_COL_CASTS = 64
659
660
661class AggregateOrderByStyle(IntEnum):
662 """Describes backend database's capabilities with ORDER BY for aggregate
663 functions
664
665 .. versionadded:: 2.1
666
667 """
668
669 NONE = 0
670 """database has no ORDER BY for aggregate functions"""
671
672 INLINE = 1
673 """ORDER BY is rendered inside the function's argument list, typically as
674 the last element"""
675
676 WITHIN_GROUP = 2
677 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
678 functions (not just the ordered set ones)"""
679
680
681class CompilerState(IntEnum):
682 COMPILING = 0
683 """statement is present, compilation phase in progress"""
684
685 STRING_APPLIED = 1
686 """statement is present, string form of the statement has been applied.
687
688 Additional processors by subclasses may still be pending.
689
690 """
691
692 NO_STATEMENT = 2
693 """compiler does not have a statement to compile, is used
694 for method access"""
695
696
697class Linting(IntEnum):
698 """represent preferences for the 'SQL linting' feature.
699
700 this feature currently includes support for flagging cartesian products
701 in SQL statements.
702
703 """
704
705 NO_LINTING = 0
706 "Disable all linting."
707
708 COLLECT_CARTESIAN_PRODUCTS = 1
709 """Collect data on FROMs and cartesian products and gather into
710 'self.from_linter'"""
711
712 WARN_LINTING = 2
713 "Emit warnings for linters that find problems"
714
715 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
716 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
717 and WARN_LINTING"""
718
719
720NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
721 Linting
722)
723
724
725class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
726 """represents current state for the "cartesian product" detection
727 feature."""
728
729 def lint(self, start=None):
730 froms = self.froms
731 if not froms:
732 return None, None
733
734 edges = set(self.edges)
735 the_rest = set(froms)
736
737 if start is not None:
738 start_with = start
739 the_rest.remove(start_with)
740 else:
741 start_with = the_rest.pop()
742
743 stack = collections.deque([start_with])
744
745 while stack and the_rest:
746 node = stack.popleft()
747 the_rest.discard(node)
748
749 # comparison of nodes in edges here is based on hash equality, as
750 # there are "annotated" elements that match the non-annotated ones.
751 # to remove the need for in-python hash() calls, use native
752 # containment routines (e.g. "node in edge", "edge.index(node)")
753 to_remove = {edge for edge in edges if node in edge}
754
755 # appendleft the node in each edge that is not
756 # the one that matched.
757 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
758 edges.difference_update(to_remove)
759
760 # FROMS left over? boom
761 if the_rest:
762 return the_rest, start_with
763 else:
764 return None, None
765
766 def warn(self, stmt_type="SELECT"):
767 the_rest, start_with = self.lint()
768
769 # FROMS left over? boom
770 if the_rest:
771 froms = the_rest
772 if froms:
773 template = (
774 "{stmt_type} statement has a cartesian product between "
775 "FROM element(s) {froms} and "
776 'FROM element "{start}". Apply join condition(s) '
777 "between each element to resolve."
778 )
779 froms_str = ", ".join(
780 f'"{self.froms[from_]}"' for from_ in froms
781 )
782 message = template.format(
783 stmt_type=stmt_type,
784 froms=froms_str,
785 start=self.froms[start_with],
786 )
787
788 util.warn(message)
789
790
791class Compiled:
792 """Represent a compiled SQL or DDL expression.
793
794 The ``__str__`` method of the ``Compiled`` object should produce
795 the actual text of the statement. ``Compiled`` objects are
796 specific to their underlying database dialect, and also may
797 or may not be specific to the columns referenced within a
798 particular set of bind parameters. In no case should the
799 ``Compiled`` object be dependent on the actual values of those
800 bind parameters, even though it may reference those values as
801 defaults.
802 """
803
804 statement: Optional[ClauseElement] = None
805 "The statement to compile."
806 string: str = ""
807 "The string representation of the ``statement``"
808
809 state: CompilerState
810 """description of the compiler's state"""
811
812 is_sql = False
813 is_ddl = False
814
815 _cached_metadata: Optional[CursorResultMetaData] = None
816
817 _result_columns: Optional[List[ResultColumnsEntry]] = None
818
819 schema_translate_map: Optional[SchemaTranslateMapType] = None
820
821 execution_options: _ExecuteOptions = util.EMPTY_DICT
822 """
823 Execution options propagated from the statement. In some cases,
824 sub-elements of the statement can modify these.
825 """
826
827 preparer: IdentifierPreparer
828
829 _annotations: _AnnotationDict = util.EMPTY_DICT
830
831 compile_state: Optional[CompileState] = None
832 """Optional :class:`.CompileState` object that maintains additional
833 state used by the compiler.
834
835 Major executable objects such as :class:`_expression.Insert`,
836 :class:`_expression.Update`, :class:`_expression.Delete`,
837 :class:`_expression.Select` will generate this
838 state when compiled in order to calculate additional information about the
839 object. For the top level object that is to be executed, the state can be
840 stored here where it can also have applicability towards result set
841 processing.
842
843 .. versionadded:: 1.4
844
845 """
846
847 dml_compile_state: Optional[CompileState] = None
848 """Optional :class:`.CompileState` assigned at the same point that
849 .isinsert, .isupdate, or .isdelete is assigned.
850
851 This will normally be the same object as .compile_state, with the
852 exception of cases like the :class:`.ORMFromStatementCompileState`
853 object.
854
855 .. versionadded:: 1.4.40
856
857 """
858
859 cache_key: Optional[CacheKey] = None
860 """The :class:`.CacheKey` that was generated ahead of creating this
861 :class:`.Compiled` object.
862
863 This is used for routines that need access to the original
864 :class:`.CacheKey` instance generated when the :class:`.Compiled`
865 instance was first cached, typically in order to reconcile
866 the original list of :class:`.BindParameter` objects with a
867 per-statement list that's generated on each call.
868
869 """
870
871 _gen_time: float
872 """Generation time of this :class:`.Compiled`, used for reporting
873 cache stats."""
874
875 def __init__(
876 self,
877 dialect: Dialect,
878 statement: Optional[ClauseElement],
879 schema_translate_map: Optional[SchemaTranslateMapType] = None,
880 render_schema_translate: bool = False,
881 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
882 ):
883 """Construct a new :class:`.Compiled` object.
884
885 :param dialect: :class:`.Dialect` to compile against.
886
887 :param statement: :class:`_expression.ClauseElement` to be compiled.
888
889 :param schema_translate_map: dictionary of schema names to be
890 translated when forming the resultant SQL
891
892 .. seealso::
893
894 :ref:`schema_translating`
895
896 :param compile_kwargs: additional kwargs that will be
897 passed to the initial call to :meth:`.Compiled.process`.
898
899
900 """
901 self.dialect = dialect
902 self.preparer = self.dialect.identifier_preparer
903 if schema_translate_map:
904 self.schema_translate_map = schema_translate_map
905 self.preparer = self.preparer._with_schema_translate(
906 schema_translate_map
907 )
908
909 if statement is not None:
910 self.state = CompilerState.COMPILING
911 self.statement = statement
912 self.can_execute = statement.supports_execution
913 self._annotations = statement._annotations
914 if self.can_execute:
915 if TYPE_CHECKING:
916 assert isinstance(statement, Executable)
917 self.execution_options = statement._execution_options
918 self.string = self.process(self.statement, **compile_kwargs)
919
920 if render_schema_translate:
921 assert schema_translate_map is not None
922 self.string = self.preparer._render_schema_translates(
923 self.string, schema_translate_map
924 )
925
926 self.state = CompilerState.STRING_APPLIED
927 else:
928 self.state = CompilerState.NO_STATEMENT
929
930 self._gen_time = perf_counter()
931
932 def __init_subclass__(cls) -> None:
933 cls._init_compiler_cls()
934 return super().__init_subclass__()
935
936 @classmethod
937 def _init_compiler_cls(cls):
938 pass
939
940 def visit_unsupported_compilation(self, element, err, **kw):
941 raise exc.UnsupportedCompilationError(self, type(element)) from err
942
943 @property
944 def sql_compiler(self) -> SQLCompiler:
945 """Return a Compiled that is capable of processing SQL expressions.
946
947 If this compiler is one, it would likely just return 'self'.
948
949 """
950
951 raise NotImplementedError()
952
953 def process(self, obj: Visitable, **kwargs: Any) -> str:
954 return obj._compiler_dispatch(self, **kwargs)
955
956 def __str__(self) -> str:
957 """Return the string text of the generated SQL or DDL."""
958
959 if self.state is CompilerState.STRING_APPLIED:
960 return self.string
961 else:
962 return ""
963
964 def construct_params(
965 self,
966 params: Optional[_CoreSingleExecuteParams] = None,
967 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
968 escape_names: bool = True,
969 ) -> Optional[_MutableCoreSingleExecuteParams]:
970 """Return the bind params for this compiled object.
971
972 :param params: a dict of string/object pairs whose values will
973 override bind values compiled in to the
974 statement.
975 """
976
977 raise NotImplementedError()
978
979 @property
980 def params(self):
981 """Return the bind params for this compiled object."""
982 return self.construct_params()
983
984
985class TypeCompiler(util.EnsureKWArg):
986 """Produces DDL specification for TypeEngine objects."""
987
988 ensure_kwarg = r"visit_\w+"
989
990 def __init__(self, dialect: Dialect):
991 self.dialect = dialect
992
993 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
994 if (
995 type_._variant_mapping
996 and self.dialect.name in type_._variant_mapping
997 ):
998 type_ = type_._variant_mapping[self.dialect.name]
999 return type_._compiler_dispatch(self, **kw)
1000
1001 def visit_unsupported_compilation(
1002 self, element: Any, err: Exception, **kw: Any
1003 ) -> NoReturn:
1004 raise exc.UnsupportedCompilationError(self, element) from err
1005
1006
1007# this was a Visitable, but to allow accurate detection of
1008# column elements this is actually a column element
1009class _CompileLabel(
1010 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1011):
1012 """lightweight label object which acts as an expression.Label."""
1013
1014 __visit_name__ = "label"
1015 __slots__ = "element", "name", "_alt_names"
1016
1017 def __init__(self, col, name, alt_names=()):
1018 self.element = col
1019 self.name = name
1020 self._alt_names = (col,) + alt_names
1021
1022 @property
1023 def proxy_set(self):
1024 return self.element.proxy_set
1025
1026 @property
1027 def type(self):
1028 return self.element.type
1029
1030 def self_group(self, **kw):
1031 return self
1032
1033
1034class aggregate_orderby_inline(
1035 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1036):
1037 """produce ORDER BY inside of function argument lists"""
1038
1039 __visit_name__ = "aggregate_orderby_inline"
1040 __slots__ = "element", "aggregate_order_by"
1041
1042 def __init__(self, element, orderby):
1043 self.element = element
1044 self.aggregate_order_by = orderby
1045
1046 def __iter__(self):
1047 return iter(self.element)
1048
1049 @property
1050 def proxy_set(self):
1051 return self.element.proxy_set
1052
1053 @property
1054 def type(self):
1055 return self.element.type
1056
1057 def self_group(self, **kw):
1058 return self
1059
1060 def _with_binary_element_type(self, type_):
1061 return aggregate_orderby_inline(
1062 self.element._with_binary_element_type(type_),
1063 self.aggregate_order_by,
1064 )
1065
1066
1067class ilike_case_insensitive(
1068 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1069):
1070 """produce a wrapping element for a case-insensitive portion of
1071 an ILIKE construct.
1072
1073 The construct usually renders the ``lower()`` function, but on
1074 PostgreSQL will pass silently with the assumption that "ILIKE"
1075 is being used.
1076
1077 .. versionadded:: 2.0
1078
1079 """
1080
1081 __visit_name__ = "ilike_case_insensitive_operand"
1082 __slots__ = "element", "comparator"
1083
1084 def __init__(self, element):
1085 self.element = element
1086 self.comparator = element.comparator
1087
1088 @property
1089 def proxy_set(self):
1090 return self.element.proxy_set
1091
1092 @property
1093 def type(self):
1094 return self.element.type
1095
1096 def self_group(self, **kw):
1097 return self
1098
1099 def _with_binary_element_type(self, type_):
1100 return ilike_case_insensitive(
1101 self.element._with_binary_element_type(type_)
1102 )
1103
1104
1105class SQLCompiler(Compiled):
1106 """Default implementation of :class:`.Compiled`.
1107
1108 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1109
1110 """
1111
1112 extract_map = EXTRACT_MAP
1113
1114 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1115 util.immutabledict(
1116 {
1117 "%": "P",
1118 "(": "A",
1119 ")": "Z",
1120 ":": "C",
1121 ".": "_",
1122 "[": "_",
1123 "]": "_",
1124 " ": "_",
1125 }
1126 )
1127 )
1128 """A mapping (e.g. dict or similar) containing a lookup of
1129 characters keyed to replacement characters which will be applied to all
1130 'bind names' used in SQL statements as a form of 'escaping'; the given
1131 characters are replaced entirely with the 'replacement' character when
1132 rendered in the SQL statement, and a similar translation is performed
1133 on the incoming names used in parameter dictionaries passed to methods
1134 like :meth:`_engine.Connection.execute`.
1135
1136 This allows bound parameter names used in :func:`_sql.bindparam` and
1137 other constructs to have any arbitrary characters present without any
1138 concern for characters that aren't allowed at all on the target database.
1139
1140 Third party dialects can establish their own dictionary here to replace the
1141 default mapping, which will ensure that the particular characters in the
1142 mapping will never appear in a bound parameter name.
1143
1144 The dictionary is evaluated at **class creation time**, so cannot be
1145 modified at runtime; it must be present on the class when the class
1146 is first declared.
1147
1148 Note that for dialects that have additional bound parameter rules such
1149 as additional restrictions on leading characters, the
1150 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1151 See the cx_Oracle compiler for an example of this.
1152
1153 .. versionadded:: 2.0.0rc1
1154
1155 """
1156
1157 _bind_translate_re: ClassVar[Pattern[str]]
1158 _bind_translate_chars: ClassVar[Mapping[str, str]]
1159
1160 is_sql = True
1161
1162 compound_keywords = COMPOUND_KEYWORDS
1163
1164 isdelete: bool = False
1165 isinsert: bool = False
1166 isupdate: bool = False
1167 """class-level defaults which can be set at the instance
1168 level to define if this Compiled instance represents
1169 INSERT/UPDATE/DELETE
1170 """
1171
1172 postfetch: Optional[List[Column[Any]]]
1173 """list of columns that can be post-fetched after INSERT or UPDATE to
1174 receive server-updated values"""
1175
1176 insert_prefetch: Sequence[Column[Any]] = ()
1177 """list of columns for which default values should be evaluated before
1178 an INSERT takes place"""
1179
1180 update_prefetch: Sequence[Column[Any]] = ()
1181 """list of columns for which onupdate default values should be evaluated
1182 before an UPDATE takes place"""
1183
1184 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1185 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1186 statement, used to receive newly generated values of columns.
1187
1188 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1189 ``returning`` collection, which was not a generalized RETURNING
1190 collection and instead was in fact specific to the "implicit returning"
1191 feature.
1192
1193 """
1194
1195 isplaintext: bool = False
1196
1197 binds: Dict[str, BindParameter[Any]]
1198 """a dictionary of bind parameter keys to BindParameter instances."""
1199
1200 bind_names: Dict[BindParameter[Any], str]
1201 """a dictionary of BindParameter instances to "compiled" names
1202 that are actually present in the generated SQL"""
1203
1204 stack: List[_CompilerStackEntry]
1205 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1206 tracked in this stack using an entry format."""
1207
1208 returning_precedes_values: bool = False
1209 """set to True classwide to generate RETURNING
1210 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1211 """
1212
1213 render_table_with_column_in_update_from: bool = False
1214 """set to True classwide to indicate the SET clause
1215 in a multi-table UPDATE statement should qualify
1216 columns with the table name (i.e. MySQL only)
1217 """
1218
1219 ansi_bind_rules: bool = False
1220 """SQL 92 doesn't allow bind parameters to be used
1221 in the columns clause of a SELECT, nor does it allow
1222 ambiguous expressions like "? = ?". A compiler
1223 subclass can set this flag to False if the target
1224 driver/DB enforces this
1225 """
1226
1227 bindtemplate: str
1228 """template to render bound parameters based on paramstyle."""
1229
1230 compilation_bindtemplate: str
1231 """template used by compiler to render parameters before positional
1232 paramstyle application"""
1233
1234 _numeric_binds_identifier_char: str
1235 """Character that's used to as the identifier of a numerical bind param.
1236 For example if this char is set to ``$``, numerical binds will be rendered
1237 in the form ``$1, $2, $3``.
1238 """
1239
1240 _result_columns: List[ResultColumnsEntry]
1241 """relates label names in the final SQL to a tuple of local
1242 column/label name, ColumnElement object (if any) and
1243 TypeEngine. CursorResult uses this for type processing and
1244 column targeting"""
1245
1246 _textual_ordered_columns: bool = False
1247 """tell the result object that the column names as rendered are important,
1248 but they are also "ordered" vs. what is in the compiled object here.
1249
1250 As of 1.4.42 this condition is only present when the statement is a
1251 TextualSelect, e.g. text("....").columns(...), where it is required
1252 that the columns are considered positionally and not by name.
1253
1254 """
1255
1256 _ad_hoc_textual: bool = False
1257 """tell the result that we encountered text() or '*' constructs in the
1258 middle of the result columns, but we also have compiled columns, so
1259 if the number of columns in cursor.description does not match how many
1260 expressions we have, that means we can't rely on positional at all and
1261 should match on name.
1262
1263 """
1264
1265 _ordered_columns: bool = True
1266 """
1267 if False, means we can't be sure the list of entries
1268 in _result_columns is actually the rendered order. Usually
1269 True unless using an unordered TextualSelect.
1270 """
1271
1272 _loose_column_name_matching: bool = False
1273 """tell the result object that the SQL statement is textual, wants to match
1274 up to Column objects, and may be using the ._tq_label in the SELECT rather
1275 than the base name.
1276
1277 """
1278
1279 _numeric_binds: bool = False
1280 """
1281 True if paramstyle is "numeric". This paramstyle is trickier than
1282 all the others.
1283
1284 """
1285
1286 _render_postcompile: bool = False
1287 """
1288 whether to render out POSTCOMPILE params during the compile phase.
1289
1290 This attribute is used only for end-user invocation of stmt.compile();
1291 it's never used for actual statement execution, where instead the
1292 dialect internals access and render the internal postcompile structure
1293 directly.
1294
1295 """
1296
1297 _post_compile_expanded_state: Optional[ExpandedState] = None
1298 """When render_postcompile is used, the ``ExpandedState`` used to create
1299 the "expanded" SQL is assigned here, and then used by the ``.params``
1300 accessor and ``.construct_params()`` methods for their return values.
1301
1302 .. versionadded:: 2.0.0rc1
1303
1304 """
1305
1306 _pre_expanded_string: Optional[str] = None
1307 """Stores the original string SQL before 'post_compile' is applied,
1308 for cases where 'post_compile' were used.
1309
1310 """
1311
1312 _pre_expanded_positiontup: Optional[List[str]] = None
1313
1314 _insertmanyvalues: Optional[_InsertManyValues] = None
1315
1316 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1317
1318 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1319 """bindparameter objects that are rendered as literal values at statement
1320 execution time.
1321
1322 """
1323
1324 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1325 """bindparameter objects that are rendered as bound parameter placeholders
1326 at statement execution time.
1327
1328 """
1329
1330 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1331 """Late escaping of bound parameter names that has to be converted
1332 to the original name when looking in the parameter dictionary.
1333
1334 """
1335
1336 has_out_parameters = False
1337 """if True, there are bindparam() objects that have the isoutparam
1338 flag set."""
1339
1340 postfetch_lastrowid = False
1341 """if True, and this in insert, use cursor.lastrowid to populate
1342 result.inserted_primary_key. """
1343
1344 _cache_key_bind_match: Optional[
1345 Tuple[
1346 Dict[
1347 BindParameter[Any],
1348 List[BindParameter[Any]],
1349 ],
1350 Dict[
1351 str,
1352 BindParameter[Any],
1353 ],
1354 ]
1355 ] = None
1356 """a mapping that will relate the BindParameter object we compile
1357 to those that are part of the extracted collection of parameters
1358 in the cache key, if we were given a cache key.
1359
1360 """
1361
1362 positiontup: Optional[List[str]] = None
1363 """for a compiled construct that uses a positional paramstyle, will be
1364 a sequence of strings, indicating the names of bound parameters in order.
1365
1366 This is used in order to render bound parameters in their correct order,
1367 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1368 render parameters.
1369
1370 This sequence always contains the unescaped name of the parameters.
1371
1372 .. seealso::
1373
1374 :ref:`faq_sql_expression_string` - includes a usage example for
1375 debugging use cases.
1376
1377 """
1378 _values_bindparam: Optional[List[str]] = None
1379
1380 _visited_bindparam: Optional[List[str]] = None
1381
1382 inline: bool = False
1383
1384 ctes: Optional[MutableMapping[CTE, str]]
1385
1386 # Detect same CTE references - Dict[(level, name), cte]
1387 # Level is required for supporting nesting
1388 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1389
1390 # To retrieve key/level in ctes_by_level_name -
1391 # Dict[cte_reference, (level, cte_name, cte_opts)]
1392 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1393
1394 ctes_recursive: bool
1395
1396 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1397 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1398 _positional_pattern = re.compile(
1399 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1400 )
1401 _collect_params: Final[bool]
1402 _collected_params: util.immutabledict[str, Any]
1403
1404 @classmethod
1405 def _init_compiler_cls(cls):
1406 cls._init_bind_translate()
1407
1408 @classmethod
1409 def _init_bind_translate(cls):
1410 reg = re.escape("".join(cls.bindname_escape_characters))
1411 cls._bind_translate_re = re.compile(f"[{reg}]")
1412 cls._bind_translate_chars = cls.bindname_escape_characters
1413
1414 def __init__(
1415 self,
1416 dialect: Dialect,
1417 statement: Optional[ClauseElement],
1418 cache_key: Optional[CacheKey] = None,
1419 column_keys: Optional[Sequence[str]] = None,
1420 for_executemany: bool = False,
1421 linting: Linting = NO_LINTING,
1422 _supporting_against: Optional[SQLCompiler] = None,
1423 **kwargs: Any,
1424 ):
1425 """Construct a new :class:`.SQLCompiler` object.
1426
1427 :param dialect: :class:`.Dialect` to be used
1428
1429 :param statement: :class:`_expression.ClauseElement` to be compiled
1430
1431 :param column_keys: a list of column names to be compiled into an
1432 INSERT or UPDATE statement.
1433
1434 :param for_executemany: whether INSERT / UPDATE statements should
1435 expect that they are to be invoked in an "executemany" style,
1436 which may impact how the statement will be expected to return the
1437 values of defaults and autoincrement / sequences and similar.
1438 Depending on the backend and driver in use, support for retrieving
1439 these values may be disabled which means SQL expressions may
1440 be rendered inline, RETURNING may not be rendered, etc.
1441
1442 :param kwargs: additional keyword arguments to be consumed by the
1443 superclass.
1444
1445 """
1446 self.column_keys = column_keys
1447
1448 self.cache_key = cache_key
1449
1450 if cache_key:
1451 cksm = {b.key: b for b in cache_key[1]}
1452 ckbm = {b: [b] for b in cache_key[1]}
1453 self._cache_key_bind_match = (ckbm, cksm)
1454
1455 # compile INSERT/UPDATE defaults/sequences to expect executemany
1456 # style execution, which may mean no pre-execute of defaults,
1457 # or no RETURNING
1458 self.for_executemany = for_executemany
1459
1460 self.linting = linting
1461
1462 # a dictionary of bind parameter keys to BindParameter
1463 # instances.
1464 self.binds = {}
1465
1466 # a dictionary of BindParameter instances to "compiled" names
1467 # that are actually present in the generated SQL
1468 self.bind_names = util.column_dict()
1469
1470 # stack which keeps track of nested SELECT statements
1471 self.stack = []
1472
1473 self._result_columns = []
1474
1475 # true if the paramstyle is positional
1476 self.positional = dialect.positional
1477 if self.positional:
1478 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1479 if nb:
1480 self._numeric_binds_identifier_char = (
1481 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1482 )
1483
1484 self.compilation_bindtemplate = _pyformat_template
1485 else:
1486 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1487
1488 self.ctes = None
1489
1490 self.label_length = (
1491 dialect.label_length or dialect.max_identifier_length
1492 )
1493
1494 # a map which tracks "anonymous" identifiers that are created on
1495 # the fly here
1496 self.anon_map = prefix_anon_map()
1497
1498 # a map which tracks "truncated" names based on
1499 # dialect.label_length or dialect.max_identifier_length
1500 self.truncated_names: Dict[Tuple[str, str], str] = {}
1501 self._truncated_counters: Dict[str, int] = {}
1502 if not cache_key:
1503 self._collect_params = True
1504 self._collected_params = util.EMPTY_DICT
1505 else:
1506 self._collect_params = False # type: ignore[misc]
1507
1508 Compiled.__init__(self, dialect, statement, **kwargs)
1509
1510 if self.isinsert or self.isupdate or self.isdelete:
1511 if TYPE_CHECKING:
1512 assert isinstance(statement, UpdateBase)
1513
1514 if self.isinsert or self.isupdate:
1515 if TYPE_CHECKING:
1516 assert isinstance(statement, ValuesBase)
1517 if statement._inline:
1518 self.inline = True
1519 elif self.for_executemany and (
1520 not self.isinsert
1521 or (
1522 self.dialect.insert_executemany_returning
1523 and statement._return_defaults
1524 )
1525 ):
1526 self.inline = True
1527
1528 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1529
1530 if _supporting_against:
1531 self.__dict__.update(
1532 {
1533 k: v
1534 for k, v in _supporting_against.__dict__.items()
1535 if k
1536 not in {
1537 "state",
1538 "dialect",
1539 "preparer",
1540 "positional",
1541 "_numeric_binds",
1542 "compilation_bindtemplate",
1543 "bindtemplate",
1544 }
1545 }
1546 )
1547
1548 if self.state is CompilerState.STRING_APPLIED:
1549 if self.positional:
1550 if self._numeric_binds:
1551 self._process_numeric()
1552 else:
1553 self._process_positional()
1554
1555 if self._render_postcompile:
1556 parameters = self.construct_params(
1557 escape_names=False,
1558 _no_postcompile=True,
1559 )
1560
1561 self._process_parameters_for_postcompile(
1562 parameters, _populate_self=True
1563 )
1564
1565 @property
1566 def insert_single_values_expr(self) -> Optional[str]:
1567 """When an INSERT is compiled with a single set of parameters inside
1568 a VALUES expression, the string is assigned here, where it can be
1569 used for insert batching schemes to rewrite the VALUES expression.
1570
1571 .. versionchanged:: 2.0 This collection is no longer used by
1572 SQLAlchemy's built-in dialects, in favor of the currently
1573 internal ``_insertmanyvalues`` collection that is used only by
1574 :class:`.SQLCompiler`.
1575
1576 """
1577 if self._insertmanyvalues is None:
1578 return None
1579 else:
1580 return self._insertmanyvalues.single_values_expr
1581
1582 @util.ro_memoized_property
1583 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1584 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1585
1586 This is either the so-called "implicit returning" columns which are
1587 calculated by the compiler on the fly, or those present based on what's
1588 present in ``self.statement._returning`` (expanded into individual
1589 columns using the ``._all_selected_columns`` attribute) i.e. those set
1590 explicitly using the :meth:`.UpdateBase.returning` method.
1591
1592 .. versionadded:: 2.0
1593
1594 """
1595 if self.implicit_returning:
1596 return self.implicit_returning
1597 elif self.statement is not None and is_dml(self.statement):
1598 return [
1599 c
1600 for c in self.statement._all_selected_columns
1601 if is_column_element(c)
1602 ]
1603
1604 else:
1605 return None
1606
1607 @property
1608 def returning(self):
1609 """backwards compatibility; returns the
1610 effective_returning collection.
1611
1612 """
1613 return self.effective_returning
1614
1615 @property
1616 def current_executable(self):
1617 """Return the current 'executable' that is being compiled.
1618
1619 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1620 :class:`_sql.Update`, :class:`_sql.Delete`,
1621 :class:`_sql.CompoundSelect` object that is being compiled.
1622 Specifically it's assigned to the ``self.stack`` list of elements.
1623
1624 When a statement like the above is being compiled, it normally
1625 is also assigned to the ``.statement`` attribute of the
1626 :class:`_sql.Compiler` object. However, all SQL constructs are
1627 ultimately nestable, and this attribute should never be consulted
1628 by a ``visit_`` method, as it is not guaranteed to be assigned
1629 nor guaranteed to correspond to the current statement being compiled.
1630
1631 """
1632 try:
1633 return self.stack[-1]["selectable"]
1634 except IndexError as ie:
1635 raise IndexError("Compiler does not have a stack entry") from ie
1636
1637 @property
1638 def prefetch(self):
1639 return list(self.insert_prefetch) + list(self.update_prefetch)
1640
1641 @util.memoized_property
1642 def _global_attributes(self) -> Dict[Any, Any]:
1643 return {}
1644
1645 def _add_to_params(self, item: ExecutableStatement) -> None:
1646 # assumes that this is called before traversing the statement
1647 # so the call happens outer to inner, meaning that existing params
1648 # take precedence
1649 if item._params:
1650 self._collected_params = item._params | self._collected_params
1651
1652 @util.memoized_instancemethod
1653 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1654 """Initialize collections related to CTEs only if
1655 a CTE is located, to save on the overhead of
1656 these collections otherwise.
1657
1658 """
1659 # collect CTEs to tack on top of a SELECT
1660 # To store the query to print - Dict[cte, text_query]
1661 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1662 self.ctes = ctes
1663
1664 # Detect same CTE references - Dict[(level, name), cte]
1665 # Level is required for supporting nesting
1666 self.ctes_by_level_name = {}
1667
1668 # To retrieve key/level in ctes_by_level_name -
1669 # Dict[cte_reference, (level, cte_name, cte_opts)]
1670 self.level_name_by_cte = {}
1671
1672 self.ctes_recursive = False
1673
1674 return ctes
1675
1676 @contextlib.contextmanager
1677 def _nested_result(self):
1678 """special API to support the use case of 'nested result sets'"""
1679 result_columns, ordered_columns = (
1680 self._result_columns,
1681 self._ordered_columns,
1682 )
1683 self._result_columns, self._ordered_columns = [], False
1684
1685 try:
1686 if self.stack:
1687 entry = self.stack[-1]
1688 entry["need_result_map_for_nested"] = True
1689 else:
1690 entry = None
1691 yield self._result_columns, self._ordered_columns
1692 finally:
1693 if entry:
1694 entry.pop("need_result_map_for_nested")
1695 self._result_columns, self._ordered_columns = (
1696 result_columns,
1697 ordered_columns,
1698 )
1699
1700 def _process_positional(self):
1701 assert not self.positiontup
1702 assert self.state is CompilerState.STRING_APPLIED
1703 assert not self._numeric_binds
1704
1705 if self.dialect.paramstyle == "format":
1706 placeholder = "%s"
1707 else:
1708 assert self.dialect.paramstyle == "qmark"
1709 placeholder = "?"
1710
1711 positions = []
1712
1713 def find_position(m: re.Match[str]) -> str:
1714 normal_bind = m.group(1)
1715 if normal_bind:
1716 positions.append(normal_bind)
1717 return placeholder
1718 else:
1719 # this a post-compile bind
1720 positions.append(m.group(2))
1721 return m.group(0)
1722
1723 self.string = re.sub(
1724 self._positional_pattern, find_position, self.string
1725 )
1726
1727 if self.escaped_bind_names:
1728 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1729 assert len(self.escaped_bind_names) == len(reverse_escape)
1730 self.positiontup = [
1731 reverse_escape.get(name, name) for name in positions
1732 ]
1733 else:
1734 self.positiontup = positions
1735
1736 if self._insertmanyvalues:
1737 positions = []
1738
1739 single_values_expr = re.sub(
1740 self._positional_pattern,
1741 find_position,
1742 self._insertmanyvalues.single_values_expr,
1743 )
1744 insert_crud_params = [
1745 (
1746 v[0],
1747 v[1],
1748 re.sub(self._positional_pattern, find_position, v[2]),
1749 v[3],
1750 )
1751 for v in self._insertmanyvalues.insert_crud_params
1752 ]
1753
1754 self._insertmanyvalues = self._insertmanyvalues._replace(
1755 single_values_expr=single_values_expr,
1756 insert_crud_params=insert_crud_params,
1757 )
1758
1759 def _process_numeric(self):
1760 assert self._numeric_binds
1761 assert self.state is CompilerState.STRING_APPLIED
1762
1763 num = 1
1764 param_pos: Dict[str, str] = {}
1765 order: Iterable[str]
1766 if self._insertmanyvalues and self._values_bindparam is not None:
1767 # bindparams that are not in values are always placed first.
1768 # this avoids the need of changing them when using executemany
1769 # values () ()
1770 order = itertools.chain(
1771 (
1772 name
1773 for name in self.bind_names.values()
1774 if name not in self._values_bindparam
1775 ),
1776 self.bind_names.values(),
1777 )
1778 else:
1779 order = self.bind_names.values()
1780
1781 for bind_name in order:
1782 if bind_name in param_pos:
1783 continue
1784 bind = self.binds[bind_name]
1785 if (
1786 bind in self.post_compile_params
1787 or bind in self.literal_execute_params
1788 ):
1789 # set to None to just mark the in positiontup, it will not
1790 # be replaced below.
1791 param_pos[bind_name] = None # type: ignore
1792 else:
1793 ph = f"{self._numeric_binds_identifier_char}{num}"
1794 num += 1
1795 param_pos[bind_name] = ph
1796
1797 self.next_numeric_pos = num
1798
1799 self.positiontup = list(param_pos)
1800 if self.escaped_bind_names:
1801 len_before = len(param_pos)
1802 param_pos = {
1803 self.escaped_bind_names.get(name, name): pos
1804 for name, pos in param_pos.items()
1805 }
1806 assert len(param_pos) == len_before
1807
1808 # Can't use format here since % chars are not escaped.
1809 self.string = self._pyformat_pattern.sub(
1810 lambda m: param_pos[m.group(1)], self.string
1811 )
1812
1813 if self._insertmanyvalues:
1814 single_values_expr = (
1815 # format is ok here since single_values_expr includes only
1816 # place-holders
1817 self._insertmanyvalues.single_values_expr
1818 % param_pos
1819 )
1820 insert_crud_params = [
1821 (v[0], v[1], "%s", v[3])
1822 for v in self._insertmanyvalues.insert_crud_params
1823 ]
1824
1825 self._insertmanyvalues = self._insertmanyvalues._replace(
1826 # This has the numbers (:1, :2)
1827 single_values_expr=single_values_expr,
1828 # The single binds are instead %s so they can be formatted
1829 insert_crud_params=insert_crud_params,
1830 )
1831
1832 @util.memoized_property
1833 def _bind_processors(
1834 self,
1835 ) -> MutableMapping[
1836 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1837 ]:
1838 # mypy is not able to see the two value types as the above Union,
1839 # it just sees "object". don't know how to resolve
1840 return {
1841 key: value # type: ignore
1842 for key, value in (
1843 (
1844 self.bind_names[bindparam],
1845 (
1846 bindparam.type._cached_bind_processor(self.dialect)
1847 if not bindparam.type._is_tuple_type
1848 else tuple(
1849 elem_type._cached_bind_processor(self.dialect)
1850 for elem_type in cast(
1851 TupleType, bindparam.type
1852 ).types
1853 )
1854 ),
1855 )
1856 for bindparam in self.bind_names
1857 )
1858 if value is not None
1859 }
1860
1861 def is_subquery(self):
1862 return len(self.stack) > 1
1863
1864 @property
1865 def sql_compiler(self) -> Self:
1866 return self
1867
1868 def construct_expanded_state(
1869 self,
1870 params: Optional[_CoreSingleExecuteParams] = None,
1871 escape_names: bool = True,
1872 ) -> ExpandedState:
1873 """Return a new :class:`.ExpandedState` for a given parameter set.
1874
1875 For queries that use "expanding" or other late-rendered parameters,
1876 this method will provide for both the finalized SQL string as well
1877 as the parameters that would be used for a particular parameter set.
1878
1879 .. versionadded:: 2.0.0rc1
1880
1881 """
1882 parameters = self.construct_params(
1883 params,
1884 escape_names=escape_names,
1885 _no_postcompile=True,
1886 )
1887 return self._process_parameters_for_postcompile(
1888 parameters,
1889 )
1890
1891 def construct_params(
1892 self,
1893 params: Optional[_CoreSingleExecuteParams] = None,
1894 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1895 escape_names: bool = True,
1896 _group_number: Optional[int] = None,
1897 _check: bool = True,
1898 _no_postcompile: bool = False,
1899 _collected_params: _CoreSingleExecuteParams | None = None,
1900 ) -> _MutableCoreSingleExecuteParams:
1901 """return a dictionary of bind parameter keys and values"""
1902 if _collected_params is not None:
1903 assert not self._collect_params
1904 elif self._collect_params:
1905 _collected_params = self._collected_params
1906
1907 if _collected_params:
1908 if not params:
1909 params = _collected_params
1910 else:
1911 params = {**_collected_params, **params}
1912
1913 if self._render_postcompile and not _no_postcompile:
1914 assert self._post_compile_expanded_state is not None
1915 if not params:
1916 return dict(self._post_compile_expanded_state.parameters)
1917 else:
1918 raise exc.InvalidRequestError(
1919 "can't construct new parameters when render_postcompile "
1920 "is used; the statement is hard-linked to the original "
1921 "parameters. Use construct_expanded_state to generate a "
1922 "new statement and parameters."
1923 )
1924
1925 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1926
1927 if extracted_parameters:
1928 # related the bound parameters collected in the original cache key
1929 # to those collected in the incoming cache key. They will not have
1930 # matching names but they will line up positionally in the same
1931 # way. The parameters present in self.bind_names may be clones of
1932 # these original cache key params in the case of DML but the .key
1933 # will be guaranteed to match.
1934 if self.cache_key is None:
1935 raise exc.CompileError(
1936 "This compiled object has no original cache key; "
1937 "can't pass extracted_parameters to construct_params"
1938 )
1939 else:
1940 orig_extracted = self.cache_key[1]
1941
1942 ckbm_tuple = self._cache_key_bind_match
1943 assert ckbm_tuple is not None
1944 ckbm, _ = ckbm_tuple
1945 resolved_extracted = {
1946 bind: extracted
1947 for b, extracted in zip(orig_extracted, extracted_parameters)
1948 for bind in ckbm[b]
1949 }
1950 else:
1951 resolved_extracted = None
1952
1953 if params:
1954 pd = {}
1955 for bindparam, name in self.bind_names.items():
1956 escaped_name = (
1957 self.escaped_bind_names.get(name, name)
1958 if has_escaped_names
1959 else name
1960 )
1961
1962 if bindparam.key in params:
1963 pd[escaped_name] = params[bindparam.key]
1964 elif name in params:
1965 pd[escaped_name] = params[name]
1966
1967 elif _check and bindparam.required:
1968 if _group_number:
1969 raise exc.InvalidRequestError(
1970 "A value is required for bind parameter %r, "
1971 "in parameter group %d"
1972 % (bindparam.key, _group_number),
1973 code="cd3x",
1974 )
1975 else:
1976 raise exc.InvalidRequestError(
1977 "A value is required for bind parameter %r"
1978 % bindparam.key,
1979 code="cd3x",
1980 )
1981 else:
1982 if resolved_extracted:
1983 value_param = resolved_extracted.get(
1984 bindparam, bindparam
1985 )
1986 else:
1987 value_param = bindparam
1988
1989 if bindparam.callable:
1990 pd[escaped_name] = value_param.effective_value
1991 else:
1992 pd[escaped_name] = value_param.value
1993 return pd
1994 else:
1995 pd = {}
1996 for bindparam, name in self.bind_names.items():
1997 escaped_name = (
1998 self.escaped_bind_names.get(name, name)
1999 if has_escaped_names
2000 else name
2001 )
2002
2003 if _check and bindparam.required:
2004 if _group_number:
2005 raise exc.InvalidRequestError(
2006 "A value is required for bind parameter %r, "
2007 "in parameter group %d"
2008 % (bindparam.key, _group_number),
2009 code="cd3x",
2010 )
2011 else:
2012 raise exc.InvalidRequestError(
2013 "A value is required for bind parameter %r"
2014 % bindparam.key,
2015 code="cd3x",
2016 )
2017
2018 if resolved_extracted:
2019 value_param = resolved_extracted.get(bindparam, bindparam)
2020 else:
2021 value_param = bindparam
2022
2023 if bindparam.callable:
2024 pd[escaped_name] = value_param.effective_value
2025 else:
2026 pd[escaped_name] = value_param.value
2027
2028 return pd
2029
2030 @util.memoized_instancemethod
2031 def _get_set_input_sizes_lookup(self):
2032 dialect = self.dialect
2033
2034 include_types = dialect.include_set_input_sizes
2035 exclude_types = dialect.exclude_set_input_sizes
2036
2037 dbapi = dialect.dbapi
2038
2039 def lookup_type(typ):
2040 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2041
2042 if (
2043 dbtype is not None
2044 and (exclude_types is None or dbtype not in exclude_types)
2045 and (include_types is None or dbtype in include_types)
2046 ):
2047 return dbtype
2048 else:
2049 return None
2050
2051 inputsizes = {}
2052
2053 literal_execute_params = self.literal_execute_params
2054
2055 for bindparam in self.bind_names:
2056 if bindparam in literal_execute_params:
2057 continue
2058
2059 if bindparam.type._is_tuple_type:
2060 inputsizes[bindparam] = [
2061 lookup_type(typ)
2062 for typ in cast(TupleType, bindparam.type).types
2063 ]
2064 else:
2065 inputsizes[bindparam] = lookup_type(bindparam.type)
2066
2067 return inputsizes
2068
2069 @property
2070 def params(self):
2071 """Return the bind param dictionary embedded into this
2072 compiled object, for those values that are present.
2073
2074 .. seealso::
2075
2076 :ref:`faq_sql_expression_string` - includes a usage example for
2077 debugging use cases.
2078
2079 """
2080 return self.construct_params(_check=False)
2081
2082 def _process_parameters_for_postcompile(
2083 self,
2084 parameters: _MutableCoreSingleExecuteParams,
2085 _populate_self: bool = False,
2086 ) -> ExpandedState:
2087 """handle special post compile parameters.
2088
2089 These include:
2090
2091 * "expanding" parameters -typically IN tuples that are rendered
2092 on a per-parameter basis for an otherwise fixed SQL statement string.
2093
2094 * literal_binds compiled with the literal_execute flag. Used for
2095 things like SQL Server "TOP N" where the driver does not accommodate
2096 N as a bound parameter.
2097
2098 """
2099
2100 expanded_parameters = {}
2101 new_positiontup: Optional[List[str]]
2102
2103 pre_expanded_string = self._pre_expanded_string
2104 if pre_expanded_string is None:
2105 pre_expanded_string = self.string
2106
2107 if self.positional:
2108 new_positiontup = []
2109
2110 pre_expanded_positiontup = self._pre_expanded_positiontup
2111 if pre_expanded_positiontup is None:
2112 pre_expanded_positiontup = self.positiontup
2113
2114 else:
2115 new_positiontup = pre_expanded_positiontup = None
2116
2117 processors = self._bind_processors
2118 single_processors = cast(
2119 "Mapping[str, _BindProcessorType[Any]]", processors
2120 )
2121 tuple_processors = cast(
2122 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2123 )
2124
2125 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2126
2127 replacement_expressions: Dict[str, Any] = {}
2128 to_update_sets: Dict[str, Any] = {}
2129
2130 # notes:
2131 # *unescaped* parameter names in:
2132 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2133 #
2134 # *escaped* parameter names in:
2135 # construct_params(), replacement_expressions
2136
2137 numeric_positiontup: Optional[List[str]] = None
2138
2139 if self.positional and pre_expanded_positiontup is not None:
2140 names: Iterable[str] = pre_expanded_positiontup
2141 if self._numeric_binds:
2142 numeric_positiontup = []
2143 else:
2144 names = self.bind_names.values()
2145
2146 ebn = self.escaped_bind_names
2147 for name in names:
2148 escaped_name = ebn.get(name, name) if ebn else name
2149 parameter = self.binds[name]
2150
2151 if parameter in self.literal_execute_params:
2152 if escaped_name not in replacement_expressions:
2153 replacement_expressions[escaped_name] = (
2154 self.render_literal_bindparam(
2155 parameter,
2156 render_literal_value=parameters.pop(escaped_name),
2157 )
2158 )
2159 continue
2160
2161 if parameter in self.post_compile_params:
2162 if escaped_name in replacement_expressions:
2163 to_update = to_update_sets[escaped_name]
2164 values = None
2165 else:
2166 # we are removing the parameter from parameters
2167 # because it is a list value, which is not expected by
2168 # TypeEngine objects that would otherwise be asked to
2169 # process it. the single name is being replaced with
2170 # individual numbered parameters for each value in the
2171 # param.
2172 #
2173 # note we are also inserting *escaped* parameter names
2174 # into the given dictionary. default dialect will
2175 # use these param names directly as they will not be
2176 # in the escaped_bind_names dictionary.
2177 values = parameters.pop(name)
2178
2179 leep_res = self._literal_execute_expanding_parameter(
2180 escaped_name, parameter, values
2181 )
2182 (to_update, replacement_expr) = leep_res
2183
2184 to_update_sets[escaped_name] = to_update
2185 replacement_expressions[escaped_name] = replacement_expr
2186
2187 if not parameter.literal_execute:
2188 parameters.update(to_update)
2189 if parameter.type._is_tuple_type:
2190 assert values is not None
2191 new_processors.update(
2192 (
2193 "%s_%s_%s" % (name, i, j),
2194 tuple_processors[name][j - 1],
2195 )
2196 for i, tuple_element in enumerate(values, 1)
2197 for j, _ in enumerate(tuple_element, 1)
2198 if name in tuple_processors
2199 and tuple_processors[name][j - 1] is not None
2200 )
2201 else:
2202 new_processors.update(
2203 (key, single_processors[name])
2204 for key, _ in to_update
2205 if name in single_processors
2206 )
2207 if numeric_positiontup is not None:
2208 numeric_positiontup.extend(
2209 name for name, _ in to_update
2210 )
2211 elif new_positiontup is not None:
2212 # to_update has escaped names, but that's ok since
2213 # these are new names, that aren't in the
2214 # escaped_bind_names dict.
2215 new_positiontup.extend(name for name, _ in to_update)
2216 expanded_parameters[name] = [
2217 expand_key for expand_key, _ in to_update
2218 ]
2219 elif new_positiontup is not None:
2220 new_positiontup.append(name)
2221
2222 def process_expanding(m):
2223 key = m.group(1)
2224 expr = replacement_expressions[key]
2225
2226 # if POSTCOMPILE included a bind_expression, render that
2227 # around each element
2228 if m.group(2):
2229 tok = m.group(2).split("~~")
2230 be_left, be_right = tok[1], tok[3]
2231 expr = ", ".join(
2232 "%s%s%s" % (be_left, exp, be_right)
2233 for exp in expr.split(", ")
2234 )
2235 return expr
2236
2237 statement = re.sub(
2238 self._post_compile_pattern, process_expanding, pre_expanded_string
2239 )
2240
2241 if numeric_positiontup is not None:
2242 assert new_positiontup is not None
2243 param_pos = {
2244 key: f"{self._numeric_binds_identifier_char}{num}"
2245 for num, key in enumerate(
2246 numeric_positiontup, self.next_numeric_pos
2247 )
2248 }
2249 # Can't use format here since % chars are not escaped.
2250 statement = self._pyformat_pattern.sub(
2251 lambda m: param_pos[m.group(1)], statement
2252 )
2253 new_positiontup.extend(numeric_positiontup)
2254
2255 expanded_state = ExpandedState(
2256 statement,
2257 parameters,
2258 new_processors,
2259 new_positiontup,
2260 expanded_parameters,
2261 )
2262
2263 if _populate_self:
2264 # this is for the "render_postcompile" flag, which is not
2265 # otherwise used internally and is for end-user debugging and
2266 # special use cases.
2267 self._pre_expanded_string = pre_expanded_string
2268 self._pre_expanded_positiontup = pre_expanded_positiontup
2269 self.string = expanded_state.statement
2270 self.positiontup = (
2271 list(expanded_state.positiontup or ())
2272 if self.positional
2273 else None
2274 )
2275 self._post_compile_expanded_state = expanded_state
2276
2277 return expanded_state
2278
2279 @util.preload_module("sqlalchemy.engine.cursor")
2280 def _create_result_map(self):
2281 """utility method used for unit tests only."""
2282 cursor = util.preloaded.engine_cursor
2283 return cursor.CursorResultMetaData._create_description_match_map(
2284 self._result_columns
2285 )
2286
2287 # assigned by crud.py for insert/update statements
2288 _get_bind_name_for_col: _BindNameForColProtocol
2289
2290 @util.memoized_property
2291 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2292 getter = self._get_bind_name_for_col
2293 return getter
2294
2295 @util.memoized_property
2296 @util.preload_module("sqlalchemy.engine.result")
2297 def _inserted_primary_key_from_lastrowid_getter(self):
2298 result = util.preloaded.engine_result
2299
2300 param_key_getter = self._within_exec_param_key_getter
2301
2302 assert self.compile_state is not None
2303 statement = self.compile_state.statement
2304
2305 if TYPE_CHECKING:
2306 assert isinstance(statement, Insert)
2307
2308 table = statement.table
2309
2310 getters = [
2311 (operator.methodcaller("get", param_key_getter(col), None), col)
2312 for col in table.primary_key
2313 ]
2314
2315 autoinc_getter = None
2316 autoinc_col = table._autoincrement_column
2317 if autoinc_col is not None:
2318 # apply type post processors to the lastrowid
2319 lastrowid_processor = autoinc_col.type._cached_result_processor(
2320 self.dialect, None
2321 )
2322 autoinc_key = param_key_getter(autoinc_col)
2323
2324 # if a bind value is present for the autoincrement column
2325 # in the parameters, we need to do the logic dictated by
2326 # #7998; honor a non-None user-passed parameter over lastrowid.
2327 # previously in the 1.4 series we weren't fetching lastrowid
2328 # at all if the key were present in the parameters
2329 if autoinc_key in self.binds:
2330
2331 def _autoinc_getter(lastrowid, parameters):
2332 param_value = parameters.get(autoinc_key, lastrowid)
2333 if param_value is not None:
2334 # they supplied non-None parameter, use that.
2335 # SQLite at least is observed to return the wrong
2336 # cursor.lastrowid for INSERT..ON CONFLICT so it
2337 # can't be used in all cases
2338 return param_value
2339 else:
2340 # use lastrowid
2341 return lastrowid
2342
2343 # work around mypy https://github.com/python/mypy/issues/14027
2344 autoinc_getter = _autoinc_getter
2345
2346 else:
2347 lastrowid_processor = None
2348
2349 row_fn = result.result_tuple([col.key for col in table.primary_key])
2350
2351 def get(lastrowid, parameters):
2352 """given cursor.lastrowid value and the parameters used for INSERT,
2353 return a "row" that represents the primary key, either by
2354 using the "lastrowid" or by extracting values from the parameters
2355 that were sent along with the INSERT.
2356
2357 """
2358 if lastrowid_processor is not None:
2359 lastrowid = lastrowid_processor(lastrowid)
2360
2361 if lastrowid is None:
2362 return row_fn(getter(parameters) for getter, col in getters)
2363 else:
2364 return row_fn(
2365 (
2366 (
2367 autoinc_getter(lastrowid, parameters)
2368 if autoinc_getter is not None
2369 else lastrowid
2370 )
2371 if col is autoinc_col
2372 else getter(parameters)
2373 )
2374 for getter, col in getters
2375 )
2376
2377 return get
2378
2379 @util.memoized_property
2380 @util.preload_module("sqlalchemy.engine.result")
2381 def _inserted_primary_key_from_returning_getter(self):
2382 result = util.preloaded.engine_result
2383
2384 assert self.compile_state is not None
2385 statement = self.compile_state.statement
2386
2387 if TYPE_CHECKING:
2388 assert isinstance(statement, Insert)
2389
2390 param_key_getter = self._within_exec_param_key_getter
2391 table = statement.table
2392
2393 returning = self.implicit_returning
2394 assert returning is not None
2395 ret = {col: idx for idx, col in enumerate(returning)}
2396
2397 getters = cast(
2398 "List[Tuple[Callable[[Any], Any], bool]]",
2399 [
2400 (
2401 (operator.itemgetter(ret[col]), True)
2402 if col in ret
2403 else (
2404 operator.methodcaller(
2405 "get", param_key_getter(col), None
2406 ),
2407 False,
2408 )
2409 )
2410 for col in table.primary_key
2411 ],
2412 )
2413
2414 row_fn = result.result_tuple([col.key for col in table.primary_key])
2415
2416 def get(row, parameters):
2417 return row_fn(
2418 getter(row) if use_row else getter(parameters)
2419 for getter, use_row in getters
2420 )
2421
2422 return get
2423
2424 def default_from(self) -> str:
2425 """Called when a SELECT statement has no froms, and no FROM clause is
2426 to be appended.
2427
2428 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2429 output.
2430
2431 """
2432 return ""
2433
2434 def visit_override_binds(self, override_binds, **kw):
2435 """SQL compile the nested element of an _OverrideBinds with
2436 bindparams swapped out.
2437
2438 The _OverrideBinds is not normally expected to be compiled; it
2439 is meant to be used when an already cached statement is to be used,
2440 the compilation was already performed, and only the bound params should
2441 be swapped in at execution time.
2442
2443 However, there are test cases that exericise this object, and
2444 additionally the ORM subquery loader is known to feed in expressions
2445 which include this construct into new queries (discovered in #11173),
2446 so it has to do the right thing at compile time as well.
2447
2448 """
2449
2450 # get SQL text first
2451 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2452
2453 # for a test compile that is not for caching, change binds after the
2454 # fact. note that we don't try to
2455 # swap the bindparam as we compile, because our element may be
2456 # elsewhere in the statement already (e.g. a subquery or perhaps a
2457 # CTE) and was already visited / compiled. See
2458 # test_relationship_criteria.py ->
2459 # test_selectinload_local_criteria_subquery
2460 for k in override_binds.translate:
2461 if k not in self.binds:
2462 continue
2463 bp = self.binds[k]
2464
2465 # so this would work, just change the value of bp in place.
2466 # but we dont want to mutate things outside.
2467 # bp.value = override_binds.translate[bp.key]
2468 # continue
2469
2470 # instead, need to replace bp with new_bp or otherwise accommodate
2471 # in all internal collections
2472 new_bp = bp._with_value(
2473 override_binds.translate[bp.key],
2474 maintain_key=True,
2475 required=False,
2476 )
2477
2478 name = self.bind_names[bp]
2479 self.binds[k] = self.binds[name] = new_bp
2480 self.bind_names[new_bp] = name
2481 self.bind_names.pop(bp, None)
2482
2483 if bp in self.post_compile_params:
2484 self.post_compile_params |= {new_bp}
2485 if bp in self.literal_execute_params:
2486 self.literal_execute_params |= {new_bp}
2487
2488 ckbm_tuple = self._cache_key_bind_match
2489 if ckbm_tuple:
2490 ckbm, cksm = ckbm_tuple
2491 for bp in bp._cloned_set:
2492 if bp.key in cksm:
2493 cb = cksm[bp.key]
2494 ckbm[cb].append(new_bp)
2495
2496 return sqltext
2497
2498 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2499 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2500
2501 def visit_select_statement_grouping(self, grouping, **kwargs):
2502 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2503
2504 def visit_label_reference(
2505 self, element, within_columns_clause=False, **kwargs
2506 ):
2507 if self.stack and self.dialect.supports_simple_order_by_label:
2508 try:
2509 compile_state = cast(
2510 "Union[SelectState, CompoundSelectState]",
2511 self.stack[-1]["compile_state"],
2512 )
2513 except KeyError as ke:
2514 raise exc.CompileError(
2515 "Can't resolve label reference for ORDER BY / "
2516 "GROUP BY / DISTINCT etc."
2517 ) from ke
2518
2519 (
2520 with_cols,
2521 only_froms,
2522 only_cols,
2523 ) = compile_state._label_resolve_dict
2524 if within_columns_clause:
2525 resolve_dict = only_froms
2526 else:
2527 resolve_dict = only_cols
2528
2529 # this can be None in the case that a _label_reference()
2530 # were subject to a replacement operation, in which case
2531 # the replacement of the Label element may have changed
2532 # to something else like a ColumnClause expression.
2533 order_by_elem = element.element._order_by_label_element
2534
2535 if (
2536 order_by_elem is not None
2537 and order_by_elem.name in resolve_dict
2538 and order_by_elem.shares_lineage(
2539 resolve_dict[order_by_elem.name]
2540 )
2541 ):
2542 kwargs["render_label_as_label"] = (
2543 element.element._order_by_label_element
2544 )
2545 return self.process(
2546 element.element,
2547 within_columns_clause=within_columns_clause,
2548 **kwargs,
2549 )
2550
2551 def visit_textual_label_reference(
2552 self, element, within_columns_clause=False, **kwargs
2553 ):
2554 if not self.stack:
2555 # compiling the element outside of the context of a SELECT
2556 return self.process(element._text_clause)
2557
2558 try:
2559 compile_state = cast(
2560 "Union[SelectState, CompoundSelectState]",
2561 self.stack[-1]["compile_state"],
2562 )
2563 except KeyError as ke:
2564 coercions._no_text_coercion(
2565 element.element,
2566 extra=(
2567 "Can't resolve label reference for ORDER BY / "
2568 "GROUP BY / DISTINCT etc."
2569 ),
2570 exc_cls=exc.CompileError,
2571 err=ke,
2572 )
2573
2574 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2575 try:
2576 if within_columns_clause:
2577 col = only_froms[element.element]
2578 else:
2579 col = with_cols[element.element]
2580 except KeyError as err:
2581 coercions._no_text_coercion(
2582 element.element,
2583 extra=(
2584 "Can't resolve label reference for ORDER BY / "
2585 "GROUP BY / DISTINCT etc."
2586 ),
2587 exc_cls=exc.CompileError,
2588 err=err,
2589 )
2590 else:
2591 kwargs["render_label_as_label"] = col
2592 return self.process(
2593 col, within_columns_clause=within_columns_clause, **kwargs
2594 )
2595
2596 def visit_label(
2597 self,
2598 label,
2599 add_to_result_map=None,
2600 within_label_clause=False,
2601 within_columns_clause=False,
2602 render_label_as_label=None,
2603 result_map_targets=(),
2604 within_tstring=False,
2605 **kw,
2606 ):
2607 if within_tstring:
2608 raise exc.CompileError(
2609 "Using label() directly inside tstring is not supported "
2610 "as it is ambiguous how the label expression should be "
2611 "rendered without knowledge of how it's being used in SQL"
2612 )
2613 # only render labels within the columns clause
2614 # or ORDER BY clause of a select. dialect-specific compilers
2615 # can modify this behavior.
2616 render_label_with_as = (
2617 within_columns_clause and not within_label_clause
2618 )
2619 render_label_only = render_label_as_label is label
2620
2621 if render_label_only or render_label_with_as:
2622 if isinstance(label.name, elements._truncated_label):
2623 labelname = self._truncated_identifier("colident", label.name)
2624 else:
2625 labelname = label.name
2626
2627 if render_label_with_as:
2628 if add_to_result_map is not None:
2629 add_to_result_map(
2630 labelname,
2631 label.name,
2632 (label, labelname) + label._alt_names + result_map_targets,
2633 label.type,
2634 )
2635 return (
2636 label.element._compiler_dispatch(
2637 self,
2638 within_columns_clause=True,
2639 within_label_clause=True,
2640 **kw,
2641 )
2642 + OPERATORS[operators.as_]
2643 + self.preparer.format_label(label, labelname)
2644 )
2645 elif render_label_only:
2646 return self.preparer.format_label(label, labelname)
2647 else:
2648 return label.element._compiler_dispatch(
2649 self, within_columns_clause=False, **kw
2650 )
2651
2652 def _fallback_column_name(self, column):
2653 raise exc.CompileError(
2654 "Cannot compile Column object until its 'name' is assigned."
2655 )
2656
2657 def visit_lambda_element(self, element, **kw):
2658 sql_element = element._resolved
2659 return self.process(sql_element, **kw)
2660
2661 def visit_column(
2662 self,
2663 column: ColumnClause[Any],
2664 add_to_result_map: Optional[_ResultMapAppender] = None,
2665 include_table: bool = True,
2666 result_map_targets: Tuple[Any, ...] = (),
2667 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2668 **kwargs: Any,
2669 ) -> str:
2670 name = orig_name = column.name
2671 if name is None:
2672 name = self._fallback_column_name(column)
2673
2674 is_literal = column.is_literal
2675 if not is_literal and isinstance(name, elements._truncated_label):
2676 name = self._truncated_identifier("colident", name)
2677
2678 if add_to_result_map is not None:
2679 targets = (column, name, column.key) + result_map_targets
2680 if column._tq_label:
2681 targets += (column._tq_label,)
2682
2683 add_to_result_map(name, orig_name, targets, column.type)
2684
2685 if is_literal:
2686 # note we are not currently accommodating for
2687 # literal_column(quoted_name('ident', True)) here
2688 name = self.escape_literal_column(name)
2689 else:
2690 name = self.preparer.quote(name)
2691 table = column.table
2692 if table is None or not include_table or not table.named_with_column:
2693 return name
2694 else:
2695 effective_schema = self.preparer.schema_for_object(table)
2696
2697 if effective_schema:
2698 schema_prefix = (
2699 self.preparer.quote_schema(effective_schema) + "."
2700 )
2701 else:
2702 schema_prefix = ""
2703
2704 if TYPE_CHECKING:
2705 assert isinstance(table, NamedFromClause)
2706 tablename = table.name
2707
2708 if (
2709 not effective_schema
2710 and ambiguous_table_name_map
2711 and tablename in ambiguous_table_name_map
2712 ):
2713 tablename = ambiguous_table_name_map[tablename]
2714
2715 if isinstance(tablename, elements._truncated_label):
2716 tablename = self._truncated_identifier("alias", tablename)
2717
2718 return schema_prefix + self.preparer.quote(tablename) + "." + name
2719
2720 def visit_collation(self, element, **kw):
2721 return self.preparer.format_collation(element.collation)
2722
2723 def visit_fromclause(self, fromclause, **kwargs):
2724 return fromclause.name
2725
2726 def visit_index(self, index, **kwargs):
2727 return index.name
2728
2729 def visit_typeclause(self, typeclause, **kw):
2730 kw["type_expression"] = typeclause
2731 kw["identifier_preparer"] = self.preparer
2732 return self.dialect.type_compiler_instance.process(
2733 typeclause.type, **kw
2734 )
2735
2736 def post_process_text(self, text):
2737 if self.preparer._double_percents:
2738 text = text.replace("%", "%%")
2739 return text
2740
2741 def escape_literal_column(self, text):
2742 if self.preparer._double_percents:
2743 text = text.replace("%", "%%")
2744 return text
2745
2746 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2747 if self._collect_params:
2748 self._add_to_params(textclause)
2749
2750 def do_bindparam(m):
2751 name = m.group(1)
2752 if name in textclause._bindparams:
2753 return self.process(textclause._bindparams[name], **kw)
2754 else:
2755 return self.bindparam_string(name, **kw)
2756
2757 if not self.stack:
2758 self.isplaintext = True
2759
2760 if add_to_result_map:
2761 # text() object is present in the columns clause of a
2762 # select(). Add a no-name entry to the result map so that
2763 # row[text()] produces a result
2764 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2765
2766 # un-escape any \:params
2767 return BIND_PARAMS_ESC.sub(
2768 lambda m: m.group(1),
2769 BIND_PARAMS.sub(
2770 do_bindparam, self.post_process_text(textclause.text)
2771 ),
2772 )
2773
2774 def visit_tstring(self, tstring, add_to_result_map=None, **kw):
2775 if self._collect_params:
2776 self._add_to_params(tstring)
2777
2778 if not self.stack:
2779 self.isplaintext = True
2780
2781 if add_to_result_map:
2782 # tstring() object is present in the columns clause of a
2783 # select(). Add a no-name entry to the result map so that
2784 # row[tstring()] produces a result
2785 add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
2786
2787 # Process each part and concatenate
2788 kw["within_tstring"] = True
2789 return "".join(self.process(part, **kw) for part in tstring.parts)
2790
2791 def visit_textual_select(
2792 self, taf, compound_index=None, asfrom=False, **kw
2793 ):
2794 if self._collect_params:
2795 self._add_to_params(taf)
2796 toplevel = not self.stack
2797 entry = self._default_stack_entry if toplevel else self.stack[-1]
2798
2799 new_entry: _CompilerStackEntry = {
2800 "correlate_froms": set(),
2801 "asfrom_froms": set(),
2802 "selectable": taf,
2803 }
2804 self.stack.append(new_entry)
2805
2806 if taf._independent_ctes:
2807 self._dispatch_independent_ctes(taf, kw)
2808
2809 populate_result_map = (
2810 toplevel
2811 or (
2812 compound_index == 0
2813 and entry.get("need_result_map_for_compound", False)
2814 )
2815 or entry.get("need_result_map_for_nested", False)
2816 )
2817
2818 if populate_result_map:
2819 self._ordered_columns = self._textual_ordered_columns = (
2820 taf.positional
2821 )
2822
2823 # enable looser result column matching when the SQL text links to
2824 # Column objects by name only
2825 self._loose_column_name_matching = not taf.positional and bool(
2826 taf.column_args
2827 )
2828
2829 for c in taf.column_args:
2830 self.process(
2831 c,
2832 within_columns_clause=True,
2833 add_to_result_map=self._add_to_result_map,
2834 )
2835
2836 text = self.process(taf.element, **kw)
2837 if self.ctes:
2838 nesting_level = len(self.stack) if not toplevel else None
2839 text = self._render_cte_clause(nesting_level=nesting_level) + text
2840
2841 self.stack.pop(-1)
2842
2843 return text
2844
2845 def visit_null(self, expr: Null, **kw: Any) -> str:
2846 return "NULL"
2847
2848 def visit_true(self, expr: True_, **kw: Any) -> str:
2849 if self.dialect.supports_native_boolean:
2850 return "true"
2851 else:
2852 return "1"
2853
2854 def visit_false(self, expr: False_, **kw: Any) -> str:
2855 if self.dialect.supports_native_boolean:
2856 return "false"
2857 else:
2858 return "0"
2859
2860 def _generate_delimited_list(self, elements, separator, **kw):
2861 return separator.join(
2862 s
2863 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2864 if s
2865 )
2866
2867 def _generate_delimited_and_list(self, clauses, **kw):
2868 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2869 operators.and_,
2870 elements.True_._singleton,
2871 elements.False_._singleton,
2872 clauses,
2873 )
2874 if lcc == 1:
2875 return clauses[0]._compiler_dispatch(self, **kw)
2876 else:
2877 separator = OPERATORS[operators.and_]
2878 return separator.join(
2879 s
2880 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2881 if s
2882 )
2883
2884 def visit_tuple(self, clauselist, **kw):
2885 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2886
2887 def visit_element_list(self, element, **kw):
2888 return self._generate_delimited_list(element.clauses, " ", **kw)
2889
2890 def visit_order_by_list(self, element, **kw):
2891 return self._generate_delimited_list(element.clauses, ", ", **kw)
2892
2893 def visit_clauselist(self, clauselist, **kw):
2894 sep = clauselist.operator
2895 if sep is None:
2896 sep = " "
2897 else:
2898 sep = OPERATORS[clauselist.operator]
2899
2900 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2901
2902 def visit_expression_clauselist(self, clauselist, **kw):
2903 operator_ = clauselist.operator
2904
2905 disp = self._get_operator_dispatch(
2906 operator_, "expression_clauselist", None
2907 )
2908 if disp:
2909 return disp(clauselist, operator_, **kw)
2910
2911 try:
2912 opstring = OPERATORS[operator_]
2913 except KeyError as err:
2914 raise exc.UnsupportedCompilationError(self, operator_) from err
2915 else:
2916 kw["_in_operator_expression"] = True
2917 return self._generate_delimited_list(
2918 clauselist.clauses, opstring, **kw
2919 )
2920
2921 def visit_case(self, clause, **kwargs):
2922 x = "CASE "
2923 if clause.value is not None:
2924 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2925 for cond, result in clause.whens:
2926 x += (
2927 "WHEN "
2928 + cond._compiler_dispatch(self, **kwargs)
2929 + " THEN "
2930 + result._compiler_dispatch(self, **kwargs)
2931 + " "
2932 )
2933 if clause.else_ is not None:
2934 x += (
2935 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2936 )
2937 x += "END"
2938 return x
2939
2940 def visit_type_coerce(self, type_coerce, **kw):
2941 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2942
2943 def visit_cast(self, cast, **kwargs):
2944 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2945 match = re.match("(.*)( COLLATE .*)", type_clause)
2946 return "CAST(%s AS %s)%s" % (
2947 cast.clause._compiler_dispatch(self, **kwargs),
2948 match.group(1) if match else type_clause,
2949 match.group(2) if match else "",
2950 )
2951
2952 def visit_frame_clause(self, frameclause, **kw):
2953
2954 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2955 left = "UNBOUNDED PRECEDING"
2956 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2957 left = "CURRENT ROW"
2958 else:
2959 val = self.process(frameclause.lower_bind, **kw)
2960 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2961 left = f"{val} PRECEDING"
2962 else:
2963 left = f"{val} FOLLOWING"
2964
2965 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2966 right = "UNBOUNDED FOLLOWING"
2967 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2968 right = "CURRENT ROW"
2969 else:
2970 val = self.process(frameclause.upper_bind, **kw)
2971 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2972 right = f"{val} PRECEDING"
2973 else:
2974 right = f"{val} FOLLOWING"
2975
2976 return f"{left} AND {right}"
2977
2978 def visit_over(self, over, **kwargs):
2979 text = over.element._compiler_dispatch(self, **kwargs)
2980 if over.range_ is not None:
2981 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2982 elif over.rows is not None:
2983 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
2984 elif over.groups is not None:
2985 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
2986 else:
2987 range_ = None
2988
2989 return "%s OVER (%s)" % (
2990 text,
2991 " ".join(
2992 [
2993 "%s BY %s"
2994 % (word, clause._compiler_dispatch(self, **kwargs))
2995 for word, clause in (
2996 ("PARTITION", over.partition_by),
2997 ("ORDER", over.order_by),
2998 )
2999 if clause is not None and len(clause)
3000 ]
3001 + ([range_] if range_ else [])
3002 ),
3003 )
3004
3005 def visit_withingroup(self, withingroup, **kwargs):
3006 return "%s WITHIN GROUP (ORDER BY %s)" % (
3007 withingroup.element._compiler_dispatch(self, **kwargs),
3008 withingroup.order_by._compiler_dispatch(self, **kwargs),
3009 )
3010
3011 def visit_funcfilter(self, funcfilter, **kwargs):
3012 return "%s FILTER (WHERE %s)" % (
3013 funcfilter.func._compiler_dispatch(self, **kwargs),
3014 funcfilter.criterion._compiler_dispatch(self, **kwargs),
3015 )
3016
3017 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
3018 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
3019 raise exc.CompileError(
3020 "this dialect does not support "
3021 "ORDER BY within an aggregate function"
3022 )
3023 elif (
3024 self.dialect.aggregate_order_by_style
3025 is AggregateOrderByStyle.INLINE
3026 ):
3027 new_fn = aggregateorderby.element._clone()
3028 new_fn.clause_expr = elements.Grouping(
3029 aggregate_orderby_inline(
3030 new_fn.clause_expr.element, aggregateorderby.order_by
3031 )
3032 )
3033
3034 return new_fn._compiler_dispatch(self, **kwargs)
3035 else:
3036 return self.visit_withingroup(aggregateorderby, **kwargs)
3037
3038 def visit_aggregate_orderby_inline(self, element, **kw):
3039 return "%s ORDER BY %s" % (
3040 self.process(element.element, **kw),
3041 self.process(element.aggregate_order_by, **kw),
3042 )
3043
3044 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3045 # aggreagate_order_by attribute is present if visit_function
3046 # gave us a Function with aggregate_orderby_inline() as the inner
3047 # contents
3048 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3049
3050 literal_exec = dict(kw)
3051 literal_exec["literal_execute"] = True
3052
3053 # break up the function into its components so we can apply
3054 # literal_execute to the second argument (the delimiter)
3055 cl = list(fn.clauses)
3056 expr, delimiter = cl[0:2]
3057 if (
3058 order_by is not None
3059 and self.dialect.aggregate_order_by_style
3060 is AggregateOrderByStyle.INLINE
3061 ):
3062 return (
3063 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3064 f"{delimiter._compiler_dispatch(self, **literal_exec)} "
3065 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3066 )
3067 else:
3068 return (
3069 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3070 f"{delimiter._compiler_dispatch(self, **literal_exec)})"
3071 )
3072
3073 def visit_extract(self, extract, **kwargs):
3074 field = self.extract_map.get(extract.field, extract.field)
3075 return "EXTRACT(%s FROM %s)" % (
3076 field,
3077 extract.expr._compiler_dispatch(self, **kwargs),
3078 )
3079
3080 def visit_scalar_function_column(self, element, **kw):
3081 compiled_fn = self.visit_function(element.fn, **kw)
3082 compiled_col = self.visit_column(element, **kw)
3083 return "(%s).%s" % (compiled_fn, compiled_col)
3084
3085 def visit_function(
3086 self,
3087 func: Function[Any],
3088 add_to_result_map: Optional[_ResultMapAppender] = None,
3089 **kwargs: Any,
3090 ) -> str:
3091 if self._collect_params:
3092 self._add_to_params(func)
3093 if add_to_result_map is not None:
3094 add_to_result_map(func.name, func.name, (func.name,), func.type)
3095
3096 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3097
3098 text: str
3099
3100 if disp:
3101 text = disp(func, **kwargs)
3102 else:
3103 name = FUNCTIONS.get(func._deannotate().__class__, None)
3104 if name:
3105 if func._has_args:
3106 name += "%(expr)s"
3107 else:
3108 name = func.name
3109 name = (
3110 self.preparer.quote(name)
3111 if self.preparer._requires_quotes_illegal_chars(name)
3112 or isinstance(name, elements.quoted_name)
3113 else name
3114 )
3115 name = name + "%(expr)s"
3116 text = ".".join(
3117 [
3118 (
3119 self.preparer.quote(tok)
3120 if self.preparer._requires_quotes_illegal_chars(tok)
3121 or isinstance(name, elements.quoted_name)
3122 else tok
3123 )
3124 for tok in func.packagenames
3125 ]
3126 + [name]
3127 ) % {"expr": self.function_argspec(func, **kwargs)}
3128
3129 if func._with_ordinality:
3130 text += " WITH ORDINALITY"
3131 return text
3132
3133 def visit_next_value_func(self, next_value, **kw):
3134 return self.visit_sequence(next_value.sequence)
3135
3136 def visit_sequence(self, sequence, **kw):
3137 raise NotImplementedError(
3138 "Dialect '%s' does not support sequence increments."
3139 % self.dialect.name
3140 )
3141
3142 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3143 return func.clause_expr._compiler_dispatch(self, **kwargs)
3144
3145 def visit_compound_select(
3146 self, cs, asfrom=False, compound_index=None, **kwargs
3147 ):
3148 if self._collect_params:
3149 self._add_to_params(cs)
3150 toplevel = not self.stack
3151
3152 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3153
3154 if toplevel and not self.compile_state:
3155 self.compile_state = compile_state
3156
3157 compound_stmt = compile_state.statement
3158
3159 entry = self._default_stack_entry if toplevel else self.stack[-1]
3160 need_result_map = toplevel or (
3161 not compound_index
3162 and entry.get("need_result_map_for_compound", False)
3163 )
3164
3165 # indicates there is already a CompoundSelect in play
3166 if compound_index == 0:
3167 entry["select_0"] = cs
3168
3169 self.stack.append(
3170 {
3171 "correlate_froms": entry["correlate_froms"],
3172 "asfrom_froms": entry["asfrom_froms"],
3173 "selectable": cs,
3174 "compile_state": compile_state,
3175 "need_result_map_for_compound": need_result_map,
3176 }
3177 )
3178
3179 if compound_stmt._independent_ctes:
3180 self._dispatch_independent_ctes(compound_stmt, kwargs)
3181
3182 keyword = self.compound_keywords[cs.keyword]
3183
3184 text = (" " + keyword + " ").join(
3185 (
3186 c._compiler_dispatch(
3187 self, asfrom=asfrom, compound_index=i, **kwargs
3188 )
3189 for i, c in enumerate(cs.selects)
3190 )
3191 )
3192
3193 kwargs["include_table"] = False
3194 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3195 text += self.order_by_clause(cs, **kwargs)
3196 if cs._has_row_limiting_clause:
3197 text += self._row_limit_clause(cs, **kwargs)
3198
3199 if self.ctes:
3200 nesting_level = len(self.stack) if not toplevel else None
3201 text = (
3202 self._render_cte_clause(
3203 nesting_level=nesting_level,
3204 include_following_stack=True,
3205 )
3206 + text
3207 )
3208
3209 self.stack.pop(-1)
3210 return text
3211
3212 def _row_limit_clause(self, cs, **kwargs):
3213 if cs._fetch_clause is not None:
3214 return self.fetch_clause(cs, **kwargs)
3215 else:
3216 return self.limit_clause(cs, **kwargs)
3217
3218 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3219 attrname = "visit_%s_%s%s" % (
3220 operator_.__name__,
3221 qualifier1,
3222 "_" + qualifier2 if qualifier2 else "",
3223 )
3224 return getattr(self, attrname, None)
3225
3226 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3227 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3228 return getattr(self, attrname, None)
3229
3230 def visit_unary(
3231 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3232 ):
3233 if add_to_result_map is not None:
3234 result_map_targets += (unary,)
3235 kw["add_to_result_map"] = add_to_result_map
3236 kw["result_map_targets"] = result_map_targets
3237
3238 if unary.operator:
3239 if unary.modifier:
3240 raise exc.CompileError(
3241 "Unary expression does not support operator "
3242 "and modifier simultaneously"
3243 )
3244 disp = self._get_operator_dispatch(
3245 unary.operator, "unary", "operator"
3246 )
3247 if disp:
3248 return disp(unary, unary.operator, **kw)
3249 else:
3250 return self._generate_generic_unary_operator(
3251 unary, OPERATORS[unary.operator], **kw
3252 )
3253 elif unary.modifier:
3254 disp = self._get_operator_dispatch(
3255 unary.modifier, "unary", "modifier"
3256 )
3257 if disp:
3258 return disp(unary, unary.modifier, **kw)
3259 else:
3260 return self._generate_generic_unary_modifier(
3261 unary, OPERATORS[unary.modifier], **kw
3262 )
3263 else:
3264 raise exc.CompileError(
3265 "Unary expression has no operator or modifier"
3266 )
3267
3268 def visit_truediv_binary(self, binary, operator, **kw):
3269 if self.dialect.div_is_floordiv:
3270 return (
3271 self.process(binary.left, **kw)
3272 + " / "
3273 # TODO: would need a fast cast again here,
3274 # unless we want to use an implicit cast like "+ 0.0"
3275 + self.process(
3276 elements.Cast(
3277 binary.right,
3278 (
3279 binary.right.type
3280 if binary.right.type._type_affinity
3281 in (sqltypes.Numeric, sqltypes.Float)
3282 else sqltypes.Numeric()
3283 ),
3284 ),
3285 **kw,
3286 )
3287 )
3288 else:
3289 return (
3290 self.process(binary.left, **kw)
3291 + " / "
3292 + self.process(binary.right, **kw)
3293 )
3294
3295 def visit_floordiv_binary(self, binary, operator, **kw):
3296 if (
3297 self.dialect.div_is_floordiv
3298 and binary.right.type._type_affinity is sqltypes.Integer
3299 ):
3300 return (
3301 self.process(binary.left, **kw)
3302 + " / "
3303 + self.process(binary.right, **kw)
3304 )
3305 else:
3306 return "FLOOR(%s)" % (
3307 self.process(binary.left, **kw)
3308 + " / "
3309 + self.process(binary.right, **kw)
3310 )
3311
3312 def visit_is_true_unary_operator(self, element, operator, **kw):
3313 if (
3314 element._is_implicitly_boolean
3315 or self.dialect.supports_native_boolean
3316 ):
3317 return self.process(element.element, **kw)
3318 else:
3319 return "%s = 1" % self.process(element.element, **kw)
3320
3321 def visit_is_false_unary_operator(self, element, operator, **kw):
3322 if (
3323 element._is_implicitly_boolean
3324 or self.dialect.supports_native_boolean
3325 ):
3326 return "NOT %s" % self.process(element.element, **kw)
3327 else:
3328 return "%s = 0" % self.process(element.element, **kw)
3329
3330 def visit_not_match_op_binary(self, binary, operator, **kw):
3331 return "NOT %s" % self.visit_binary(
3332 binary, override_operator=operators.match_op
3333 )
3334
3335 def visit_not_in_op_binary(self, binary, operator, **kw):
3336 # The brackets are required in the NOT IN operation because the empty
3337 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3338 # The presence of the OR makes the brackets required.
3339 return "(%s)" % self._generate_generic_binary(
3340 binary, OPERATORS[operator], **kw
3341 )
3342
3343 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3344 if expand_op is operators.not_in_op:
3345 if len(type_) > 1:
3346 return "(%s)) OR (1 = 1" % (
3347 ", ".join("NULL" for element in type_)
3348 )
3349 else:
3350 return "NULL) OR (1 = 1"
3351 elif expand_op is operators.in_op:
3352 if len(type_) > 1:
3353 return "(%s)) AND (1 != 1" % (
3354 ", ".join("NULL" for element in type_)
3355 )
3356 else:
3357 return "NULL) AND (1 != 1"
3358 else:
3359 return self.visit_empty_set_expr(type_)
3360
3361 def visit_empty_set_expr(self, element_types, **kw):
3362 raise NotImplementedError(
3363 "Dialect '%s' does not support empty set expression."
3364 % self.dialect.name
3365 )
3366
3367 def _literal_execute_expanding_parameter_literal_binds(
3368 self, parameter, values, bind_expression_template=None
3369 ):
3370 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3371
3372 if not values:
3373 # empty IN expression. note we don't need to use
3374 # bind_expression_template here because there are no
3375 # expressions to render.
3376
3377 if typ_dialect_impl._is_tuple_type:
3378 replacement_expression = (
3379 "VALUES " if self.dialect.tuple_in_values else ""
3380 ) + self.visit_empty_set_op_expr(
3381 parameter.type.types, parameter.expand_op
3382 )
3383
3384 else:
3385 replacement_expression = self.visit_empty_set_op_expr(
3386 [parameter.type], parameter.expand_op
3387 )
3388
3389 elif typ_dialect_impl._is_tuple_type or (
3390 typ_dialect_impl._isnull
3391 and isinstance(values[0], collections_abc.Sequence)
3392 and not isinstance(values[0], (str, bytes))
3393 ):
3394 if typ_dialect_impl._has_bind_expression:
3395 raise NotImplementedError(
3396 "bind_expression() on TupleType not supported with "
3397 "literal_binds"
3398 )
3399
3400 replacement_expression = (
3401 "VALUES " if self.dialect.tuple_in_values else ""
3402 ) + ", ".join(
3403 "(%s)"
3404 % (
3405 ", ".join(
3406 self.render_literal_value(value, param_type)
3407 for value, param_type in zip(
3408 tuple_element, parameter.type.types
3409 )
3410 )
3411 )
3412 for i, tuple_element in enumerate(values)
3413 )
3414 else:
3415 if bind_expression_template:
3416 post_compile_pattern = self._post_compile_pattern
3417 m = post_compile_pattern.search(bind_expression_template)
3418 assert m and m.group(
3419 2
3420 ), "unexpected format for expanding parameter"
3421
3422 tok = m.group(2).split("~~")
3423 be_left, be_right = tok[1], tok[3]
3424 replacement_expression = ", ".join(
3425 "%s%s%s"
3426 % (
3427 be_left,
3428 self.render_literal_value(value, parameter.type),
3429 be_right,
3430 )
3431 for value in values
3432 )
3433 else:
3434 replacement_expression = ", ".join(
3435 self.render_literal_value(value, parameter.type)
3436 for value in values
3437 )
3438
3439 return (), replacement_expression
3440
3441 def _literal_execute_expanding_parameter(self, name, parameter, values):
3442 if parameter.literal_execute:
3443 return self._literal_execute_expanding_parameter_literal_binds(
3444 parameter, values
3445 )
3446
3447 dialect = self.dialect
3448 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3449
3450 if self._numeric_binds:
3451 bind_template = self.compilation_bindtemplate
3452 else:
3453 bind_template = self.bindtemplate
3454
3455 if (
3456 self.dialect._bind_typing_render_casts
3457 and typ_dialect_impl.render_bind_cast
3458 ):
3459
3460 def _render_bindtemplate(name):
3461 return self.render_bind_cast(
3462 parameter.type,
3463 typ_dialect_impl,
3464 bind_template % {"name": name},
3465 )
3466
3467 else:
3468
3469 def _render_bindtemplate(name):
3470 return bind_template % {"name": name}
3471
3472 if not values:
3473 to_update = []
3474 if typ_dialect_impl._is_tuple_type:
3475 replacement_expression = self.visit_empty_set_op_expr(
3476 parameter.type.types, parameter.expand_op
3477 )
3478 else:
3479 replacement_expression = self.visit_empty_set_op_expr(
3480 [parameter.type], parameter.expand_op
3481 )
3482
3483 elif typ_dialect_impl._is_tuple_type or (
3484 typ_dialect_impl._isnull
3485 and isinstance(values[0], collections_abc.Sequence)
3486 and not isinstance(values[0], (str, bytes))
3487 ):
3488 assert not typ_dialect_impl._is_array
3489 to_update = [
3490 ("%s_%s_%s" % (name, i, j), value)
3491 for i, tuple_element in enumerate(values, 1)
3492 for j, value in enumerate(tuple_element, 1)
3493 ]
3494
3495 replacement_expression = (
3496 "VALUES " if dialect.tuple_in_values else ""
3497 ) + ", ".join(
3498 "(%s)"
3499 % (
3500 ", ".join(
3501 _render_bindtemplate(
3502 to_update[i * len(tuple_element) + j][0]
3503 )
3504 for j, value in enumerate(tuple_element)
3505 )
3506 )
3507 for i, tuple_element in enumerate(values)
3508 )
3509 else:
3510 to_update = [
3511 ("%s_%s" % (name, i), value)
3512 for i, value in enumerate(values, 1)
3513 ]
3514 replacement_expression = ", ".join(
3515 _render_bindtemplate(key) for key, value in to_update
3516 )
3517
3518 return to_update, replacement_expression
3519
3520 def visit_binary(
3521 self,
3522 binary,
3523 override_operator=None,
3524 eager_grouping=False,
3525 from_linter=None,
3526 lateral_from_linter=None,
3527 **kw,
3528 ):
3529 if from_linter and operators.is_comparison(binary.operator):
3530 if lateral_from_linter is not None:
3531 enclosing_lateral = kw["enclosing_lateral"]
3532 lateral_from_linter.edges.update(
3533 itertools.product(
3534 _de_clone(
3535 binary.left._from_objects + [enclosing_lateral]
3536 ),
3537 _de_clone(
3538 binary.right._from_objects + [enclosing_lateral]
3539 ),
3540 )
3541 )
3542 else:
3543 from_linter.edges.update(
3544 itertools.product(
3545 _de_clone(binary.left._from_objects),
3546 _de_clone(binary.right._from_objects),
3547 )
3548 )
3549
3550 # don't allow "? = ?" to render
3551 if (
3552 self.ansi_bind_rules
3553 and isinstance(binary.left, elements.BindParameter)
3554 and isinstance(binary.right, elements.BindParameter)
3555 ):
3556 kw["literal_execute"] = True
3557
3558 operator_ = override_operator or binary.operator
3559 disp = self._get_operator_dispatch(operator_, "binary", None)
3560 if disp:
3561 return disp(binary, operator_, **kw)
3562 else:
3563 try:
3564 opstring = OPERATORS[operator_]
3565 except KeyError as err:
3566 raise exc.UnsupportedCompilationError(self, operator_) from err
3567 else:
3568 return self._generate_generic_binary(
3569 binary,
3570 opstring,
3571 from_linter=from_linter,
3572 lateral_from_linter=lateral_from_linter,
3573 **kw,
3574 )
3575
3576 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3577 return self.process(element.sql_function, **kw)
3578
3579 def visit_mod_binary(self, binary, operator, **kw):
3580 if self.preparer._double_percents:
3581 return (
3582 self.process(binary.left, **kw)
3583 + " %% "
3584 + self.process(binary.right, **kw)
3585 )
3586 else:
3587 return (
3588 self.process(binary.left, **kw)
3589 + " % "
3590 + self.process(binary.right, **kw)
3591 )
3592
3593 def visit_custom_op_binary(self, element, operator, **kw):
3594 if operator.visit_name:
3595 disp = self._get_custom_operator_dispatch(operator, "binary")
3596 if disp:
3597 return disp(element, operator, **kw)
3598
3599 kw["eager_grouping"] = operator.eager_grouping
3600 return self._generate_generic_binary(
3601 element,
3602 " " + self.escape_literal_column(operator.opstring) + " ",
3603 **kw,
3604 )
3605
3606 def visit_custom_op_unary_operator(self, element, operator, **kw):
3607 if operator.visit_name:
3608 disp = self._get_custom_operator_dispatch(operator, "unary")
3609 if disp:
3610 return disp(element, operator, **kw)
3611
3612 return self._generate_generic_unary_operator(
3613 element, self.escape_literal_column(operator.opstring) + " ", **kw
3614 )
3615
3616 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3617 if operator.visit_name:
3618 disp = self._get_custom_operator_dispatch(operator, "unary")
3619 if disp:
3620 return disp(element, operator, **kw)
3621
3622 return self._generate_generic_unary_modifier(
3623 element, " " + self.escape_literal_column(operator.opstring), **kw
3624 )
3625
3626 def _generate_generic_binary(
3627 self,
3628 binary: BinaryExpression[Any],
3629 opstring: str,
3630 eager_grouping: bool = False,
3631 **kw: Any,
3632 ) -> str:
3633 _in_operator_expression = kw.get("_in_operator_expression", False)
3634
3635 kw["_in_operator_expression"] = True
3636 kw["_binary_op"] = binary.operator
3637 text = (
3638 binary.left._compiler_dispatch(
3639 self, eager_grouping=eager_grouping, **kw
3640 )
3641 + opstring
3642 + binary.right._compiler_dispatch(
3643 self, eager_grouping=eager_grouping, **kw
3644 )
3645 )
3646
3647 if _in_operator_expression and eager_grouping:
3648 text = "(%s)" % text
3649 return text
3650
3651 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3652 return opstring + unary.element._compiler_dispatch(self, **kw)
3653
3654 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3655 return unary.element._compiler_dispatch(self, **kw) + opstring
3656
3657 @util.memoized_property
3658 def _like_percent_literal(self):
3659 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3660
3661 def visit_ilike_case_insensitive_operand(self, element, **kw):
3662 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3663
3664 def visit_contains_op_binary(self, binary, operator, **kw):
3665 binary = binary._clone()
3666 percent = self._like_percent_literal
3667 binary.right = percent.concat(binary.right).concat(percent)
3668 return self.visit_like_op_binary(binary, operator, **kw)
3669
3670 def visit_not_contains_op_binary(self, binary, operator, **kw):
3671 binary = binary._clone()
3672 percent = self._like_percent_literal
3673 binary.right = percent.concat(binary.right).concat(percent)
3674 return self.visit_not_like_op_binary(binary, operator, **kw)
3675
3676 def visit_icontains_op_binary(self, binary, operator, **kw):
3677 binary = binary._clone()
3678 percent = self._like_percent_literal
3679 binary.left = ilike_case_insensitive(binary.left)
3680 binary.right = percent.concat(
3681 ilike_case_insensitive(binary.right)
3682 ).concat(percent)
3683 return self.visit_ilike_op_binary(binary, operator, **kw)
3684
3685 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3686 binary = binary._clone()
3687 percent = self._like_percent_literal
3688 binary.left = ilike_case_insensitive(binary.left)
3689 binary.right = percent.concat(
3690 ilike_case_insensitive(binary.right)
3691 ).concat(percent)
3692 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3693
3694 def visit_startswith_op_binary(self, binary, operator, **kw):
3695 binary = binary._clone()
3696 percent = self._like_percent_literal
3697 binary.right = percent._rconcat(binary.right)
3698 return self.visit_like_op_binary(binary, operator, **kw)
3699
3700 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3701 binary = binary._clone()
3702 percent = self._like_percent_literal
3703 binary.right = percent._rconcat(binary.right)
3704 return self.visit_not_like_op_binary(binary, operator, **kw)
3705
3706 def visit_istartswith_op_binary(self, binary, operator, **kw):
3707 binary = binary._clone()
3708 percent = self._like_percent_literal
3709 binary.left = ilike_case_insensitive(binary.left)
3710 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3711 return self.visit_ilike_op_binary(binary, operator, **kw)
3712
3713 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3714 binary = binary._clone()
3715 percent = self._like_percent_literal
3716 binary.left = ilike_case_insensitive(binary.left)
3717 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3718 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3719
3720 def visit_endswith_op_binary(self, binary, operator, **kw):
3721 binary = binary._clone()
3722 percent = self._like_percent_literal
3723 binary.right = percent.concat(binary.right)
3724 return self.visit_like_op_binary(binary, operator, **kw)
3725
3726 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3727 binary = binary._clone()
3728 percent = self._like_percent_literal
3729 binary.right = percent.concat(binary.right)
3730 return self.visit_not_like_op_binary(binary, operator, **kw)
3731
3732 def visit_iendswith_op_binary(self, binary, operator, **kw):
3733 binary = binary._clone()
3734 percent = self._like_percent_literal
3735 binary.left = ilike_case_insensitive(binary.left)
3736 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3737 return self.visit_ilike_op_binary(binary, operator, **kw)
3738
3739 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3740 binary = binary._clone()
3741 percent = self._like_percent_literal
3742 binary.left = ilike_case_insensitive(binary.left)
3743 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3744 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3745
3746 def visit_like_op_binary(self, binary, operator, **kw):
3747 escape = binary.modifiers.get("escape", None)
3748
3749 return "%s LIKE %s" % (
3750 binary.left._compiler_dispatch(self, **kw),
3751 binary.right._compiler_dispatch(self, **kw),
3752 ) + (
3753 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3754 if escape is not None
3755 else ""
3756 )
3757
3758 def visit_not_like_op_binary(self, binary, operator, **kw):
3759 escape = binary.modifiers.get("escape", None)
3760 return "%s NOT LIKE %s" % (
3761 binary.left._compiler_dispatch(self, **kw),
3762 binary.right._compiler_dispatch(self, **kw),
3763 ) + (
3764 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3765 if escape is not None
3766 else ""
3767 )
3768
3769 def visit_ilike_op_binary(self, binary, operator, **kw):
3770 if operator is operators.ilike_op:
3771 binary = binary._clone()
3772 binary.left = ilike_case_insensitive(binary.left)
3773 binary.right = ilike_case_insensitive(binary.right)
3774 # else we assume ilower() has been applied
3775
3776 return self.visit_like_op_binary(binary, operator, **kw)
3777
3778 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3779 if operator is operators.not_ilike_op:
3780 binary = binary._clone()
3781 binary.left = ilike_case_insensitive(binary.left)
3782 binary.right = ilike_case_insensitive(binary.right)
3783 # else we assume ilower() has been applied
3784
3785 return self.visit_not_like_op_binary(binary, operator, **kw)
3786
3787 def visit_between_op_binary(self, binary, operator, **kw):
3788 symmetric = binary.modifiers.get("symmetric", False)
3789 return self._generate_generic_binary(
3790 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3791 )
3792
3793 def visit_not_between_op_binary(self, binary, operator, **kw):
3794 symmetric = binary.modifiers.get("symmetric", False)
3795 return self._generate_generic_binary(
3796 binary,
3797 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3798 **kw,
3799 )
3800
3801 def visit_regexp_match_op_binary(
3802 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3803 ) -> str:
3804 raise exc.CompileError(
3805 "%s dialect does not support regular expressions"
3806 % self.dialect.name
3807 )
3808
3809 def visit_not_regexp_match_op_binary(
3810 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3811 ) -> str:
3812 raise exc.CompileError(
3813 "%s dialect does not support regular expressions"
3814 % self.dialect.name
3815 )
3816
3817 def visit_regexp_replace_op_binary(
3818 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3819 ) -> str:
3820 raise exc.CompileError(
3821 "%s dialect does not support regular expression replacements"
3822 % self.dialect.name
3823 )
3824
3825 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3826 if bindmarkers is None:
3827 raise exc.CompileError(
3828 "DML target objects may only be used with "
3829 "compiled INSERT or UPDATE statements"
3830 )
3831
3832 bindmarkers[element.column.key] = element
3833 return f"__BINDMARKER_~~{element.column.key}~~"
3834
3835 def visit_bindparam(
3836 self,
3837 bindparam,
3838 within_columns_clause=False,
3839 literal_binds=False,
3840 skip_bind_expression=False,
3841 literal_execute=False,
3842 render_postcompile=False,
3843 **kwargs,
3844 ):
3845
3846 if not skip_bind_expression:
3847 impl = bindparam.type.dialect_impl(self.dialect)
3848 if impl._has_bind_expression:
3849 bind_expression = impl.bind_expression(bindparam)
3850 wrapped = self.process(
3851 bind_expression,
3852 skip_bind_expression=True,
3853 within_columns_clause=within_columns_clause,
3854 literal_binds=literal_binds and not bindparam.expanding,
3855 literal_execute=literal_execute,
3856 render_postcompile=render_postcompile,
3857 **kwargs,
3858 )
3859 if bindparam.expanding:
3860 # for postcompile w/ expanding, move the "wrapped" part
3861 # of this into the inside
3862
3863 m = re.match(
3864 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3865 )
3866 assert m, "unexpected format for expanding parameter"
3867 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3868 m.group(2),
3869 m.group(1),
3870 m.group(3),
3871 )
3872
3873 if literal_binds:
3874 ret = self.render_literal_bindparam(
3875 bindparam,
3876 within_columns_clause=True,
3877 bind_expression_template=wrapped,
3878 **kwargs,
3879 )
3880 return f"({ret})"
3881
3882 return wrapped
3883
3884 if not literal_binds:
3885 literal_execute = (
3886 literal_execute
3887 or bindparam.literal_execute
3888 or (within_columns_clause and self.ansi_bind_rules)
3889 )
3890 post_compile = literal_execute or bindparam.expanding
3891 else:
3892 post_compile = False
3893
3894 if literal_binds:
3895 ret = self.render_literal_bindparam(
3896 bindparam, within_columns_clause=True, **kwargs
3897 )
3898 if bindparam.expanding:
3899 ret = f"({ret})"
3900 return ret
3901
3902 name = self._truncate_bindparam(bindparam)
3903
3904 if name in self.binds:
3905 existing = self.binds[name]
3906 if existing is not bindparam:
3907 if (
3908 (existing.unique or bindparam.unique)
3909 and not existing.proxy_set.intersection(
3910 bindparam.proxy_set
3911 )
3912 and not existing._cloned_set.intersection(
3913 bindparam._cloned_set
3914 )
3915 ):
3916 raise exc.CompileError(
3917 "Bind parameter '%s' conflicts with "
3918 "unique bind parameter of the same name" % name
3919 )
3920 elif existing.expanding != bindparam.expanding:
3921 raise exc.CompileError(
3922 "Can't reuse bound parameter name '%s' in both "
3923 "'expanding' (e.g. within an IN expression) and "
3924 "non-expanding contexts. If this parameter is to "
3925 "receive a list/array value, set 'expanding=True' on "
3926 "it for expressions that aren't IN, otherwise use "
3927 "a different parameter name." % (name,)
3928 )
3929 elif existing._is_crud or bindparam._is_crud:
3930 if existing._is_crud and bindparam._is_crud:
3931 # TODO: this condition is not well understood.
3932 # see tests in test/sql/test_update.py
3933 raise exc.CompileError(
3934 "Encountered unsupported case when compiling an "
3935 "INSERT or UPDATE statement. If this is a "
3936 "multi-table "
3937 "UPDATE statement, please provide string-named "
3938 "arguments to the "
3939 "values() method with distinct names; support for "
3940 "multi-table UPDATE statements that "
3941 "target multiple tables for UPDATE is very "
3942 "limited",
3943 )
3944 else:
3945 raise exc.CompileError(
3946 f"bindparam() name '{bindparam.key}' is reserved "
3947 "for automatic usage in the VALUES or SET "
3948 "clause of this "
3949 "insert/update statement. Please use a "
3950 "name other than column name when using "
3951 "bindparam() "
3952 "with insert() or update() (for example, "
3953 f"'b_{bindparam.key}')."
3954 )
3955
3956 self.binds[bindparam.key] = self.binds[name] = bindparam
3957
3958 # if we are given a cache key that we're going to match against,
3959 # relate the bindparam here to one that is most likely present
3960 # in the "extracted params" portion of the cache key. this is used
3961 # to set up a positional mapping that is used to determine the
3962 # correct parameters for a subsequent use of this compiled with
3963 # a different set of parameter values. here, we accommodate for
3964 # parameters that may have been cloned both before and after the cache
3965 # key was been generated.
3966 ckbm_tuple = self._cache_key_bind_match
3967
3968 if ckbm_tuple:
3969 ckbm, cksm = ckbm_tuple
3970 for bp in bindparam._cloned_set:
3971 if bp.key in cksm:
3972 cb = cksm[bp.key]
3973 ckbm[cb].append(bindparam)
3974
3975 if bindparam.isoutparam:
3976 self.has_out_parameters = True
3977
3978 if post_compile:
3979 if render_postcompile:
3980 self._render_postcompile = True
3981
3982 if literal_execute:
3983 self.literal_execute_params |= {bindparam}
3984 else:
3985 self.post_compile_params |= {bindparam}
3986
3987 ret = self.bindparam_string(
3988 name,
3989 post_compile=post_compile,
3990 expanding=bindparam.expanding,
3991 bindparam_type=bindparam.type,
3992 **kwargs,
3993 )
3994
3995 if bindparam.expanding:
3996 ret = f"({ret})"
3997
3998 return ret
3999
4000 def render_bind_cast(self, type_, dbapi_type, sqltext):
4001 raise NotImplementedError()
4002
4003 def render_literal_bindparam(
4004 self,
4005 bindparam,
4006 render_literal_value=NO_ARG,
4007 bind_expression_template=None,
4008 **kw,
4009 ):
4010 if render_literal_value is not NO_ARG:
4011 value = render_literal_value
4012 else:
4013 if bindparam.value is None and bindparam.callable is None:
4014 op = kw.get("_binary_op", None)
4015 if op and op not in (operators.is_, operators.is_not):
4016 util.warn_limited(
4017 "Bound parameter '%s' rendering literal NULL in a SQL "
4018 "expression; comparisons to NULL should not use "
4019 "operators outside of 'is' or 'is not'",
4020 (bindparam.key,),
4021 )
4022 return self.process(sqltypes.NULLTYPE, **kw)
4023 value = bindparam.effective_value
4024
4025 if bindparam.expanding:
4026 leep = self._literal_execute_expanding_parameter_literal_binds
4027 to_update, replacement_expr = leep(
4028 bindparam,
4029 value,
4030 bind_expression_template=bind_expression_template,
4031 )
4032 return replacement_expr
4033 else:
4034 return self.render_literal_value(value, bindparam.type)
4035
4036 def render_literal_value(
4037 self, value: Any, type_: sqltypes.TypeEngine[Any]
4038 ) -> str:
4039 """Render the value of a bind parameter as a quoted literal.
4040
4041 This is used for statement sections that do not accept bind parameters
4042 on the target driver/database.
4043
4044 This should be implemented by subclasses using the quoting services
4045 of the DBAPI.
4046
4047 """
4048
4049 if value is None and not type_.should_evaluate_none:
4050 # issue #10535 - handle NULL in the compiler without placing
4051 # this onto each type, except for "evaluate None" types
4052 # (e.g. JSON)
4053 return self.process(elements.Null._instance())
4054
4055 processor = type_._cached_literal_processor(self.dialect)
4056 if processor:
4057 try:
4058 return processor(value)
4059 except Exception as e:
4060 raise exc.CompileError(
4061 f"Could not render literal value "
4062 f'"{sql_util._repr_single_value(value)}" '
4063 f"with datatype "
4064 f"{type_}; see parent stack trace for "
4065 "more detail."
4066 ) from e
4067
4068 else:
4069 raise exc.CompileError(
4070 f"No literal value renderer is available for literal value "
4071 f'"{sql_util._repr_single_value(value)}" '
4072 f"with datatype {type_}"
4073 )
4074
4075 def _truncate_bindparam(self, bindparam):
4076 if bindparam in self.bind_names:
4077 return self.bind_names[bindparam]
4078
4079 bind_name = bindparam.key
4080 if isinstance(bind_name, elements._truncated_label):
4081 bind_name = self._truncated_identifier("bindparam", bind_name)
4082
4083 # add to bind_names for translation
4084 self.bind_names[bindparam] = bind_name
4085
4086 return bind_name
4087
4088 def _truncated_identifier(
4089 self, ident_class: str, name: _truncated_label
4090 ) -> str:
4091 if (ident_class, name) in self.truncated_names:
4092 return self.truncated_names[(ident_class, name)]
4093
4094 anonname = name.apply_map(self.anon_map)
4095
4096 if len(anonname) > self.label_length - 6:
4097 counter = self._truncated_counters.get(ident_class, 1)
4098 truncname = (
4099 anonname[0 : max(self.label_length - 6, 0)]
4100 + "_"
4101 + hex(counter)[2:]
4102 )
4103 self._truncated_counters[ident_class] = counter + 1
4104 else:
4105 truncname = anonname
4106 self.truncated_names[(ident_class, name)] = truncname
4107 return truncname
4108
4109 def _anonymize(self, name: str) -> str:
4110 return name % self.anon_map
4111
4112 def bindparam_string(
4113 self,
4114 name: str,
4115 post_compile: bool = False,
4116 expanding: bool = False,
4117 escaped_from: Optional[str] = None,
4118 bindparam_type: Optional[TypeEngine[Any]] = None,
4119 accumulate_bind_names: Optional[Set[str]] = None,
4120 visited_bindparam: Optional[List[str]] = None,
4121 **kw: Any,
4122 ) -> str:
4123 # TODO: accumulate_bind_names is passed by crud.py to gather
4124 # names on a per-value basis, visited_bindparam is passed by
4125 # visit_insert() to collect all parameters in the statement.
4126 # see if this gathering can be simplified somehow
4127 if accumulate_bind_names is not None:
4128 accumulate_bind_names.add(name)
4129 if visited_bindparam is not None:
4130 visited_bindparam.append(name)
4131
4132 if not escaped_from:
4133 if self._bind_translate_re.search(name):
4134 # not quite the translate use case as we want to
4135 # also get a quick boolean if we even found
4136 # unusual characters in the name
4137 new_name = self._bind_translate_re.sub(
4138 lambda m: self._bind_translate_chars[m.group(0)],
4139 name,
4140 )
4141 escaped_from = name
4142 name = new_name
4143
4144 if escaped_from:
4145 self.escaped_bind_names = self.escaped_bind_names.union(
4146 {escaped_from: name}
4147 )
4148 if post_compile:
4149 ret = "__[POSTCOMPILE_%s]" % name
4150 if expanding:
4151 # for expanding, bound parameters or literal values will be
4152 # rendered per item
4153 return ret
4154
4155 # otherwise, for non-expanding "literal execute", apply
4156 # bind casts as determined by the datatype
4157 if bindparam_type is not None:
4158 type_impl = bindparam_type._unwrapped_dialect_impl(
4159 self.dialect
4160 )
4161 if type_impl.render_literal_cast:
4162 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4163 return ret
4164 elif self.state is CompilerState.COMPILING:
4165 ret = self.compilation_bindtemplate % {"name": name}
4166 else:
4167 ret = self.bindtemplate % {"name": name}
4168
4169 if (
4170 bindparam_type is not None
4171 and self.dialect._bind_typing_render_casts
4172 ):
4173 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4174 if type_impl.render_bind_cast:
4175 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4176
4177 return ret
4178
4179 def _dispatch_independent_ctes(self, stmt, kw):
4180 local_kw = kw.copy()
4181 local_kw.pop("cte_opts", None)
4182 for cte, opt in zip(
4183 stmt._independent_ctes, stmt._independent_ctes_opts
4184 ):
4185 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4186
4187 def visit_cte(
4188 self,
4189 cte: CTE,
4190 asfrom: bool = False,
4191 ashint: bool = False,
4192 fromhints: Optional[_FromHintsType] = None,
4193 visiting_cte: Optional[CTE] = None,
4194 from_linter: Optional[FromLinter] = None,
4195 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4196 **kwargs: Any,
4197 ) -> Optional[str]:
4198 self_ctes = self._init_cte_state()
4199 assert self_ctes is self.ctes
4200
4201 kwargs["visiting_cte"] = cte
4202
4203 cte_name = cte.name
4204
4205 if isinstance(cte_name, elements._truncated_label):
4206 cte_name = self._truncated_identifier("alias", cte_name)
4207
4208 is_new_cte = True
4209 embedded_in_current_named_cte = False
4210
4211 _reference_cte = cte._get_reference_cte()
4212
4213 nesting = cte.nesting or cte_opts.nesting
4214
4215 # check for CTE already encountered
4216 if _reference_cte in self.level_name_by_cte:
4217 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4218 _reference_cte
4219 ]
4220 assert _ == cte_name
4221
4222 cte_level_name = (cte_level, cte_name)
4223 existing_cte = self.ctes_by_level_name[cte_level_name]
4224
4225 # check if we are receiving it here with a specific
4226 # "nest_here" location; if so, move it to this location
4227
4228 if cte_opts.nesting:
4229 if existing_cte_opts.nesting:
4230 raise exc.CompileError(
4231 "CTE is stated as 'nest_here' in "
4232 "more than one location"
4233 )
4234
4235 old_level_name = (cte_level, cte_name)
4236 cte_level = len(self.stack) if nesting else 1
4237 cte_level_name = new_level_name = (cte_level, cte_name)
4238
4239 del self.ctes_by_level_name[old_level_name]
4240 self.ctes_by_level_name[new_level_name] = existing_cte
4241 self.level_name_by_cte[_reference_cte] = new_level_name + (
4242 cte_opts,
4243 )
4244
4245 else:
4246 cte_level = len(self.stack) if nesting else 1
4247 cte_level_name = (cte_level, cte_name)
4248
4249 if cte_level_name in self.ctes_by_level_name:
4250 existing_cte = self.ctes_by_level_name[cte_level_name]
4251 else:
4252 existing_cte = None
4253
4254 if existing_cte is not None:
4255 embedded_in_current_named_cte = visiting_cte is existing_cte
4256
4257 # we've generated a same-named CTE that we are enclosed in,
4258 # or this is the same CTE. just return the name.
4259 if cte is existing_cte._restates or cte is existing_cte:
4260 is_new_cte = False
4261 elif existing_cte is cte._restates:
4262 # we've generated a same-named CTE that is
4263 # enclosed in us - we take precedence, so
4264 # discard the text for the "inner".
4265 del self_ctes[existing_cte]
4266
4267 existing_cte_reference_cte = existing_cte._get_reference_cte()
4268
4269 assert existing_cte_reference_cte is _reference_cte
4270 assert existing_cte_reference_cte is existing_cte
4271
4272 del self.level_name_by_cte[existing_cte_reference_cte]
4273 else:
4274 if (
4275 # if the two CTEs have the same hash, which we expect
4276 # here means that one/both is an annotated of the other
4277 (hash(cte) == hash(existing_cte))
4278 # or...
4279 or (
4280 (
4281 # if they are clones, i.e. they came from the ORM
4282 # or some other visit method
4283 cte._is_clone_of is not None
4284 or existing_cte._is_clone_of is not None
4285 )
4286 # and are deep-copy identical
4287 and cte.compare(existing_cte)
4288 )
4289 ):
4290 # then consider these two CTEs the same
4291 is_new_cte = False
4292 else:
4293 # otherwise these are two CTEs that either will render
4294 # differently, or were indicated separately by the user,
4295 # with the same name
4296 raise exc.CompileError(
4297 "Multiple, unrelated CTEs found with "
4298 "the same name: %r" % cte_name
4299 )
4300
4301 if not asfrom and not is_new_cte:
4302 return None
4303
4304 if cte._cte_alias is not None:
4305 pre_alias_cte = cte._cte_alias
4306 cte_pre_alias_name = cte._cte_alias.name
4307 if isinstance(cte_pre_alias_name, elements._truncated_label):
4308 cte_pre_alias_name = self._truncated_identifier(
4309 "alias", cte_pre_alias_name
4310 )
4311 else:
4312 pre_alias_cte = cte
4313 cte_pre_alias_name = None
4314
4315 if is_new_cte:
4316 self.ctes_by_level_name[cte_level_name] = cte
4317 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4318 cte_opts,
4319 )
4320
4321 if pre_alias_cte not in self.ctes:
4322 self.visit_cte(pre_alias_cte, **kwargs)
4323
4324 if not cte_pre_alias_name and cte not in self_ctes:
4325 if cte.recursive:
4326 self.ctes_recursive = True
4327 text = self.preparer.format_alias(cte, cte_name)
4328 if cte.recursive or cte.element.name_cte_columns:
4329 col_source = cte.element
4330
4331 # TODO: can we get at the .columns_plus_names collection
4332 # that is already (or will be?) generated for the SELECT
4333 # rather than calling twice?
4334 recur_cols = [
4335 # TODO: proxy_name is not technically safe,
4336 # see test_cte->
4337 # test_with_recursive_no_name_currently_buggy. not
4338 # clear what should be done with such a case
4339 fallback_label_name or proxy_name
4340 for (
4341 _,
4342 proxy_name,
4343 fallback_label_name,
4344 c,
4345 repeated,
4346 ) in (col_source._generate_columns_plus_names(True))
4347 if not repeated
4348 ]
4349
4350 text += "(%s)" % (
4351 ", ".join(
4352 self.preparer.format_label_name(
4353 ident, anon_map=self.anon_map
4354 )
4355 for ident in recur_cols
4356 )
4357 )
4358
4359 assert kwargs.get("subquery", False) is False
4360
4361 if not self.stack:
4362 # toplevel, this is a stringify of the
4363 # cte directly. just compile the inner
4364 # the way alias() does.
4365 return cte.element._compiler_dispatch(
4366 self, asfrom=asfrom, **kwargs
4367 )
4368 else:
4369 prefixes = self._generate_prefixes(
4370 cte, cte._prefixes, **kwargs
4371 )
4372 inner = cte.element._compiler_dispatch(
4373 self, asfrom=True, **kwargs
4374 )
4375
4376 text += " AS %s\n(%s)" % (prefixes, inner)
4377
4378 if cte._suffixes:
4379 text += " " + self._generate_prefixes(
4380 cte, cte._suffixes, **kwargs
4381 )
4382
4383 self_ctes[cte] = text
4384
4385 if asfrom:
4386 if from_linter:
4387 from_linter.froms[cte._de_clone()] = cte_name
4388
4389 if not is_new_cte and embedded_in_current_named_cte:
4390 return self.preparer.format_alias(cte, cte_name)
4391
4392 if cte_pre_alias_name:
4393 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4394 if self.preparer._requires_quotes(cte_name):
4395 cte_name = self.preparer.quote(cte_name)
4396 text += self.get_render_as_alias_suffix(cte_name)
4397 return text # type: ignore[no-any-return]
4398 else:
4399 return self.preparer.format_alias(cte, cte_name)
4400
4401 return None
4402
4403 def visit_table_valued_alias(self, element, **kw):
4404 if element.joins_implicitly:
4405 kw["from_linter"] = None
4406 if element._is_lateral:
4407 return self.visit_lateral(element, **kw)
4408 else:
4409 return self.visit_alias(element, **kw)
4410
4411 def visit_table_valued_column(self, element, **kw):
4412 return self.visit_column(element, **kw)
4413
4414 def visit_alias(
4415 self,
4416 alias,
4417 asfrom=False,
4418 ashint=False,
4419 iscrud=False,
4420 fromhints=None,
4421 subquery=False,
4422 lateral=False,
4423 enclosing_alias=None,
4424 from_linter=None,
4425 within_tstring=False,
4426 **kwargs,
4427 ):
4428 if lateral:
4429 if "enclosing_lateral" not in kwargs:
4430 # if lateral is set and enclosing_lateral is not
4431 # present, we assume we are being called directly
4432 # from visit_lateral() and we need to set enclosing_lateral.
4433 assert alias._is_lateral
4434 kwargs["enclosing_lateral"] = alias
4435
4436 # for lateral objects, we track a second from_linter that is...
4437 # lateral! to the level above us.
4438 if (
4439 from_linter
4440 and "lateral_from_linter" not in kwargs
4441 and "enclosing_lateral" in kwargs
4442 ):
4443 kwargs["lateral_from_linter"] = from_linter
4444
4445 if enclosing_alias is not None and enclosing_alias.element is alias:
4446 inner = alias.element._compiler_dispatch(
4447 self,
4448 asfrom=asfrom,
4449 ashint=ashint,
4450 iscrud=iscrud,
4451 fromhints=fromhints,
4452 lateral=lateral,
4453 enclosing_alias=alias,
4454 **kwargs,
4455 )
4456 if subquery and (asfrom or lateral):
4457 inner = "(%s)" % (inner,)
4458 return inner
4459 else:
4460 kwargs["enclosing_alias"] = alias
4461
4462 if asfrom or ashint or within_tstring:
4463 if isinstance(alias.name, elements._truncated_label):
4464 alias_name = self._truncated_identifier("alias", alias.name)
4465 else:
4466 alias_name = alias.name
4467
4468 if ashint:
4469 return self.preparer.format_alias(alias, alias_name)
4470 elif asfrom or within_tstring:
4471 if from_linter:
4472 from_linter.froms[alias._de_clone()] = alias_name
4473
4474 inner = alias.element._compiler_dispatch(
4475 self, asfrom=True, lateral=lateral, **kwargs
4476 )
4477 if subquery:
4478 inner = "(%s)" % (inner,)
4479
4480 ret = inner + self.get_render_as_alias_suffix(
4481 self.preparer.format_alias(alias, alias_name)
4482 )
4483
4484 if alias._supports_derived_columns and alias._render_derived:
4485 ret += "(%s)" % (
4486 ", ".join(
4487 "%s%s"
4488 % (
4489 self.preparer.quote(col.name),
4490 (
4491 " %s"
4492 % self.dialect.type_compiler_instance.process(
4493 col.type, **kwargs
4494 )
4495 if alias._render_derived_w_types
4496 else ""
4497 ),
4498 )
4499 for col in alias.c
4500 )
4501 )
4502
4503 if fromhints and alias in fromhints:
4504 ret = self.format_from_hint_text(
4505 ret, alias, fromhints[alias], iscrud
4506 )
4507
4508 return ret
4509 else:
4510 # note we cancel the "subquery" flag here as well
4511 return alias.element._compiler_dispatch(
4512 self, lateral=lateral, **kwargs
4513 )
4514
4515 def visit_subquery(self, subquery, **kw):
4516 kw["subquery"] = True
4517 return self.visit_alias(subquery, **kw)
4518
4519 def visit_lateral(self, lateral_, **kw):
4520 kw["lateral"] = True
4521 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4522
4523 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4524 text = "%s TABLESAMPLE %s" % (
4525 self.visit_alias(tablesample, asfrom=True, **kw),
4526 tablesample._get_method()._compiler_dispatch(self, **kw),
4527 )
4528
4529 if tablesample.seed is not None:
4530 text += " REPEATABLE (%s)" % (
4531 tablesample.seed._compiler_dispatch(self, **kw)
4532 )
4533
4534 return text
4535
4536 def _render_values(self, element, **kw):
4537 kw.setdefault("literal_binds", element.literal_binds)
4538 tuples = ", ".join(
4539 self.process(
4540 elements.Tuple(
4541 types=element._column_types, *elem
4542 ).self_group(),
4543 **kw,
4544 )
4545 for chunk in element._data
4546 for elem in chunk
4547 )
4548 return f"VALUES {tuples}"
4549
4550 def visit_values(
4551 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4552 ):
4553
4554 if element._independent_ctes:
4555 self._dispatch_independent_ctes(element, kw)
4556
4557 v = self._render_values(element, **kw)
4558
4559 if element._unnamed:
4560 name = None
4561 elif isinstance(element.name, elements._truncated_label):
4562 name = self._truncated_identifier("values", element.name)
4563 else:
4564 name = element.name
4565
4566 if element._is_lateral:
4567 lateral = "LATERAL "
4568 else:
4569 lateral = ""
4570
4571 if asfrom:
4572 if from_linter:
4573 from_linter.froms[element._de_clone()] = (
4574 name if name is not None else "(unnamed VALUES element)"
4575 )
4576
4577 if visiting_cte is not None and visiting_cte.element is element:
4578 if element._is_lateral:
4579 raise exc.CompileError(
4580 "Can't use a LATERAL VALUES expression inside of a CTE"
4581 )
4582 elif name:
4583 kw["include_table"] = False
4584 v = "%s(%s)%s (%s)" % (
4585 lateral,
4586 v,
4587 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4588 (
4589 ", ".join(
4590 c._compiler_dispatch(self, **kw)
4591 for c in element.columns
4592 )
4593 ),
4594 )
4595 else:
4596 v = "%s(%s)" % (lateral, v)
4597 return v
4598
4599 def visit_scalar_values(self, element, **kw):
4600 return f"({self._render_values(element, **kw)})"
4601
4602 def get_render_as_alias_suffix(self, alias_name_text):
4603 return " AS " + alias_name_text
4604
4605 def _add_to_result_map(
4606 self,
4607 keyname: str,
4608 name: str,
4609 objects: Tuple[Any, ...],
4610 type_: TypeEngine[Any],
4611 ) -> None:
4612
4613 # note objects must be non-empty for cursor.py to handle the
4614 # collection properly
4615 assert objects
4616
4617 if keyname is None or keyname == "*":
4618 self._ordered_columns = False
4619 self._ad_hoc_textual = True
4620 if type_._is_tuple_type:
4621 raise exc.CompileError(
4622 "Most backends don't support SELECTing "
4623 "from a tuple() object. If this is an ORM query, "
4624 "consider using the Bundle object."
4625 )
4626 self._result_columns.append(
4627 ResultColumnsEntry(keyname, name, objects, type_)
4628 )
4629
4630 def _label_returning_column(
4631 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4632 ):
4633 """Render a column with necessary labels inside of a RETURNING clause.
4634
4635 This method is provided for individual dialects in place of calling
4636 the _label_select_column method directly, so that the two use cases
4637 of RETURNING vs. SELECT can be disambiguated going forward.
4638
4639 .. versionadded:: 1.4.21
4640
4641 """
4642 return self._label_select_column(
4643 None,
4644 column,
4645 populate_result_map,
4646 False,
4647 {} if column_clause_args is None else column_clause_args,
4648 **kw,
4649 )
4650
4651 def _label_select_column(
4652 self,
4653 select,
4654 column,
4655 populate_result_map,
4656 asfrom,
4657 column_clause_args,
4658 name=None,
4659 proxy_name=None,
4660 fallback_label_name=None,
4661 within_columns_clause=True,
4662 column_is_repeated=False,
4663 need_column_expressions=False,
4664 include_table=True,
4665 ):
4666 """produce labeled columns present in a select()."""
4667 impl = column.type.dialect_impl(self.dialect)
4668
4669 if impl._has_column_expression and (
4670 need_column_expressions or populate_result_map
4671 ):
4672 col_expr = impl.column_expression(column)
4673 else:
4674 col_expr = column
4675
4676 if populate_result_map:
4677 # pass an "add_to_result_map" callable into the compilation
4678 # of embedded columns. this collects information about the
4679 # column as it will be fetched in the result and is coordinated
4680 # with cursor.description when the query is executed.
4681 add_to_result_map = self._add_to_result_map
4682
4683 # if the SELECT statement told us this column is a repeat,
4684 # wrap the callable with one that prevents the addition of the
4685 # targets
4686 if column_is_repeated:
4687 _add_to_result_map = add_to_result_map
4688
4689 def add_to_result_map(keyname, name, objects, type_):
4690 _add_to_result_map(keyname, name, (keyname,), type_)
4691
4692 # if we redefined col_expr for type expressions, wrap the
4693 # callable with one that adds the original column to the targets
4694 elif col_expr is not column:
4695 _add_to_result_map = add_to_result_map
4696
4697 def add_to_result_map(keyname, name, objects, type_):
4698 _add_to_result_map(
4699 keyname, name, (column,) + objects, type_
4700 )
4701
4702 else:
4703 add_to_result_map = None
4704
4705 # this method is used by some of the dialects for RETURNING,
4706 # which has different inputs. _label_returning_column was added
4707 # as the better target for this now however for 1.4 we will keep
4708 # _label_select_column directly compatible with this use case.
4709 # these assertions right now set up the current expected inputs
4710 assert within_columns_clause, (
4711 "_label_select_column is only relevant within "
4712 "the columns clause of a SELECT or RETURNING"
4713 )
4714 if isinstance(column, elements.Label):
4715 if col_expr is not column:
4716 result_expr = _CompileLabel(
4717 col_expr, column.name, alt_names=(column.element,)
4718 )
4719 else:
4720 result_expr = col_expr
4721
4722 elif name:
4723 # here, _columns_plus_names has determined there's an explicit
4724 # label name we need to use. this is the default for
4725 # tablenames_plus_columnnames as well as when columns are being
4726 # deduplicated on name
4727
4728 assert (
4729 proxy_name is not None
4730 ), "proxy_name is required if 'name' is passed"
4731
4732 result_expr = _CompileLabel(
4733 col_expr,
4734 name,
4735 alt_names=(
4736 proxy_name,
4737 # this is a hack to allow legacy result column lookups
4738 # to work as they did before; this goes away in 2.0.
4739 # TODO: this only seems to be tested indirectly
4740 # via test/orm/test_deprecations.py. should be a
4741 # resultset test for this
4742 column._tq_label,
4743 ),
4744 )
4745 else:
4746 # determine here whether this column should be rendered in
4747 # a labelled context or not, as we were given no required label
4748 # name from the caller. Here we apply heuristics based on the kind
4749 # of SQL expression involved.
4750
4751 if col_expr is not column:
4752 # type-specific expression wrapping the given column,
4753 # so we render a label
4754 render_with_label = True
4755 elif isinstance(column, elements.ColumnClause):
4756 # table-bound column, we render its name as a label if we are
4757 # inside of a subquery only
4758 render_with_label = (
4759 asfrom
4760 and not column.is_literal
4761 and column.table is not None
4762 )
4763 elif isinstance(column, elements.TextClause):
4764 render_with_label = False
4765 elif isinstance(column, elements.UnaryExpression):
4766 # unary expression. notes added as of #12681
4767 #
4768 # By convention, the visit_unary() method
4769 # itself does not add an entry to the result map, and relies
4770 # upon either the inner expression creating a result map
4771 # entry, or if not, by creating a label here that produces
4772 # the result map entry. Where that happens is based on whether
4773 # or not the element immediately inside the unary is a
4774 # NamedColumn subclass or not.
4775 #
4776 # Now, this also impacts how the SELECT is written; if
4777 # we decide to generate a label here, we get the usual
4778 # "~(x+y) AS anon_1" thing in the columns clause. If we
4779 # don't, we don't get an AS at all, we get like
4780 # "~table.column".
4781 #
4782 # But here is the important thing as of modernish (like 1.4)
4783 # versions of SQLAlchemy - **whether or not the AS <label>
4784 # is present in the statement is not actually important**.
4785 # We target result columns **positionally** for a fully
4786 # compiled ``Select()`` object; before 1.4 we needed those
4787 # labels to match in cursor.description etc etc but now it
4788 # really doesn't matter.
4789 # So really, we could set render_with_label True in all cases.
4790 # Or we could just have visit_unary() populate the result map
4791 # in all cases.
4792 #
4793 # What we're doing here is strictly trying to not rock the
4794 # boat too much with when we do/don't render "AS label";
4795 # labels being present helps in the edge cases that we
4796 # "fall back" to named cursor.description matching, labels
4797 # not being present for columns keeps us from having awkward
4798 # phrases like "SELECT DISTINCT table.x AS x".
4799 render_with_label = (
4800 (
4801 # exception case to detect if we render "not boolean"
4802 # as "not <col>" for native boolean or "<col> = 1"
4803 # for non-native boolean. this is controlled by
4804 # visit_is_<true|false>_unary_operator
4805 column.operator
4806 in (operators.is_false, operators.is_true)
4807 and not self.dialect.supports_native_boolean
4808 )
4809 or column._wraps_unnamed_column()
4810 or asfrom
4811 )
4812 elif (
4813 # general class of expressions that don't have a SQL-column
4814 # addressable name. includes scalar selects, bind parameters,
4815 # SQL functions, others
4816 not isinstance(column, elements.NamedColumn)
4817 # deeper check that indicates there's no natural "name" to
4818 # this element, which accommodates for custom SQL constructs
4819 # that might have a ".name" attribute (but aren't SQL
4820 # functions) but are not implementing this more recently added
4821 # base class. in theory the "NamedColumn" check should be
4822 # enough, however here we seek to maintain legacy behaviors
4823 # as well.
4824 and column._non_anon_label is None
4825 ):
4826 render_with_label = True
4827 else:
4828 render_with_label = False
4829
4830 if render_with_label:
4831 if not fallback_label_name:
4832 # used by the RETURNING case right now. we generate it
4833 # here as 3rd party dialects may be referring to
4834 # _label_select_column method directly instead of the
4835 # just-added _label_returning_column method
4836 assert not column_is_repeated
4837 fallback_label_name = column._anon_name_label
4838
4839 fallback_label_name = (
4840 elements._truncated_label(fallback_label_name)
4841 if not isinstance(
4842 fallback_label_name, elements._truncated_label
4843 )
4844 else fallback_label_name
4845 )
4846
4847 result_expr = _CompileLabel(
4848 col_expr, fallback_label_name, alt_names=(proxy_name,)
4849 )
4850 else:
4851 result_expr = col_expr
4852
4853 column_clause_args.update(
4854 within_columns_clause=within_columns_clause,
4855 add_to_result_map=add_to_result_map,
4856 include_table=include_table,
4857 within_tstring=False,
4858 )
4859 return result_expr._compiler_dispatch(self, **column_clause_args)
4860
4861 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4862 hinttext = self.get_from_hint_text(table, hint)
4863 if hinttext:
4864 sqltext += " " + hinttext
4865 return sqltext
4866
4867 def get_select_hint_text(self, byfroms):
4868 return None
4869
4870 def get_from_hint_text(
4871 self, table: FromClause, text: Optional[str]
4872 ) -> Optional[str]:
4873 return None
4874
4875 def get_crud_hint_text(self, table, text):
4876 return None
4877
4878 def get_statement_hint_text(self, hint_texts):
4879 return " ".join(hint_texts)
4880
4881 _default_stack_entry: _CompilerStackEntry
4882
4883 if not typing.TYPE_CHECKING:
4884 _default_stack_entry = util.immutabledict(
4885 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4886 )
4887
4888 def _display_froms_for_select(
4889 self, select_stmt, asfrom, lateral=False, **kw
4890 ):
4891 # utility method to help external dialects
4892 # get the correct from list for a select.
4893 # specifically the oracle dialect needs this feature
4894 # right now.
4895 toplevel = not self.stack
4896 entry = self._default_stack_entry if toplevel else self.stack[-1]
4897
4898 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4899
4900 correlate_froms = entry["correlate_froms"]
4901 asfrom_froms = entry["asfrom_froms"]
4902
4903 if asfrom and not lateral:
4904 froms = compile_state._get_display_froms(
4905 explicit_correlate_froms=correlate_froms.difference(
4906 asfrom_froms
4907 ),
4908 implicit_correlate_froms=(),
4909 )
4910 else:
4911 froms = compile_state._get_display_froms(
4912 explicit_correlate_froms=correlate_froms,
4913 implicit_correlate_froms=asfrom_froms,
4914 )
4915 return froms
4916
4917 translate_select_structure: Any = None
4918 """if not ``None``, should be a callable which accepts ``(select_stmt,
4919 **kw)`` and returns a select object. this is used for structural changes
4920 mostly to accommodate for LIMIT/OFFSET schemes
4921
4922 """
4923
4924 def visit_select(
4925 self,
4926 select_stmt,
4927 asfrom=False,
4928 insert_into=False,
4929 fromhints=None,
4930 compound_index=None,
4931 select_wraps_for=None,
4932 lateral=False,
4933 from_linter=None,
4934 **kwargs,
4935 ):
4936 assert select_wraps_for is None, (
4937 "SQLAlchemy 1.4 requires use of "
4938 "the translate_select_structure hook for structural "
4939 "translations of SELECT objects"
4940 )
4941 if self._collect_params:
4942 self._add_to_params(select_stmt)
4943
4944 # initial setup of SELECT. the compile_state_factory may now
4945 # be creating a totally different SELECT from the one that was
4946 # passed in. for ORM use this will convert from an ORM-state
4947 # SELECT to a regular "Core" SELECT. other composed operations
4948 # such as computation of joins will be performed.
4949
4950 kwargs["within_columns_clause"] = False
4951
4952 compile_state = select_stmt._compile_state_factory(
4953 select_stmt, self, **kwargs
4954 )
4955 kwargs["ambiguous_table_name_map"] = (
4956 compile_state._ambiguous_table_name_map
4957 )
4958
4959 select_stmt = compile_state.statement
4960
4961 toplevel = not self.stack
4962
4963 if toplevel and not self.compile_state:
4964 self.compile_state = compile_state
4965
4966 is_embedded_select = compound_index is not None or insert_into
4967
4968 # translate step for Oracle, SQL Server which often need to
4969 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4970 # other conditions
4971 if self.translate_select_structure:
4972 new_select_stmt = self.translate_select_structure(
4973 select_stmt, asfrom=asfrom, **kwargs
4974 )
4975
4976 # if SELECT was restructured, maintain a link to the originals
4977 # and assemble a new compile state
4978 if new_select_stmt is not select_stmt:
4979 compile_state_wraps_for = compile_state
4980 select_wraps_for = select_stmt
4981 select_stmt = new_select_stmt
4982
4983 compile_state = select_stmt._compile_state_factory(
4984 select_stmt, self, **kwargs
4985 )
4986 select_stmt = compile_state.statement
4987
4988 entry = self._default_stack_entry if toplevel else self.stack[-1]
4989
4990 populate_result_map = need_column_expressions = (
4991 toplevel
4992 or entry.get("need_result_map_for_compound", False)
4993 or entry.get("need_result_map_for_nested", False)
4994 )
4995
4996 # indicates there is a CompoundSelect in play and we are not the
4997 # first select
4998 if compound_index:
4999 populate_result_map = False
5000
5001 # this was first proposed as part of #3372; however, it is not
5002 # reached in current tests and could possibly be an assertion
5003 # instead.
5004 if not populate_result_map and "add_to_result_map" in kwargs:
5005 del kwargs["add_to_result_map"]
5006
5007 froms = self._setup_select_stack(
5008 select_stmt, compile_state, entry, asfrom, lateral, compound_index
5009 )
5010
5011 column_clause_args = kwargs.copy()
5012 column_clause_args.update(
5013 {"within_label_clause": False, "within_columns_clause": False}
5014 )
5015
5016 text = "SELECT " # we're off to a good start !
5017
5018 if select_stmt._post_select_clause is not None:
5019 psc = self.process(select_stmt._post_select_clause, **kwargs)
5020 if psc is not None:
5021 text += psc + " "
5022
5023 if select_stmt._hints:
5024 hint_text, byfrom = self._setup_select_hints(select_stmt)
5025 if hint_text:
5026 text += hint_text + " "
5027 else:
5028 byfrom = None
5029
5030 if select_stmt._independent_ctes:
5031 self._dispatch_independent_ctes(select_stmt, kwargs)
5032
5033 if select_stmt._prefixes:
5034 text += self._generate_prefixes(
5035 select_stmt, select_stmt._prefixes, **kwargs
5036 )
5037
5038 text += self.get_select_precolumns(select_stmt, **kwargs)
5039
5040 if select_stmt._pre_columns_clause is not None:
5041 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5042 if pcc is not None:
5043 text += pcc + " "
5044
5045 # the actual list of columns to print in the SELECT column list.
5046 inner_columns = [
5047 c
5048 for c in [
5049 self._label_select_column(
5050 select_stmt,
5051 column,
5052 populate_result_map,
5053 asfrom,
5054 column_clause_args,
5055 name=name,
5056 proxy_name=proxy_name,
5057 fallback_label_name=fallback_label_name,
5058 column_is_repeated=repeated,
5059 need_column_expressions=need_column_expressions,
5060 )
5061 for (
5062 name,
5063 proxy_name,
5064 fallback_label_name,
5065 column,
5066 repeated,
5067 ) in compile_state.columns_plus_names
5068 ]
5069 if c is not None
5070 ]
5071
5072 if populate_result_map and select_wraps_for is not None:
5073 # if this select was generated from translate_select,
5074 # rewrite the targeted columns in the result map
5075
5076 translate = dict(
5077 zip(
5078 [
5079 name
5080 for (
5081 key,
5082 proxy_name,
5083 fallback_label_name,
5084 name,
5085 repeated,
5086 ) in compile_state.columns_plus_names
5087 ],
5088 [
5089 name
5090 for (
5091 key,
5092 proxy_name,
5093 fallback_label_name,
5094 name,
5095 repeated,
5096 ) in compile_state_wraps_for.columns_plus_names
5097 ],
5098 )
5099 )
5100
5101 self._result_columns = [
5102 ResultColumnsEntry(
5103 key, name, tuple(translate.get(o, o) for o in obj), type_
5104 )
5105 for key, name, obj, type_ in self._result_columns
5106 ]
5107
5108 text = self._compose_select_body(
5109 text,
5110 select_stmt,
5111 compile_state,
5112 inner_columns,
5113 froms,
5114 byfrom,
5115 toplevel,
5116 kwargs,
5117 )
5118
5119 if select_stmt._post_body_clause is not None:
5120 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5121 if pbc:
5122 text += " " + pbc
5123
5124 if select_stmt._statement_hints:
5125 per_dialect = [
5126 ht
5127 for (dialect_name, ht) in select_stmt._statement_hints
5128 if dialect_name in ("*", self.dialect.name)
5129 ]
5130 if per_dialect:
5131 text += " " + self.get_statement_hint_text(per_dialect)
5132
5133 # In compound query, CTEs are shared at the compound level
5134 if self.ctes and (not is_embedded_select or toplevel):
5135 nesting_level = len(self.stack) if not toplevel else None
5136 text = self._render_cte_clause(nesting_level=nesting_level) + text
5137
5138 if select_stmt._suffixes:
5139 text += " " + self._generate_prefixes(
5140 select_stmt, select_stmt._suffixes, **kwargs
5141 )
5142
5143 self.stack.pop(-1)
5144
5145 return text
5146
5147 def _setup_select_hints(
5148 self, select: Select[Unpack[TupleAny]]
5149 ) -> Tuple[str, _FromHintsType]:
5150 byfrom = {
5151 from_: hinttext
5152 % {"name": from_._compiler_dispatch(self, ashint=True)}
5153 for (from_, dialect), hinttext in select._hints.items()
5154 if dialect in ("*", self.dialect.name)
5155 }
5156 hint_text = self.get_select_hint_text(byfrom)
5157 return hint_text, byfrom
5158
5159 def _setup_select_stack(
5160 self, select, compile_state, entry, asfrom, lateral, compound_index
5161 ):
5162 correlate_froms = entry["correlate_froms"]
5163 asfrom_froms = entry["asfrom_froms"]
5164
5165 if compound_index == 0:
5166 entry["select_0"] = select
5167 elif compound_index:
5168 select_0 = entry["select_0"]
5169 numcols = len(select_0._all_selected_columns)
5170
5171 if len(compile_state.columns_plus_names) != numcols:
5172 raise exc.CompileError(
5173 "All selectables passed to "
5174 "CompoundSelect must have identical numbers of "
5175 "columns; select #%d has %d columns, select "
5176 "#%d has %d"
5177 % (
5178 1,
5179 numcols,
5180 compound_index + 1,
5181 len(select._all_selected_columns),
5182 )
5183 )
5184
5185 if asfrom and not lateral:
5186 froms = compile_state._get_display_froms(
5187 explicit_correlate_froms=correlate_froms.difference(
5188 asfrom_froms
5189 ),
5190 implicit_correlate_froms=(),
5191 )
5192 else:
5193 froms = compile_state._get_display_froms(
5194 explicit_correlate_froms=correlate_froms,
5195 implicit_correlate_froms=asfrom_froms,
5196 )
5197
5198 new_correlate_froms = set(_from_objects(*froms))
5199 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5200
5201 new_entry: _CompilerStackEntry = {
5202 "asfrom_froms": new_correlate_froms,
5203 "correlate_froms": all_correlate_froms,
5204 "selectable": select,
5205 "compile_state": compile_state,
5206 }
5207 self.stack.append(new_entry)
5208
5209 return froms
5210
5211 def _compose_select_body(
5212 self,
5213 text,
5214 select,
5215 compile_state,
5216 inner_columns,
5217 froms,
5218 byfrom,
5219 toplevel,
5220 kwargs,
5221 ):
5222 text += ", ".join(inner_columns)
5223
5224 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5225 from_linter = FromLinter({}, set())
5226 warn_linting = self.linting & WARN_LINTING
5227 if toplevel:
5228 self.from_linter = from_linter
5229 else:
5230 from_linter = None
5231 warn_linting = False
5232
5233 # adjust the whitespace for no inner columns, part of #9440,
5234 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5235 # "SELECT FROM ...".
5236 # while it would be better to have built the SELECT starting string
5237 # without trailing whitespace first, then add whitespace only if inner
5238 # cols were present, this breaks compatibility with various custom
5239 # compilation schemes that are currently being tested.
5240 if not inner_columns:
5241 text = text.rstrip()
5242
5243 if froms:
5244 text += " \nFROM "
5245
5246 if select._hints:
5247 text += ", ".join(
5248 [
5249 f._compiler_dispatch(
5250 self,
5251 asfrom=True,
5252 fromhints=byfrom,
5253 from_linter=from_linter,
5254 **kwargs,
5255 )
5256 for f in froms
5257 ]
5258 )
5259 else:
5260 text += ", ".join(
5261 [
5262 f._compiler_dispatch(
5263 self,
5264 asfrom=True,
5265 from_linter=from_linter,
5266 **kwargs,
5267 )
5268 for f in froms
5269 ]
5270 )
5271 else:
5272 text += self.default_from()
5273
5274 if select._where_criteria:
5275 t = self._generate_delimited_and_list(
5276 select._where_criteria, from_linter=from_linter, **kwargs
5277 )
5278 if t:
5279 text += " \nWHERE " + t
5280
5281 if warn_linting:
5282 assert from_linter is not None
5283 from_linter.warn()
5284
5285 if select._group_by_clauses:
5286 text += self.group_by_clause(select, **kwargs)
5287
5288 if select._having_criteria:
5289 t = self._generate_delimited_and_list(
5290 select._having_criteria, **kwargs
5291 )
5292 if t:
5293 text += " \nHAVING " + t
5294
5295 if select._post_criteria_clause is not None:
5296 pcc = self.process(select._post_criteria_clause, **kwargs)
5297 if pcc is not None:
5298 text += " \n" + pcc
5299
5300 if select._order_by_clauses:
5301 text += self.order_by_clause(select, **kwargs)
5302
5303 if select._has_row_limiting_clause:
5304 text += self._row_limit_clause(select, **kwargs)
5305
5306 if select._for_update_arg is not None:
5307 text += self.for_update_clause(select, **kwargs)
5308
5309 return text
5310
5311 def _generate_prefixes(self, stmt, prefixes, **kw):
5312 clause = " ".join(
5313 prefix._compiler_dispatch(self, **kw)
5314 for prefix, dialect_name in prefixes
5315 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5316 )
5317 if clause:
5318 clause += " "
5319 return clause
5320
5321 def _render_cte_clause(
5322 self,
5323 nesting_level=None,
5324 include_following_stack=False,
5325 ):
5326 """
5327 include_following_stack
5328 Also render the nesting CTEs on the next stack. Useful for
5329 SQL structures like UNION or INSERT that can wrap SELECT
5330 statements containing nesting CTEs.
5331 """
5332 if not self.ctes:
5333 return ""
5334
5335 ctes: MutableMapping[CTE, str]
5336
5337 if nesting_level and nesting_level > 1:
5338 ctes = util.OrderedDict()
5339 for cte in list(self.ctes.keys()):
5340 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5341 cte._get_reference_cte()
5342 ]
5343 nesting = cte.nesting or cte_opts.nesting
5344 is_rendered_level = cte_level == nesting_level or (
5345 include_following_stack and cte_level == nesting_level + 1
5346 )
5347 if not (nesting and is_rendered_level):
5348 continue
5349
5350 ctes[cte] = self.ctes[cte]
5351
5352 else:
5353 ctes = self.ctes
5354
5355 if not ctes:
5356 return ""
5357 ctes_recursive = any([cte.recursive for cte in ctes])
5358
5359 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5360 cte_text += ", \n".join([txt for txt in ctes.values()])
5361 cte_text += "\n "
5362
5363 if nesting_level and nesting_level > 1:
5364 for cte in list(ctes.keys()):
5365 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5366 cte._get_reference_cte()
5367 ]
5368 del self.ctes[cte]
5369 del self.ctes_by_level_name[(cte_level, cte_name)]
5370 del self.level_name_by_cte[cte._get_reference_cte()]
5371
5372 return cte_text
5373
5374 def get_cte_preamble(self, recursive):
5375 if recursive:
5376 return "WITH RECURSIVE"
5377 else:
5378 return "WITH"
5379
5380 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5381 """Called when building a ``SELECT`` statement, position is just
5382 before column list.
5383
5384 """
5385 if select._distinct_on:
5386 util.warn_deprecated(
5387 "DISTINCT ON is currently supported only by the PostgreSQL "
5388 "dialect. Use of DISTINCT ON for other backends is currently "
5389 "silently ignored, however this usage is deprecated, and will "
5390 "raise CompileError in a future release for all backends "
5391 "that do not support this syntax.",
5392 version="1.4",
5393 )
5394 return "DISTINCT " if select._distinct else ""
5395
5396 def group_by_clause(self, select, **kw):
5397 """allow dialects to customize how GROUP BY is rendered."""
5398
5399 group_by = self._generate_delimited_list(
5400 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5401 )
5402 if group_by:
5403 return " GROUP BY " + group_by
5404 else:
5405 return ""
5406
5407 def order_by_clause(self, select, **kw):
5408 """allow dialects to customize how ORDER BY is rendered."""
5409
5410 order_by = self._generate_delimited_list(
5411 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5412 )
5413
5414 if order_by:
5415 return " ORDER BY " + order_by
5416 else:
5417 return ""
5418
5419 def for_update_clause(self, select, **kw):
5420 return " FOR UPDATE"
5421
5422 def returning_clause(
5423 self,
5424 stmt: UpdateBase,
5425 returning_cols: Sequence[_ColumnsClauseElement],
5426 *,
5427 populate_result_map: bool,
5428 **kw: Any,
5429 ) -> str:
5430 columns = [
5431 self._label_returning_column(
5432 stmt,
5433 column,
5434 populate_result_map,
5435 fallback_label_name=fallback_label_name,
5436 column_is_repeated=repeated,
5437 name=name,
5438 proxy_name=proxy_name,
5439 **kw,
5440 )
5441 for (
5442 name,
5443 proxy_name,
5444 fallback_label_name,
5445 column,
5446 repeated,
5447 ) in stmt._generate_columns_plus_names(
5448 True, cols=base._select_iterables(returning_cols)
5449 )
5450 ]
5451
5452 return "RETURNING " + ", ".join(columns)
5453
5454 def limit_clause(self, select, **kw):
5455 text = ""
5456 if select._limit_clause is not None:
5457 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5458 if select._offset_clause is not None:
5459 if select._limit_clause is None:
5460 text += "\n LIMIT -1"
5461 text += " OFFSET " + self.process(select._offset_clause, **kw)
5462 return text
5463
5464 def fetch_clause(
5465 self,
5466 select,
5467 fetch_clause=None,
5468 require_offset=False,
5469 use_literal_execute_for_simple_int=False,
5470 **kw,
5471 ):
5472 if fetch_clause is None:
5473 fetch_clause = select._fetch_clause
5474 fetch_clause_options = select._fetch_clause_options
5475 else:
5476 fetch_clause_options = {"percent": False, "with_ties": False}
5477
5478 text = ""
5479
5480 if select._offset_clause is not None:
5481 offset_clause = select._offset_clause
5482 if (
5483 use_literal_execute_for_simple_int
5484 and select._simple_int_clause(offset_clause)
5485 ):
5486 offset_clause = offset_clause.render_literal_execute()
5487 offset_str = self.process(offset_clause, **kw)
5488 text += "\n OFFSET %s ROWS" % offset_str
5489 elif require_offset:
5490 text += "\n OFFSET 0 ROWS"
5491
5492 if fetch_clause is not None:
5493 if (
5494 use_literal_execute_for_simple_int
5495 and select._simple_int_clause(fetch_clause)
5496 ):
5497 fetch_clause = fetch_clause.render_literal_execute()
5498 text += "\n FETCH FIRST %s%s ROWS %s" % (
5499 self.process(fetch_clause, **kw),
5500 " PERCENT" if fetch_clause_options["percent"] else "",
5501 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5502 )
5503 return text
5504
5505 def visit_table(
5506 self,
5507 table,
5508 asfrom=False,
5509 iscrud=False,
5510 ashint=False,
5511 fromhints=None,
5512 use_schema=True,
5513 from_linter=None,
5514 ambiguous_table_name_map=None,
5515 enclosing_alias=None,
5516 within_tstring=False,
5517 **kwargs,
5518 ):
5519 if from_linter:
5520 from_linter.froms[table] = table.fullname
5521
5522 if asfrom or ashint or within_tstring:
5523 effective_schema = self.preparer.schema_for_object(table)
5524
5525 if use_schema and effective_schema:
5526 ret = (
5527 self.preparer.quote_schema(effective_schema)
5528 + "."
5529 + self.preparer.quote(table.name)
5530 )
5531 else:
5532 ret = self.preparer.quote(table.name)
5533
5534 if (
5535 (
5536 enclosing_alias is None
5537 or enclosing_alias.element is not table
5538 )
5539 and not effective_schema
5540 and ambiguous_table_name_map
5541 and table.name in ambiguous_table_name_map
5542 ):
5543 anon_name = self._truncated_identifier(
5544 "alias", ambiguous_table_name_map[table.name]
5545 )
5546
5547 ret = ret + self.get_render_as_alias_suffix(
5548 self.preparer.format_alias(None, anon_name)
5549 )
5550
5551 if fromhints and table in fromhints:
5552 ret = self.format_from_hint_text(
5553 ret, table, fromhints[table], iscrud
5554 )
5555 return ret
5556 else:
5557 return ""
5558
5559 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5560 if from_linter:
5561 from_linter.edges.update(
5562 itertools.product(
5563 _de_clone(join.left._from_objects),
5564 _de_clone(join.right._from_objects),
5565 )
5566 )
5567
5568 if join.full:
5569 join_type = " FULL OUTER JOIN "
5570 elif join.isouter:
5571 join_type = " LEFT OUTER JOIN "
5572 else:
5573 join_type = " JOIN "
5574 return (
5575 join.left._compiler_dispatch(
5576 self, asfrom=True, from_linter=from_linter, **kwargs
5577 )
5578 + join_type
5579 + join.right._compiler_dispatch(
5580 self, asfrom=True, from_linter=from_linter, **kwargs
5581 )
5582 + " ON "
5583 # TODO: likely need asfrom=True here?
5584 + join.onclause._compiler_dispatch(
5585 self, from_linter=from_linter, **kwargs
5586 )
5587 )
5588
5589 def _setup_crud_hints(self, stmt, table_text):
5590 dialect_hints = {
5591 table: hint_text
5592 for (table, dialect), hint_text in stmt._hints.items()
5593 if dialect in ("*", self.dialect.name)
5594 }
5595 if stmt.table in dialect_hints:
5596 table_text = self.format_from_hint_text(
5597 table_text, stmt.table, dialect_hints[stmt.table], True
5598 )
5599 return dialect_hints, table_text
5600
5601 # within the realm of "insertmanyvalues sentinel columns",
5602 # these lookups match different kinds of Column() configurations
5603 # to specific backend capabilities. they are broken into two
5604 # lookups, one for autoincrement columns and the other for non
5605 # autoincrement columns
5606 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5607 {
5608 _SentinelDefaultCharacterization.CLIENTSIDE: (
5609 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5610 ),
5611 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5612 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5613 ),
5614 _SentinelDefaultCharacterization.NONE: (
5615 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5616 ),
5617 _SentinelDefaultCharacterization.IDENTITY: (
5618 InsertmanyvaluesSentinelOpts.IDENTITY
5619 ),
5620 _SentinelDefaultCharacterization.SEQUENCE: (
5621 InsertmanyvaluesSentinelOpts.SEQUENCE
5622 ),
5623 _SentinelDefaultCharacterization.MONOTONIC_FUNCTION: (
5624 InsertmanyvaluesSentinelOpts.MONOTONIC_FUNCTION
5625 ),
5626 }
5627 )
5628 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5629 {
5630 _SentinelDefaultCharacterization.NONE: (
5631 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5632 ),
5633 }
5634 )
5635
5636 def _get_sentinel_column_for_table(
5637 self, table: Table
5638 ) -> Optional[Sequence[Column[Any]]]:
5639 """given a :class:`.Table`, return a usable sentinel column or
5640 columns for this dialect if any.
5641
5642 Return None if no sentinel columns could be identified, or raise an
5643 error if a column was marked as a sentinel explicitly but isn't
5644 compatible with this dialect.
5645
5646 """
5647
5648 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5649 sentinel_characteristics = table._sentinel_column_characteristics
5650
5651 sent_cols = sentinel_characteristics.columns
5652
5653 if sent_cols is None:
5654 return None
5655
5656 if sentinel_characteristics.is_autoinc:
5657 bitmask = self._sentinel_col_autoinc_lookup.get(
5658 sentinel_characteristics.default_characterization, 0
5659 )
5660 else:
5661 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5662 sentinel_characteristics.default_characterization, 0
5663 )
5664
5665 if sentinel_opts & bitmask:
5666 return sent_cols
5667
5668 if sentinel_characteristics.is_explicit:
5669 # a column was explicitly marked as insert_sentinel=True,
5670 # however it is not compatible with this dialect. they should
5671 # not indicate this column as a sentinel if they need to include
5672 # this dialect.
5673
5674 # TODO: do we want non-primary key explicit sentinel cols
5675 # that can gracefully degrade for some backends?
5676 # insert_sentinel="degrade" perhaps. not for the initial release.
5677 # I am hoping people are generally not dealing with this sentinel
5678 # business at all.
5679
5680 # if is_explicit is True, there will be only one sentinel column.
5681
5682 raise exc.InvalidRequestError(
5683 f"Column {sent_cols[0]} can't be explicitly "
5684 "marked as a sentinel column when using the "
5685 f"{self.dialect.name} dialect, as the "
5686 "particular type of default generation on this column is "
5687 "not currently compatible with this dialect's specific "
5688 f"INSERT..RETURNING syntax which can receive the "
5689 "server-generated value in "
5690 "a deterministic way. To remove this error, remove "
5691 "insert_sentinel=True from primary key autoincrement "
5692 "columns; these columns are automatically used as "
5693 "sentinels for supported dialects in any case."
5694 )
5695
5696 return None
5697
5698 def _deliver_insertmanyvalues_batches(
5699 self,
5700 statement: str,
5701 parameters: _DBAPIMultiExecuteParams,
5702 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5703 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5704 batch_size: int,
5705 sort_by_parameter_order: bool,
5706 schema_translate_map: Optional[SchemaTranslateMapType],
5707 ) -> Iterator[_InsertManyValuesBatch]:
5708 imv = self._insertmanyvalues
5709 assert imv is not None
5710
5711 if not imv.sentinel_param_keys:
5712 _sentinel_from_params = None
5713 else:
5714 _sentinel_from_params = operator.itemgetter(
5715 *imv.sentinel_param_keys
5716 )
5717
5718 lenparams = len(parameters)
5719 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5720 # backend doesn't support
5721 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5722 # at the moment this is basically SQL Server due to
5723 # not being able to use DEFAULT for identity column
5724 # just yield out that many single statements! still
5725 # faster than a whole connection.execute() call ;)
5726 #
5727 # note we still are taking advantage of the fact that we know
5728 # we are using RETURNING. The generalized approach of fetching
5729 # cursor.lastrowid etc. still goes through the more heavyweight
5730 # "ExecutionContext per statement" system as it isn't usable
5731 # as a generic "RETURNING" approach
5732 use_row_at_a_time = True
5733 downgraded = False
5734 elif not self.dialect.supports_multivalues_insert or (
5735 sort_by_parameter_order
5736 and self._result_columns
5737 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5738 ):
5739 # deterministic order was requested and the compiler could
5740 # not organize sentinel columns for this dialect/statement.
5741 # use row at a time
5742 use_row_at_a_time = True
5743 downgraded = True
5744 else:
5745 use_row_at_a_time = False
5746 downgraded = False
5747
5748 if use_row_at_a_time:
5749 for batchnum, (param, compiled_param) in enumerate(
5750 cast(
5751 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5752 zip(parameters, compiled_parameters),
5753 ),
5754 1,
5755 ):
5756 yield _InsertManyValuesBatch(
5757 statement,
5758 param,
5759 generic_setinputsizes,
5760 [param],
5761 (
5762 [_sentinel_from_params(compiled_param)]
5763 if _sentinel_from_params
5764 else []
5765 ),
5766 1,
5767 batchnum,
5768 lenparams,
5769 sort_by_parameter_order,
5770 downgraded,
5771 )
5772 return
5773
5774 if schema_translate_map:
5775 rst = functools.partial(
5776 self.preparer._render_schema_translates,
5777 schema_translate_map=schema_translate_map,
5778 )
5779 else:
5780 rst = None
5781
5782 imv_single_values_expr = imv.single_values_expr
5783 if rst:
5784 imv_single_values_expr = rst(imv_single_values_expr)
5785
5786 executemany_values = f"({imv_single_values_expr})"
5787 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5788
5789 # Use optional insertmanyvalues_max_parameters
5790 # to further shrink the batch size so that there are no more than
5791 # insertmanyvalues_max_parameters params.
5792 # Currently used by SQL Server, which limits statements to 2100 bound
5793 # parameters (actually 2099).
5794 max_params = self.dialect.insertmanyvalues_max_parameters
5795 if max_params:
5796 total_num_of_params = len(self.bind_names)
5797 num_params_per_batch = len(imv.insert_crud_params)
5798 num_params_outside_of_batch = (
5799 total_num_of_params - num_params_per_batch
5800 )
5801 batch_size = min(
5802 batch_size,
5803 (
5804 (max_params - num_params_outside_of_batch)
5805 // num_params_per_batch
5806 ),
5807 )
5808
5809 batches = cast("List[Sequence[Any]]", list(parameters))
5810 compiled_batches = cast(
5811 "List[Sequence[Any]]", list(compiled_parameters)
5812 )
5813
5814 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5815 batchnum = 1
5816 total_batches = lenparams // batch_size + (
5817 1 if lenparams % batch_size else 0
5818 )
5819
5820 insert_crud_params = imv.insert_crud_params
5821 assert insert_crud_params is not None
5822
5823 if rst:
5824 insert_crud_params = [
5825 (col, key, rst(expr), st)
5826 for col, key, expr, st in insert_crud_params
5827 ]
5828
5829 escaped_bind_names: Mapping[str, str]
5830 expand_pos_lower_index = expand_pos_upper_index = 0
5831
5832 if not self.positional:
5833 if self.escaped_bind_names:
5834 escaped_bind_names = self.escaped_bind_names
5835 else:
5836 escaped_bind_names = {}
5837
5838 all_keys = set(parameters[0])
5839
5840 def apply_placeholders(keys, formatted):
5841 for key in keys:
5842 key = escaped_bind_names.get(key, key)
5843 formatted = formatted.replace(
5844 self.bindtemplate % {"name": key},
5845 self.bindtemplate
5846 % {"name": f"{key}__EXECMANY_INDEX__"},
5847 )
5848 return formatted
5849
5850 if imv.embed_values_counter:
5851 imv_values_counter = ", _IMV_VALUES_COUNTER"
5852 else:
5853 imv_values_counter = ""
5854 formatted_values_clause = f"""({', '.join(
5855 apply_placeholders(bind_keys, formatted)
5856 for _, _, formatted, bind_keys in insert_crud_params
5857 )}{imv_values_counter})"""
5858
5859 keys_to_replace = all_keys.intersection(
5860 escaped_bind_names.get(key, key)
5861 for _, _, _, bind_keys in insert_crud_params
5862 for key in bind_keys
5863 )
5864 base_parameters = {
5865 key: parameters[0][key]
5866 for key in all_keys.difference(keys_to_replace)
5867 }
5868 executemany_values_w_comma = ""
5869 else:
5870 formatted_values_clause = ""
5871 keys_to_replace = set()
5872 base_parameters = {}
5873
5874 if imv.embed_values_counter:
5875 executemany_values_w_comma = (
5876 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5877 )
5878 else:
5879 executemany_values_w_comma = f"({imv_single_values_expr}), "
5880
5881 all_names_we_will_expand: Set[str] = set()
5882 for elem in imv.insert_crud_params:
5883 all_names_we_will_expand.update(elem[3])
5884
5885 # get the start and end position in a particular list
5886 # of parameters where we will be doing the "expanding".
5887 # statements can have params on either side or both sides,
5888 # given RETURNING and CTEs
5889 if all_names_we_will_expand:
5890 positiontup = self.positiontup
5891 assert positiontup is not None
5892
5893 all_expand_positions = {
5894 idx
5895 for idx, name in enumerate(positiontup)
5896 if name in all_names_we_will_expand
5897 }
5898 expand_pos_lower_index = min(all_expand_positions)
5899 expand_pos_upper_index = max(all_expand_positions) + 1
5900 assert (
5901 len(all_expand_positions)
5902 == expand_pos_upper_index - expand_pos_lower_index
5903 )
5904
5905 if self._numeric_binds:
5906 escaped = re.escape(self._numeric_binds_identifier_char)
5907 executemany_values_w_comma = re.sub(
5908 rf"{escaped}\d+", "%s", executemany_values_w_comma
5909 )
5910
5911 while batches:
5912 batch = batches[0:batch_size]
5913 compiled_batch = compiled_batches[0:batch_size]
5914
5915 batches[0:batch_size] = []
5916 compiled_batches[0:batch_size] = []
5917
5918 if batches:
5919 current_batch_size = batch_size
5920 else:
5921 current_batch_size = len(batch)
5922
5923 if generic_setinputsizes:
5924 # if setinputsizes is present, expand this collection to
5925 # suit the batch length as well
5926 # currently this will be mssql+pyodbc for internal dialects
5927 processed_setinputsizes = [
5928 (new_key, len_, typ)
5929 for new_key, len_, typ in (
5930 (f"{key}_{index}", len_, typ)
5931 for index in range(current_batch_size)
5932 for key, len_, typ in generic_setinputsizes
5933 )
5934 ]
5935
5936 replaced_parameters: Any
5937 if self.positional:
5938 num_ins_params = imv.num_positional_params_counted
5939
5940 batch_iterator: Iterable[Sequence[Any]]
5941 extra_params_left: Sequence[Any]
5942 extra_params_right: Sequence[Any]
5943
5944 if num_ins_params == len(batch[0]):
5945 extra_params_left = extra_params_right = ()
5946 batch_iterator = batch
5947 else:
5948 extra_params_left = batch[0][:expand_pos_lower_index]
5949 extra_params_right = batch[0][expand_pos_upper_index:]
5950 batch_iterator = (
5951 b[expand_pos_lower_index:expand_pos_upper_index]
5952 for b in batch
5953 )
5954
5955 if imv.embed_values_counter:
5956 expanded_values_string = (
5957 "".join(
5958 executemany_values_w_comma.replace(
5959 "_IMV_VALUES_COUNTER", str(i)
5960 )
5961 for i, _ in enumerate(batch)
5962 )
5963 )[:-2]
5964 else:
5965 expanded_values_string = (
5966 (executemany_values_w_comma * current_batch_size)
5967 )[:-2]
5968
5969 if self._numeric_binds and num_ins_params > 0:
5970 # numeric will always number the parameters inside of
5971 # VALUES (and thus order self.positiontup) to be higher
5972 # than non-VALUES parameters, no matter where in the
5973 # statement those non-VALUES parameters appear (this is
5974 # ensured in _process_numeric by numbering first all
5975 # params that are not in _values_bindparam)
5976 # therefore all extra params are always
5977 # on the left side and numbered lower than the VALUES
5978 # parameters
5979 assert not extra_params_right
5980
5981 start = expand_pos_lower_index + 1
5982 end = num_ins_params * (current_batch_size) + start
5983
5984 # need to format here, since statement may contain
5985 # unescaped %, while values_string contains just (%s, %s)
5986 positions = tuple(
5987 f"{self._numeric_binds_identifier_char}{i}"
5988 for i in range(start, end)
5989 )
5990 expanded_values_string = expanded_values_string % positions
5991
5992 replaced_statement = statement.replace(
5993 "__EXECMANY_TOKEN__", expanded_values_string
5994 )
5995
5996 replaced_parameters = tuple(
5997 itertools.chain.from_iterable(batch_iterator)
5998 )
5999
6000 replaced_parameters = (
6001 extra_params_left
6002 + replaced_parameters
6003 + extra_params_right
6004 )
6005
6006 else:
6007 replaced_values_clauses = []
6008 replaced_parameters = base_parameters.copy()
6009
6010 for i, param in enumerate(batch):
6011 fmv = formatted_values_clause.replace(
6012 "EXECMANY_INDEX__", str(i)
6013 )
6014 if imv.embed_values_counter:
6015 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
6016
6017 replaced_values_clauses.append(fmv)
6018 replaced_parameters.update(
6019 {f"{key}__{i}": param[key] for key in keys_to_replace}
6020 )
6021
6022 replaced_statement = statement.replace(
6023 "__EXECMANY_TOKEN__",
6024 ", ".join(replaced_values_clauses),
6025 )
6026
6027 yield _InsertManyValuesBatch(
6028 replaced_statement,
6029 replaced_parameters,
6030 processed_setinputsizes,
6031 batch,
6032 (
6033 [_sentinel_from_params(cb) for cb in compiled_batch]
6034 if _sentinel_from_params
6035 else []
6036 ),
6037 current_batch_size,
6038 batchnum,
6039 total_batches,
6040 sort_by_parameter_order,
6041 False,
6042 )
6043 batchnum += 1
6044
6045 def visit_insert(
6046 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6047 ):
6048 compile_state = insert_stmt._compile_state_factory(
6049 insert_stmt, self, **kw
6050 )
6051 insert_stmt = compile_state.statement
6052
6053 if visiting_cte is not None:
6054 kw["visiting_cte"] = visiting_cte
6055 toplevel = False
6056 else:
6057 toplevel = not self.stack
6058
6059 if toplevel:
6060 self.isinsert = True
6061 if not self.dml_compile_state:
6062 self.dml_compile_state = compile_state
6063 if not self.compile_state:
6064 self.compile_state = compile_state
6065
6066 self.stack.append(
6067 {
6068 "correlate_froms": set(),
6069 "asfrom_froms": set(),
6070 "selectable": insert_stmt,
6071 }
6072 )
6073
6074 counted_bindparam = 0
6075
6076 # reset any incoming "visited_bindparam" collection
6077 visited_bindparam = None
6078
6079 # for positional, insertmanyvalues needs to know how many
6080 # bound parameters are in the VALUES sequence; there's no simple
6081 # rule because default expressions etc. can have zero or more
6082 # params inside them. After multiple attempts to figure this out,
6083 # this very simplistic "count after" works and is
6084 # likely the least amount of callcounts, though looks clumsy
6085 if self.positional and visiting_cte is None:
6086 # if we are inside a CTE, don't count parameters
6087 # here since they won't be for insertmanyvalues. keep
6088 # visited_bindparam at None so no counting happens.
6089 # see #9173
6090 visited_bindparam = []
6091
6092 crud_params_struct = crud._get_crud_params(
6093 self,
6094 insert_stmt,
6095 compile_state,
6096 toplevel,
6097 visited_bindparam=visited_bindparam,
6098 **kw,
6099 )
6100
6101 if self.positional and visited_bindparam is not None:
6102 counted_bindparam = len(visited_bindparam)
6103 if self._numeric_binds:
6104 if self._values_bindparam is not None:
6105 self._values_bindparam += visited_bindparam
6106 else:
6107 self._values_bindparam = visited_bindparam
6108
6109 crud_params_single = crud_params_struct.single_params
6110
6111 if (
6112 not crud_params_single
6113 and not self.dialect.supports_default_values
6114 and not self.dialect.supports_default_metavalue
6115 and not self.dialect.supports_empty_insert
6116 ):
6117 raise exc.CompileError(
6118 "The '%s' dialect with current database "
6119 "version settings does not support empty "
6120 "inserts." % self.dialect.name
6121 )
6122
6123 if compile_state._has_multi_parameters:
6124 if not self.dialect.supports_multivalues_insert:
6125 raise exc.CompileError(
6126 "The '%s' dialect with current database "
6127 "version settings does not support "
6128 "in-place multirow inserts." % self.dialect.name
6129 )
6130 elif (
6131 self.implicit_returning or insert_stmt._returning
6132 ) and insert_stmt._sort_by_parameter_order:
6133 raise exc.CompileError(
6134 "RETURNING cannot be deterministically sorted when "
6135 "using an INSERT which includes multi-row values()."
6136 )
6137 crud_params_single = crud_params_struct.single_params
6138 else:
6139 crud_params_single = crud_params_struct.single_params
6140
6141 preparer = self.preparer
6142 supports_default_values = self.dialect.supports_default_values
6143
6144 text = "INSERT "
6145
6146 if insert_stmt._prefixes:
6147 text += self._generate_prefixes(
6148 insert_stmt, insert_stmt._prefixes, **kw
6149 )
6150
6151 text += "INTO "
6152 table_text = preparer.format_table(insert_stmt.table)
6153
6154 if insert_stmt._hints:
6155 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6156
6157 if insert_stmt._independent_ctes:
6158 self._dispatch_independent_ctes(insert_stmt, kw)
6159
6160 text += table_text
6161
6162 if crud_params_single or not supports_default_values:
6163 text += " (%s)" % ", ".join(
6164 [expr for _, expr, _, _ in crud_params_single]
6165 )
6166
6167 # look for insertmanyvalues attributes that would have been configured
6168 # by crud.py as it scanned through the columns to be part of the
6169 # INSERT
6170 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6171 named_sentinel_params: Optional[Sequence[str]] = None
6172 add_sentinel_cols = None
6173 implicit_sentinel = False
6174
6175 returning_cols = self.implicit_returning or insert_stmt._returning
6176 if returning_cols:
6177 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6178 if add_sentinel_cols is not None:
6179 assert use_insertmanyvalues
6180
6181 # search for the sentinel column explicitly present
6182 # in the INSERT columns list, and additionally check that
6183 # this column has a bound parameter name set up that's in the
6184 # parameter list. If both of these cases are present, it means
6185 # we will have a client side value for the sentinel in each
6186 # parameter set.
6187
6188 _params_by_col = {
6189 col: param_names
6190 for col, _, _, param_names in crud_params_single
6191 }
6192 named_sentinel_params = []
6193 for _add_sentinel_col in add_sentinel_cols:
6194 if _add_sentinel_col not in _params_by_col:
6195 named_sentinel_params = None
6196 break
6197 param_name = self._within_exec_param_key_getter(
6198 _add_sentinel_col
6199 )
6200 if param_name not in _params_by_col[_add_sentinel_col]:
6201 named_sentinel_params = None
6202 break
6203 named_sentinel_params.append(param_name)
6204
6205 if named_sentinel_params is None:
6206 # if we are not going to have a client side value for
6207 # the sentinel in the parameter set, that means it's
6208 # an autoincrement, an IDENTITY, or a server-side SQL
6209 # expression like nextval('seqname'). So this is
6210 # an "implicit" sentinel; we will look for it in
6211 # RETURNING
6212 # only, and then sort on it. For this case on PG,
6213 # SQL Server we have to use a special INSERT form
6214 # that guarantees the server side function lines up with
6215 # the entries in the VALUES.
6216 if (
6217 self.dialect.insertmanyvalues_implicit_sentinel
6218 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6219 ):
6220 implicit_sentinel = True
6221 else:
6222 # here, we are not using a sentinel at all
6223 # and we are likely the SQLite dialect.
6224 # The first add_sentinel_col that we have should not
6225 # be marked as "insert_sentinel=True". if it was,
6226 # an error should have been raised in
6227 # _get_sentinel_column_for_table.
6228 assert not add_sentinel_cols[0]._insert_sentinel, (
6229 "sentinel selection rules should have prevented "
6230 "us from getting here for this dialect"
6231 )
6232
6233 # always put the sentinel columns last. even if they are
6234 # in the returning list already, they will be there twice
6235 # then.
6236 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6237
6238 returning_clause = self.returning_clause(
6239 insert_stmt,
6240 returning_cols,
6241 populate_result_map=toplevel,
6242 )
6243
6244 if self.returning_precedes_values:
6245 text += " " + returning_clause
6246
6247 else:
6248 returning_clause = None
6249
6250 if insert_stmt.select is not None:
6251 # placed here by crud.py
6252 select_text = self.process(
6253 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6254 )
6255
6256 if self.ctes and self.dialect.cte_follows_insert:
6257 nesting_level = len(self.stack) if not toplevel else None
6258 text += " %s%s" % (
6259 self._render_cte_clause(
6260 nesting_level=nesting_level,
6261 include_following_stack=True,
6262 ),
6263 select_text,
6264 )
6265 else:
6266 text += " %s" % select_text
6267 elif not crud_params_single and supports_default_values:
6268 text += " DEFAULT VALUES"
6269 if use_insertmanyvalues:
6270 self._insertmanyvalues = _InsertManyValues(
6271 True,
6272 self.dialect.default_metavalue_token,
6273 crud_params_single,
6274 counted_bindparam,
6275 sort_by_parameter_order=(
6276 insert_stmt._sort_by_parameter_order
6277 ),
6278 includes_upsert_behaviors=(
6279 insert_stmt._post_values_clause is not None
6280 ),
6281 sentinel_columns=add_sentinel_cols,
6282 num_sentinel_columns=(
6283 len(add_sentinel_cols) if add_sentinel_cols else 0
6284 ),
6285 implicit_sentinel=implicit_sentinel,
6286 )
6287 elif compile_state._has_multi_parameters:
6288 text += " VALUES %s" % (
6289 ", ".join(
6290 "(%s)"
6291 % (", ".join(value for _, _, value, _ in crud_param_set))
6292 for crud_param_set in crud_params_struct.all_multi_params
6293 ),
6294 )
6295 elif use_insertmanyvalues:
6296 if (
6297 implicit_sentinel
6298 and (
6299 self.dialect.insertmanyvalues_implicit_sentinel
6300 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6301 )
6302 # this is checking if we have
6303 # INSERT INTO table (id) VALUES (DEFAULT).
6304 and not (crud_params_struct.is_default_metavalue_only)
6305 ):
6306 # if we have a sentinel column that is server generated,
6307 # then for selected backends render the VALUES list as a
6308 # subquery. This is the orderable form supported by
6309 # PostgreSQL and in fewer cases SQL Server
6310 embed_sentinel_value = True
6311
6312 render_bind_casts = (
6313 self.dialect.insertmanyvalues_implicit_sentinel
6314 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6315 )
6316
6317 add_sentinel_set = add_sentinel_cols or ()
6318
6319 insert_single_values_expr = ", ".join(
6320 [
6321 value
6322 for col, _, value, _ in crud_params_single
6323 if col not in add_sentinel_set
6324 ]
6325 )
6326
6327 colnames = ", ".join(
6328 f"p{i}"
6329 for i, cp in enumerate(crud_params_single)
6330 if cp[0] not in add_sentinel_set
6331 )
6332
6333 if render_bind_casts:
6334 # render casts for the SELECT list. For PG, we are
6335 # already rendering bind casts in the parameter list,
6336 # selectively for the more "tricky" types like ARRAY.
6337 # however, even for the "easy" types, if the parameter
6338 # is NULL for every entry, PG gives up and says
6339 # "it must be TEXT", which fails for other easy types
6340 # like ints. So we cast on this side too.
6341 colnames_w_cast = ", ".join(
6342 (
6343 self.render_bind_cast(
6344 col.type,
6345 col.type._unwrapped_dialect_impl(self.dialect),
6346 f"p{i}",
6347 )
6348 if col not in add_sentinel_set
6349 else expr
6350 )
6351 for i, (col, _, expr, _) in enumerate(
6352 crud_params_single
6353 )
6354 )
6355 else:
6356 colnames_w_cast = ", ".join(
6357 (f"p{i}" if col not in add_sentinel_set else expr)
6358 for i, (col, _, expr, _) in enumerate(
6359 crud_params_single
6360 )
6361 )
6362
6363 insert_crud_params = [
6364 elem
6365 for elem in crud_params_single
6366 if elem[0] not in add_sentinel_set
6367 ]
6368
6369 text += (
6370 f" SELECT {colnames_w_cast} FROM "
6371 f"(VALUES ({insert_single_values_expr})) "
6372 f"AS imp_sen({colnames}, sen_counter) "
6373 "ORDER BY sen_counter"
6374 )
6375
6376 else:
6377 # otherwise, if no sentinel or backend doesn't support
6378 # orderable subquery form, use a plain VALUES list
6379 embed_sentinel_value = False
6380 insert_crud_params = crud_params_single
6381 insert_single_values_expr = ", ".join(
6382 [value for _, _, value, _ in crud_params_single]
6383 )
6384
6385 text += f" VALUES ({insert_single_values_expr})"
6386
6387 self._insertmanyvalues = _InsertManyValues(
6388 is_default_expr=False,
6389 single_values_expr=insert_single_values_expr,
6390 insert_crud_params=insert_crud_params,
6391 num_positional_params_counted=counted_bindparam,
6392 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6393 includes_upsert_behaviors=(
6394 insert_stmt._post_values_clause is not None
6395 ),
6396 sentinel_columns=add_sentinel_cols,
6397 num_sentinel_columns=(
6398 len(add_sentinel_cols) if add_sentinel_cols else 0
6399 ),
6400 sentinel_param_keys=named_sentinel_params,
6401 implicit_sentinel=implicit_sentinel,
6402 embed_values_counter=embed_sentinel_value,
6403 )
6404
6405 else:
6406 insert_single_values_expr = ", ".join(
6407 [value for _, _, value, _ in crud_params_single]
6408 )
6409
6410 text += f" VALUES ({insert_single_values_expr})"
6411
6412 if insert_stmt._post_values_clause is not None:
6413 post_values_clause = self.process(
6414 insert_stmt._post_values_clause, **kw
6415 )
6416 if post_values_clause:
6417 text += " " + post_values_clause
6418
6419 if returning_clause and not self.returning_precedes_values:
6420 text += " " + returning_clause
6421
6422 if self.ctes and not self.dialect.cte_follows_insert:
6423 nesting_level = len(self.stack) if not toplevel else None
6424 text = (
6425 self._render_cte_clause(
6426 nesting_level=nesting_level,
6427 include_following_stack=True,
6428 )
6429 + text
6430 )
6431
6432 self.stack.pop(-1)
6433
6434 return text
6435
6436 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6437 """Provide a hook to override the initial table clause
6438 in an UPDATE statement.
6439
6440 MySQL overrides this.
6441
6442 """
6443 kw["asfrom"] = True
6444 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6445
6446 def update_from_clause(
6447 self, update_stmt, from_table, extra_froms, from_hints, **kw
6448 ):
6449 """Provide a hook to override the generation of an
6450 UPDATE..FROM clause.
6451 MySQL and MSSQL override this.
6452 """
6453 raise NotImplementedError(
6454 "This backend does not support multiple-table "
6455 "criteria within UPDATE"
6456 )
6457
6458 def update_post_criteria_clause(
6459 self, update_stmt: Update, **kw: Any
6460 ) -> Optional[str]:
6461 """provide a hook to override generation after the WHERE criteria
6462 in an UPDATE statement
6463
6464 .. versionadded:: 2.1
6465
6466 """
6467 if update_stmt._post_criteria_clause is not None:
6468 return self.process(
6469 update_stmt._post_criteria_clause,
6470 **kw,
6471 )
6472 else:
6473 return None
6474
6475 def delete_post_criteria_clause(
6476 self, delete_stmt: Delete, **kw: Any
6477 ) -> Optional[str]:
6478 """provide a hook to override generation after the WHERE criteria
6479 in a DELETE statement
6480
6481 .. versionadded:: 2.1
6482
6483 """
6484 if delete_stmt._post_criteria_clause is not None:
6485 return self.process(
6486 delete_stmt._post_criteria_clause,
6487 **kw,
6488 )
6489 else:
6490 return None
6491
6492 def visit_update(
6493 self,
6494 update_stmt: Update,
6495 visiting_cte: Optional[CTE] = None,
6496 **kw: Any,
6497 ) -> str:
6498 compile_state = update_stmt._compile_state_factory(
6499 update_stmt, self, **kw
6500 )
6501 if TYPE_CHECKING:
6502 assert isinstance(compile_state, UpdateDMLState)
6503 update_stmt = compile_state.statement # type: ignore[assignment]
6504
6505 if visiting_cte is not None:
6506 kw["visiting_cte"] = visiting_cte
6507 toplevel = False
6508 else:
6509 toplevel = not self.stack
6510
6511 if toplevel:
6512 self.isupdate = True
6513 if not self.dml_compile_state:
6514 self.dml_compile_state = compile_state
6515 if not self.compile_state:
6516 self.compile_state = compile_state
6517
6518 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6519 from_linter = FromLinter({}, set())
6520 warn_linting = self.linting & WARN_LINTING
6521 if toplevel:
6522 self.from_linter = from_linter
6523 else:
6524 from_linter = None
6525 warn_linting = False
6526
6527 extra_froms = compile_state._extra_froms
6528 is_multitable = bool(extra_froms)
6529
6530 if is_multitable:
6531 # main table might be a JOIN
6532 main_froms = set(_from_objects(update_stmt.table))
6533 render_extra_froms = [
6534 f for f in extra_froms if f not in main_froms
6535 ]
6536 correlate_froms = main_froms.union(extra_froms)
6537 else:
6538 render_extra_froms = []
6539 correlate_froms = {update_stmt.table}
6540
6541 self.stack.append(
6542 {
6543 "correlate_froms": correlate_froms,
6544 "asfrom_froms": correlate_froms,
6545 "selectable": update_stmt,
6546 }
6547 )
6548
6549 text = "UPDATE "
6550
6551 if update_stmt._prefixes:
6552 text += self._generate_prefixes(
6553 update_stmt, update_stmt._prefixes, **kw
6554 )
6555
6556 table_text = self.update_tables_clause(
6557 update_stmt,
6558 update_stmt.table,
6559 render_extra_froms,
6560 from_linter=from_linter,
6561 **kw,
6562 )
6563 crud_params_struct = crud._get_crud_params(
6564 self, update_stmt, compile_state, toplevel, **kw
6565 )
6566 crud_params = crud_params_struct.single_params
6567
6568 if update_stmt._hints:
6569 dialect_hints, table_text = self._setup_crud_hints(
6570 update_stmt, table_text
6571 )
6572 else:
6573 dialect_hints = None
6574
6575 if update_stmt._independent_ctes:
6576 self._dispatch_independent_ctes(update_stmt, kw)
6577
6578 text += table_text
6579
6580 text += " SET "
6581 text += ", ".join(
6582 expr + "=" + value
6583 for _, expr, value, _ in cast(
6584 "List[Tuple[Any, str, str, Any]]", crud_params
6585 )
6586 )
6587
6588 if self.implicit_returning or update_stmt._returning:
6589 if self.returning_precedes_values:
6590 text += " " + self.returning_clause(
6591 update_stmt,
6592 self.implicit_returning or update_stmt._returning,
6593 populate_result_map=toplevel,
6594 )
6595
6596 if extra_froms:
6597 extra_from_text = self.update_from_clause(
6598 update_stmt,
6599 update_stmt.table,
6600 render_extra_froms,
6601 dialect_hints,
6602 from_linter=from_linter,
6603 **kw,
6604 )
6605 if extra_from_text:
6606 text += " " + extra_from_text
6607
6608 if update_stmt._where_criteria:
6609 t = self._generate_delimited_and_list(
6610 update_stmt._where_criteria, from_linter=from_linter, **kw
6611 )
6612 if t:
6613 text += " WHERE " + t
6614
6615 ulc = self.update_post_criteria_clause(
6616 update_stmt, from_linter=from_linter, **kw
6617 )
6618 if ulc:
6619 text += " " + ulc
6620
6621 if (
6622 self.implicit_returning or update_stmt._returning
6623 ) and not self.returning_precedes_values:
6624 text += " " + self.returning_clause(
6625 update_stmt,
6626 self.implicit_returning or update_stmt._returning,
6627 populate_result_map=toplevel,
6628 )
6629
6630 if self.ctes:
6631 nesting_level = len(self.stack) if not toplevel else None
6632 text = self._render_cte_clause(nesting_level=nesting_level) + text
6633
6634 if warn_linting:
6635 assert from_linter is not None
6636 from_linter.warn(stmt_type="UPDATE")
6637
6638 self.stack.pop(-1)
6639
6640 return text # type: ignore[no-any-return]
6641
6642 def delete_extra_from_clause(
6643 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6644 ):
6645 """Provide a hook to override the generation of an
6646 DELETE..FROM clause.
6647
6648 This can be used to implement DELETE..USING for example.
6649
6650 MySQL and MSSQL override this.
6651
6652 """
6653 raise NotImplementedError(
6654 "This backend does not support multiple-table "
6655 "criteria within DELETE"
6656 )
6657
6658 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6659 return from_table._compiler_dispatch(
6660 self, asfrom=True, iscrud=True, **kw
6661 )
6662
6663 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6664 compile_state = delete_stmt._compile_state_factory(
6665 delete_stmt, self, **kw
6666 )
6667 delete_stmt = compile_state.statement
6668
6669 if visiting_cte is not None:
6670 kw["visiting_cte"] = visiting_cte
6671 toplevel = False
6672 else:
6673 toplevel = not self.stack
6674
6675 if toplevel:
6676 self.isdelete = True
6677 if not self.dml_compile_state:
6678 self.dml_compile_state = compile_state
6679 if not self.compile_state:
6680 self.compile_state = compile_state
6681
6682 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6683 from_linter = FromLinter({}, set())
6684 warn_linting = self.linting & WARN_LINTING
6685 if toplevel:
6686 self.from_linter = from_linter
6687 else:
6688 from_linter = None
6689 warn_linting = False
6690
6691 extra_froms = compile_state._extra_froms
6692
6693 correlate_froms = {delete_stmt.table}.union(extra_froms)
6694 self.stack.append(
6695 {
6696 "correlate_froms": correlate_froms,
6697 "asfrom_froms": correlate_froms,
6698 "selectable": delete_stmt,
6699 }
6700 )
6701
6702 text = "DELETE "
6703
6704 if delete_stmt._prefixes:
6705 text += self._generate_prefixes(
6706 delete_stmt, delete_stmt._prefixes, **kw
6707 )
6708
6709 text += "FROM "
6710
6711 try:
6712 table_text = self.delete_table_clause(
6713 delete_stmt,
6714 delete_stmt.table,
6715 extra_froms,
6716 from_linter=from_linter,
6717 )
6718 except TypeError:
6719 # anticipate 3rd party dialects that don't include **kw
6720 # TODO: remove in 2.1
6721 table_text = self.delete_table_clause(
6722 delete_stmt, delete_stmt.table, extra_froms
6723 )
6724 if from_linter:
6725 _ = self.process(delete_stmt.table, from_linter=from_linter)
6726
6727 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6728
6729 if delete_stmt._hints:
6730 dialect_hints, table_text = self._setup_crud_hints(
6731 delete_stmt, table_text
6732 )
6733 else:
6734 dialect_hints = None
6735
6736 if delete_stmt._independent_ctes:
6737 self._dispatch_independent_ctes(delete_stmt, kw)
6738
6739 text += table_text
6740
6741 if (
6742 self.implicit_returning or delete_stmt._returning
6743 ) and self.returning_precedes_values:
6744 text += " " + self.returning_clause(
6745 delete_stmt,
6746 self.implicit_returning or delete_stmt._returning,
6747 populate_result_map=toplevel,
6748 )
6749
6750 if extra_froms:
6751 extra_from_text = self.delete_extra_from_clause(
6752 delete_stmt,
6753 delete_stmt.table,
6754 extra_froms,
6755 dialect_hints,
6756 from_linter=from_linter,
6757 **kw,
6758 )
6759 if extra_from_text:
6760 text += " " + extra_from_text
6761
6762 if delete_stmt._where_criteria:
6763 t = self._generate_delimited_and_list(
6764 delete_stmt._where_criteria, from_linter=from_linter, **kw
6765 )
6766 if t:
6767 text += " WHERE " + t
6768
6769 dlc = self.delete_post_criteria_clause(
6770 delete_stmt, from_linter=from_linter, **kw
6771 )
6772 if dlc:
6773 text += " " + dlc
6774
6775 if (
6776 self.implicit_returning or delete_stmt._returning
6777 ) and not self.returning_precedes_values:
6778 text += " " + self.returning_clause(
6779 delete_stmt,
6780 self.implicit_returning or delete_stmt._returning,
6781 populate_result_map=toplevel,
6782 )
6783
6784 if self.ctes:
6785 nesting_level = len(self.stack) if not toplevel else None
6786 text = self._render_cte_clause(nesting_level=nesting_level) + text
6787
6788 if warn_linting:
6789 assert from_linter is not None
6790 from_linter.warn(stmt_type="DELETE")
6791
6792 self.stack.pop(-1)
6793
6794 return text
6795
6796 def visit_savepoint(self, savepoint_stmt, **kw):
6797 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6798
6799 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6800 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6801 savepoint_stmt
6802 )
6803
6804 def visit_release_savepoint(self, savepoint_stmt, **kw):
6805 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6806 savepoint_stmt
6807 )
6808
6809
6810class StrSQLCompiler(SQLCompiler):
6811 """A :class:`.SQLCompiler` subclass which allows a small selection
6812 of non-standard SQL features to render into a string value.
6813
6814 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6815 element is directly stringified without calling upon the
6816 :meth:`_expression.ClauseElement.compile` method.
6817 It can render a limited set
6818 of non-standard SQL constructs to assist in basic stringification,
6819 however for more substantial custom or dialect-specific SQL constructs,
6820 it will be necessary to make use of
6821 :meth:`_expression.ClauseElement.compile`
6822 directly.
6823
6824 .. seealso::
6825
6826 :ref:`faq_sql_expression_string`
6827
6828 """
6829
6830 def _fallback_column_name(self, column):
6831 return "<name unknown>"
6832
6833 @util.preload_module("sqlalchemy.engine.url")
6834 def visit_unsupported_compilation(self, element, err, **kw):
6835 if element.stringify_dialect != "default":
6836 url = util.preloaded.engine_url
6837 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6838
6839 compiler = dialect.statement_compiler(
6840 dialect, None, _supporting_against=self
6841 )
6842 if not isinstance(compiler, StrSQLCompiler):
6843 return compiler.process(element, **kw)
6844
6845 return super().visit_unsupported_compilation(element, err)
6846
6847 def visit_getitem_binary(self, binary, operator, **kw):
6848 return "%s[%s]" % (
6849 self.process(binary.left, **kw),
6850 self.process(binary.right, **kw),
6851 )
6852
6853 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6854 return self.visit_getitem_binary(binary, operator, **kw)
6855
6856 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6857 return self.visit_getitem_binary(binary, operator, **kw)
6858
6859 def visit_sequence(self, sequence, **kw):
6860 return (
6861 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6862 )
6863
6864 def returning_clause(
6865 self,
6866 stmt: UpdateBase,
6867 returning_cols: Sequence[_ColumnsClauseElement],
6868 *,
6869 populate_result_map: bool,
6870 **kw: Any,
6871 ) -> str:
6872 columns = [
6873 self._label_select_column(None, c, True, False, {})
6874 for c in base._select_iterables(returning_cols)
6875 ]
6876 return "RETURNING " + ", ".join(columns)
6877
6878 def update_from_clause(
6879 self, update_stmt, from_table, extra_froms, from_hints, **kw
6880 ):
6881 kw["asfrom"] = True
6882 return "FROM " + ", ".join(
6883 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6884 for t in extra_froms
6885 )
6886
6887 def delete_extra_from_clause(
6888 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6889 ):
6890 kw["asfrom"] = True
6891 return ", " + ", ".join(
6892 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6893 for t in extra_froms
6894 )
6895
6896 def visit_empty_set_expr(self, element_types, **kw):
6897 return "SELECT 1 WHERE 1!=1"
6898
6899 def get_from_hint_text(self, table, text):
6900 return "[%s]" % text
6901
6902 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6903 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6904
6905 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6906 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6907
6908 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6909 return "<regexp replace>(%s, %s)" % (
6910 binary.left._compiler_dispatch(self, **kw),
6911 binary.right._compiler_dispatch(self, **kw),
6912 )
6913
6914 def visit_try_cast(self, cast, **kwargs):
6915 return "TRY_CAST(%s AS %s)" % (
6916 cast.clause._compiler_dispatch(self, **kwargs),
6917 cast.typeclause._compiler_dispatch(self, **kwargs),
6918 )
6919
6920
6921class DDLCompiler(Compiled):
6922 is_ddl = True
6923
6924 if TYPE_CHECKING:
6925
6926 def __init__(
6927 self,
6928 dialect: Dialect,
6929 statement: ExecutableDDLElement,
6930 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6931 render_schema_translate: bool = ...,
6932 compile_kwargs: Mapping[str, Any] = ...,
6933 ): ...
6934
6935 @util.ro_memoized_property
6936 def sql_compiler(self) -> SQLCompiler:
6937 return self.dialect.statement_compiler(
6938 self.dialect, None, schema_translate_map=self.schema_translate_map
6939 )
6940
6941 @util.memoized_property
6942 def type_compiler(self):
6943 return self.dialect.type_compiler_instance
6944
6945 def construct_params(
6946 self,
6947 params: Optional[_CoreSingleExecuteParams] = None,
6948 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6949 escape_names: bool = True,
6950 ) -> Optional[_MutableCoreSingleExecuteParams]:
6951 return None
6952
6953 def visit_ddl(self, ddl, **kwargs):
6954 # table events can substitute table and schema name
6955 context = ddl.context
6956 if isinstance(ddl.target, schema.Table):
6957 context = context.copy()
6958
6959 preparer = self.preparer
6960 path = preparer.format_table_seq(ddl.target)
6961 if len(path) == 1:
6962 table, sch = path[0], ""
6963 else:
6964 table, sch = path[-1], path[0]
6965
6966 context.setdefault("table", table)
6967 context.setdefault("schema", sch)
6968 context.setdefault("fullname", preparer.format_table(ddl.target))
6969
6970 return self.sql_compiler.post_process_text(ddl.statement % context)
6971
6972 def visit_create_schema(self, create, **kw):
6973 text = "CREATE SCHEMA "
6974 if create.if_not_exists:
6975 text += "IF NOT EXISTS "
6976 return text + self.preparer.format_schema(create.element)
6977
6978 def visit_drop_schema(self, drop, **kw):
6979 text = "DROP SCHEMA "
6980 if drop.if_exists:
6981 text += "IF EXISTS "
6982 text += self.preparer.format_schema(drop.element)
6983 if drop.cascade:
6984 text += " CASCADE"
6985 return text
6986
6987 def visit_create_table(self, create, **kw):
6988 table = create.element
6989 preparer = self.preparer
6990
6991 text = "\nCREATE "
6992 if table._prefixes:
6993 text += " ".join(table._prefixes) + " "
6994
6995 text += "TABLE "
6996 if create.if_not_exists:
6997 text += "IF NOT EXISTS "
6998
6999 text += preparer.format_table(table) + " "
7000
7001 create_table_suffix = self.create_table_suffix(table)
7002 if create_table_suffix:
7003 text += create_table_suffix + " "
7004
7005 text += "("
7006
7007 separator = "\n"
7008
7009 # if only one primary key, specify it along with the column
7010 first_pk = False
7011 for create_column in create.columns:
7012 column = create_column.element
7013 try:
7014 processed = self.process(
7015 create_column, first_pk=column.primary_key and not first_pk
7016 )
7017 if processed is not None:
7018 text += separator
7019 separator = ", \n"
7020 text += "\t" + processed
7021 if column.primary_key:
7022 first_pk = True
7023 except exc.CompileError as ce:
7024 raise exc.CompileError(
7025 "(in table '%s', column '%s'): %s"
7026 % (table.description, column.name, ce.args[0])
7027 ) from ce
7028
7029 const = self.create_table_constraints(
7030 table,
7031 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
7032 )
7033 if const:
7034 text += separator + "\t" + const
7035
7036 text += "\n)%s\n\n" % self.post_create_table(table)
7037 return text
7038
7039 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
7040 return self._generate_table_select(element, "view", **kw)
7041
7042 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
7043 return self._generate_table_select(element, "create_table_as", **kw)
7044
7045 def _generate_table_select(
7046 self,
7047 element: _TableViaSelect,
7048 type_: str,
7049 if_not_exists: Optional[bool] = None,
7050 **kw: Any,
7051 ) -> str:
7052 prep = self.preparer
7053
7054 inner_kw = dict(kw)
7055 inner_kw["literal_binds"] = True
7056 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7057
7058 # Use if_not_exists parameter if provided, otherwise use element's
7059 use_if_not_exists = (
7060 if_not_exists
7061 if if_not_exists is not None
7062 else element.if_not_exists
7063 )
7064
7065 parts = [
7066 "CREATE",
7067 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7068 "TEMPORARY" if element.temporary else None,
7069 (
7070 "MATERIALIZED VIEW"
7071 if type_ == "view" and getattr(element, "materialized", False)
7072 else "TABLE" if type_ == "create_table_as" else "VIEW"
7073 ),
7074 "IF NOT EXISTS" if use_if_not_exists else None,
7075 prep.format_table(element.table),
7076 "AS",
7077 select_sql,
7078 ]
7079 return " ".join(p for p in parts if p)
7080
7081 def visit_create_column(self, create, first_pk=False, **kw):
7082 column = create.element
7083
7084 if column.system:
7085 return None
7086
7087 text = self.get_column_specification(column, first_pk=first_pk)
7088 const = " ".join(
7089 self.process(constraint) for constraint in column.constraints
7090 )
7091 if const:
7092 text += " " + const
7093
7094 return text
7095
7096 def create_table_constraints(
7097 self, table, _include_foreign_key_constraints=None, **kw
7098 ):
7099 # On some DB order is significant: visit PK first, then the
7100 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7101 constraints = []
7102 if table.primary_key:
7103 constraints.append(table.primary_key)
7104
7105 all_fkcs = table.foreign_key_constraints
7106 if _include_foreign_key_constraints is not None:
7107 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7108 else:
7109 omit_fkcs = set()
7110
7111 constraints.extend(
7112 [
7113 c
7114 for c in table._sorted_constraints
7115 if c is not table.primary_key and c not in omit_fkcs
7116 ]
7117 )
7118
7119 return ", \n\t".join(
7120 p
7121 for p in (
7122 self.process(constraint)
7123 for constraint in constraints
7124 if (constraint._should_create_for_compiler(self))
7125 and (
7126 not self.dialect.supports_alter
7127 or not getattr(constraint, "use_alter", False)
7128 )
7129 )
7130 if p is not None
7131 )
7132
7133 def visit_drop_table(self, drop, **kw):
7134 text = "\nDROP TABLE "
7135 if drop.if_exists:
7136 text += "IF EXISTS "
7137 return text + self.preparer.format_table(drop.element)
7138
7139 def visit_drop_view(self, drop, **kw):
7140 text = "\nDROP "
7141 if drop.materialized:
7142 text += "MATERIALIZED VIEW "
7143 else:
7144 text += "VIEW "
7145 if drop.if_exists:
7146 text += "IF EXISTS "
7147 return text + self.preparer.format_table(drop.element)
7148
7149 def _verify_index_table(self, index: Index) -> None:
7150 if index.table is None:
7151 raise exc.CompileError(
7152 "Index '%s' is not associated with any table." % index.name
7153 )
7154
7155 def visit_create_index(
7156 self, create, include_schema=False, include_table_schema=True, **kw
7157 ):
7158 index = create.element
7159 self._verify_index_table(index)
7160 preparer = self.preparer
7161 text = "CREATE "
7162 if index.unique:
7163 text += "UNIQUE "
7164 if index.name is None:
7165 raise exc.CompileError(
7166 "CREATE INDEX requires that the index have a name"
7167 )
7168
7169 text += "INDEX "
7170 if create.if_not_exists:
7171 text += "IF NOT EXISTS "
7172
7173 text += "%s ON %s (%s)" % (
7174 self._prepared_index_name(index, include_schema=include_schema),
7175 preparer.format_table(
7176 index.table, use_schema=include_table_schema
7177 ),
7178 ", ".join(
7179 self.sql_compiler.process(
7180 expr, include_table=False, literal_binds=True
7181 )
7182 for expr in index.expressions
7183 ),
7184 )
7185 return text
7186
7187 def visit_drop_index(self, drop, **kw):
7188 index = drop.element
7189
7190 if index.name is None:
7191 raise exc.CompileError(
7192 "DROP INDEX requires that the index have a name"
7193 )
7194 text = "\nDROP INDEX "
7195 if drop.if_exists:
7196 text += "IF EXISTS "
7197
7198 return text + self._prepared_index_name(index, include_schema=True)
7199
7200 def _prepared_index_name(
7201 self, index: Index, include_schema: bool = False
7202 ) -> str:
7203 if index.table is not None:
7204 effective_schema = self.preparer.schema_for_object(index.table)
7205 else:
7206 effective_schema = None
7207 if include_schema and effective_schema:
7208 schema_name = self.preparer.quote_schema(effective_schema)
7209 else:
7210 schema_name = None
7211
7212 index_name: str = self.preparer.format_index(index)
7213
7214 if schema_name:
7215 index_name = schema_name + "." + index_name
7216 return index_name
7217
7218 def visit_add_constraint(self, create, **kw):
7219 return "ALTER TABLE %s ADD %s" % (
7220 self.preparer.format_table(create.element.table),
7221 self.process(create.element),
7222 )
7223
7224 def visit_set_table_comment(self, create, **kw):
7225 return "COMMENT ON TABLE %s IS %s" % (
7226 self.preparer.format_table(create.element),
7227 self.sql_compiler.render_literal_value(
7228 create.element.comment, sqltypes.String()
7229 ),
7230 )
7231
7232 def visit_drop_table_comment(self, drop, **kw):
7233 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7234 drop.element
7235 )
7236
7237 def visit_set_column_comment(self, create, **kw):
7238 return "COMMENT ON COLUMN %s IS %s" % (
7239 self.preparer.format_column(
7240 create.element, use_table=True, use_schema=True
7241 ),
7242 self.sql_compiler.render_literal_value(
7243 create.element.comment, sqltypes.String()
7244 ),
7245 )
7246
7247 def visit_drop_column_comment(self, drop, **kw):
7248 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7249 drop.element, use_table=True
7250 )
7251
7252 def visit_set_constraint_comment(self, create, **kw):
7253 raise exc.UnsupportedCompilationError(self, type(create))
7254
7255 def visit_drop_constraint_comment(self, drop, **kw):
7256 raise exc.UnsupportedCompilationError(self, type(drop))
7257
7258 def get_identity_options(self, identity_options):
7259 text = []
7260 if identity_options.increment is not None:
7261 text.append("INCREMENT BY %d" % identity_options.increment)
7262 if identity_options.start is not None:
7263 text.append("START WITH %d" % identity_options.start)
7264 if identity_options.minvalue is not None:
7265 text.append("MINVALUE %d" % identity_options.minvalue)
7266 if identity_options.maxvalue is not None:
7267 text.append("MAXVALUE %d" % identity_options.maxvalue)
7268 if identity_options.nominvalue is not None:
7269 text.append("NO MINVALUE")
7270 if identity_options.nomaxvalue is not None:
7271 text.append("NO MAXVALUE")
7272 if identity_options.cache is not None:
7273 text.append("CACHE %d" % identity_options.cache)
7274 if identity_options.cycle is not None:
7275 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7276 return " ".join(text)
7277
7278 def visit_create_sequence(self, create, prefix=None, **kw):
7279 text = "CREATE SEQUENCE "
7280 if create.if_not_exists:
7281 text += "IF NOT EXISTS "
7282 text += self.preparer.format_sequence(create.element)
7283
7284 if prefix:
7285 text += prefix
7286 options = self.get_identity_options(create.element)
7287 if options:
7288 text += " " + options
7289 return text
7290
7291 def visit_drop_sequence(self, drop, **kw):
7292 text = "DROP SEQUENCE "
7293 if drop.if_exists:
7294 text += "IF EXISTS "
7295 return text + self.preparer.format_sequence(drop.element)
7296
7297 def visit_drop_constraint(self, drop, **kw):
7298 constraint = drop.element
7299 if constraint.name is not None:
7300 formatted_name = self.preparer.format_constraint(constraint)
7301 else:
7302 formatted_name = None
7303
7304 if formatted_name is None:
7305 raise exc.CompileError(
7306 "Can't emit DROP CONSTRAINT for constraint %r; "
7307 "it has no name" % drop.element
7308 )
7309 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7310 self.preparer.format_table(drop.element.table),
7311 "IF EXISTS " if drop.if_exists else "",
7312 formatted_name,
7313 " CASCADE" if drop.cascade else "",
7314 )
7315
7316 def get_column_specification(self, column, **kwargs):
7317 colspec = (
7318 self.preparer.format_column(column)
7319 + " "
7320 + self.dialect.type_compiler_instance.process(
7321 column.type, type_expression=column
7322 )
7323 )
7324 default = self.get_column_default_string(column)
7325 if default is not None:
7326 colspec += " DEFAULT " + default
7327
7328 if column.computed is not None:
7329 colspec += " " + self.process(column.computed)
7330
7331 if (
7332 column.identity is not None
7333 and self.dialect.supports_identity_columns
7334 ):
7335 colspec += " " + self.process(column.identity)
7336
7337 if not column.nullable and (
7338 not column.identity or not self.dialect.supports_identity_columns
7339 ):
7340 colspec += " NOT NULL"
7341 return colspec
7342
7343 def create_table_suffix(self, table):
7344 return ""
7345
7346 def post_create_table(self, table):
7347 return ""
7348
7349 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7350 if isinstance(column.server_default, schema.DefaultClause):
7351 return self.render_default_string(column.server_default.arg)
7352 else:
7353 return None
7354
7355 def render_default_string(self, default: Union[Visitable, str]) -> str:
7356 if isinstance(default, str):
7357 return self.sql_compiler.render_literal_value(
7358 default, sqltypes.STRINGTYPE
7359 )
7360 else:
7361 return self.sql_compiler.process(default, literal_binds=True)
7362
7363 def visit_table_or_column_check_constraint(self, constraint, **kw):
7364 if constraint.is_column_level:
7365 return self.visit_column_check_constraint(constraint)
7366 else:
7367 return self.visit_check_constraint(constraint)
7368
7369 def visit_check_constraint(self, constraint, **kw):
7370 text = self.define_constraint_preamble(constraint, **kw)
7371 text += self.define_check_body(constraint, **kw)
7372 text += self.define_constraint_deferrability(constraint)
7373 return text
7374
7375 def visit_column_check_constraint(self, constraint, **kw):
7376 text = self.define_constraint_preamble(constraint, **kw)
7377 text += self.define_check_body(constraint, **kw)
7378 text += self.define_constraint_deferrability(constraint)
7379 return text
7380
7381 def visit_primary_key_constraint(
7382 self, constraint: PrimaryKeyConstraint, **kw: Any
7383 ) -> str:
7384 if len(constraint) == 0:
7385 return ""
7386 text = self.define_constraint_preamble(constraint, **kw)
7387 text += self.define_primary_key_body(constraint, **kw)
7388 text += self.define_constraint_deferrability(constraint)
7389 return text
7390
7391 def visit_foreign_key_constraint(
7392 self, constraint: ForeignKeyConstraint, **kw: Any
7393 ) -> str:
7394 text = self.define_constraint_preamble(constraint, **kw)
7395 text += self.define_foreign_key_body(constraint, **kw)
7396 text += self.define_constraint_match(constraint)
7397 text += self.define_constraint_cascades(constraint)
7398 text += self.define_constraint_deferrability(constraint)
7399 return text
7400
7401 def define_constraint_remote_table(self, constraint, table, preparer):
7402 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7403
7404 return preparer.format_table(table)
7405
7406 def visit_unique_constraint(
7407 self, constraint: UniqueConstraint, **kw: Any
7408 ) -> str:
7409 if len(constraint) == 0:
7410 return ""
7411 text = self.define_constraint_preamble(constraint, **kw)
7412 text += self.define_unique_body(constraint, **kw)
7413 text += self.define_constraint_deferrability(constraint)
7414 return text
7415
7416 def define_constraint_preamble(
7417 self, constraint: Constraint, **kw: Any
7418 ) -> str:
7419 text = ""
7420 if constraint.name is not None:
7421 formatted_name = self.preparer.format_constraint(constraint)
7422 if formatted_name is not None:
7423 text += "CONSTRAINT %s " % formatted_name
7424 return text
7425
7426 def define_primary_key_body(
7427 self, constraint: PrimaryKeyConstraint, **kw: Any
7428 ) -> str:
7429 text = ""
7430 text += "PRIMARY KEY "
7431 text += "(%s)" % ", ".join(
7432 self.preparer.quote(c.name)
7433 for c in (
7434 constraint.columns_autoinc_first
7435 if constraint._implicit_generated
7436 else constraint.columns
7437 )
7438 )
7439 return text
7440
7441 def define_foreign_key_body(
7442 self, constraint: ForeignKeyConstraint, **kw: Any
7443 ) -> str:
7444 preparer = self.preparer
7445 remote_table = list(constraint.elements)[0].column.table
7446 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7447 ", ".join(
7448 preparer.quote(f.parent.name) for f in constraint.elements
7449 ),
7450 self.define_constraint_remote_table(
7451 constraint, remote_table, preparer
7452 ),
7453 ", ".join(
7454 preparer.quote(f.column.name) for f in constraint.elements
7455 ),
7456 )
7457 return text
7458
7459 def define_unique_body(
7460 self, constraint: UniqueConstraint, **kw: Any
7461 ) -> str:
7462 text = "UNIQUE %s(%s)" % (
7463 self.define_unique_constraint_distinct(constraint, **kw),
7464 ", ".join(self.preparer.quote(c.name) for c in constraint),
7465 )
7466 return text
7467
7468 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7469 text = "CHECK (%s)" % self.sql_compiler.process(
7470 constraint.sqltext, include_table=False, literal_binds=True
7471 )
7472 return text
7473
7474 def define_unique_constraint_distinct(
7475 self, constraint: UniqueConstraint, **kw: Any
7476 ) -> str:
7477 return ""
7478
7479 def define_constraint_cascades(
7480 self, constraint: ForeignKeyConstraint
7481 ) -> str:
7482 text = ""
7483 if constraint.ondelete is not None:
7484 text += self.define_constraint_ondelete_cascade(constraint)
7485
7486 if constraint.onupdate is not None:
7487 text += self.define_constraint_onupdate_cascade(constraint)
7488 return text
7489
7490 def define_constraint_ondelete_cascade(
7491 self, constraint: ForeignKeyConstraint
7492 ) -> str:
7493 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7494 constraint.ondelete, FK_ON_DELETE
7495 )
7496
7497 def define_constraint_onupdate_cascade(
7498 self, constraint: ForeignKeyConstraint
7499 ) -> str:
7500 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7501 constraint.onupdate, FK_ON_UPDATE
7502 )
7503
7504 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7505 text = ""
7506 if constraint.deferrable is not None:
7507 if constraint.deferrable:
7508 text += " DEFERRABLE"
7509 else:
7510 text += " NOT DEFERRABLE"
7511 if constraint.initially is not None:
7512 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7513 constraint.initially, FK_INITIALLY
7514 )
7515 return text
7516
7517 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7518 text = ""
7519 if constraint.match is not None:
7520 text += " MATCH %s" % constraint.match
7521 return text
7522
7523 def visit_computed_column(self, generated, **kw):
7524 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7525 generated.sqltext, include_table=False, literal_binds=True
7526 )
7527 if generated.persisted is True:
7528 text += " STORED"
7529 elif generated.persisted is False:
7530 text += " VIRTUAL"
7531 return text
7532
7533 def visit_identity_column(self, identity, **kw):
7534 text = "GENERATED %s AS IDENTITY" % (
7535 "ALWAYS" if identity.always else "BY DEFAULT",
7536 )
7537 options = self.get_identity_options(identity)
7538 if options:
7539 text += " (%s)" % options
7540 return text
7541
7542
7543class GenericTypeCompiler(TypeCompiler):
7544 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7545 return "FLOAT"
7546
7547 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7548 return "DOUBLE"
7549
7550 def visit_DOUBLE_PRECISION(
7551 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7552 ) -> str:
7553 return "DOUBLE PRECISION"
7554
7555 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7556 return "REAL"
7557
7558 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7559 if type_.precision is None:
7560 return "NUMERIC"
7561 elif type_.scale is None:
7562 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7563 else:
7564 return "NUMERIC(%(precision)s, %(scale)s)" % {
7565 "precision": type_.precision,
7566 "scale": type_.scale,
7567 }
7568
7569 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7570 if type_.precision is None:
7571 return "DECIMAL"
7572 elif type_.scale is None:
7573 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7574 else:
7575 return "DECIMAL(%(precision)s, %(scale)s)" % {
7576 "precision": type_.precision,
7577 "scale": type_.scale,
7578 }
7579
7580 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7581 return "INTEGER"
7582
7583 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7584 return "SMALLINT"
7585
7586 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7587 return "BIGINT"
7588
7589 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7590 return "TIMESTAMP"
7591
7592 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7593 return "DATETIME"
7594
7595 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7596 return "DATE"
7597
7598 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7599 return "TIME"
7600
7601 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7602 return "CLOB"
7603
7604 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7605 return "NCLOB"
7606
7607 def _render_string_type(
7608 self, name: str, length: Optional[int], collation: Optional[str]
7609 ) -> str:
7610 text = name
7611 if length:
7612 text += f"({length})"
7613 if collation:
7614 text += f' COLLATE "{collation}"'
7615 return text
7616
7617 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7618 return self._render_string_type("CHAR", type_.length, type_.collation)
7619
7620 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7621 return self._render_string_type("NCHAR", type_.length, type_.collation)
7622
7623 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7624 return self._render_string_type(
7625 "VARCHAR", type_.length, type_.collation
7626 )
7627
7628 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7629 return self._render_string_type(
7630 "NVARCHAR", type_.length, type_.collation
7631 )
7632
7633 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7634 return self._render_string_type("TEXT", type_.length, type_.collation)
7635
7636 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7637 return "UUID"
7638
7639 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7640 return "BLOB"
7641
7642 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7643 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7644
7645 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7646 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7647
7648 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7649 return "BOOLEAN"
7650
7651 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7652 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7653 return self._render_string_type("CHAR", length=32, collation=None)
7654 else:
7655 return self.visit_UUID(type_, **kw)
7656
7657 def visit_large_binary(
7658 self, type_: sqltypes.LargeBinary, **kw: Any
7659 ) -> str:
7660 return self.visit_BLOB(type_, **kw)
7661
7662 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7663 return self.visit_BOOLEAN(type_, **kw)
7664
7665 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7666 return self.visit_TIME(type_, **kw)
7667
7668 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7669 return self.visit_DATETIME(type_, **kw)
7670
7671 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7672 return self.visit_DATE(type_, **kw)
7673
7674 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7675 return self.visit_BIGINT(type_, **kw)
7676
7677 def visit_small_integer(
7678 self, type_: sqltypes.SmallInteger, **kw: Any
7679 ) -> str:
7680 return self.visit_SMALLINT(type_, **kw)
7681
7682 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7683 return self.visit_INTEGER(type_, **kw)
7684
7685 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7686 return self.visit_REAL(type_, **kw)
7687
7688 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7689 return self.visit_FLOAT(type_, **kw)
7690
7691 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7692 return self.visit_DOUBLE(type_, **kw)
7693
7694 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7695 return self.visit_NUMERIC(type_, **kw)
7696
7697 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7698 return self.visit_VARCHAR(type_, **kw)
7699
7700 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7701 return self.visit_VARCHAR(type_, **kw)
7702
7703 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7704 return self.visit_TEXT(type_, **kw)
7705
7706 def visit_unicode_text(
7707 self, type_: sqltypes.UnicodeText, **kw: Any
7708 ) -> str:
7709 return self.visit_TEXT(type_, **kw)
7710
7711 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7712 return self.visit_VARCHAR(type_, **kw)
7713
7714 def visit_null(self, type_, **kw):
7715 raise exc.CompileError(
7716 "Can't generate DDL for %r; "
7717 "did you forget to specify a "
7718 "type on this Column?" % type_
7719 )
7720
7721 def visit_type_decorator(
7722 self, type_: TypeDecorator[Any], **kw: Any
7723 ) -> str:
7724 return self.process(type_.type_engine(self.dialect), **kw)
7725
7726 def visit_user_defined(
7727 self, type_: UserDefinedType[Any], **kw: Any
7728 ) -> str:
7729 return type_.get_col_spec(**kw)
7730
7731
7732class StrSQLTypeCompiler(GenericTypeCompiler):
7733 def process(self, type_, **kw):
7734 try:
7735 _compiler_dispatch = type_._compiler_dispatch
7736 except AttributeError:
7737 return self._visit_unknown(type_, **kw)
7738 else:
7739 return _compiler_dispatch(self, **kw)
7740
7741 def __getattr__(self, key):
7742 if key.startswith("visit_"):
7743 return self._visit_unknown
7744 else:
7745 raise AttributeError(key)
7746
7747 def _visit_unknown(self, type_, **kw):
7748 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7749 return type_.__class__.__name__
7750 else:
7751 return repr(type_)
7752
7753 def visit_null(self, type_, **kw):
7754 return "NULL"
7755
7756 def visit_user_defined(self, type_, **kw):
7757 try:
7758 get_col_spec = type_.get_col_spec
7759 except AttributeError:
7760 return repr(type_)
7761 else:
7762 return get_col_spec(**kw)
7763
7764
7765class _SchemaForObjectCallable(Protocol):
7766 def __call__(self, obj: Any, /) -> str: ...
7767
7768
7769class _BindNameForColProtocol(Protocol):
7770 def __call__(self, col: ColumnClause[Any]) -> str: ...
7771
7772
7773class IdentifierPreparer:
7774 """Handle quoting and case-folding of identifiers based on options."""
7775
7776 reserved_words = RESERVED_WORDS
7777
7778 legal_characters = LEGAL_CHARACTERS
7779
7780 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7781
7782 initial_quote: str
7783
7784 final_quote: str
7785
7786 _strings: MutableMapping[str, str]
7787
7788 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7789 """Return the .schema attribute for an object.
7790
7791 For the default IdentifierPreparer, the schema for an object is always
7792 the value of the ".schema" attribute. if the preparer is replaced
7793 with one that has a non-empty schema_translate_map, the value of the
7794 ".schema" attribute is rendered a symbol that will be converted to a
7795 real schema name from the mapping post-compile.
7796
7797 """
7798
7799 _includes_none_schema_translate: bool = False
7800
7801 def __init__(
7802 self,
7803 dialect: Dialect,
7804 initial_quote: str = '"',
7805 final_quote: Optional[str] = None,
7806 escape_quote: str = '"',
7807 quote_case_sensitive_collations: bool = True,
7808 omit_schema: bool = False,
7809 ):
7810 """Construct a new ``IdentifierPreparer`` object.
7811
7812 initial_quote
7813 Character that begins a delimited identifier.
7814
7815 final_quote
7816 Character that ends a delimited identifier. Defaults to
7817 `initial_quote`.
7818
7819 omit_schema
7820 Prevent prepending schema name. Useful for databases that do
7821 not support schemae.
7822 """
7823
7824 self.dialect = dialect
7825 self.initial_quote = initial_quote
7826 self.final_quote = final_quote or self.initial_quote
7827 self.escape_quote = escape_quote
7828 self.escape_to_quote = self.escape_quote * 2
7829 self.omit_schema = omit_schema
7830 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7831 self._strings = {}
7832 self._double_percents = self.dialect.paramstyle in (
7833 "format",
7834 "pyformat",
7835 )
7836
7837 def _with_schema_translate(self, schema_translate_map):
7838 prep = self.__class__.__new__(self.__class__)
7839 prep.__dict__.update(self.__dict__)
7840
7841 includes_none = None in schema_translate_map
7842
7843 def symbol_getter(obj):
7844 name = obj.schema
7845 if obj._use_schema_map and (name is not None or includes_none):
7846 if name is not None and ("[" in name or "]" in name):
7847 raise exc.CompileError(
7848 "Square bracket characters ([]) not supported "
7849 "in schema translate name '%s'" % name
7850 )
7851 return quoted_name(
7852 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7853 )
7854 else:
7855 return obj.schema
7856
7857 prep.schema_for_object = symbol_getter
7858 prep._includes_none_schema_translate = includes_none
7859 return prep
7860
7861 def _render_schema_translates(
7862 self, statement: str, schema_translate_map: SchemaTranslateMapType
7863 ) -> str:
7864 d = schema_translate_map
7865 if None in d:
7866 if not self._includes_none_schema_translate:
7867 raise exc.InvalidRequestError(
7868 "schema translate map which previously did not have "
7869 "`None` present as a key now has `None` present; compiled "
7870 "statement may lack adequate placeholders. Please use "
7871 "consistent keys in successive "
7872 "schema_translate_map dictionaries."
7873 )
7874
7875 d["_none"] = d[None] # type: ignore[index]
7876
7877 def replace(m):
7878 name = m.group(2)
7879 if name in d:
7880 effective_schema = d[name]
7881 else:
7882 if name in (None, "_none"):
7883 raise exc.InvalidRequestError(
7884 "schema translate map which previously had `None` "
7885 "present as a key now no longer has it present; don't "
7886 "know how to apply schema for compiled statement. "
7887 "Please use consistent keys in successive "
7888 "schema_translate_map dictionaries."
7889 )
7890 effective_schema = name
7891
7892 if not effective_schema:
7893 effective_schema = self.dialect.default_schema_name
7894 if not effective_schema:
7895 # TODO: no coverage here
7896 raise exc.CompileError(
7897 "Dialect has no default schema name; can't "
7898 "use None as dynamic schema target."
7899 )
7900 return self.quote_schema(effective_schema)
7901
7902 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7903
7904 def _escape_identifier(self, value: str) -> str:
7905 """Escape an identifier.
7906
7907 Subclasses should override this to provide database-dependent
7908 escaping behavior.
7909 """
7910
7911 value = value.replace(self.escape_quote, self.escape_to_quote)
7912 if self._double_percents:
7913 value = value.replace("%", "%%")
7914 return value
7915
7916 def _unescape_identifier(self, value: str) -> str:
7917 """Canonicalize an escaped identifier.
7918
7919 Subclasses should override this to provide database-dependent
7920 unescaping behavior that reverses _escape_identifier.
7921 """
7922
7923 return value.replace(self.escape_to_quote, self.escape_quote)
7924
7925 def validate_sql_phrase(self, element, reg):
7926 """keyword sequence filter.
7927
7928 a filter for elements that are intended to represent keyword sequences,
7929 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7930 should be present.
7931
7932 """
7933
7934 if element is not None and not reg.match(element):
7935 raise exc.CompileError(
7936 "Unexpected SQL phrase: %r (matching against %r)"
7937 % (element, reg.pattern)
7938 )
7939 return element
7940
7941 def quote_identifier(self, value: str) -> str:
7942 """Quote an identifier.
7943
7944 Subclasses should override this to provide database-dependent
7945 quoting behavior.
7946 """
7947
7948 return (
7949 self.initial_quote
7950 + self._escape_identifier(value)
7951 + self.final_quote
7952 )
7953
7954 def _requires_quotes(self, value: str) -> bool:
7955 """Return True if the given identifier requires quoting."""
7956 lc_value = value.lower()
7957 return (
7958 lc_value in self.reserved_words
7959 or value[0] in self.illegal_initial_characters
7960 or not self.legal_characters.match(str(value))
7961 or (lc_value != value)
7962 )
7963
7964 def _requires_quotes_illegal_chars(self, value):
7965 """Return True if the given identifier requires quoting, but
7966 not taking case convention into account."""
7967 return not self.legal_characters.match(str(value))
7968
7969 def quote_schema(self, schema: str) -> str:
7970 """Conditionally quote a schema name.
7971
7972
7973 The name is quoted if it is a reserved word, contains quote-necessary
7974 characters, or is an instance of :class:`.quoted_name` which includes
7975 ``quote`` set to ``True``.
7976
7977 Subclasses can override this to provide database-dependent
7978 quoting behavior for schema names.
7979
7980 :param schema: string schema name
7981 """
7982 return self.quote(schema)
7983
7984 def quote(self, ident: str) -> str:
7985 """Conditionally quote an identifier.
7986
7987 The identifier is quoted if it is a reserved word, contains
7988 quote-necessary characters, or is an instance of
7989 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7990
7991 Subclasses can override this to provide database-dependent
7992 quoting behavior for identifier names.
7993
7994 :param ident: string identifier
7995 """
7996 force = getattr(ident, "quote", None)
7997
7998 if force is None:
7999 if ident in self._strings:
8000 return self._strings[ident]
8001 else:
8002 if self._requires_quotes(ident):
8003 self._strings[ident] = self.quote_identifier(ident)
8004 else:
8005 self._strings[ident] = ident
8006 return self._strings[ident]
8007 elif force:
8008 return self.quote_identifier(ident)
8009 else:
8010 return ident
8011
8012 def format_collation(self, collation_name):
8013 if self.quote_case_sensitive_collations:
8014 return self.quote(collation_name)
8015 else:
8016 return collation_name
8017
8018 def format_sequence(
8019 self, sequence: schema.Sequence, use_schema: bool = True
8020 ) -> str:
8021 name = self.quote(sequence.name)
8022
8023 effective_schema = self.schema_for_object(sequence)
8024
8025 if (
8026 not self.omit_schema
8027 and use_schema
8028 and effective_schema is not None
8029 ):
8030 name = self.quote_schema(effective_schema) + "." + name
8031 return name
8032
8033 def format_label(
8034 self, label: Label[Any], name: Optional[str] = None
8035 ) -> str:
8036 return self.quote(name or label.name)
8037
8038 def format_alias(
8039 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
8040 ) -> str:
8041 if name is None:
8042 assert alias is not None
8043 return self.quote(alias.name)
8044 else:
8045 return self.quote(name)
8046
8047 def format_savepoint(self, savepoint, name=None):
8048 # Running the savepoint name through quoting is unnecessary
8049 # for all known dialects. This is here to support potential
8050 # third party use cases
8051 ident = name or savepoint.ident
8052 if self._requires_quotes(ident):
8053 ident = self.quote_identifier(ident)
8054 return ident
8055
8056 @util.preload_module("sqlalchemy.sql.naming")
8057 def format_constraint(
8058 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
8059 ) -> Optional[str]:
8060 naming = util.preloaded.sql_naming
8061
8062 if constraint.name is _NONE_NAME:
8063 name = naming._constraint_name_for_table(
8064 constraint, constraint.table
8065 )
8066
8067 if name is None:
8068 return None
8069 else:
8070 name = constraint.name
8071
8072 assert name is not None
8073 if constraint.__visit_name__ == "index":
8074 return self.truncate_and_render_index_name(
8075 name, _alembic_quote=_alembic_quote
8076 )
8077 else:
8078 return self.truncate_and_render_constraint_name(
8079 name, _alembic_quote=_alembic_quote
8080 )
8081
8082 def truncate_and_render_index_name(
8083 self, name: str, _alembic_quote: bool = True
8084 ) -> str:
8085 # calculate these at format time so that ad-hoc changes
8086 # to dialect.max_identifier_length etc. can be reflected
8087 # as IdentifierPreparer is long lived
8088 max_ = (
8089 self.dialect.max_index_name_length
8090 or self.dialect.max_identifier_length
8091 )
8092 return self._truncate_and_render_maxlen_name(
8093 name, max_, _alembic_quote
8094 )
8095
8096 def truncate_and_render_constraint_name(
8097 self, name: str, _alembic_quote: bool = True
8098 ) -> str:
8099 # calculate these at format time so that ad-hoc changes
8100 # to dialect.max_identifier_length etc. can be reflected
8101 # as IdentifierPreparer is long lived
8102 max_ = (
8103 self.dialect.max_constraint_name_length
8104 or self.dialect.max_identifier_length
8105 )
8106 return self._truncate_and_render_maxlen_name(
8107 name, max_, _alembic_quote
8108 )
8109
8110 def _truncate_and_render_maxlen_name(
8111 self, name: str, max_: int, _alembic_quote: bool
8112 ) -> str:
8113 if isinstance(name, elements._truncated_label):
8114 if len(name) > max_:
8115 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8116 else:
8117 self.dialect.validate_identifier(name)
8118
8119 if not _alembic_quote:
8120 return name
8121 else:
8122 return self.quote(name)
8123
8124 def format_index(self, index: Index) -> str:
8125 name = self.format_constraint(index)
8126 assert name is not None
8127 return name
8128
8129 def format_table(
8130 self,
8131 table: FromClause,
8132 use_schema: bool = True,
8133 name: Optional[str] = None,
8134 ) -> str:
8135 """Prepare a quoted table and schema name."""
8136 if name is None:
8137 if TYPE_CHECKING:
8138 assert isinstance(table, NamedFromClause)
8139 name = table.name
8140
8141 result = self.quote(name)
8142
8143 effective_schema = self.schema_for_object(table)
8144
8145 if not self.omit_schema and use_schema and effective_schema:
8146 result = self.quote_schema(effective_schema) + "." + result
8147 return result
8148
8149 def format_schema(self, name):
8150 """Prepare a quoted schema name."""
8151
8152 return self.quote(name)
8153
8154 def format_label_name(
8155 self,
8156 name,
8157 anon_map=None,
8158 ):
8159 """Prepare a quoted column name."""
8160
8161 if anon_map is not None and isinstance(
8162 name, elements._truncated_label
8163 ):
8164 name = name.apply_map(anon_map)
8165
8166 return self.quote(name)
8167
8168 def format_column(
8169 self,
8170 column: ColumnElement[Any],
8171 use_table: bool = False,
8172 name: Optional[str] = None,
8173 table_name: Optional[str] = None,
8174 use_schema: bool = False,
8175 anon_map: Optional[Mapping[str, Any]] = None,
8176 ) -> str:
8177 """Prepare a quoted column name."""
8178
8179 if name is None:
8180 name = column.name
8181 assert name is not None
8182
8183 if anon_map is not None and isinstance(
8184 name, elements._truncated_label
8185 ):
8186 name = name.apply_map(anon_map)
8187
8188 if not getattr(column, "is_literal", False):
8189 if use_table:
8190 return (
8191 self.format_table(
8192 column.table, use_schema=use_schema, name=table_name
8193 )
8194 + "."
8195 + self.quote(name)
8196 )
8197 else:
8198 return self.quote(name)
8199 else:
8200 # literal textual elements get stuck into ColumnClause a lot,
8201 # which shouldn't get quoted
8202
8203 if use_table:
8204 return (
8205 self.format_table(
8206 column.table, use_schema=use_schema, name=table_name
8207 )
8208 + "."
8209 + name
8210 )
8211 else:
8212 return name
8213
8214 def format_table_seq(self, table, use_schema=True):
8215 """Format table name and schema as a tuple."""
8216
8217 # Dialects with more levels in their fully qualified references
8218 # ('database', 'owner', etc.) could override this and return
8219 # a longer sequence.
8220
8221 effective_schema = self.schema_for_object(table)
8222
8223 if not self.omit_schema and use_schema and effective_schema:
8224 return (
8225 self.quote_schema(effective_schema),
8226 self.format_table(table, use_schema=False),
8227 )
8228 else:
8229 return (self.format_table(table, use_schema=False),)
8230
8231 @util.memoized_property
8232 def _r_identifiers(self):
8233 initial, final, escaped_final = (
8234 re.escape(s)
8235 for s in (
8236 self.initial_quote,
8237 self.final_quote,
8238 self._escape_identifier(self.final_quote),
8239 )
8240 )
8241 r = re.compile(
8242 r"(?:"
8243 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8244 r"|([^\.]+))(?=\.|$))+"
8245 % {"initial": initial, "final": final, "escaped": escaped_final}
8246 )
8247 return r
8248
8249 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8250 """Unpack 'schema.table.column'-like strings into components."""
8251
8252 r = self._r_identifiers
8253 return [
8254 self._unescape_identifier(i)
8255 for i in [a or b for a, b in r.findall(identifiers)]
8256 ]