1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26
27from __future__ import annotations
28
29import collections
30import collections.abc as collections_abc
31import contextlib
32from enum import IntEnum
33import functools
34import itertools
35import operator
36import re
37from time import perf_counter
38import typing
39from typing import Any
40from typing import Callable
41from typing import cast
42from typing import ClassVar
43from typing import Dict
44from typing import Final
45from typing import FrozenSet
46from typing import Iterable
47from typing import Iterator
48from typing import List
49from typing import Literal
50from typing import Mapping
51from typing import MutableMapping
52from typing import NamedTuple
53from typing import NoReturn
54from typing import Optional
55from typing import Pattern
56from typing import Protocol
57from typing import Sequence
58from typing import Set
59from typing import Tuple
60from typing import Type
61from typing import TYPE_CHECKING
62from typing import TypedDict
63from typing import Union
64
65from . import base
66from . import coercions
67from . import crud
68from . import elements
69from . import functions
70from . import operators
71from . import roles
72from . import schema
73from . import selectable
74from . import sqltypes
75from . import util as sql_util
76from ._typing import is_column_element
77from ._typing import is_dml
78from .base import _de_clone
79from .base import _from_objects
80from .base import _NONE_NAME
81from .base import _SentinelDefaultCharacterization
82from .base import NO_ARG
83from .elements import quoted_name
84from .sqltypes import TupleType
85from .visitors import prefix_anon_map
86from .. import exc
87from .. import util
88from ..util import FastIntFlag
89from ..util.typing import Self
90from ..util.typing import TupleAny
91from ..util.typing import Unpack
92
93if typing.TYPE_CHECKING:
94 from .annotation import _AnnotationDict
95 from .base import _AmbiguousTableNameMap
96 from .base import CompileState
97 from .base import Executable
98 from .base import ExecutableStatement
99 from .cache_key import CacheKey
100 from .ddl import _TableViaSelect
101 from .ddl import CreateTableAs
102 from .ddl import CreateView
103 from .ddl import ExecutableDDLElement
104 from .dml import Delete
105 from .dml import Insert
106 from .dml import Update
107 from .dml import UpdateBase
108 from .dml import UpdateDMLState
109 from .dml import ValuesBase
110 from .elements import _truncated_label
111 from .elements import BinaryExpression
112 from .elements import BindParameter
113 from .elements import ClauseElement
114 from .elements import ColumnClause
115 from .elements import ColumnElement
116 from .elements import False_
117 from .elements import Label
118 from .elements import Null
119 from .elements import True_
120 from .functions import Function
121 from .schema import CheckConstraint
122 from .schema import Column
123 from .schema import Constraint
124 from .schema import ForeignKeyConstraint
125 from .schema import IdentityOptions
126 from .schema import Index
127 from .schema import PrimaryKeyConstraint
128 from .schema import Table
129 from .schema import UniqueConstraint
130 from .selectable import _ColumnsClauseElement
131 from .selectable import AliasedReturnsRows
132 from .selectable import CompoundSelectState
133 from .selectable import CTE
134 from .selectable import FromClause
135 from .selectable import NamedFromClause
136 from .selectable import ReturnsRows
137 from .selectable import Select
138 from .selectable import SelectState
139 from .type_api import _BindProcessorType
140 from .type_api import TypeDecorator
141 from .type_api import TypeEngine
142 from .type_api import UserDefinedType
143 from .visitors import Visitable
144 from ..engine.cursor import CursorResultMetaData
145 from ..engine.interfaces import _CoreSingleExecuteParams
146 from ..engine.interfaces import _DBAPIAnyExecuteParams
147 from ..engine.interfaces import _DBAPIMultiExecuteParams
148 from ..engine.interfaces import _DBAPISingleExecuteParams
149 from ..engine.interfaces import _ExecuteOptions
150 from ..engine.interfaces import _GenericSetInputSizesType
151 from ..engine.interfaces import _MutableCoreSingleExecuteParams
152 from ..engine.interfaces import Dialect
153 from ..engine.interfaces import SchemaTranslateMapType
154
155
156_FromHintsType = Dict["FromClause", str]
157
158RESERVED_WORDS = {
159 "all",
160 "analyse",
161 "analyze",
162 "and",
163 "any",
164 "array",
165 "as",
166 "asc",
167 "asymmetric",
168 "authorization",
169 "between",
170 "binary",
171 "both",
172 "case",
173 "cast",
174 "check",
175 "collate",
176 "column",
177 "constraint",
178 "create",
179 "cross",
180 "current_date",
181 "current_role",
182 "current_time",
183 "current_timestamp",
184 "current_user",
185 "default",
186 "deferrable",
187 "desc",
188 "distinct",
189 "do",
190 "else",
191 "end",
192 "except",
193 "false",
194 "for",
195 "foreign",
196 "freeze",
197 "from",
198 "full",
199 "grant",
200 "group",
201 "having",
202 "ilike",
203 "in",
204 "initially",
205 "inner",
206 "intersect",
207 "into",
208 "is",
209 "isnull",
210 "join",
211 "leading",
212 "left",
213 "like",
214 "limit",
215 "localtime",
216 "localtimestamp",
217 "natural",
218 "new",
219 "not",
220 "notnull",
221 "null",
222 "off",
223 "offset",
224 "old",
225 "on",
226 "only",
227 "or",
228 "order",
229 "outer",
230 "overlaps",
231 "placing",
232 "primary",
233 "references",
234 "right",
235 "select",
236 "session_user",
237 "set",
238 "similar",
239 "some",
240 "symmetric",
241 "table",
242 "then",
243 "to",
244 "trailing",
245 "true",
246 "union",
247 "unique",
248 "user",
249 "using",
250 "verbose",
251 "when",
252 "where",
253}
254
255LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
256LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
257ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
258
259FK_ON_DELETE = re.compile(
260 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
261)
262FK_ON_UPDATE = re.compile(
263 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
264)
265FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
266_WINDOW_EXCLUDE_RE = re.compile(
267 r"^(?:CURRENT ROW|GROUP|TIES|NO OTHERS)$", re.I
268)
269BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
270BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
271
272_pyformat_template = "%%(%(name)s)s"
273BIND_TEMPLATES = {
274 "pyformat": _pyformat_template,
275 "qmark": "?",
276 "format": "%%s",
277 "numeric": ":[_POSITION]",
278 "numeric_dollar": "$[_POSITION]",
279 "named": ":%(name)s",
280}
281
282
283OPERATORS = {
284 # binary
285 operators.and_: " AND ",
286 operators.or_: " OR ",
287 operators.add: " + ",
288 operators.mul: " * ",
289 operators.sub: " - ",
290 operators.mod: " % ",
291 operators.neg: "-",
292 operators.lt: " < ",
293 operators.le: " <= ",
294 operators.ne: " != ",
295 operators.gt: " > ",
296 operators.ge: " >= ",
297 operators.eq: " = ",
298 operators.is_distinct_from: " IS DISTINCT FROM ",
299 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
300 operators.concat_op: " || ",
301 operators.match_op: " MATCH ",
302 operators.not_match_op: " NOT MATCH ",
303 operators.in_op: " IN ",
304 operators.not_in_op: " NOT IN ",
305 operators.comma_op: ", ",
306 operators.from_: " FROM ",
307 operators.as_: " AS ",
308 operators.is_: " IS ",
309 operators.is_not: " IS NOT ",
310 operators.collate: " COLLATE ",
311 # unary
312 operators.exists: "EXISTS ",
313 operators.distinct_op: "DISTINCT ",
314 operators.inv: "NOT ",
315 operators.any_op: "ANY ",
316 operators.all_op: "ALL ",
317 # modifiers
318 operators.desc_op: " DESC",
319 operators.asc_op: " ASC",
320 operators.nulls_first_op: " NULLS FIRST",
321 operators.nulls_last_op: " NULLS LAST",
322 # bitwise
323 operators.bitwise_xor_op: " ^ ",
324 operators.bitwise_or_op: " | ",
325 operators.bitwise_and_op: " & ",
326 operators.bitwise_not_op: "~",
327 operators.bitwise_lshift_op: " << ",
328 operators.bitwise_rshift_op: " >> ",
329}
330
331FUNCTIONS: Dict[Type[Function[Any]], str] = {
332 functions.coalesce: "coalesce",
333 functions.current_date: "CURRENT_DATE",
334 functions.current_time: "CURRENT_TIME",
335 functions.current_timestamp: "CURRENT_TIMESTAMP",
336 functions.current_user: "CURRENT_USER",
337 functions.localtime: "LOCALTIME",
338 functions.localtimestamp: "LOCALTIMESTAMP",
339 functions.random: "random",
340 functions.sysdate: "sysdate",
341 functions.session_user: "SESSION_USER",
342 functions.user: "USER",
343 functions.cube: "CUBE",
344 functions.rollup: "ROLLUP",
345 functions.grouping_sets: "GROUPING SETS",
346}
347
348
349EXTRACT_MAP = {
350 "month": "month",
351 "day": "day",
352 "year": "year",
353 "second": "second",
354 "hour": "hour",
355 "doy": "doy",
356 "minute": "minute",
357 "quarter": "quarter",
358 "dow": "dow",
359 "week": "week",
360 "epoch": "epoch",
361 "milliseconds": "milliseconds",
362 "microseconds": "microseconds",
363 "timezone_hour": "timezone_hour",
364 "timezone_minute": "timezone_minute",
365}
366
367COMPOUND_KEYWORDS = {
368 selectable._CompoundSelectKeyword.UNION: "UNION",
369 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
370 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
371 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
372 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
373 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
374}
375
376
377class ResultColumnsEntry(NamedTuple):
378 """Tracks a column expression that is expected to be represented
379 in the result rows for this statement.
380
381 This normally refers to the columns clause of a SELECT statement
382 but may also refer to a RETURNING clause, as well as for dialect-specific
383 emulations.
384
385 """
386
387 keyname: str
388 """string name that's expected in cursor.description"""
389
390 name: str
391 """column name, may be labeled"""
392
393 objects: Tuple[Any, ...]
394 """sequence of objects that should be able to locate this column
395 in a RowMapping. This is typically string names and aliases
396 as well as Column objects.
397
398 """
399
400 type: TypeEngine[Any]
401 """Datatype to be associated with this column. This is where
402 the "result processing" logic directly links the compiled statement
403 to the rows that come back from the cursor.
404
405 """
406
407
408class _ResultMapAppender(Protocol):
409 def __call__(
410 self,
411 keyname: str,
412 name: str,
413 objects: Sequence[Any],
414 type_: TypeEngine[Any],
415 ) -> None: ...
416
417
418# integer indexes into ResultColumnsEntry used by cursor.py.
419# some profiling showed integer access faster than named tuple
420RM_RENDERED_NAME: Literal[0] = 0
421RM_NAME: Literal[1] = 1
422RM_OBJECTS: Literal[2] = 2
423RM_TYPE: Literal[3] = 3
424
425
426class _BaseCompilerStackEntry(TypedDict):
427 asfrom_froms: Set[FromClause]
428 correlate_froms: Set[FromClause]
429 selectable: ReturnsRows
430
431
432class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
433 compile_state: CompileState
434 need_result_map_for_nested: bool
435 need_result_map_for_compound: bool
436 select_0: ReturnsRows
437 insert_from_select: Select[Unpack[TupleAny]]
438
439
440class ExpandedState(NamedTuple):
441 """represents state to use when producing "expanded" and
442 "post compile" bound parameters for a statement.
443
444 "expanded" parameters are parameters that are generated at
445 statement execution time to suit a number of parameters passed, the most
446 prominent example being the individual elements inside of an IN expression.
447
448 "post compile" parameters are parameters where the SQL literal value
449 will be rendered into the SQL statement at execution time, rather than
450 being passed as separate parameters to the driver.
451
452 To create an :class:`.ExpandedState` instance, use the
453 :meth:`.SQLCompiler.construct_expanded_state` method on any
454 :class:`.SQLCompiler` instance.
455
456 """
457
458 statement: str
459 """String SQL statement with parameters fully expanded"""
460
461 parameters: _CoreSingleExecuteParams
462 """Parameter dictionary with parameters fully expanded.
463
464 For a statement that uses named parameters, this dictionary will map
465 exactly to the names in the statement. For a statement that uses
466 positional parameters, the :attr:`.ExpandedState.positional_parameters`
467 will yield a tuple with the positional parameter set.
468
469 """
470
471 processors: Mapping[str, _BindProcessorType[Any]]
472 """mapping of bound value processors"""
473
474 positiontup: Optional[Sequence[str]]
475 """Sequence of string names indicating the order of positional
476 parameters"""
477
478 parameter_expansion: Mapping[str, List[str]]
479 """Mapping representing the intermediary link from original parameter
480 name to list of "expanded" parameter names, for those parameters that
481 were expanded."""
482
483 @property
484 def positional_parameters(self) -> Tuple[Any, ...]:
485 """Tuple of positional parameters, for statements that were compiled
486 using a positional paramstyle.
487
488 """
489 if self.positiontup is None:
490 raise exc.InvalidRequestError(
491 "statement does not use a positional paramstyle"
492 )
493 return tuple(self.parameters[key] for key in self.positiontup)
494
495 @property
496 def additional_parameters(self) -> _CoreSingleExecuteParams:
497 """synonym for :attr:`.ExpandedState.parameters`."""
498 return self.parameters
499
500
501class _InsertManyValues(NamedTuple):
502 """represents state to use for executing an "insertmanyvalues" statement.
503
504 The primary consumers of this object are the
505 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
506 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
507
508 .. versionadded:: 2.0
509
510 """
511
512 is_default_expr: bool
513 """if True, the statement is of the form
514 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
515
516 """
517
518 single_values_expr: str
519 """The rendered "values" clause of the INSERT statement.
520
521 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
522 The insertmanyvalues logic uses this string as a search and replace
523 target.
524
525 """
526
527 insert_crud_params: List[crud._CrudParamElementStr]
528 """List of Column / bind names etc. used while rewriting the statement"""
529
530 num_positional_params_counted: int
531 """the number of bound parameters in a single-row statement.
532
533 This count may be larger or smaller than the actual number of columns
534 targeted in the INSERT, as it accommodates for SQL expressions
535 in the values list that may have zero or more parameters embedded
536 within them.
537
538 This count is part of what's used to organize rewritten parameter lists
539 when batching.
540
541 """
542
543 sort_by_parameter_order: bool = False
544 """if the deterministic_returnined_order parameter were used on the
545 insert.
546
547 All of the attributes following this will only be used if this is True.
548
549 """
550
551 includes_upsert_behaviors: bool = False
552 """if True, we have to accommodate for upsert behaviors.
553
554 This will in some cases downgrade "insertmanyvalues" that requests
555 deterministic ordering.
556
557 """
558
559 sentinel_columns: Optional[Sequence[Column[Any]]] = None
560 """List of sentinel columns that were located.
561
562 This list is only here if the INSERT asked for
563 sort_by_parameter_order=True,
564 and dialect-appropriate sentinel columns were located.
565
566 .. versionadded:: 2.0.10
567
568 """
569
570 num_sentinel_columns: int = 0
571 """how many sentinel columns are in the above list, if any.
572
573 This is the same as
574 ``len(sentinel_columns) if sentinel_columns is not None else 0``
575
576 """
577
578 sentinel_param_keys: Optional[Sequence[str]] = None
579 """parameter str keys in each param dictionary / tuple
580 that would link to the client side "sentinel" values for that row, which
581 we can use to match up parameter sets to result rows.
582
583 This is only present if sentinel_columns is present and the INSERT
584 statement actually refers to client side values for these sentinel
585 columns.
586
587 .. versionadded:: 2.0.10
588
589 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
590 only, used against the "compiled parameteters" collection before
591 the parameters were converted by bound parameter processors
592
593 """
594
595 implicit_sentinel: bool = False
596 """if True, we have exactly one sentinel column and it uses a server side
597 value, currently has to generate an incrementing integer value.
598
599 The dialect in question would have asserted that it supports receiving
600 these values back and sorting on that value as a means of guaranteeing
601 correlation with the incoming parameter list.
602
603 .. versionadded:: 2.0.10
604
605 """
606
607 has_upsert_bound_parameters: bool = False
608 """if True, the upsert SET clause contains bound parameters that will
609 receive their values from the parameters dict (i.e., parametrized
610 bindparams where value is None and callable is None).
611
612 This means we can't batch multiple rows in a single statement, since
613 each row would need different values in the SET clause but there's only
614 one SET clause per statement. See issue #13130.
615
616 .. versionadded:: 2.0.37
617
618 """
619
620 embed_values_counter: bool = False
621 """Whether to embed an incrementing integer counter in each parameter
622 set within the VALUES clause as parameters are batched over.
623
624 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
625 where a subquery is used to produce value tuples. Current support
626 includes PostgreSQL, Microsoft SQL Server.
627
628 .. versionadded:: 2.0.10
629
630 """
631
632
633class _InsertManyValuesBatch(NamedTuple):
634 """represents an individual batch SQL statement for insertmanyvalues.
635
636 This is passed through the
637 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
638 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
639 to the :class:`.Connection` within the
640 :meth:`.Connection._exec_insertmany_context` method.
641
642 .. versionadded:: 2.0.10
643
644 """
645
646 replaced_statement: str
647 replaced_parameters: _DBAPIAnyExecuteParams
648 processed_setinputsizes: Optional[_GenericSetInputSizesType]
649 batch: Sequence[_DBAPISingleExecuteParams]
650 sentinel_values: Sequence[Tuple[Any, ...]]
651 current_batch_size: int
652 batchnum: int
653 total_batches: int
654 rows_sorted: bool
655 is_downgraded: bool
656
657
658class InsertmanyvaluesSentinelOpts(FastIntFlag):
659 """bitflag enum indicating styles of PK defaults
660 which can work as implicit sentinel columns
661
662 """
663
664 NOT_SUPPORTED = 1
665 AUTOINCREMENT = 2
666 IDENTITY = 4
667 SEQUENCE = 8
668 MONOTONIC_FUNCTION = 16
669
670 ANY_AUTOINCREMENT = (
671 AUTOINCREMENT | IDENTITY | SEQUENCE | MONOTONIC_FUNCTION
672 )
673 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
674
675 USE_INSERT_FROM_SELECT = 32
676 RENDER_SELECT_COL_CASTS = 64
677
678
679class AggregateOrderByStyle(IntEnum):
680 """Describes backend database's capabilities with ORDER BY for aggregate
681 functions
682
683 .. versionadded:: 2.1
684
685 """
686
687 NONE = 0
688 """database has no ORDER BY for aggregate functions"""
689
690 INLINE = 1
691 """ORDER BY is rendered inside the function's argument list, typically as
692 the last element"""
693
694 WITHIN_GROUP = 2
695 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
696 functions (not just the ordered set ones)"""
697
698
699class CompilerState(IntEnum):
700 COMPILING = 0
701 """statement is present, compilation phase in progress"""
702
703 STRING_APPLIED = 1
704 """statement is present, string form of the statement has been applied.
705
706 Additional processors by subclasses may still be pending.
707
708 """
709
710 NO_STATEMENT = 2
711 """compiler does not have a statement to compile, is used
712 for method access"""
713
714
715class Linting(IntEnum):
716 """represent preferences for the 'SQL linting' feature.
717
718 this feature currently includes support for flagging cartesian products
719 in SQL statements.
720
721 """
722
723 NO_LINTING = 0
724 "Disable all linting."
725
726 COLLECT_CARTESIAN_PRODUCTS = 1
727 """Collect data on FROMs and cartesian products and gather into
728 'self.from_linter'"""
729
730 WARN_LINTING = 2
731 "Emit warnings for linters that find problems"
732
733 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
734 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
735 and WARN_LINTING"""
736
737
738NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
739 Linting
740)
741
742
743class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
744 """represents current state for the "cartesian product" detection
745 feature."""
746
747 def lint(self, start=None):
748 froms = self.froms
749 if not froms:
750 return None, None
751
752 edges = set(self.edges)
753 the_rest = set(froms)
754
755 if start is not None:
756 start_with = start
757 the_rest.remove(start_with)
758 else:
759 start_with = the_rest.pop()
760
761 stack = collections.deque([start_with])
762
763 while stack and the_rest:
764 node = stack.popleft()
765 the_rest.discard(node)
766
767 # comparison of nodes in edges here is based on hash equality, as
768 # there are "annotated" elements that match the non-annotated ones.
769 # to remove the need for in-python hash() calls, use native
770 # containment routines (e.g. "node in edge", "edge.index(node)")
771 to_remove = {edge for edge in edges if node in edge}
772
773 # appendleft the node in each edge that is not
774 # the one that matched.
775 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
776 edges.difference_update(to_remove)
777
778 # FROMS left over? boom
779 if the_rest:
780 return the_rest, start_with
781 else:
782 return None, None
783
784 def warn(self, stmt_type="SELECT"):
785 the_rest, start_with = self.lint()
786
787 # FROMS left over? boom
788 if the_rest:
789 froms = the_rest
790 if froms:
791 template = (
792 "{stmt_type} statement has a cartesian product between "
793 "FROM element(s) {froms} and "
794 'FROM element "{start}". Apply join condition(s) '
795 "between each element to resolve."
796 )
797 froms_str = ", ".join(
798 f'"{self.froms[from_]}"' for from_ in froms
799 )
800 message = template.format(
801 stmt_type=stmt_type,
802 froms=froms_str,
803 start=self.froms[start_with],
804 )
805
806 util.warn(message)
807
808
809class Compiled:
810 """Represent a compiled SQL or DDL expression.
811
812 The ``__str__`` method of the ``Compiled`` object should produce
813 the actual text of the statement. ``Compiled`` objects are
814 specific to their underlying database dialect, and also may
815 or may not be specific to the columns referenced within a
816 particular set of bind parameters. In no case should the
817 ``Compiled`` object be dependent on the actual values of those
818 bind parameters, even though it may reference those values as
819 defaults.
820 """
821
822 statement: Optional[ClauseElement] = None
823 "The statement to compile."
824 string: str = ""
825 "The string representation of the ``statement``"
826
827 state: CompilerState
828 """description of the compiler's state"""
829
830 is_sql = False
831 is_ddl = False
832
833 _cached_metadata: Optional[CursorResultMetaData] = None
834
835 _result_columns: Optional[List[ResultColumnsEntry]] = None
836
837 schema_translate_map: Optional[SchemaTranslateMapType] = None
838
839 execution_options: _ExecuteOptions = util.EMPTY_DICT
840 """
841 Execution options propagated from the statement. In some cases,
842 sub-elements of the statement can modify these.
843 """
844
845 preparer: IdentifierPreparer
846
847 _annotations: _AnnotationDict = util.EMPTY_DICT
848
849 compile_state: Optional[CompileState] = None
850 """Optional :class:`.CompileState` object that maintains additional
851 state used by the compiler.
852
853 Major executable objects such as :class:`_expression.Insert`,
854 :class:`_expression.Update`, :class:`_expression.Delete`,
855 :class:`_expression.Select` will generate this
856 state when compiled in order to calculate additional information about the
857 object. For the top level object that is to be executed, the state can be
858 stored here where it can also have applicability towards result set
859 processing.
860
861 .. versionadded:: 1.4
862
863 """
864
865 dml_compile_state: Optional[CompileState] = None
866 """Optional :class:`.CompileState` assigned at the same point that
867 .isinsert, .isupdate, or .isdelete is assigned.
868
869 This will normally be the same object as .compile_state, with the
870 exception of cases like the :class:`.ORMFromStatementCompileState`
871 object.
872
873 .. versionadded:: 1.4.40
874
875 """
876
877 cache_key: Optional[CacheKey] = None
878 """The :class:`.CacheKey` that was generated ahead of creating this
879 :class:`.Compiled` object.
880
881 This is used for routines that need access to the original
882 :class:`.CacheKey` instance generated when the :class:`.Compiled`
883 instance was first cached, typically in order to reconcile
884 the original list of :class:`.BindParameter` objects with a
885 per-statement list that's generated on each call.
886
887 """
888
889 _gen_time: float
890 """Generation time of this :class:`.Compiled`, used for reporting
891 cache stats."""
892
893 def __init__(
894 self,
895 dialect: Dialect,
896 statement: Optional[ClauseElement],
897 schema_translate_map: Optional[SchemaTranslateMapType] = None,
898 render_schema_translate: bool = False,
899 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
900 ):
901 """Construct a new :class:`.Compiled` object.
902
903 :param dialect: :class:`.Dialect` to compile against.
904
905 :param statement: :class:`_expression.ClauseElement` to be compiled.
906
907 :param schema_translate_map: dictionary of schema names to be
908 translated when forming the resultant SQL
909
910 .. seealso::
911
912 :ref:`schema_translating`
913
914 :param compile_kwargs: additional kwargs that will be
915 passed to the initial call to :meth:`.Compiled.process`.
916
917
918 """
919 self.dialect = dialect
920 self.preparer = self.dialect.identifier_preparer
921 if schema_translate_map:
922 self.schema_translate_map = schema_translate_map
923 self.preparer = self.preparer._with_schema_translate(
924 schema_translate_map
925 )
926
927 if statement is not None:
928 self.state = CompilerState.COMPILING
929 self.statement = statement
930 self.can_execute = statement.supports_execution
931 self._annotations = statement._annotations
932 if self.can_execute:
933 if TYPE_CHECKING:
934 assert isinstance(statement, Executable)
935 self.execution_options = statement._execution_options
936 self.string = self.process(self.statement, **compile_kwargs)
937
938 if render_schema_translate:
939 assert schema_translate_map is not None
940 self.string = self.preparer._render_schema_translates(
941 self.string, schema_translate_map
942 )
943
944 self.state = CompilerState.STRING_APPLIED
945 else:
946 self.state = CompilerState.NO_STATEMENT
947
948 self._gen_time = perf_counter()
949
950 def __init_subclass__(cls) -> None:
951 cls._init_compiler_cls()
952 return super().__init_subclass__()
953
954 @classmethod
955 def _init_compiler_cls(cls):
956 pass
957
958 def visit_unsupported_compilation(self, element, err, **kw):
959 raise exc.UnsupportedCompilationError(self, type(element)) from err
960
961 @property
962 def sql_compiler(self) -> SQLCompiler:
963 """Return a Compiled that is capable of processing SQL expressions.
964
965 If this compiler is one, it would likely just return 'self'.
966
967 """
968
969 raise NotImplementedError()
970
971 def process(self, obj: Visitable, **kwargs: Any) -> str:
972 return obj._compiler_dispatch(self, **kwargs)
973
974 def __str__(self) -> str:
975 """Return the string text of the generated SQL or DDL."""
976
977 if self.state is CompilerState.STRING_APPLIED:
978 return self.string
979 else:
980 return ""
981
982 def construct_params(
983 self,
984 params: Optional[_CoreSingleExecuteParams] = None,
985 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
986 escape_names: bool = True,
987 ) -> Optional[_MutableCoreSingleExecuteParams]:
988 """Return the bind params for this compiled object.
989
990 :param params: a dict of string/object pairs whose values will
991 override bind values compiled in to the
992 statement.
993 """
994
995 raise NotImplementedError()
996
997 @property
998 def params(self):
999 """Return the bind params for this compiled object."""
1000 return self.construct_params()
1001
1002
1003class TypeCompiler(util.EnsureKWArg):
1004 """Produces DDL specification for TypeEngine objects."""
1005
1006 ensure_kwarg = r"visit_\w+"
1007
1008 def __init__(self, dialect: Dialect):
1009 self.dialect = dialect
1010
1011 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
1012 if (
1013 type_._variant_mapping
1014 and self.dialect.name in type_._variant_mapping
1015 ):
1016 type_ = type_._variant_mapping[self.dialect.name]
1017 return type_._compiler_dispatch(self, **kw)
1018
1019 def visit_unsupported_compilation(
1020 self, element: Any, err: Exception, **kw: Any
1021 ) -> NoReturn:
1022 raise exc.UnsupportedCompilationError(self, element) from err
1023
1024
1025# this was a Visitable, but to allow accurate detection of
1026# column elements this is actually a column element
1027class _CompileLabel(
1028 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1029):
1030 """lightweight label object which acts as an expression.Label."""
1031
1032 __visit_name__ = "label"
1033 __slots__ = "element", "name", "_alt_names"
1034
1035 def __init__(self, col, name, alt_names=()):
1036 self.element = col
1037 self.name = name
1038 self._alt_names = (col,) + alt_names
1039
1040 @property
1041 def proxy_set(self):
1042 return self.element.proxy_set
1043
1044 @property
1045 def type(self):
1046 return self.element.type
1047
1048 def self_group(self, **kw):
1049 return self
1050
1051
1052class aggregate_orderby_inline(
1053 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1054):
1055 """produce ORDER BY inside of function argument lists"""
1056
1057 __visit_name__ = "aggregate_orderby_inline"
1058 __slots__ = "element", "aggregate_order_by"
1059
1060 def __init__(self, element, orderby):
1061 self.element = element
1062 self.aggregate_order_by = orderby
1063
1064 def __iter__(self):
1065 return iter(self.element)
1066
1067 @property
1068 def proxy_set(self):
1069 return self.element.proxy_set
1070
1071 @property
1072 def type(self):
1073 return self.element.type
1074
1075 def self_group(self, **kw):
1076 return self
1077
1078 def _with_binary_element_type(self, type_):
1079 return aggregate_orderby_inline(
1080 self.element._with_binary_element_type(type_),
1081 self.aggregate_order_by,
1082 )
1083
1084
1085class ilike_case_insensitive(
1086 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1087):
1088 """produce a wrapping element for a case-insensitive portion of
1089 an ILIKE construct.
1090
1091 The construct usually renders the ``lower()`` function, but on
1092 PostgreSQL will pass silently with the assumption that "ILIKE"
1093 is being used.
1094
1095 .. versionadded:: 2.0
1096
1097 """
1098
1099 __visit_name__ = "ilike_case_insensitive_operand"
1100 __slots__ = "element", "comparator"
1101
1102 def __init__(self, element):
1103 self.element = element
1104 self.comparator = element.comparator
1105
1106 @property
1107 def proxy_set(self):
1108 return self.element.proxy_set
1109
1110 @property
1111 def type(self):
1112 return self.element.type
1113
1114 def self_group(self, **kw):
1115 return self
1116
1117 def _with_binary_element_type(self, type_):
1118 return ilike_case_insensitive(
1119 self.element._with_binary_element_type(type_)
1120 )
1121
1122
1123class SQLCompiler(Compiled):
1124 """Default implementation of :class:`.Compiled`.
1125
1126 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1127
1128 """
1129
1130 extract_map = EXTRACT_MAP
1131
1132 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1133 util.immutabledict(
1134 {
1135 "%": "P",
1136 "(": "A",
1137 ")": "Z",
1138 ":": "C",
1139 ".": "_",
1140 "[": "_",
1141 "]": "_",
1142 " ": "_",
1143 }
1144 )
1145 )
1146 """A mapping (e.g. dict or similar) containing a lookup of
1147 characters keyed to replacement characters which will be applied to all
1148 'bind names' used in SQL statements as a form of 'escaping'; the given
1149 characters are replaced entirely with the 'replacement' character when
1150 rendered in the SQL statement, and a similar translation is performed
1151 on the incoming names used in parameter dictionaries passed to methods
1152 like :meth:`_engine.Connection.execute`.
1153
1154 This allows bound parameter names used in :func:`_sql.bindparam` and
1155 other constructs to have any arbitrary characters present without any
1156 concern for characters that aren't allowed at all on the target database.
1157
1158 Third party dialects can establish their own dictionary here to replace the
1159 default mapping, which will ensure that the particular characters in the
1160 mapping will never appear in a bound parameter name.
1161
1162 The dictionary is evaluated at **class creation time**, so cannot be
1163 modified at runtime; it must be present on the class when the class
1164 is first declared.
1165
1166 Note that for dialects that have additional bound parameter rules such
1167 as additional restrictions on leading characters, the
1168 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1169 See the cx_Oracle compiler for an example of this.
1170
1171 .. versionadded:: 2.0.0rc1
1172
1173 """
1174
1175 _bind_translate_re: ClassVar[Pattern[str]]
1176 _bind_translate_chars: ClassVar[Mapping[str, str]]
1177
1178 is_sql = True
1179
1180 compound_keywords = COMPOUND_KEYWORDS
1181
1182 isdelete: bool = False
1183 isinsert: bool = False
1184 isupdate: bool = False
1185 """class-level defaults which can be set at the instance
1186 level to define if this Compiled instance represents
1187 INSERT/UPDATE/DELETE
1188 """
1189
1190 postfetch: Optional[List[Column[Any]]]
1191 """list of columns that can be post-fetched after INSERT or UPDATE to
1192 receive server-updated values"""
1193
1194 insert_prefetch: Sequence[Column[Any]] = ()
1195 """list of columns for which default values should be evaluated before
1196 an INSERT takes place"""
1197
1198 update_prefetch: Sequence[Column[Any]] = ()
1199 """list of columns for which onupdate default values should be evaluated
1200 before an UPDATE takes place"""
1201
1202 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1203 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1204 statement, used to receive newly generated values of columns.
1205
1206 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1207 ``returning`` collection, which was not a generalized RETURNING
1208 collection and instead was in fact specific to the "implicit returning"
1209 feature.
1210
1211 """
1212
1213 isplaintext: bool = False
1214
1215 binds: Dict[str, BindParameter[Any]]
1216 """a dictionary of bind parameter keys to BindParameter instances."""
1217
1218 bind_names: Dict[BindParameter[Any], str]
1219 """a dictionary of BindParameter instances to "compiled" names
1220 that are actually present in the generated SQL"""
1221
1222 stack: List[_CompilerStackEntry]
1223 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1224 tracked in this stack using an entry format."""
1225
1226 returning_precedes_values: bool = False
1227 """set to True classwide to generate RETURNING
1228 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1229 """
1230
1231 render_table_with_column_in_update_from: bool = False
1232 """set to True classwide to indicate the SET clause
1233 in a multi-table UPDATE statement should qualify
1234 columns with the table name (i.e. MySQL only)
1235 """
1236
1237 ansi_bind_rules: bool = False
1238 """SQL 92 doesn't allow bind parameters to be used
1239 in the columns clause of a SELECT, nor does it allow
1240 ambiguous expressions like "? = ?". A compiler
1241 subclass can set this flag to False if the target
1242 driver/DB enforces this
1243 """
1244
1245 bindtemplate: str
1246 """template to render bound parameters based on paramstyle."""
1247
1248 compilation_bindtemplate: str
1249 """template used by compiler to render parameters before positional
1250 paramstyle application"""
1251
1252 _numeric_binds_identifier_char: str
1253 """Character that's used to as the identifier of a numerical bind param.
1254 For example if this char is set to ``$``, numerical binds will be rendered
1255 in the form ``$1, $2, $3``.
1256 """
1257
1258 _result_columns: List[ResultColumnsEntry]
1259 """relates label names in the final SQL to a tuple of local
1260 column/label name, ColumnElement object (if any) and
1261 TypeEngine. CursorResult uses this for type processing and
1262 column targeting"""
1263
1264 _textual_ordered_columns: bool = False
1265 """tell the result object that the column names as rendered are important,
1266 but they are also "ordered" vs. what is in the compiled object here.
1267
1268 As of 1.4.42 this condition is only present when the statement is a
1269 TextualSelect, e.g. text("....").columns(...), where it is required
1270 that the columns are considered positionally and not by name.
1271
1272 """
1273
1274 _ad_hoc_textual: bool = False
1275 """tell the result that we encountered text() or '*' constructs in the
1276 middle of the result columns, but we also have compiled columns, so
1277 if the number of columns in cursor.description does not match how many
1278 expressions we have, that means we can't rely on positional at all and
1279 should match on name.
1280
1281 """
1282
1283 _ordered_columns: bool = True
1284 """
1285 if False, means we can't be sure the list of entries
1286 in _result_columns is actually the rendered order. Usually
1287 True unless using an unordered TextualSelect.
1288 """
1289
1290 _loose_column_name_matching: bool = False
1291 """tell the result object that the SQL statement is textual, wants to match
1292 up to Column objects, and may be using the ._tq_label in the SELECT rather
1293 than the base name.
1294
1295 """
1296
1297 _numeric_binds: bool = False
1298 """
1299 True if paramstyle is "numeric". This paramstyle is trickier than
1300 all the others.
1301
1302 """
1303
1304 _render_postcompile: bool = False
1305 """
1306 whether to render out POSTCOMPILE params during the compile phase.
1307
1308 This attribute is used only for end-user invocation of stmt.compile();
1309 it's never used for actual statement execution, where instead the
1310 dialect internals access and render the internal postcompile structure
1311 directly.
1312
1313 """
1314
1315 _post_compile_expanded_state: Optional[ExpandedState] = None
1316 """When render_postcompile is used, the ``ExpandedState`` used to create
1317 the "expanded" SQL is assigned here, and then used by the ``.params``
1318 accessor and ``.construct_params()`` methods for their return values.
1319
1320 .. versionadded:: 2.0.0rc1
1321
1322 """
1323
1324 _pre_expanded_string: Optional[str] = None
1325 """Stores the original string SQL before 'post_compile' is applied,
1326 for cases where 'post_compile' were used.
1327
1328 """
1329
1330 _pre_expanded_positiontup: Optional[List[str]] = None
1331
1332 _insertmanyvalues: Optional[_InsertManyValues] = None
1333
1334 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1335
1336 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1337 """bindparameter objects that are rendered as literal values at statement
1338 execution time.
1339
1340 """
1341
1342 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1343 """bindparameter objects that are rendered as bound parameter placeholders
1344 at statement execution time.
1345
1346 """
1347
1348 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1349 """Late escaping of bound parameter names that has to be converted
1350 to the original name when looking in the parameter dictionary.
1351
1352 """
1353
1354 has_out_parameters = False
1355 """if True, there are bindparam() objects that have the isoutparam
1356 flag set."""
1357
1358 postfetch_lastrowid = False
1359 """if True, and this in insert, use cursor.lastrowid to populate
1360 result.inserted_primary_key. """
1361
1362 _cache_key_bind_match: Optional[
1363 Tuple[
1364 Dict[
1365 BindParameter[Any],
1366 List[BindParameter[Any]],
1367 ],
1368 Dict[
1369 str,
1370 BindParameter[Any],
1371 ],
1372 ]
1373 ] = None
1374 """a mapping that will relate the BindParameter object we compile
1375 to those that are part of the extracted collection of parameters
1376 in the cache key, if we were given a cache key.
1377
1378 """
1379
1380 positiontup: Optional[List[str]] = None
1381 """for a compiled construct that uses a positional paramstyle, will be
1382 a sequence of strings, indicating the names of bound parameters in order.
1383
1384 This is used in order to render bound parameters in their correct order,
1385 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1386 render parameters.
1387
1388 This sequence always contains the unescaped name of the parameters.
1389
1390 .. seealso::
1391
1392 :ref:`faq_sql_expression_string` - includes a usage example for
1393 debugging use cases.
1394
1395 """
1396 _values_bindparam: Optional[List[str]] = None
1397
1398 _visited_bindparam: Optional[List[str]] = None
1399
1400 inline: bool = False
1401
1402 ctes: Optional[MutableMapping[CTE, str]]
1403
1404 # Detect same CTE references - Dict[(level, name), cte]
1405 # Level is required for supporting nesting
1406 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1407
1408 # To retrieve key/level in ctes_by_level_name -
1409 # Dict[cte_reference, (level, cte_name, cte_opts)]
1410 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1411
1412 ctes_recursive: bool
1413
1414 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1415 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1416 _positional_pattern = re.compile(
1417 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1418 )
1419 _collect_params: Final[bool]
1420 _collected_params: util.immutabledict[str, Any]
1421
1422 @classmethod
1423 def _init_compiler_cls(cls):
1424 cls._init_bind_translate()
1425
1426 @classmethod
1427 def _init_bind_translate(cls):
1428 reg = re.escape("".join(cls.bindname_escape_characters))
1429 cls._bind_translate_re = re.compile(f"[{reg}]")
1430 cls._bind_translate_chars = cls.bindname_escape_characters
1431
1432 def __init__(
1433 self,
1434 dialect: Dialect,
1435 statement: Optional[ClauseElement],
1436 cache_key: Optional[CacheKey] = None,
1437 column_keys: Optional[Sequence[str]] = None,
1438 for_executemany: bool = False,
1439 linting: Linting = NO_LINTING,
1440 _supporting_against: Optional[SQLCompiler] = None,
1441 **kwargs: Any,
1442 ):
1443 """Construct a new :class:`.SQLCompiler` object.
1444
1445 :param dialect: :class:`.Dialect` to be used
1446
1447 :param statement: :class:`_expression.ClauseElement` to be compiled
1448
1449 :param column_keys: a list of column names to be compiled into an
1450 INSERT or UPDATE statement.
1451
1452 :param for_executemany: whether INSERT / UPDATE statements should
1453 expect that they are to be invoked in an "executemany" style,
1454 which may impact how the statement will be expected to return the
1455 values of defaults and autoincrement / sequences and similar.
1456 Depending on the backend and driver in use, support for retrieving
1457 these values may be disabled which means SQL expressions may
1458 be rendered inline, RETURNING may not be rendered, etc.
1459
1460 :param kwargs: additional keyword arguments to be consumed by the
1461 superclass.
1462
1463 """
1464 self.column_keys = column_keys
1465
1466 self.cache_key = cache_key
1467
1468 if cache_key:
1469 cksm = {b.key: b for b in cache_key[1]}
1470 ckbm = {b: [b] for b in cache_key[1]}
1471 self._cache_key_bind_match = (ckbm, cksm)
1472
1473 # compile INSERT/UPDATE defaults/sequences to expect executemany
1474 # style execution, which may mean no pre-execute of defaults,
1475 # or no RETURNING
1476 self.for_executemany = for_executemany
1477
1478 self.linting = linting
1479
1480 # a dictionary of bind parameter keys to BindParameter
1481 # instances.
1482 self.binds = {}
1483
1484 # a dictionary of BindParameter instances to "compiled" names
1485 # that are actually present in the generated SQL
1486 self.bind_names = util.column_dict()
1487
1488 # stack which keeps track of nested SELECT statements
1489 self.stack = []
1490
1491 self._result_columns = []
1492
1493 # true if the paramstyle is positional
1494 self.positional = dialect.positional
1495 if self.positional:
1496 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1497 if nb:
1498 self._numeric_binds_identifier_char = (
1499 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1500 )
1501
1502 self.compilation_bindtemplate = _pyformat_template
1503 else:
1504 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1505
1506 self.ctes = None
1507
1508 self.label_length = (
1509 dialect.label_length or dialect.max_identifier_length
1510 )
1511
1512 # a map which tracks "anonymous" identifiers that are created on
1513 # the fly here
1514 self.anon_map = prefix_anon_map()
1515
1516 # a map which tracks "truncated" names based on
1517 # dialect.label_length or dialect.max_identifier_length
1518 self.truncated_names: Dict[Tuple[str, str], str] = {}
1519 self._truncated_counters: Dict[str, int] = {}
1520 if not cache_key:
1521 self._collect_params = True
1522 self._collected_params = util.EMPTY_DICT
1523 else:
1524 self._collect_params = False # type: ignore[misc]
1525
1526 Compiled.__init__(self, dialect, statement, **kwargs)
1527
1528 if self.isinsert or self.isupdate or self.isdelete:
1529 if TYPE_CHECKING:
1530 assert isinstance(statement, UpdateBase)
1531
1532 if self.isinsert or self.isupdate:
1533 if TYPE_CHECKING:
1534 assert isinstance(statement, ValuesBase)
1535 if statement._inline:
1536 self.inline = True
1537 elif self.for_executemany and (
1538 not self.isinsert
1539 or (
1540 self.dialect.insert_executemany_returning
1541 and statement._return_defaults
1542 )
1543 ):
1544 self.inline = True
1545
1546 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1547
1548 if _supporting_against:
1549 self.__dict__.update(
1550 {
1551 k: v
1552 for k, v in _supporting_against.__dict__.items()
1553 if k
1554 not in {
1555 "state",
1556 "dialect",
1557 "preparer",
1558 "positional",
1559 "_numeric_binds",
1560 "compilation_bindtemplate",
1561 "bindtemplate",
1562 }
1563 }
1564 )
1565
1566 if self.state is CompilerState.STRING_APPLIED:
1567 if self.positional:
1568 if self._numeric_binds:
1569 self._process_numeric()
1570 else:
1571 self._process_positional()
1572
1573 if self._render_postcompile:
1574 parameters = self.construct_params(
1575 escape_names=False,
1576 _no_postcompile=True,
1577 )
1578
1579 self._process_parameters_for_postcompile(
1580 parameters, _populate_self=True
1581 )
1582
1583 @property
1584 def insert_single_values_expr(self) -> Optional[str]:
1585 """When an INSERT is compiled with a single set of parameters inside
1586 a VALUES expression, the string is assigned here, where it can be
1587 used for insert batching schemes to rewrite the VALUES expression.
1588
1589 .. versionchanged:: 2.0 This collection is no longer used by
1590 SQLAlchemy's built-in dialects, in favor of the currently
1591 internal ``_insertmanyvalues`` collection that is used only by
1592 :class:`.SQLCompiler`.
1593
1594 """
1595 if self._insertmanyvalues is None:
1596 return None
1597 else:
1598 return self._insertmanyvalues.single_values_expr
1599
1600 @util.ro_memoized_property
1601 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1602 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1603
1604 This is either the so-called "implicit returning" columns which are
1605 calculated by the compiler on the fly, or those present based on what's
1606 present in ``self.statement._returning`` (expanded into individual
1607 columns using the ``._all_selected_columns`` attribute) i.e. those set
1608 explicitly using the :meth:`.UpdateBase.returning` method.
1609
1610 .. versionadded:: 2.0
1611
1612 """
1613 if self.implicit_returning:
1614 return self.implicit_returning
1615 elif self.statement is not None and is_dml(self.statement):
1616 return [
1617 c
1618 for c in self.statement._all_selected_columns
1619 if is_column_element(c)
1620 ]
1621
1622 else:
1623 return None
1624
1625 @property
1626 def returning(self):
1627 """backwards compatibility; returns the
1628 effective_returning collection.
1629
1630 """
1631 return self.effective_returning
1632
1633 @property
1634 def current_executable(self):
1635 """Return the current 'executable' that is being compiled.
1636
1637 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1638 :class:`_sql.Update`, :class:`_sql.Delete`,
1639 :class:`_sql.CompoundSelect` object that is being compiled.
1640 Specifically it's assigned to the ``self.stack`` list of elements.
1641
1642 When a statement like the above is being compiled, it normally
1643 is also assigned to the ``.statement`` attribute of the
1644 :class:`_sql.Compiler` object. However, all SQL constructs are
1645 ultimately nestable, and this attribute should never be consulted
1646 by a ``visit_`` method, as it is not guaranteed to be assigned
1647 nor guaranteed to correspond to the current statement being compiled.
1648
1649 """
1650 try:
1651 return self.stack[-1]["selectable"]
1652 except IndexError as ie:
1653 raise IndexError("Compiler does not have a stack entry") from ie
1654
1655 @property
1656 def prefetch(self):
1657 return list(self.insert_prefetch) + list(self.update_prefetch)
1658
1659 @util.memoized_property
1660 def _global_attributes(self) -> Dict[Any, Any]:
1661 return {}
1662
1663 def _add_to_params(self, item: ExecutableStatement) -> None:
1664 # assumes that this is called before traversing the statement
1665 # so the call happens outer to inner, meaning that existing params
1666 # take precedence
1667 if item._params:
1668 self._collected_params = item._params | self._collected_params
1669
1670 @util.memoized_instancemethod
1671 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1672 """Initialize collections related to CTEs only if
1673 a CTE is located, to save on the overhead of
1674 these collections otherwise.
1675
1676 """
1677 # collect CTEs to tack on top of a SELECT
1678 # To store the query to print - Dict[cte, text_query]
1679 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1680 self.ctes = ctes
1681
1682 # Detect same CTE references - Dict[(level, name), cte]
1683 # Level is required for supporting nesting
1684 self.ctes_by_level_name = {}
1685
1686 # To retrieve key/level in ctes_by_level_name -
1687 # Dict[cte_reference, (level, cte_name, cte_opts)]
1688 self.level_name_by_cte = {}
1689
1690 self.ctes_recursive = False
1691
1692 return ctes
1693
1694 @contextlib.contextmanager
1695 def _nested_result(self):
1696 """special API to support the use case of 'nested result sets'"""
1697 result_columns, ordered_columns = (
1698 self._result_columns,
1699 self._ordered_columns,
1700 )
1701 self._result_columns, self._ordered_columns = [], False
1702
1703 try:
1704 if self.stack:
1705 entry = self.stack[-1]
1706 entry["need_result_map_for_nested"] = True
1707 else:
1708 entry = None
1709 yield self._result_columns, self._ordered_columns
1710 finally:
1711 if entry:
1712 entry.pop("need_result_map_for_nested")
1713 self._result_columns, self._ordered_columns = (
1714 result_columns,
1715 ordered_columns,
1716 )
1717
1718 def _process_positional(self):
1719 assert not self.positiontup
1720 assert self.state is CompilerState.STRING_APPLIED
1721 assert not self._numeric_binds
1722
1723 if self.dialect.paramstyle == "format":
1724 placeholder = "%s"
1725 else:
1726 assert self.dialect.paramstyle == "qmark"
1727 placeholder = "?"
1728
1729 positions = []
1730
1731 def find_position(m: re.Match[str]) -> str:
1732 normal_bind = m.group(1)
1733 if normal_bind:
1734 positions.append(normal_bind)
1735 return placeholder
1736 else:
1737 # this a post-compile bind
1738 positions.append(m.group(2))
1739 return m.group(0)
1740
1741 self.string = re.sub(
1742 self._positional_pattern, find_position, self.string
1743 )
1744
1745 if self.escaped_bind_names:
1746 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1747 assert len(self.escaped_bind_names) == len(reverse_escape)
1748 self.positiontup = [
1749 reverse_escape.get(name, name) for name in positions
1750 ]
1751 else:
1752 self.positiontup = positions
1753
1754 if self._insertmanyvalues:
1755 positions = []
1756
1757 single_values_expr = re.sub(
1758 self._positional_pattern,
1759 find_position,
1760 self._insertmanyvalues.single_values_expr,
1761 )
1762 insert_crud_params = [
1763 (
1764 v[0],
1765 v[1],
1766 re.sub(self._positional_pattern, find_position, v[2]),
1767 v[3],
1768 )
1769 for v in self._insertmanyvalues.insert_crud_params
1770 ]
1771
1772 self._insertmanyvalues = self._insertmanyvalues._replace(
1773 single_values_expr=single_values_expr,
1774 insert_crud_params=insert_crud_params,
1775 )
1776
1777 def _process_numeric(self):
1778 assert self._numeric_binds
1779 assert self.state is CompilerState.STRING_APPLIED
1780
1781 num = 1
1782 param_pos: Dict[str, str] = {}
1783 order: Iterable[str]
1784 if self._insertmanyvalues and self._values_bindparam is not None:
1785 # bindparams that are not in values are always placed first.
1786 # this avoids the need of changing them when using executemany
1787 # values () ()
1788 order = itertools.chain(
1789 (
1790 name
1791 for name in self.bind_names.values()
1792 if name not in self._values_bindparam
1793 ),
1794 self.bind_names.values(),
1795 )
1796 else:
1797 order = self.bind_names.values()
1798
1799 for bind_name in order:
1800 if bind_name in param_pos:
1801 continue
1802 bind = self.binds[bind_name]
1803 if (
1804 bind in self.post_compile_params
1805 or bind in self.literal_execute_params
1806 ):
1807 # set to None to just mark the in positiontup, it will not
1808 # be replaced below.
1809 param_pos[bind_name] = None # type: ignore
1810 else:
1811 ph = f"{self._numeric_binds_identifier_char}{num}"
1812 num += 1
1813 param_pos[bind_name] = ph
1814
1815 self.next_numeric_pos = num
1816
1817 self.positiontup = list(param_pos)
1818 if self.escaped_bind_names:
1819 len_before = len(param_pos)
1820 param_pos = {
1821 self.escaped_bind_names.get(name, name): pos
1822 for name, pos in param_pos.items()
1823 }
1824 assert len(param_pos) == len_before
1825
1826 # Can't use format here since % chars are not escaped.
1827 self.string = self._pyformat_pattern.sub(
1828 lambda m: param_pos[m.group(1)], self.string
1829 )
1830
1831 if self._insertmanyvalues:
1832 single_values_expr = (
1833 # format is ok here since single_values_expr includes only
1834 # place-holders
1835 self._insertmanyvalues.single_values_expr
1836 % param_pos
1837 )
1838 insert_crud_params = [
1839 (v[0], v[1], "%s", v[3])
1840 for v in self._insertmanyvalues.insert_crud_params
1841 ]
1842
1843 self._insertmanyvalues = self._insertmanyvalues._replace(
1844 # This has the numbers (:1, :2)
1845 single_values_expr=single_values_expr,
1846 # The single binds are instead %s so they can be formatted
1847 insert_crud_params=insert_crud_params,
1848 )
1849
1850 @util.memoized_property
1851 def _bind_processors(
1852 self,
1853 ) -> MutableMapping[
1854 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1855 ]:
1856 # mypy is not able to see the two value types as the above Union,
1857 # it just sees "object". don't know how to resolve
1858 return {
1859 key: value # type: ignore
1860 for key, value in (
1861 (
1862 self.bind_names[bindparam],
1863 (
1864 bindparam.type._cached_bind_processor(self.dialect)
1865 if not bindparam.type._is_tuple_type
1866 else tuple(
1867 elem_type._cached_bind_processor(self.dialect)
1868 for elem_type in cast(
1869 TupleType, bindparam.type
1870 ).types
1871 )
1872 ),
1873 )
1874 for bindparam in self.bind_names
1875 )
1876 if value is not None
1877 }
1878
1879 def is_subquery(self):
1880 return len(self.stack) > 1
1881
1882 @property
1883 def sql_compiler(self) -> Self:
1884 return self
1885
1886 def construct_expanded_state(
1887 self,
1888 params: Optional[_CoreSingleExecuteParams] = None,
1889 escape_names: bool = True,
1890 ) -> ExpandedState:
1891 """Return a new :class:`.ExpandedState` for a given parameter set.
1892
1893 For queries that use "expanding" or other late-rendered parameters,
1894 this method will provide for both the finalized SQL string as well
1895 as the parameters that would be used for a particular parameter set.
1896
1897 .. versionadded:: 2.0.0rc1
1898
1899 """
1900 parameters = self.construct_params(
1901 params,
1902 escape_names=escape_names,
1903 _no_postcompile=True,
1904 )
1905 return self._process_parameters_for_postcompile(
1906 parameters,
1907 )
1908
1909 def construct_params(
1910 self,
1911 params: Optional[_CoreSingleExecuteParams] = None,
1912 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1913 escape_names: bool = True,
1914 _group_number: Optional[int] = None,
1915 _check: bool = True,
1916 _no_postcompile: bool = False,
1917 _collected_params: _CoreSingleExecuteParams | None = None,
1918 ) -> _MutableCoreSingleExecuteParams:
1919 """return a dictionary of bind parameter keys and values"""
1920 if _collected_params is not None:
1921 assert not self._collect_params
1922 elif self._collect_params:
1923 _collected_params = self._collected_params
1924
1925 if _collected_params:
1926 if not params:
1927 params = _collected_params
1928 else:
1929 params = {**_collected_params, **params}
1930
1931 if self._render_postcompile and not _no_postcompile:
1932 assert self._post_compile_expanded_state is not None
1933 if not params:
1934 return dict(self._post_compile_expanded_state.parameters)
1935 else:
1936 raise exc.InvalidRequestError(
1937 "can't construct new parameters when render_postcompile "
1938 "is used; the statement is hard-linked to the original "
1939 "parameters. Use construct_expanded_state to generate a "
1940 "new statement and parameters."
1941 )
1942
1943 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1944
1945 if extracted_parameters:
1946 # related the bound parameters collected in the original cache key
1947 # to those collected in the incoming cache key. They will not have
1948 # matching names but they will line up positionally in the same
1949 # way. The parameters present in self.bind_names may be clones of
1950 # these original cache key params in the case of DML but the .key
1951 # will be guaranteed to match.
1952 if self.cache_key is None:
1953 raise exc.CompileError(
1954 "This compiled object has no original cache key; "
1955 "can't pass extracted_parameters to construct_params"
1956 )
1957 else:
1958 orig_extracted = self.cache_key[1]
1959
1960 ckbm_tuple = self._cache_key_bind_match
1961 assert ckbm_tuple is not None
1962 ckbm, _ = ckbm_tuple
1963 resolved_extracted = {
1964 bind: extracted
1965 for b, extracted in zip(orig_extracted, extracted_parameters)
1966 for bind in ckbm[b]
1967 }
1968 else:
1969 resolved_extracted = None
1970
1971 if params:
1972 pd = {}
1973 for bindparam, name in self.bind_names.items():
1974 escaped_name = (
1975 self.escaped_bind_names.get(name, name)
1976 if has_escaped_names
1977 else name
1978 )
1979
1980 if bindparam.key in params:
1981 pd[escaped_name] = params[bindparam.key]
1982 elif name in params:
1983 pd[escaped_name] = params[name]
1984
1985 elif _check and bindparam.required:
1986 if _group_number:
1987 raise exc.InvalidRequestError(
1988 "A value is required for bind parameter %r, "
1989 "in parameter group %d"
1990 % (bindparam.key, _group_number),
1991 code="cd3x",
1992 )
1993 else:
1994 raise exc.InvalidRequestError(
1995 "A value is required for bind parameter %r"
1996 % bindparam.key,
1997 code="cd3x",
1998 )
1999 else:
2000 if resolved_extracted:
2001 value_param = resolved_extracted.get(
2002 bindparam, bindparam
2003 )
2004 else:
2005 value_param = bindparam
2006
2007 if bindparam.callable:
2008 pd[escaped_name] = value_param.effective_value
2009 else:
2010 pd[escaped_name] = value_param.value
2011 return pd
2012 else:
2013 pd = {}
2014 for bindparam, name in self.bind_names.items():
2015 escaped_name = (
2016 self.escaped_bind_names.get(name, name)
2017 if has_escaped_names
2018 else name
2019 )
2020
2021 if _check and bindparam.required:
2022 if _group_number:
2023 raise exc.InvalidRequestError(
2024 "A value is required for bind parameter %r, "
2025 "in parameter group %d"
2026 % (bindparam.key, _group_number),
2027 code="cd3x",
2028 )
2029 else:
2030 raise exc.InvalidRequestError(
2031 "A value is required for bind parameter %r"
2032 % bindparam.key,
2033 code="cd3x",
2034 )
2035
2036 if resolved_extracted:
2037 value_param = resolved_extracted.get(bindparam, bindparam)
2038 else:
2039 value_param = bindparam
2040
2041 if bindparam.callable:
2042 pd[escaped_name] = value_param.effective_value
2043 else:
2044 pd[escaped_name] = value_param.value
2045
2046 return pd
2047
2048 @util.memoized_instancemethod
2049 def _get_set_input_sizes_lookup(self):
2050 dialect = self.dialect
2051
2052 include_types = dialect.include_set_input_sizes
2053 exclude_types = dialect.exclude_set_input_sizes
2054
2055 dbapi = dialect.dbapi
2056
2057 def lookup_type(typ):
2058 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2059
2060 if (
2061 dbtype is not None
2062 and (exclude_types is None or dbtype not in exclude_types)
2063 and (include_types is None or dbtype in include_types)
2064 ):
2065 return dbtype
2066 else:
2067 return None
2068
2069 inputsizes = {}
2070
2071 literal_execute_params = self.literal_execute_params
2072
2073 for bindparam in self.bind_names:
2074 if bindparam in literal_execute_params:
2075 continue
2076
2077 if bindparam.type._is_tuple_type:
2078 inputsizes[bindparam] = [
2079 lookup_type(typ)
2080 for typ in cast(TupleType, bindparam.type).types
2081 ]
2082 else:
2083 inputsizes[bindparam] = lookup_type(bindparam.type)
2084
2085 return inputsizes
2086
2087 @property
2088 def params(self):
2089 """Return the bind param dictionary embedded into this
2090 compiled object, for those values that are present.
2091
2092 .. seealso::
2093
2094 :ref:`faq_sql_expression_string` - includes a usage example for
2095 debugging use cases.
2096
2097 """
2098 return self.construct_params(_check=False)
2099
2100 def _process_parameters_for_postcompile(
2101 self,
2102 parameters: _MutableCoreSingleExecuteParams,
2103 _populate_self: bool = False,
2104 ) -> ExpandedState:
2105 """handle special post compile parameters.
2106
2107 These include:
2108
2109 * "expanding" parameters -typically IN tuples that are rendered
2110 on a per-parameter basis for an otherwise fixed SQL statement string.
2111
2112 * literal_binds compiled with the literal_execute flag. Used for
2113 things like SQL Server "TOP N" where the driver does not accommodate
2114 N as a bound parameter.
2115
2116 """
2117
2118 expanded_parameters = {}
2119 new_positiontup: Optional[List[str]]
2120
2121 pre_expanded_string = self._pre_expanded_string
2122 if pre_expanded_string is None:
2123 pre_expanded_string = self.string
2124
2125 if self.positional:
2126 new_positiontup = []
2127
2128 pre_expanded_positiontup = self._pre_expanded_positiontup
2129 if pre_expanded_positiontup is None:
2130 pre_expanded_positiontup = self.positiontup
2131
2132 else:
2133 new_positiontup = pre_expanded_positiontup = None
2134
2135 processors = self._bind_processors
2136 single_processors = cast(
2137 "Mapping[str, _BindProcessorType[Any]]", processors
2138 )
2139 tuple_processors = cast(
2140 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2141 )
2142
2143 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2144
2145 replacement_expressions: Dict[str, Any] = {}
2146 to_update_sets: Dict[str, Any] = {}
2147
2148 # notes:
2149 # *unescaped* parameter names in:
2150 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2151 #
2152 # *escaped* parameter names in:
2153 # construct_params(), replacement_expressions
2154
2155 numeric_positiontup: Optional[List[str]] = None
2156
2157 if self.positional and pre_expanded_positiontup is not None:
2158 names: Iterable[str] = pre_expanded_positiontup
2159 if self._numeric_binds:
2160 numeric_positiontup = []
2161 else:
2162 names = self.bind_names.values()
2163
2164 ebn = self.escaped_bind_names
2165 for name in names:
2166 escaped_name = ebn.get(name, name) if ebn else name
2167 parameter = self.binds[name]
2168
2169 if parameter in self.literal_execute_params:
2170 if escaped_name not in replacement_expressions:
2171 replacement_expressions[escaped_name] = (
2172 self.render_literal_bindparam(
2173 parameter,
2174 render_literal_value=parameters.pop(escaped_name),
2175 )
2176 )
2177 continue
2178
2179 if parameter in self.post_compile_params:
2180 if escaped_name in replacement_expressions:
2181 to_update = to_update_sets[escaped_name]
2182 values = None
2183 else:
2184 # we are removing the parameter from parameters
2185 # because it is a list value, which is not expected by
2186 # TypeEngine objects that would otherwise be asked to
2187 # process it. the single name is being replaced with
2188 # individual numbered parameters for each value in the
2189 # param.
2190 #
2191 # note we are also inserting *escaped* parameter names
2192 # into the given dictionary. default dialect will
2193 # use these param names directly as they will not be
2194 # in the escaped_bind_names dictionary.
2195 values = parameters.pop(name)
2196
2197 leep_res = self._literal_execute_expanding_parameter(
2198 escaped_name, parameter, values
2199 )
2200 to_update, replacement_expr = leep_res
2201
2202 to_update_sets[escaped_name] = to_update
2203 replacement_expressions[escaped_name] = replacement_expr
2204
2205 if not parameter.literal_execute:
2206 parameters.update(to_update)
2207 if parameter.type._is_tuple_type:
2208 assert values is not None
2209 new_processors.update(
2210 (
2211 "%s_%s_%s" % (name, i, j),
2212 tuple_processors[name][j - 1],
2213 )
2214 for i, tuple_element in enumerate(values, 1)
2215 for j, _ in enumerate(tuple_element, 1)
2216 if name in tuple_processors
2217 and tuple_processors[name][j - 1] is not None
2218 )
2219 else:
2220 new_processors.update(
2221 (key, single_processors[name])
2222 for key, _ in to_update
2223 if name in single_processors
2224 )
2225 if numeric_positiontup is not None:
2226 numeric_positiontup.extend(
2227 name for name, _ in to_update
2228 )
2229 elif new_positiontup is not None:
2230 # to_update has escaped names, but that's ok since
2231 # these are new names, that aren't in the
2232 # escaped_bind_names dict.
2233 new_positiontup.extend(name for name, _ in to_update)
2234 expanded_parameters[name] = [
2235 expand_key for expand_key, _ in to_update
2236 ]
2237 elif new_positiontup is not None:
2238 new_positiontup.append(name)
2239
2240 def process_expanding(m):
2241 key = m.group(1)
2242 expr = replacement_expressions[key]
2243
2244 # if POSTCOMPILE included a bind_expression, render that
2245 # around each element
2246 if m.group(2):
2247 tok = m.group(2).split("~~")
2248 be_left, be_right = tok[1], tok[3]
2249 expr = ", ".join(
2250 "%s%s%s" % (be_left, exp, be_right)
2251 for exp in expr.split(", ")
2252 )
2253 return expr
2254
2255 statement = re.sub(
2256 self._post_compile_pattern, process_expanding, pre_expanded_string
2257 )
2258
2259 if numeric_positiontup is not None:
2260 assert new_positiontup is not None
2261 param_pos = {
2262 key: f"{self._numeric_binds_identifier_char}{num}"
2263 for num, key in enumerate(
2264 numeric_positiontup, self.next_numeric_pos
2265 )
2266 }
2267 # Can't use format here since % chars are not escaped.
2268 statement = self._pyformat_pattern.sub(
2269 lambda m: param_pos[m.group(1)], statement
2270 )
2271 new_positiontup.extend(numeric_positiontup)
2272
2273 expanded_state = ExpandedState(
2274 statement,
2275 parameters,
2276 new_processors,
2277 new_positiontup,
2278 expanded_parameters,
2279 )
2280
2281 if _populate_self:
2282 # this is for the "render_postcompile" flag, which is not
2283 # otherwise used internally and is for end-user debugging and
2284 # special use cases.
2285 self._pre_expanded_string = pre_expanded_string
2286 self._pre_expanded_positiontup = pre_expanded_positiontup
2287 self.string = expanded_state.statement
2288 self.positiontup = (
2289 list(expanded_state.positiontup or ())
2290 if self.positional
2291 else None
2292 )
2293 self._post_compile_expanded_state = expanded_state
2294
2295 return expanded_state
2296
2297 @util.preload_module("sqlalchemy.engine.cursor")
2298 def _create_result_map(self):
2299 """utility method used for unit tests only."""
2300 cursor = util.preloaded.engine_cursor
2301 return cursor.CursorResultMetaData._create_description_match_map(
2302 self._result_columns
2303 )
2304
2305 # assigned by crud.py for insert/update statements
2306 _get_bind_name_for_col: _BindNameForColProtocol
2307
2308 @util.memoized_property
2309 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2310 getter = self._get_bind_name_for_col
2311 return getter
2312
2313 @util.memoized_property
2314 @util.preload_module("sqlalchemy.engine.result")
2315 def _inserted_primary_key_from_lastrowid_getter(self):
2316 result = util.preloaded.engine_result
2317
2318 param_key_getter = self._within_exec_param_key_getter
2319
2320 assert self.compile_state is not None
2321 statement = self.compile_state.statement
2322
2323 if TYPE_CHECKING:
2324 assert isinstance(statement, Insert)
2325
2326 table = statement.table
2327
2328 getters = [
2329 (operator.methodcaller("get", param_key_getter(col), None), col)
2330 for col in table.primary_key
2331 ]
2332
2333 autoinc_getter = None
2334 autoinc_col = table._autoincrement_column
2335 if autoinc_col is not None:
2336 # apply type post processors to the lastrowid
2337 lastrowid_processor = autoinc_col.type._cached_result_processor(
2338 self.dialect, None
2339 )
2340 autoinc_key = param_key_getter(autoinc_col)
2341
2342 # if a bind value is present for the autoincrement column
2343 # in the parameters, we need to do the logic dictated by
2344 # #7998; honor a non-None user-passed parameter over lastrowid.
2345 # previously in the 1.4 series we weren't fetching lastrowid
2346 # at all if the key were present in the parameters
2347 if autoinc_key in self.binds:
2348
2349 def _autoinc_getter(lastrowid, parameters):
2350 param_value = parameters.get(autoinc_key, lastrowid)
2351 if param_value is not None:
2352 # they supplied non-None parameter, use that.
2353 # SQLite at least is observed to return the wrong
2354 # cursor.lastrowid for INSERT..ON CONFLICT so it
2355 # can't be used in all cases
2356 return param_value
2357 else:
2358 # use lastrowid
2359 return lastrowid
2360
2361 # work around mypy https://github.com/python/mypy/issues/14027
2362 autoinc_getter = _autoinc_getter
2363
2364 else:
2365 lastrowid_processor = None
2366
2367 row_fn = result.result_tuple([col.key for col in table.primary_key])
2368
2369 def get(lastrowid, parameters):
2370 """given cursor.lastrowid value and the parameters used for INSERT,
2371 return a "row" that represents the primary key, either by
2372 using the "lastrowid" or by extracting values from the parameters
2373 that were sent along with the INSERT.
2374
2375 """
2376 if lastrowid_processor is not None:
2377 lastrowid = lastrowid_processor(lastrowid)
2378
2379 if lastrowid is None:
2380 return row_fn(getter(parameters) for getter, col in getters)
2381 else:
2382 return row_fn(
2383 (
2384 (
2385 autoinc_getter(lastrowid, parameters)
2386 if autoinc_getter is not None
2387 else lastrowid
2388 )
2389 if col is autoinc_col
2390 else getter(parameters)
2391 )
2392 for getter, col in getters
2393 )
2394
2395 return get
2396
2397 @util.memoized_property
2398 @util.preload_module("sqlalchemy.engine.result")
2399 def _inserted_primary_key_from_returning_getter(self):
2400 result = util.preloaded.engine_result
2401
2402 assert self.compile_state is not None
2403 statement = self.compile_state.statement
2404
2405 if TYPE_CHECKING:
2406 assert isinstance(statement, Insert)
2407
2408 param_key_getter = self._within_exec_param_key_getter
2409 table = statement.table
2410
2411 returning = self.implicit_returning
2412 assert returning is not None
2413 ret = {col: idx for idx, col in enumerate(returning)}
2414
2415 getters = cast(
2416 "List[Tuple[Callable[[Any], Any], bool]]",
2417 [
2418 (
2419 (operator.itemgetter(ret[col]), True)
2420 if col in ret
2421 else (
2422 operator.methodcaller(
2423 "get", param_key_getter(col), None
2424 ),
2425 False,
2426 )
2427 )
2428 for col in table.primary_key
2429 ],
2430 )
2431
2432 row_fn = result.result_tuple([col.key for col in table.primary_key])
2433
2434 def get(row, parameters):
2435 return row_fn(
2436 getter(row) if use_row else getter(parameters)
2437 for getter, use_row in getters
2438 )
2439
2440 return get
2441
2442 def default_from(self) -> str:
2443 """Called when a SELECT statement has no froms, and no FROM clause is
2444 to be appended.
2445
2446 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2447 output.
2448
2449 """
2450 return ""
2451
2452 def visit_override_binds(self, override_binds, **kw):
2453 """SQL compile the nested element of an _OverrideBinds with
2454 bindparams swapped out.
2455
2456 The _OverrideBinds is not normally expected to be compiled; it
2457 is meant to be used when an already cached statement is to be used,
2458 the compilation was already performed, and only the bound params should
2459 be swapped in at execution time.
2460
2461 However, there are test cases that exericise this object, and
2462 additionally the ORM subquery loader is known to feed in expressions
2463 which include this construct into new queries (discovered in #11173),
2464 so it has to do the right thing at compile time as well.
2465
2466 """
2467
2468 # get SQL text first
2469 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2470
2471 # for a test compile that is not for caching, change binds after the
2472 # fact. note that we don't try to
2473 # swap the bindparam as we compile, because our element may be
2474 # elsewhere in the statement already (e.g. a subquery or perhaps a
2475 # CTE) and was already visited / compiled. See
2476 # test_relationship_criteria.py ->
2477 # test_selectinload_local_criteria_subquery
2478 for k in override_binds.translate:
2479 if k not in self.binds:
2480 continue
2481 bp = self.binds[k]
2482
2483 # so this would work, just change the value of bp in place.
2484 # but we dont want to mutate things outside.
2485 # bp.value = override_binds.translate[bp.key]
2486 # continue
2487
2488 # instead, need to replace bp with new_bp or otherwise accommodate
2489 # in all internal collections
2490 new_bp = bp._with_value(
2491 override_binds.translate[bp.key],
2492 maintain_key=True,
2493 required=False,
2494 )
2495
2496 name = self.bind_names[bp]
2497 self.binds[k] = self.binds[name] = new_bp
2498 self.bind_names[new_bp] = name
2499 self.bind_names.pop(bp, None)
2500
2501 if bp in self.post_compile_params:
2502 self.post_compile_params |= {new_bp}
2503 if bp in self.literal_execute_params:
2504 self.literal_execute_params |= {new_bp}
2505
2506 ckbm_tuple = self._cache_key_bind_match
2507 if ckbm_tuple:
2508 ckbm, cksm = ckbm_tuple
2509 for bp in bp._cloned_set:
2510 if bp.key in cksm:
2511 cb = cksm[bp.key]
2512 ckbm[cb].append(new_bp)
2513
2514 return sqltext
2515
2516 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2517 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2518
2519 def visit_select_statement_grouping(self, grouping, **kwargs):
2520 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2521
2522 def visit_label_reference(
2523 self, element, within_columns_clause=False, **kwargs
2524 ):
2525 if self.stack and self.dialect.supports_simple_order_by_label:
2526 try:
2527 compile_state = cast(
2528 "Union[SelectState, CompoundSelectState]",
2529 self.stack[-1]["compile_state"],
2530 )
2531 except KeyError as ke:
2532 raise exc.CompileError(
2533 "Can't resolve label reference for ORDER BY / "
2534 "GROUP BY / DISTINCT etc."
2535 ) from ke
2536
2537 (
2538 with_cols,
2539 only_froms,
2540 only_cols,
2541 ) = compile_state._label_resolve_dict
2542 if within_columns_clause:
2543 resolve_dict = only_froms
2544 else:
2545 resolve_dict = only_cols
2546
2547 # this can be None in the case that a _label_reference()
2548 # were subject to a replacement operation, in which case
2549 # the replacement of the Label element may have changed
2550 # to something else like a ColumnClause expression.
2551 order_by_elem = element.element._order_by_label_element
2552
2553 if (
2554 order_by_elem is not None
2555 and order_by_elem.name in resolve_dict
2556 and order_by_elem.shares_lineage(
2557 resolve_dict[order_by_elem.name]
2558 )
2559 ):
2560 kwargs["render_label_as_label"] = (
2561 element.element._order_by_label_element
2562 )
2563 return self.process(
2564 element.element,
2565 within_columns_clause=within_columns_clause,
2566 **kwargs,
2567 )
2568
2569 def visit_textual_label_reference(
2570 self, element, within_columns_clause=False, **kwargs
2571 ):
2572 if not self.stack:
2573 # compiling the element outside of the context of a SELECT
2574 return self.process(element._text_clause)
2575
2576 try:
2577 compile_state = cast(
2578 "Union[SelectState, CompoundSelectState]",
2579 self.stack[-1]["compile_state"],
2580 )
2581 except KeyError as ke:
2582 coercions._no_text_coercion(
2583 element.element,
2584 extra=(
2585 "Can't resolve label reference for ORDER BY / "
2586 "GROUP BY / DISTINCT etc."
2587 ),
2588 exc_cls=exc.CompileError,
2589 err=ke,
2590 )
2591
2592 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2593 try:
2594 if within_columns_clause:
2595 col = only_froms[element.element]
2596 else:
2597 col = with_cols[element.element]
2598 except KeyError as err:
2599 coercions._no_text_coercion(
2600 element.element,
2601 extra=(
2602 "Can't resolve label reference for ORDER BY / "
2603 "GROUP BY / DISTINCT etc."
2604 ),
2605 exc_cls=exc.CompileError,
2606 err=err,
2607 )
2608 else:
2609 kwargs["render_label_as_label"] = col
2610 return self.process(
2611 col, within_columns_clause=within_columns_clause, **kwargs
2612 )
2613
2614 def visit_label(
2615 self,
2616 label,
2617 add_to_result_map=None,
2618 within_label_clause=False,
2619 within_columns_clause=False,
2620 render_label_as_label=None,
2621 result_map_targets=(),
2622 within_tstring=False,
2623 **kw,
2624 ):
2625 if within_tstring:
2626 raise exc.CompileError(
2627 "Using label() directly inside tstring is not supported "
2628 "as it is ambiguous how the label expression should be "
2629 "rendered without knowledge of how it's being used in SQL"
2630 )
2631 # only render labels within the columns clause
2632 # or ORDER BY clause of a select. dialect-specific compilers
2633 # can modify this behavior.
2634 render_label_with_as = (
2635 within_columns_clause and not within_label_clause
2636 )
2637 render_label_only = render_label_as_label is label
2638
2639 if render_label_only or render_label_with_as:
2640 if isinstance(label.name, elements._truncated_label):
2641 labelname = self._truncated_identifier("colident", label.name)
2642 else:
2643 labelname = label.name
2644
2645 if render_label_with_as:
2646 if add_to_result_map is not None:
2647 add_to_result_map(
2648 labelname,
2649 label.name,
2650 (label, labelname) + label._alt_names + result_map_targets,
2651 label.type,
2652 )
2653 return (
2654 label.element._compiler_dispatch(
2655 self,
2656 within_columns_clause=True,
2657 within_label_clause=True,
2658 **kw,
2659 )
2660 + OPERATORS[operators.as_]
2661 + self.preparer.format_label(label, labelname)
2662 )
2663 elif render_label_only:
2664 return self.preparer.format_label(label, labelname)
2665 else:
2666 return label.element._compiler_dispatch(
2667 self, within_columns_clause=False, **kw
2668 )
2669
2670 def _fallback_column_name(self, column):
2671 raise exc.CompileError(
2672 "Cannot compile Column object until its 'name' is assigned."
2673 )
2674
2675 def visit_lambda_element(self, element, **kw):
2676 sql_element = element._resolved
2677 return self.process(sql_element, **kw)
2678
2679 def visit_column(
2680 self,
2681 column: ColumnClause[Any],
2682 add_to_result_map: Optional[_ResultMapAppender] = None,
2683 include_table: bool = True,
2684 result_map_targets: Tuple[Any, ...] = (),
2685 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2686 **kwargs: Any,
2687 ) -> str:
2688 name = orig_name = column.name
2689 if name is None:
2690 name = self._fallback_column_name(column)
2691
2692 is_literal = column.is_literal
2693 if not is_literal and isinstance(name, elements._truncated_label):
2694 name = self._truncated_identifier("colident", name)
2695
2696 if add_to_result_map is not None:
2697 targets = (column, name, column.key) + result_map_targets
2698 if column._tq_label:
2699 targets += (column._tq_label,)
2700
2701 add_to_result_map(name, orig_name, targets, column.type)
2702
2703 if is_literal:
2704 # note we are not currently accommodating for
2705 # literal_column(quoted_name('ident', True)) here
2706 name = self.escape_literal_column(name)
2707 else:
2708 name = self.preparer.quote(name)
2709 table = column.table
2710 if table is None or not include_table or not table.named_with_column:
2711 return name
2712 else:
2713 effective_schema = self.preparer.schema_for_object(table)
2714
2715 if effective_schema:
2716 schema_prefix = (
2717 self.preparer.quote_schema(effective_schema) + "."
2718 )
2719 else:
2720 schema_prefix = ""
2721
2722 if TYPE_CHECKING:
2723 assert isinstance(table, NamedFromClause)
2724 tablename = table.name
2725
2726 if (
2727 not effective_schema
2728 and ambiguous_table_name_map
2729 and tablename in ambiguous_table_name_map
2730 ):
2731 tablename = ambiguous_table_name_map[tablename]
2732
2733 if isinstance(tablename, elements._truncated_label):
2734 tablename = self._truncated_identifier("alias", tablename)
2735
2736 return schema_prefix + self.preparer.quote(tablename) + "." + name
2737
2738 def visit_collation(self, element, **kw):
2739 return self.preparer.format_collation(element.collation)
2740
2741 def visit_fromclause(self, fromclause, **kwargs):
2742 return fromclause.name
2743
2744 def visit_index(self, index, **kwargs):
2745 return index.name
2746
2747 def visit_typeclause(self, typeclause, **kw):
2748 kw["type_expression"] = typeclause
2749 kw["identifier_preparer"] = self.preparer
2750 return self.dialect.type_compiler_instance.process(
2751 typeclause.type, **kw
2752 )
2753
2754 def post_process_text(self, text):
2755 if self.preparer._double_percents:
2756 text = text.replace("%", "%%")
2757 return text
2758
2759 def escape_literal_column(self, text):
2760 if self.preparer._double_percents:
2761 text = text.replace("%", "%%")
2762 return text
2763
2764 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2765 if self._collect_params:
2766 self._add_to_params(textclause)
2767
2768 def do_bindparam(m):
2769 name = m.group(1)
2770 if name in textclause._bindparams:
2771 return self.process(textclause._bindparams[name], **kw)
2772 else:
2773 return self.bindparam_string(name, **kw)
2774
2775 if not self.stack:
2776 self.isplaintext = True
2777
2778 if add_to_result_map:
2779 # text() object is present in the columns clause of a
2780 # select(). Add a no-name entry to the result map so that
2781 # row[text()] produces a result
2782 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2783
2784 # un-escape any \:params
2785 return BIND_PARAMS_ESC.sub(
2786 lambda m: m.group(1),
2787 BIND_PARAMS.sub(
2788 do_bindparam, self.post_process_text(textclause.text)
2789 ),
2790 )
2791
2792 def visit_tstring(self, tstring, add_to_result_map=None, **kw):
2793 if self._collect_params:
2794 self._add_to_params(tstring)
2795
2796 if not self.stack:
2797 self.isplaintext = True
2798
2799 if add_to_result_map:
2800 # tstring() object is present in the columns clause of a
2801 # select(). Add a no-name entry to the result map so that
2802 # row[tstring()] produces a result
2803 add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
2804
2805 # Process each part and concatenate
2806 kw["within_tstring"] = True
2807 return "".join(self.process(part, **kw) for part in tstring.parts)
2808
2809 def visit_textual_select(
2810 self, taf, compound_index=None, asfrom=False, **kw
2811 ):
2812 if self._collect_params:
2813 self._add_to_params(taf)
2814 toplevel = not self.stack
2815 entry = self._default_stack_entry if toplevel else self.stack[-1]
2816
2817 new_entry: _CompilerStackEntry = {
2818 "correlate_froms": set(),
2819 "asfrom_froms": set(),
2820 "selectable": taf,
2821 }
2822 self.stack.append(new_entry)
2823
2824 if taf._independent_ctes:
2825 self._dispatch_independent_ctes(taf, kw)
2826
2827 populate_result_map = (
2828 toplevel
2829 or (
2830 compound_index == 0
2831 and entry.get("need_result_map_for_compound", False)
2832 )
2833 or entry.get("need_result_map_for_nested", False)
2834 )
2835
2836 if populate_result_map:
2837 self._ordered_columns = self._textual_ordered_columns = (
2838 taf.positional
2839 )
2840
2841 # enable looser result column matching when the SQL text links to
2842 # Column objects by name only
2843 self._loose_column_name_matching = not taf.positional and bool(
2844 taf.column_args
2845 )
2846
2847 for c in taf.column_args:
2848 self.process(
2849 c,
2850 within_columns_clause=True,
2851 add_to_result_map=self._add_to_result_map,
2852 )
2853
2854 text = self.process(taf.element, **kw)
2855 if self.ctes:
2856 nesting_level = len(self.stack) if not toplevel else None
2857 text = self._render_cte_clause(nesting_level=nesting_level) + text
2858
2859 self.stack.pop(-1)
2860
2861 return text
2862
2863 def visit_null(self, expr: Null, **kw: Any) -> str:
2864 return "NULL"
2865
2866 def visit_true(self, expr: True_, **kw: Any) -> str:
2867 if self.dialect.supports_native_boolean:
2868 return "true"
2869 else:
2870 return "1"
2871
2872 def visit_false(self, expr: False_, **kw: Any) -> str:
2873 if self.dialect.supports_native_boolean:
2874 return "false"
2875 else:
2876 return "0"
2877
2878 def _generate_delimited_list(self, elements, separator, **kw):
2879 return separator.join(
2880 s
2881 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2882 if s
2883 )
2884
2885 def _generate_delimited_and_list(self, clauses, **kw):
2886 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2887 operators.and_,
2888 elements.True_._singleton,
2889 elements.False_._singleton,
2890 clauses,
2891 )
2892 if lcc == 1:
2893 return clauses[0]._compiler_dispatch(self, **kw)
2894 else:
2895 separator = OPERATORS[operators.and_]
2896 return separator.join(
2897 s
2898 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2899 if s
2900 )
2901
2902 def visit_tuple(self, clauselist, **kw):
2903 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2904
2905 def visit_element_list(self, element, **kw):
2906 return self._generate_delimited_list(element.clauses, " ", **kw)
2907
2908 def visit_order_by_list(self, element, **kw):
2909 return self._generate_delimited_list(element.clauses, ", ", **kw)
2910
2911 def visit_clauselist(self, clauselist, **kw):
2912 sep = clauselist.operator
2913 if sep is None:
2914 sep = " "
2915 else:
2916 sep = OPERATORS[clauselist.operator]
2917
2918 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2919
2920 def visit_expression_clauselist(self, clauselist, **kw):
2921 operator_ = clauselist.operator
2922
2923 disp = self._get_operator_dispatch(
2924 operator_, "expression_clauselist", None
2925 )
2926 if disp:
2927 return disp(clauselist, operator_, **kw)
2928
2929 try:
2930 opstring = OPERATORS[operator_]
2931 except KeyError as err:
2932 raise exc.UnsupportedCompilationError(self, operator_) from err
2933 else:
2934 kw["_in_operator_expression"] = True
2935 return self._generate_delimited_list(
2936 clauselist.clauses, opstring, **kw
2937 )
2938
2939 def visit_case(self, clause, **kwargs):
2940 x = "CASE "
2941 if clause.value is not None:
2942 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2943 for cond, result in clause.whens:
2944 x += (
2945 "WHEN "
2946 + cond._compiler_dispatch(self, **kwargs)
2947 + " THEN "
2948 + result._compiler_dispatch(self, **kwargs)
2949 + " "
2950 )
2951 if clause.else_ is not None:
2952 x += (
2953 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2954 )
2955 x += "END"
2956 return x
2957
2958 def visit_type_coerce(self, type_coerce, **kw):
2959 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2960
2961 def visit_cast(self, cast, **kwargs):
2962 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2963 match = re.match("(.*)( COLLATE .*)", type_clause)
2964 return "CAST(%s AS %s)%s" % (
2965 cast.clause._compiler_dispatch(self, **kwargs),
2966 match.group(1) if match else type_clause,
2967 match.group(2) if match else "",
2968 )
2969
2970 def visit_frame_clause(self, frameclause, **kw):
2971
2972 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2973 left = "UNBOUNDED PRECEDING"
2974 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2975 left = "CURRENT ROW"
2976 else:
2977 val = self.process(frameclause.lower_bind, **kw)
2978 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2979 left = f"{val} PRECEDING"
2980 else:
2981 left = f"{val} FOLLOWING"
2982
2983 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2984 right = "UNBOUNDED FOLLOWING"
2985 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2986 right = "CURRENT ROW"
2987 else:
2988 val = self.process(frameclause.upper_bind, **kw)
2989 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2990 right = f"{val} PRECEDING"
2991 else:
2992 right = f"{val} FOLLOWING"
2993
2994 return f"{left} AND {right}"
2995
2996 def visit_over(self, over, **kwargs):
2997 text = over.element._compiler_dispatch(self, **kwargs)
2998 if over.range_ is not None:
2999 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
3000 elif over.rows is not None:
3001 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
3002 elif over.groups is not None:
3003 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
3004 else:
3005 range_ = None
3006
3007 if range_ is not None and over.exclude is not None:
3008 range_ += " EXCLUDE " + self.preparer.validate_sql_phrase(
3009 over.exclude, _WINDOW_EXCLUDE_RE
3010 )
3011
3012 return "%s OVER (%s)" % (
3013 text,
3014 " ".join(
3015 [
3016 "%s BY %s"
3017 % (word, clause._compiler_dispatch(self, **kwargs))
3018 for word, clause in (
3019 ("PARTITION", over.partition_by),
3020 ("ORDER", over.order_by),
3021 )
3022 if clause is not None and len(clause)
3023 ]
3024 + ([range_] if range_ else [])
3025 ),
3026 )
3027
3028 def visit_withingroup(self, withingroup, **kwargs):
3029 return "%s WITHIN GROUP (ORDER BY %s)" % (
3030 withingroup.element._compiler_dispatch(self, **kwargs),
3031 withingroup.order_by._compiler_dispatch(self, **kwargs),
3032 )
3033
3034 def visit_funcfilter(self, funcfilter, **kwargs):
3035 return "%s FILTER (WHERE %s)" % (
3036 funcfilter.func._compiler_dispatch(self, **kwargs),
3037 funcfilter.criterion._compiler_dispatch(self, **kwargs),
3038 )
3039
3040 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
3041 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
3042 raise exc.CompileError(
3043 "this dialect does not support "
3044 "ORDER BY within an aggregate function"
3045 )
3046 elif (
3047 self.dialect.aggregate_order_by_style
3048 is AggregateOrderByStyle.INLINE
3049 ):
3050 new_fn = aggregateorderby.element._clone()
3051 new_fn.clause_expr = elements.Grouping(
3052 aggregate_orderby_inline(
3053 new_fn.clause_expr.element, aggregateorderby.order_by
3054 )
3055 )
3056
3057 return new_fn._compiler_dispatch(self, **kwargs)
3058 else:
3059 return self.visit_withingroup(aggregateorderby, **kwargs)
3060
3061 def visit_aggregate_orderby_inline(self, element, **kw):
3062 return "%s ORDER BY %s" % (
3063 self.process(element.element, **kw),
3064 self.process(element.aggregate_order_by, **kw),
3065 )
3066
3067 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3068 # aggreagate_order_by attribute is present if visit_function
3069 # gave us a Function with aggregate_orderby_inline() as the inner
3070 # contents
3071 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3072
3073 literal_exec = dict(kw)
3074 literal_exec["literal_execute"] = True
3075
3076 # break up the function into its components so we can apply
3077 # literal_execute to the second argument (the delimiter)
3078 cl = list(fn.clauses)
3079 expr, delimiter = cl[0:2]
3080 if (
3081 order_by is not None
3082 and self.dialect.aggregate_order_by_style
3083 is AggregateOrderByStyle.INLINE
3084 ):
3085 return (
3086 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3087 f"{delimiter._compiler_dispatch(self, **literal_exec)} "
3088 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3089 )
3090 else:
3091 return (
3092 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3093 f"{delimiter._compiler_dispatch(self, **literal_exec)})"
3094 )
3095
3096 def visit_extract(self, extract, **kwargs):
3097 field = self.extract_map.get(extract.field, extract.field)
3098 return "EXTRACT(%s FROM %s)" % (
3099 field,
3100 extract.expr._compiler_dispatch(self, **kwargs),
3101 )
3102
3103 def visit_scalar_function_column(self, element, **kw):
3104 compiled_fn = self.visit_function(element.fn, **kw)
3105 compiled_col = self.visit_column(element, **kw)
3106 return "(%s).%s" % (compiled_fn, compiled_col)
3107
3108 def visit_function(
3109 self,
3110 func: Function[Any],
3111 add_to_result_map: Optional[_ResultMapAppender] = None,
3112 **kwargs: Any,
3113 ) -> str:
3114 if self._collect_params:
3115 self._add_to_params(func)
3116 if add_to_result_map is not None:
3117 add_to_result_map(func.name, func.name, (func.name,), func.type)
3118
3119 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3120
3121 text: str
3122
3123 kwargs["within_aggregate_function"] = True
3124
3125 if disp:
3126 text = disp(func, **kwargs)
3127 else:
3128 name = FUNCTIONS.get(func._deannotate().__class__, None)
3129 if name:
3130 if func._has_args:
3131 name += "%(expr)s"
3132 else:
3133 name = func.name
3134 name = (
3135 self.preparer.quote(name)
3136 if self.preparer._requires_quotes_illegal_chars(name)
3137 or isinstance(name, elements.quoted_name)
3138 else name
3139 )
3140 name = name + "%(expr)s"
3141 text = ".".join(
3142 [
3143 (
3144 self.preparer.quote(tok)
3145 if self.preparer._requires_quotes_illegal_chars(tok)
3146 or isinstance(name, elements.quoted_name)
3147 else tok
3148 )
3149 for tok in func.packagenames
3150 ]
3151 + [name]
3152 ) % {"expr": self.function_argspec(func, **kwargs)}
3153
3154 if func._with_ordinality:
3155 text += " WITH ORDINALITY"
3156 return text
3157
3158 def visit_next_value_func(self, next_value, **kw):
3159 return self.visit_sequence(next_value.sequence)
3160
3161 def visit_sequence(self, sequence, **kw):
3162 raise NotImplementedError(
3163 "Dialect '%s' does not support sequence increments."
3164 % self.dialect.name
3165 )
3166
3167 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3168 return func.clause_expr._compiler_dispatch(self, **kwargs)
3169
3170 def visit_compound_select(
3171 self, cs, asfrom=False, compound_index=None, **kwargs
3172 ):
3173 if self._collect_params:
3174 self._add_to_params(cs)
3175 toplevel = not self.stack
3176
3177 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3178
3179 if toplevel and not self.compile_state:
3180 self.compile_state = compile_state
3181
3182 compound_stmt = compile_state.statement
3183
3184 entry = self._default_stack_entry if toplevel else self.stack[-1]
3185 need_result_map = toplevel or (
3186 not compound_index
3187 and entry.get("need_result_map_for_compound", False)
3188 )
3189
3190 # indicates there is already a CompoundSelect in play
3191 if compound_index == 0:
3192 entry["select_0"] = cs
3193
3194 self.stack.append(
3195 {
3196 "correlate_froms": entry["correlate_froms"],
3197 "asfrom_froms": entry["asfrom_froms"],
3198 "selectable": cs,
3199 "compile_state": compile_state,
3200 "need_result_map_for_compound": need_result_map,
3201 }
3202 )
3203
3204 if compound_stmt._independent_ctes:
3205 self._dispatch_independent_ctes(compound_stmt, kwargs)
3206
3207 keyword = self.compound_keywords[cs.keyword]
3208
3209 text = (" " + keyword + " ").join(
3210 (
3211 c._compiler_dispatch(
3212 self, asfrom=asfrom, compound_index=i, **kwargs
3213 )
3214 for i, c in enumerate(cs.selects)
3215 )
3216 )
3217
3218 kwargs["include_table"] = False
3219 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3220 text += self.order_by_clause(cs, **kwargs)
3221 if cs._has_row_limiting_clause:
3222 text += self._row_limit_clause(cs, **kwargs)
3223
3224 if self.ctes:
3225 nesting_level = len(self.stack) if not toplevel else None
3226 text = (
3227 self._render_cte_clause(
3228 nesting_level=nesting_level,
3229 include_following_stack=True,
3230 )
3231 + text
3232 )
3233
3234 self.stack.pop(-1)
3235 return text
3236
3237 def _row_limit_clause(self, cs, **kwargs):
3238 if cs._fetch_clause is not None:
3239 return self.fetch_clause(cs, **kwargs)
3240 else:
3241 return self.limit_clause(cs, **kwargs)
3242
3243 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3244 attrname = "visit_%s_%s%s" % (
3245 operator_.__name__,
3246 qualifier1,
3247 "_" + qualifier2 if qualifier2 else "",
3248 )
3249 return getattr(self, attrname, None)
3250
3251 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3252 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3253 return getattr(self, attrname, None)
3254
3255 def visit_unary(
3256 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3257 ):
3258 if add_to_result_map is not None:
3259 result_map_targets += (unary,)
3260 kw["add_to_result_map"] = add_to_result_map
3261 kw["result_map_targets"] = result_map_targets
3262
3263 if unary.operator is operators.distinct_op and not kw.get(
3264 "within_aggregate_function", False
3265 ):
3266 util.warn(
3267 "Column-expression-level unary distinct() "
3268 "should not be used outside of an aggregate "
3269 "function. For general 'SELECT DISTINCT' support"
3270 "use select().distinct()."
3271 )
3272
3273 if unary.operator:
3274 if unary.modifier:
3275 raise exc.CompileError(
3276 "Unary expression does not support operator "
3277 "and modifier simultaneously"
3278 )
3279 disp = self._get_operator_dispatch(
3280 unary.operator, "unary", "operator"
3281 )
3282 if disp:
3283 return disp(unary, unary.operator, **kw)
3284 else:
3285 return self._generate_generic_unary_operator(
3286 unary, OPERATORS[unary.operator], **kw
3287 )
3288 elif unary.modifier:
3289 disp = self._get_operator_dispatch(
3290 unary.modifier, "unary", "modifier"
3291 )
3292 if disp:
3293 return disp(unary, unary.modifier, **kw)
3294 else:
3295 return self._generate_generic_unary_modifier(
3296 unary, OPERATORS[unary.modifier], **kw
3297 )
3298 else:
3299 raise exc.CompileError(
3300 "Unary expression has no operator or modifier"
3301 )
3302
3303 def visit_truediv_binary(self, binary, operator, **kw):
3304 if self.dialect.div_is_floordiv:
3305 return (
3306 self.process(binary.left, **kw)
3307 + " / "
3308 # TODO: would need a fast cast again here,
3309 # unless we want to use an implicit cast like "+ 0.0"
3310 + self.process(
3311 elements.Cast(
3312 binary.right,
3313 (
3314 binary.right.type
3315 if binary.right.type._type_affinity
3316 in (sqltypes.Numeric, sqltypes.Float)
3317 else sqltypes.Numeric()
3318 ),
3319 ),
3320 **kw,
3321 )
3322 )
3323 else:
3324 return (
3325 self.process(binary.left, **kw)
3326 + " / "
3327 + self.process(binary.right, **kw)
3328 )
3329
3330 def visit_floordiv_binary(self, binary, operator, **kw):
3331 if (
3332 self.dialect.div_is_floordiv
3333 and binary.right.type._type_affinity is sqltypes.Integer
3334 ):
3335 return (
3336 self.process(binary.left, **kw)
3337 + " / "
3338 + self.process(binary.right, **kw)
3339 )
3340 else:
3341 return "FLOOR(%s)" % (
3342 self.process(binary.left, **kw)
3343 + " / "
3344 + self.process(binary.right, **kw)
3345 )
3346
3347 def visit_is_true_unary_operator(self, element, operator, **kw):
3348 if (
3349 element._is_implicitly_boolean
3350 or self.dialect.supports_native_boolean
3351 ):
3352 return self.process(element.element, **kw)
3353 else:
3354 return "%s = 1" % self.process(element.element, **kw)
3355
3356 def visit_is_false_unary_operator(self, element, operator, **kw):
3357 if (
3358 element._is_implicitly_boolean
3359 or self.dialect.supports_native_boolean
3360 ):
3361 return "NOT %s" % self.process(element.element, **kw)
3362 else:
3363 return "%s = 0" % self.process(element.element, **kw)
3364
3365 def visit_not_match_op_binary(self, binary, operator, **kw):
3366 return "NOT %s" % self.visit_binary(
3367 binary, override_operator=operators.match_op
3368 )
3369
3370 def visit_not_in_op_binary(self, binary, operator, **kw):
3371 # The brackets are required in the NOT IN operation because the empty
3372 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3373 # The presence of the OR makes the brackets required.
3374 return "(%s)" % self._generate_generic_binary(
3375 binary, OPERATORS[operator], **kw
3376 )
3377
3378 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3379 if expand_op is operators.not_in_op:
3380 if len(type_) > 1:
3381 return "(%s)) OR (1 = 1" % (
3382 ", ".join("NULL" for element in type_)
3383 )
3384 else:
3385 return "NULL) OR (1 = 1"
3386 elif expand_op is operators.in_op:
3387 if len(type_) > 1:
3388 return "(%s)) AND (1 != 1" % (
3389 ", ".join("NULL" for element in type_)
3390 )
3391 else:
3392 return "NULL) AND (1 != 1"
3393 else:
3394 return self.visit_empty_set_expr(type_)
3395
3396 def visit_empty_set_expr(self, element_types, **kw):
3397 raise NotImplementedError(
3398 "Dialect '%s' does not support empty set expression."
3399 % self.dialect.name
3400 )
3401
3402 def _literal_execute_expanding_parameter_literal_binds(
3403 self, parameter, values, bind_expression_template=None
3404 ):
3405 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3406
3407 if not values:
3408 # empty IN expression. note we don't need to use
3409 # bind_expression_template here because there are no
3410 # expressions to render.
3411
3412 if typ_dialect_impl._is_tuple_type:
3413 replacement_expression = (
3414 "VALUES " if self.dialect.tuple_in_values else ""
3415 ) + self.visit_empty_set_op_expr(
3416 parameter.type.types, parameter.expand_op
3417 )
3418
3419 else:
3420 replacement_expression = self.visit_empty_set_op_expr(
3421 [parameter.type], parameter.expand_op
3422 )
3423
3424 elif typ_dialect_impl._is_tuple_type or (
3425 typ_dialect_impl._isnull
3426 and isinstance(values[0], collections_abc.Sequence)
3427 and not isinstance(values[0], (str, bytes))
3428 ):
3429 if typ_dialect_impl._has_bind_expression:
3430 raise NotImplementedError(
3431 "bind_expression() on TupleType not supported with "
3432 "literal_binds"
3433 )
3434
3435 replacement_expression = (
3436 "VALUES " if self.dialect.tuple_in_values else ""
3437 ) + ", ".join(
3438 "(%s)"
3439 % (
3440 ", ".join(
3441 self.render_literal_value(value, param_type)
3442 for value, param_type in zip(
3443 tuple_element, parameter.type.types
3444 )
3445 )
3446 )
3447 for i, tuple_element in enumerate(values)
3448 )
3449 else:
3450 if bind_expression_template:
3451 post_compile_pattern = self._post_compile_pattern
3452 m = post_compile_pattern.search(bind_expression_template)
3453 assert m and m.group(
3454 2
3455 ), "unexpected format for expanding parameter"
3456
3457 tok = m.group(2).split("~~")
3458 be_left, be_right = tok[1], tok[3]
3459 replacement_expression = ", ".join(
3460 "%s%s%s"
3461 % (
3462 be_left,
3463 self.render_literal_value(value, parameter.type),
3464 be_right,
3465 )
3466 for value in values
3467 )
3468 else:
3469 replacement_expression = ", ".join(
3470 self.render_literal_value(value, parameter.type)
3471 for value in values
3472 )
3473
3474 return (), replacement_expression
3475
3476 def _literal_execute_expanding_parameter(self, name, parameter, values):
3477 if parameter.literal_execute:
3478 return self._literal_execute_expanding_parameter_literal_binds(
3479 parameter, values
3480 )
3481
3482 dialect = self.dialect
3483 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3484
3485 if self._numeric_binds:
3486 bind_template = self.compilation_bindtemplate
3487 else:
3488 bind_template = self.bindtemplate
3489
3490 if (
3491 self.dialect._bind_typing_render_casts
3492 and typ_dialect_impl.render_bind_cast
3493 ):
3494
3495 def _render_bindtemplate(name):
3496 return self.render_bind_cast(
3497 parameter.type,
3498 typ_dialect_impl,
3499 bind_template % {"name": name},
3500 )
3501
3502 else:
3503
3504 def _render_bindtemplate(name):
3505 return bind_template % {"name": name}
3506
3507 if not values:
3508 to_update = []
3509 if typ_dialect_impl._is_tuple_type:
3510 replacement_expression = self.visit_empty_set_op_expr(
3511 parameter.type.types, parameter.expand_op
3512 )
3513 else:
3514 replacement_expression = self.visit_empty_set_op_expr(
3515 [parameter.type], parameter.expand_op
3516 )
3517
3518 elif typ_dialect_impl._is_tuple_type or (
3519 typ_dialect_impl._isnull
3520 and isinstance(values[0], collections_abc.Sequence)
3521 and not isinstance(values[0], (str, bytes))
3522 ):
3523 assert not typ_dialect_impl._is_array
3524 to_update = [
3525 ("%s_%s_%s" % (name, i, j), value)
3526 for i, tuple_element in enumerate(values, 1)
3527 for j, value in enumerate(tuple_element, 1)
3528 ]
3529
3530 replacement_expression = (
3531 "VALUES " if dialect.tuple_in_values else ""
3532 ) + ", ".join(
3533 "(%s)"
3534 % (
3535 ", ".join(
3536 _render_bindtemplate(
3537 to_update[i * len(tuple_element) + j][0]
3538 )
3539 for j, value in enumerate(tuple_element)
3540 )
3541 )
3542 for i, tuple_element in enumerate(values)
3543 )
3544 else:
3545 to_update = [
3546 ("%s_%s" % (name, i), value)
3547 for i, value in enumerate(values, 1)
3548 ]
3549 replacement_expression = ", ".join(
3550 _render_bindtemplate(key) for key, value in to_update
3551 )
3552
3553 return to_update, replacement_expression
3554
3555 def visit_binary(
3556 self,
3557 binary,
3558 override_operator=None,
3559 eager_grouping=False,
3560 from_linter=None,
3561 lateral_from_linter=None,
3562 **kw,
3563 ):
3564 if from_linter and operators.is_comparison(binary.operator):
3565 if lateral_from_linter is not None:
3566 enclosing_lateral = kw["enclosing_lateral"]
3567 lateral_from_linter.edges.update(
3568 itertools.product(
3569 _de_clone(
3570 binary.left._from_objects + [enclosing_lateral]
3571 ),
3572 _de_clone(
3573 binary.right._from_objects + [enclosing_lateral]
3574 ),
3575 )
3576 )
3577 else:
3578 from_linter.edges.update(
3579 itertools.product(
3580 _de_clone(binary.left._from_objects),
3581 _de_clone(binary.right._from_objects),
3582 )
3583 )
3584
3585 # don't allow "? = ?" to render
3586 if (
3587 self.ansi_bind_rules
3588 and isinstance(binary.left, elements.BindParameter)
3589 and isinstance(binary.right, elements.BindParameter)
3590 ):
3591 kw["literal_execute"] = True
3592
3593 operator_ = override_operator or binary.operator
3594 disp = self._get_operator_dispatch(operator_, "binary", None)
3595 if disp:
3596 return disp(binary, operator_, **kw)
3597 else:
3598 try:
3599 opstring = OPERATORS[operator_]
3600 except KeyError as err:
3601 raise exc.UnsupportedCompilationError(self, operator_) from err
3602 else:
3603 return self._generate_generic_binary(
3604 binary,
3605 opstring,
3606 from_linter=from_linter,
3607 lateral_from_linter=lateral_from_linter,
3608 **kw,
3609 )
3610
3611 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3612 return self.process(element.sql_function, **kw)
3613
3614 def visit_mod_binary(self, binary, operator, **kw):
3615 if self.preparer._double_percents:
3616 return (
3617 self.process(binary.left, **kw)
3618 + " %% "
3619 + self.process(binary.right, **kw)
3620 )
3621 else:
3622 return (
3623 self.process(binary.left, **kw)
3624 + " % "
3625 + self.process(binary.right, **kw)
3626 )
3627
3628 def visit_custom_op_binary(self, element, operator, **kw):
3629 if operator.visit_name:
3630 disp = self._get_custom_operator_dispatch(operator, "binary")
3631 if disp:
3632 return disp(element, operator, **kw)
3633
3634 kw["eager_grouping"] = operator.eager_grouping
3635 return self._generate_generic_binary(
3636 element,
3637 " " + self.escape_literal_column(operator.opstring) + " ",
3638 **kw,
3639 )
3640
3641 def visit_custom_op_unary_operator(self, element, operator, **kw):
3642 if operator.visit_name:
3643 disp = self._get_custom_operator_dispatch(operator, "unary")
3644 if disp:
3645 return disp(element, operator, **kw)
3646
3647 return self._generate_generic_unary_operator(
3648 element, self.escape_literal_column(operator.opstring) + " ", **kw
3649 )
3650
3651 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3652 if operator.visit_name:
3653 disp = self._get_custom_operator_dispatch(operator, "unary")
3654 if disp:
3655 return disp(element, operator, **kw)
3656
3657 return self._generate_generic_unary_modifier(
3658 element, " " + self.escape_literal_column(operator.opstring), **kw
3659 )
3660
3661 def _generate_generic_binary(
3662 self,
3663 binary: BinaryExpression[Any],
3664 opstring: str,
3665 eager_grouping: bool = False,
3666 **kw: Any,
3667 ) -> str:
3668 _in_operator_expression = kw.get("_in_operator_expression", False)
3669
3670 kw["_in_operator_expression"] = True
3671 kw["_binary_op"] = binary.operator
3672 text = (
3673 binary.left._compiler_dispatch(
3674 self, eager_grouping=eager_grouping, **kw
3675 )
3676 + opstring
3677 + binary.right._compiler_dispatch(
3678 self, eager_grouping=eager_grouping, **kw
3679 )
3680 )
3681
3682 if _in_operator_expression and eager_grouping:
3683 text = "(%s)" % text
3684 return text
3685
3686 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3687 return opstring + unary.element._compiler_dispatch(self, **kw)
3688
3689 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3690 return unary.element._compiler_dispatch(self, **kw) + opstring
3691
3692 @util.memoized_property
3693 def _like_percent_literal(self):
3694 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3695
3696 def visit_ilike_case_insensitive_operand(self, element, **kw):
3697 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3698
3699 def visit_contains_op_binary(self, binary, operator, **kw):
3700 binary = binary._clone()
3701 percent = self._like_percent_literal
3702 binary.right = percent.concat(binary.right).concat(percent)
3703 return self.visit_like_op_binary(binary, operator, **kw)
3704
3705 def visit_not_contains_op_binary(self, binary, operator, **kw):
3706 binary = binary._clone()
3707 percent = self._like_percent_literal
3708 binary.right = percent.concat(binary.right).concat(percent)
3709 return self.visit_not_like_op_binary(binary, operator, **kw)
3710
3711 def visit_icontains_op_binary(self, binary, operator, **kw):
3712 binary = binary._clone()
3713 percent = self._like_percent_literal
3714 binary.left = ilike_case_insensitive(binary.left)
3715 binary.right = percent.concat(
3716 ilike_case_insensitive(binary.right)
3717 ).concat(percent)
3718 return self.visit_ilike_op_binary(binary, operator, **kw)
3719
3720 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3721 binary = binary._clone()
3722 percent = self._like_percent_literal
3723 binary.left = ilike_case_insensitive(binary.left)
3724 binary.right = percent.concat(
3725 ilike_case_insensitive(binary.right)
3726 ).concat(percent)
3727 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3728
3729 def visit_startswith_op_binary(self, binary, operator, **kw):
3730 binary = binary._clone()
3731 percent = self._like_percent_literal
3732 binary.right = percent._rconcat(binary.right)
3733 return self.visit_like_op_binary(binary, operator, **kw)
3734
3735 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3736 binary = binary._clone()
3737 percent = self._like_percent_literal
3738 binary.right = percent._rconcat(binary.right)
3739 return self.visit_not_like_op_binary(binary, operator, **kw)
3740
3741 def visit_istartswith_op_binary(self, binary, operator, **kw):
3742 binary = binary._clone()
3743 percent = self._like_percent_literal
3744 binary.left = ilike_case_insensitive(binary.left)
3745 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3746 return self.visit_ilike_op_binary(binary, operator, **kw)
3747
3748 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3749 binary = binary._clone()
3750 percent = self._like_percent_literal
3751 binary.left = ilike_case_insensitive(binary.left)
3752 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3753 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3754
3755 def visit_endswith_op_binary(self, binary, operator, **kw):
3756 binary = binary._clone()
3757 percent = self._like_percent_literal
3758 binary.right = percent.concat(binary.right)
3759 return self.visit_like_op_binary(binary, operator, **kw)
3760
3761 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3762 binary = binary._clone()
3763 percent = self._like_percent_literal
3764 binary.right = percent.concat(binary.right)
3765 return self.visit_not_like_op_binary(binary, operator, **kw)
3766
3767 def visit_iendswith_op_binary(self, binary, operator, **kw):
3768 binary = binary._clone()
3769 percent = self._like_percent_literal
3770 binary.left = ilike_case_insensitive(binary.left)
3771 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3772 return self.visit_ilike_op_binary(binary, operator, **kw)
3773
3774 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3775 binary = binary._clone()
3776 percent = self._like_percent_literal
3777 binary.left = ilike_case_insensitive(binary.left)
3778 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3779 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3780
3781 def visit_like_op_binary(self, binary, operator, **kw):
3782 escape = binary.modifiers.get("escape", None)
3783
3784 return "%s LIKE %s" % (
3785 binary.left._compiler_dispatch(self, **kw),
3786 binary.right._compiler_dispatch(self, **kw),
3787 ) + (
3788 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3789 if escape is not None
3790 else ""
3791 )
3792
3793 def visit_not_like_op_binary(self, binary, operator, **kw):
3794 escape = binary.modifiers.get("escape", None)
3795 return "%s NOT LIKE %s" % (
3796 binary.left._compiler_dispatch(self, **kw),
3797 binary.right._compiler_dispatch(self, **kw),
3798 ) + (
3799 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3800 if escape is not None
3801 else ""
3802 )
3803
3804 def visit_ilike_op_binary(self, binary, operator, **kw):
3805 if operator is operators.ilike_op:
3806 binary = binary._clone()
3807 binary.left = ilike_case_insensitive(binary.left)
3808 binary.right = ilike_case_insensitive(binary.right)
3809 # else we assume ilower() has been applied
3810
3811 return self.visit_like_op_binary(binary, operator, **kw)
3812
3813 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3814 if operator is operators.not_ilike_op:
3815 binary = binary._clone()
3816 binary.left = ilike_case_insensitive(binary.left)
3817 binary.right = ilike_case_insensitive(binary.right)
3818 # else we assume ilower() has been applied
3819
3820 return self.visit_not_like_op_binary(binary, operator, **kw)
3821
3822 def visit_between_op_binary(self, binary, operator, **kw):
3823 symmetric = binary.modifiers.get("symmetric", False)
3824 return self._generate_generic_binary(
3825 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3826 )
3827
3828 def visit_not_between_op_binary(self, binary, operator, **kw):
3829 symmetric = binary.modifiers.get("symmetric", False)
3830 return self._generate_generic_binary(
3831 binary,
3832 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3833 **kw,
3834 )
3835
3836 def visit_regexp_match_op_binary(
3837 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3838 ) -> str:
3839 raise exc.CompileError(
3840 "%s dialect does not support regular expressions"
3841 % self.dialect.name
3842 )
3843
3844 def visit_not_regexp_match_op_binary(
3845 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3846 ) -> str:
3847 raise exc.CompileError(
3848 "%s dialect does not support regular expressions"
3849 % self.dialect.name
3850 )
3851
3852 def visit_regexp_replace_op_binary(
3853 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3854 ) -> str:
3855 raise exc.CompileError(
3856 "%s dialect does not support regular expression replacements"
3857 % self.dialect.name
3858 )
3859
3860 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3861 if bindmarkers is None:
3862 raise exc.CompileError(
3863 "DML target objects may only be used with "
3864 "compiled INSERT or UPDATE statements"
3865 )
3866
3867 bindmarkers[element.column.key] = element
3868 return f"__BINDMARKER_~~{element.column.key}~~"
3869
3870 def visit_bindparam(
3871 self,
3872 bindparam,
3873 within_columns_clause=False,
3874 literal_binds=False,
3875 skip_bind_expression=False,
3876 literal_execute=False,
3877 render_postcompile=False,
3878 is_upsert_set=False,
3879 **kwargs,
3880 ):
3881 # Detect parametrized bindparams in upsert SET clause for issue #13130
3882 if (
3883 is_upsert_set
3884 and bindparam.value is None
3885 and bindparam.callable is None
3886 and self._insertmanyvalues is not None
3887 ):
3888 self._insertmanyvalues = self._insertmanyvalues._replace(
3889 has_upsert_bound_parameters=True
3890 )
3891
3892 if not skip_bind_expression:
3893 impl = bindparam.type.dialect_impl(self.dialect)
3894 if impl._has_bind_expression:
3895 bind_expression = impl.bind_expression(bindparam)
3896 wrapped = self.process(
3897 bind_expression,
3898 skip_bind_expression=True,
3899 within_columns_clause=within_columns_clause,
3900 literal_binds=literal_binds and not bindparam.expanding,
3901 literal_execute=literal_execute,
3902 render_postcompile=render_postcompile,
3903 **kwargs,
3904 )
3905 if bindparam.expanding:
3906 # for postcompile w/ expanding, move the "wrapped" part
3907 # of this into the inside
3908
3909 m = re.match(
3910 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3911 )
3912 assert m, "unexpected format for expanding parameter"
3913 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3914 m.group(2),
3915 m.group(1),
3916 m.group(3),
3917 )
3918
3919 if literal_binds:
3920 ret = self.render_literal_bindparam(
3921 bindparam,
3922 within_columns_clause=True,
3923 bind_expression_template=wrapped,
3924 **kwargs,
3925 )
3926 return f"({ret})"
3927
3928 return wrapped
3929
3930 if not literal_binds:
3931 literal_execute = (
3932 literal_execute
3933 or bindparam.literal_execute
3934 or (within_columns_clause and self.ansi_bind_rules)
3935 )
3936 post_compile = literal_execute or bindparam.expanding
3937 else:
3938 post_compile = False
3939
3940 if literal_binds:
3941 ret = self.render_literal_bindparam(
3942 bindparam, within_columns_clause=True, **kwargs
3943 )
3944 if bindparam.expanding:
3945 ret = f"({ret})"
3946 return ret
3947
3948 name = self._truncate_bindparam(bindparam)
3949
3950 if name in self.binds:
3951 existing = self.binds[name]
3952 if existing is not bindparam:
3953 if (
3954 (existing.unique or bindparam.unique)
3955 and not existing.proxy_set.intersection(
3956 bindparam.proxy_set
3957 )
3958 and not existing._cloned_set.intersection(
3959 bindparam._cloned_set
3960 )
3961 ):
3962 raise exc.CompileError(
3963 "Bind parameter '%s' conflicts with "
3964 "unique bind parameter of the same name" % name
3965 )
3966 elif existing.expanding != bindparam.expanding:
3967 raise exc.CompileError(
3968 "Can't reuse bound parameter name '%s' in both "
3969 "'expanding' (e.g. within an IN expression) and "
3970 "non-expanding contexts. If this parameter is to "
3971 "receive a list/array value, set 'expanding=True' on "
3972 "it for expressions that aren't IN, otherwise use "
3973 "a different parameter name." % (name,)
3974 )
3975 elif existing._is_crud or bindparam._is_crud:
3976 if existing._is_crud and bindparam._is_crud:
3977 # TODO: this condition is not well understood.
3978 # see tests in test/sql/test_update.py
3979 raise exc.CompileError(
3980 "Encountered unsupported case when compiling an "
3981 "INSERT or UPDATE statement. If this is a "
3982 "multi-table "
3983 "UPDATE statement, please provide string-named "
3984 "arguments to the "
3985 "values() method with distinct names; support for "
3986 "multi-table UPDATE statements that "
3987 "target multiple tables for UPDATE is very "
3988 "limited",
3989 )
3990 else:
3991 raise exc.CompileError(
3992 f"bindparam() name '{bindparam.key}' is reserved "
3993 "for automatic usage in the VALUES or SET "
3994 "clause of this "
3995 "insert/update statement. Please use a "
3996 "name other than column name when using "
3997 "bindparam() "
3998 "with insert() or update() (for example, "
3999 f"'b_{bindparam.key}')."
4000 )
4001
4002 self.binds[bindparam.key] = self.binds[name] = bindparam
4003
4004 # if we are given a cache key that we're going to match against,
4005 # relate the bindparam here to one that is most likely present
4006 # in the "extracted params" portion of the cache key. this is used
4007 # to set up a positional mapping that is used to determine the
4008 # correct parameters for a subsequent use of this compiled with
4009 # a different set of parameter values. here, we accommodate for
4010 # parameters that may have been cloned both before and after the cache
4011 # key was been generated.
4012 ckbm_tuple = self._cache_key_bind_match
4013
4014 if ckbm_tuple:
4015 ckbm, cksm = ckbm_tuple
4016 for bp in bindparam._cloned_set:
4017 if bp.key in cksm:
4018 cb = cksm[bp.key]
4019 ckbm[cb].append(bindparam)
4020
4021 if bindparam.isoutparam:
4022 self.has_out_parameters = True
4023
4024 if post_compile:
4025 if render_postcompile:
4026 self._render_postcompile = True
4027
4028 if literal_execute:
4029 self.literal_execute_params |= {bindparam}
4030 else:
4031 self.post_compile_params |= {bindparam}
4032
4033 ret = self.bindparam_string(
4034 name,
4035 post_compile=post_compile,
4036 expanding=bindparam.expanding,
4037 bindparam_type=bindparam.type,
4038 **kwargs,
4039 )
4040
4041 if bindparam.expanding:
4042 ret = f"({ret})"
4043
4044 return ret
4045
4046 def render_bind_cast(self, type_, dbapi_type, sqltext):
4047 raise NotImplementedError()
4048
4049 def render_literal_bindparam(
4050 self,
4051 bindparam,
4052 render_literal_value=NO_ARG,
4053 bind_expression_template=None,
4054 **kw,
4055 ):
4056 if render_literal_value is not NO_ARG:
4057 value = render_literal_value
4058 else:
4059 if bindparam.value is None and bindparam.callable is None:
4060 op = kw.get("_binary_op", None)
4061 if op and op not in (operators.is_, operators.is_not):
4062 util.warn_limited(
4063 "Bound parameter '%s' rendering literal NULL in a SQL "
4064 "expression; comparisons to NULL should not use "
4065 "operators outside of 'is' or 'is not'",
4066 (bindparam.key,),
4067 )
4068 return self.process(sqltypes.NULLTYPE, **kw)
4069 value = bindparam.effective_value
4070
4071 if bindparam.expanding:
4072 leep = self._literal_execute_expanding_parameter_literal_binds
4073 to_update, replacement_expr = leep(
4074 bindparam,
4075 value,
4076 bind_expression_template=bind_expression_template,
4077 )
4078 return replacement_expr
4079 else:
4080 return self.render_literal_value(value, bindparam.type)
4081
4082 def render_literal_value(
4083 self, value: Any, type_: sqltypes.TypeEngine[Any]
4084 ) -> str:
4085 """Render the value of a bind parameter as a quoted literal.
4086
4087 This is used for statement sections that do not accept bind parameters
4088 on the target driver/database.
4089
4090 This should be implemented by subclasses using the quoting services
4091 of the DBAPI.
4092
4093 """
4094
4095 if value is None and not type_.should_evaluate_none:
4096 # issue #10535 - handle NULL in the compiler without placing
4097 # this onto each type, except for "evaluate None" types
4098 # (e.g. JSON)
4099 return self.process(elements.Null._instance())
4100
4101 processor = type_._cached_literal_processor(self.dialect)
4102 if processor:
4103 try:
4104 return processor(value)
4105 except Exception as e:
4106 raise exc.CompileError(
4107 f"Could not render literal value "
4108 f'"{sql_util._repr_single_value(value)}" '
4109 f"with datatype "
4110 f"{type_}; see parent stack trace for "
4111 "more detail."
4112 ) from e
4113
4114 else:
4115 raise exc.CompileError(
4116 f"No literal value renderer is available for literal value "
4117 f'"{sql_util._repr_single_value(value)}" '
4118 f"with datatype {type_}"
4119 )
4120
4121 def _truncate_bindparam(self, bindparam):
4122 if bindparam in self.bind_names:
4123 return self.bind_names[bindparam]
4124
4125 bind_name = bindparam.key
4126 if isinstance(bind_name, elements._truncated_label):
4127 bind_name = self._truncated_identifier("bindparam", bind_name)
4128
4129 # add to bind_names for translation
4130 self.bind_names[bindparam] = bind_name
4131
4132 return bind_name
4133
4134 def _truncated_identifier(
4135 self, ident_class: str, name: _truncated_label
4136 ) -> str:
4137 if (ident_class, name) in self.truncated_names:
4138 return self.truncated_names[(ident_class, name)]
4139
4140 anonname = name.apply_map(self.anon_map)
4141
4142 if len(anonname) > self.label_length - 6:
4143 counter = self._truncated_counters.get(ident_class, 1)
4144 truncname = (
4145 anonname[0 : max(self.label_length - 6, 0)]
4146 + "_"
4147 + hex(counter)[2:]
4148 )
4149 self._truncated_counters[ident_class] = counter + 1
4150 else:
4151 truncname = anonname
4152 self.truncated_names[(ident_class, name)] = truncname
4153 return truncname
4154
4155 def _anonymize(self, name: str) -> str:
4156 return name % self.anon_map
4157
4158 def bindparam_string(
4159 self,
4160 name: str,
4161 post_compile: bool = False,
4162 expanding: bool = False,
4163 escaped_from: Optional[str] = None,
4164 bindparam_type: Optional[TypeEngine[Any]] = None,
4165 accumulate_bind_names: Optional[Set[str]] = None,
4166 visited_bindparam: Optional[List[str]] = None,
4167 **kw: Any,
4168 ) -> str:
4169 # TODO: accumulate_bind_names is passed by crud.py to gather
4170 # names on a per-value basis, visited_bindparam is passed by
4171 # visit_insert() to collect all parameters in the statement.
4172 # see if this gathering can be simplified somehow
4173 if accumulate_bind_names is not None:
4174 accumulate_bind_names.add(name)
4175 if visited_bindparam is not None:
4176 visited_bindparam.append(name)
4177
4178 if not escaped_from:
4179 if self._bind_translate_re.search(name):
4180 # not quite the translate use case as we want to
4181 # also get a quick boolean if we even found
4182 # unusual characters in the name
4183 new_name = self._bind_translate_re.sub(
4184 lambda m: self._bind_translate_chars[m.group(0)],
4185 name,
4186 )
4187 escaped_from = name
4188 name = new_name
4189
4190 if escaped_from:
4191 self.escaped_bind_names = self.escaped_bind_names.union(
4192 {escaped_from: name}
4193 )
4194 if post_compile:
4195 ret = "__[POSTCOMPILE_%s]" % name
4196 if expanding:
4197 # for expanding, bound parameters or literal values will be
4198 # rendered per item
4199 return ret
4200
4201 # otherwise, for non-expanding "literal execute", apply
4202 # bind casts as determined by the datatype
4203 if bindparam_type is not None:
4204 type_impl = bindparam_type._unwrapped_dialect_impl(
4205 self.dialect
4206 )
4207 if type_impl.render_literal_cast:
4208 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4209 return ret
4210 elif self.state is CompilerState.COMPILING:
4211 ret = self.compilation_bindtemplate % {"name": name}
4212 else:
4213 ret = self.bindtemplate % {"name": name}
4214
4215 if (
4216 bindparam_type is not None
4217 and self.dialect._bind_typing_render_casts
4218 ):
4219 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4220 if type_impl.render_bind_cast:
4221 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4222
4223 return ret
4224
4225 def _dispatch_independent_ctes(self, stmt, kw):
4226 local_kw = kw.copy()
4227 local_kw.pop("cte_opts", None)
4228 for cte, opt in zip(
4229 stmt._independent_ctes, stmt._independent_ctes_opts
4230 ):
4231 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4232
4233 def visit_cte(
4234 self,
4235 cte: CTE,
4236 asfrom: bool = False,
4237 ashint: bool = False,
4238 fromhints: Optional[_FromHintsType] = None,
4239 visiting_cte: Optional[CTE] = None,
4240 from_linter: Optional[FromLinter] = None,
4241 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4242 **kwargs: Any,
4243 ) -> Optional[str]:
4244 self_ctes = self._init_cte_state()
4245 assert self_ctes is self.ctes
4246
4247 kwargs["visiting_cte"] = cte
4248
4249 cte_name = cte.name
4250
4251 if isinstance(cte_name, elements._truncated_label):
4252 cte_name = self._truncated_identifier("alias", cte_name)
4253
4254 is_new_cte = True
4255 embedded_in_current_named_cte = False
4256
4257 _reference_cte = cte._get_reference_cte()
4258
4259 nesting = cte.nesting or cte_opts.nesting
4260
4261 # check for CTE already encountered
4262 if _reference_cte in self.level_name_by_cte:
4263 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4264 _reference_cte
4265 ]
4266 assert _ == cte_name
4267
4268 cte_level_name = (cte_level, cte_name)
4269 existing_cte = self.ctes_by_level_name[cte_level_name]
4270
4271 # check if we are receiving it here with a specific
4272 # "nest_here" location; if so, move it to this location
4273
4274 if cte_opts.nesting:
4275 if existing_cte_opts.nesting:
4276 raise exc.CompileError(
4277 "CTE is stated as 'nest_here' in "
4278 "more than one location"
4279 )
4280
4281 old_level_name = (cte_level, cte_name)
4282 cte_level = len(self.stack) if nesting else 1
4283 cte_level_name = new_level_name = (cte_level, cte_name)
4284
4285 del self.ctes_by_level_name[old_level_name]
4286 self.ctes_by_level_name[new_level_name] = existing_cte
4287 self.level_name_by_cte[_reference_cte] = new_level_name + (
4288 cte_opts,
4289 )
4290
4291 else:
4292 cte_level = len(self.stack) if nesting else 1
4293 cte_level_name = (cte_level, cte_name)
4294
4295 if cte_level_name in self.ctes_by_level_name:
4296 existing_cte = self.ctes_by_level_name[cte_level_name]
4297 else:
4298 existing_cte = None
4299
4300 if existing_cte is not None:
4301 embedded_in_current_named_cte = visiting_cte is existing_cte
4302
4303 # we've generated a same-named CTE that we are enclosed in,
4304 # or this is the same CTE. just return the name.
4305 if cte is existing_cte._restates or cte is existing_cte:
4306 is_new_cte = False
4307 elif existing_cte is cte._restates:
4308 # we've generated a same-named CTE that is
4309 # enclosed in us - we take precedence, so
4310 # discard the text for the "inner".
4311 del self_ctes[existing_cte]
4312
4313 existing_cte_reference_cte = existing_cte._get_reference_cte()
4314
4315 assert existing_cte_reference_cte is _reference_cte
4316 assert existing_cte_reference_cte is existing_cte
4317
4318 del self.level_name_by_cte[existing_cte_reference_cte]
4319 else:
4320 if (
4321 # if the two CTEs have the same hash, which we expect
4322 # here means that one/both is an annotated of the other
4323 (hash(cte) == hash(existing_cte))
4324 # or...
4325 or (
4326 (
4327 # if they are clones, i.e. they came from the ORM
4328 # or some other visit method
4329 cte._is_clone_of is not None
4330 or existing_cte._is_clone_of is not None
4331 )
4332 # and are deep-copy identical
4333 and cte.compare(existing_cte)
4334 )
4335 ):
4336 # then consider these two CTEs the same
4337 is_new_cte = False
4338 else:
4339 # otherwise these are two CTEs that either will render
4340 # differently, or were indicated separately by the user,
4341 # with the same name
4342 raise exc.CompileError(
4343 "Multiple, unrelated CTEs found with "
4344 "the same name: %r" % cte_name
4345 )
4346
4347 if not asfrom and not is_new_cte:
4348 return None
4349
4350 if cte._cte_alias is not None:
4351 pre_alias_cte = cte._cte_alias
4352 cte_pre_alias_name = cte._cte_alias.name
4353 if isinstance(cte_pre_alias_name, elements._truncated_label):
4354 cte_pre_alias_name = self._truncated_identifier(
4355 "alias", cte_pre_alias_name
4356 )
4357 else:
4358 pre_alias_cte = cte
4359 cte_pre_alias_name = None
4360
4361 if is_new_cte:
4362 self.ctes_by_level_name[cte_level_name] = cte
4363 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4364 cte_opts,
4365 )
4366
4367 if pre_alias_cte not in self.ctes:
4368 self.visit_cte(pre_alias_cte, **kwargs)
4369
4370 if not cte_pre_alias_name and cte not in self_ctes:
4371 if cte.recursive:
4372 self.ctes_recursive = True
4373 text = self.preparer.format_alias(cte, cte_name)
4374 if cte.recursive or cte.element.name_cte_columns:
4375 col_source = cte.element
4376
4377 # TODO: can we get at the .columns_plus_names collection
4378 # that is already (or will be?) generated for the SELECT
4379 # rather than calling twice?
4380 recur_cols = [
4381 # TODO: proxy_name is not technically safe,
4382 # see test_cte->
4383 # test_with_recursive_no_name_currently_buggy. not
4384 # clear what should be done with such a case
4385 fallback_label_name or proxy_name
4386 for (
4387 _,
4388 proxy_name,
4389 fallback_label_name,
4390 c,
4391 repeated,
4392 ) in (col_source._generate_columns_plus_names(True))
4393 if not repeated
4394 ]
4395
4396 text += "(%s)" % (
4397 ", ".join(
4398 self.preparer.format_label_name(
4399 ident, anon_map=self.anon_map
4400 )
4401 for ident in recur_cols
4402 )
4403 )
4404
4405 assert kwargs.get("subquery", False) is False
4406
4407 if not self.stack:
4408 # toplevel, this is a stringify of the
4409 # cte directly. just compile the inner
4410 # the way alias() does.
4411 return cte.element._compiler_dispatch(
4412 self, asfrom=asfrom, **kwargs
4413 )
4414 else:
4415 prefixes = self._generate_prefixes(
4416 cte, cte._prefixes, **kwargs
4417 )
4418 inner = cte.element._compiler_dispatch(
4419 self, asfrom=True, **kwargs
4420 )
4421
4422 text += " AS %s\n(%s)" % (prefixes, inner)
4423
4424 if cte._suffixes:
4425 text += " " + self._generate_prefixes(
4426 cte, cte._suffixes, **kwargs
4427 )
4428
4429 self_ctes[cte] = text
4430
4431 if asfrom:
4432 if from_linter:
4433 from_linter.froms[cte._de_clone()] = cte_name
4434
4435 if not is_new_cte and embedded_in_current_named_cte:
4436 return self.preparer.format_alias(cte, cte_name)
4437
4438 if cte_pre_alias_name:
4439 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4440 if self.preparer._requires_quotes(cte_name):
4441 cte_name = self.preparer.quote(cte_name)
4442 text += self.get_render_as_alias_suffix(cte_name)
4443 return text # type: ignore[no-any-return]
4444 else:
4445 return self.preparer.format_alias(cte, cte_name)
4446
4447 return None
4448
4449 def visit_table_valued_alias(self, element, **kw):
4450 if element.joins_implicitly:
4451 kw["from_linter"] = None
4452 if element._is_lateral:
4453 return self.visit_lateral(element, **kw)
4454 else:
4455 return self.visit_alias(element, **kw)
4456
4457 def visit_table_valued_column(self, element, **kw):
4458 return self.visit_column(element, **kw)
4459
4460 def visit_alias(
4461 self,
4462 alias,
4463 asfrom=False,
4464 ashint=False,
4465 iscrud=False,
4466 fromhints=None,
4467 subquery=False,
4468 lateral=False,
4469 enclosing_alias=None,
4470 from_linter=None,
4471 within_tstring=False,
4472 **kwargs,
4473 ):
4474 if lateral:
4475 if "enclosing_lateral" not in kwargs:
4476 # if lateral is set and enclosing_lateral is not
4477 # present, we assume we are being called directly
4478 # from visit_lateral() and we need to set enclosing_lateral.
4479 assert alias._is_lateral
4480 kwargs["enclosing_lateral"] = alias
4481
4482 # for lateral objects, we track a second from_linter that is...
4483 # lateral! to the level above us.
4484 if (
4485 from_linter
4486 and "lateral_from_linter" not in kwargs
4487 and "enclosing_lateral" in kwargs
4488 ):
4489 kwargs["lateral_from_linter"] = from_linter
4490
4491 if enclosing_alias is not None and enclosing_alias.element is alias:
4492 inner = alias.element._compiler_dispatch(
4493 self,
4494 asfrom=asfrom,
4495 ashint=ashint,
4496 iscrud=iscrud,
4497 fromhints=fromhints,
4498 lateral=lateral,
4499 enclosing_alias=alias,
4500 **kwargs,
4501 )
4502 if subquery and (asfrom or lateral):
4503 inner = "(%s)" % (inner,)
4504 return inner
4505 else:
4506 kwargs["enclosing_alias"] = alias
4507
4508 if asfrom or ashint or within_tstring:
4509 if isinstance(alias.name, elements._truncated_label):
4510 alias_name = self._truncated_identifier("alias", alias.name)
4511 else:
4512 alias_name = alias.name
4513
4514 if ashint:
4515 return self.preparer.format_alias(alias, alias_name)
4516 elif asfrom or within_tstring:
4517 if from_linter:
4518 from_linter.froms[alias._de_clone()] = alias_name
4519
4520 inner = alias.element._compiler_dispatch(
4521 self, asfrom=True, lateral=lateral, **kwargs
4522 )
4523 if subquery:
4524 inner = "(%s)" % (inner,)
4525
4526 ret = inner + self.get_render_as_alias_suffix(
4527 self.preparer.format_alias(alias, alias_name)
4528 )
4529
4530 if alias._supports_derived_columns and alias._render_derived:
4531 ret += "(%s)" % (
4532 ", ".join(
4533 "%s%s"
4534 % (
4535 self.preparer.quote(col.name),
4536 (
4537 " %s"
4538 % self.dialect.type_compiler_instance.process(
4539 col.type, **kwargs
4540 )
4541 if alias._render_derived_w_types
4542 else ""
4543 ),
4544 )
4545 for col in alias.c
4546 )
4547 )
4548
4549 if fromhints and alias in fromhints:
4550 ret = self.format_from_hint_text(
4551 ret, alias, fromhints[alias], iscrud
4552 )
4553
4554 return ret
4555 else:
4556 # note we cancel the "subquery" flag here as well
4557 return alias.element._compiler_dispatch(
4558 self, lateral=lateral, **kwargs
4559 )
4560
4561 def visit_subquery(self, subquery, **kw):
4562 kw["subquery"] = True
4563 return self.visit_alias(subquery, **kw)
4564
4565 def visit_lateral(self, lateral_, **kw):
4566 kw["lateral"] = True
4567 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4568
4569 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4570 text = "%s TABLESAMPLE %s" % (
4571 self.visit_alias(tablesample, asfrom=True, **kw),
4572 tablesample._get_method()._compiler_dispatch(self, **kw),
4573 )
4574
4575 if tablesample.seed is not None:
4576 text += " REPEATABLE (%s)" % (
4577 tablesample.seed._compiler_dispatch(self, **kw)
4578 )
4579
4580 return text
4581
4582 def _render_values(self, element, **kw):
4583 kw.setdefault("literal_binds", element.literal_binds)
4584 tuples = ", ".join(
4585 self.process(
4586 elements.Tuple(
4587 types=element._column_types, *elem
4588 ).self_group(),
4589 **kw,
4590 )
4591 for chunk in element._data
4592 for elem in chunk
4593 )
4594 return f"VALUES {tuples}"
4595
4596 def visit_values(
4597 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4598 ):
4599
4600 if element._independent_ctes:
4601 self._dispatch_independent_ctes(element, kw)
4602
4603 v = self._render_values(element, **kw)
4604
4605 if element._unnamed:
4606 name = None
4607 elif isinstance(element.name, elements._truncated_label):
4608 name = self._truncated_identifier("values", element.name)
4609 else:
4610 name = element.name
4611
4612 if element._is_lateral:
4613 lateral = "LATERAL "
4614 else:
4615 lateral = ""
4616
4617 if asfrom:
4618 if from_linter:
4619 from_linter.froms[element._de_clone()] = (
4620 name if name is not None else "(unnamed VALUES element)"
4621 )
4622
4623 if visiting_cte is not None and visiting_cte.element is element:
4624 if element._is_lateral:
4625 raise exc.CompileError(
4626 "Can't use a LATERAL VALUES expression inside of a CTE"
4627 )
4628 elif name:
4629 kw["include_table"] = False
4630 v = "%s(%s)%s (%s)" % (
4631 lateral,
4632 v,
4633 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4634 (
4635 ", ".join(
4636 c._compiler_dispatch(self, **kw)
4637 for c in element.columns
4638 )
4639 ),
4640 )
4641 else:
4642 v = "%s(%s)" % (lateral, v)
4643 return v
4644
4645 def visit_scalar_values(self, element, **kw):
4646 return f"({self._render_values(element, **kw)})"
4647
4648 def get_render_as_alias_suffix(self, alias_name_text):
4649 return " AS " + alias_name_text
4650
4651 def _add_to_result_map(
4652 self,
4653 keyname: str,
4654 name: str,
4655 objects: Tuple[Any, ...],
4656 type_: TypeEngine[Any],
4657 ) -> None:
4658
4659 # note objects must be non-empty for cursor.py to handle the
4660 # collection properly
4661 assert objects
4662
4663 if keyname is None or keyname == "*":
4664 self._ordered_columns = False
4665 self._ad_hoc_textual = True
4666 if type_._is_tuple_type:
4667 raise exc.CompileError(
4668 "Most backends don't support SELECTing "
4669 "from a tuple() object. If this is an ORM query, "
4670 "consider using the Bundle object."
4671 )
4672 self._result_columns.append(
4673 ResultColumnsEntry(keyname, name, objects, type_)
4674 )
4675
4676 def _label_returning_column(
4677 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4678 ):
4679 """Render a column with necessary labels inside of a RETURNING clause.
4680
4681 This method is provided for individual dialects in place of calling
4682 the _label_select_column method directly, so that the two use cases
4683 of RETURNING vs. SELECT can be disambiguated going forward.
4684
4685 .. versionadded:: 1.4.21
4686
4687 """
4688 return self._label_select_column(
4689 None,
4690 column,
4691 populate_result_map,
4692 False,
4693 {} if column_clause_args is None else column_clause_args,
4694 **kw,
4695 )
4696
4697 def _label_select_column(
4698 self,
4699 select,
4700 column,
4701 populate_result_map,
4702 asfrom,
4703 column_clause_args,
4704 name=None,
4705 proxy_name=None,
4706 fallback_label_name=None,
4707 within_columns_clause=True,
4708 column_is_repeated=False,
4709 need_column_expressions=False,
4710 include_table=True,
4711 ):
4712 """produce labeled columns present in a select()."""
4713 impl = column.type.dialect_impl(self.dialect)
4714
4715 if impl._has_column_expression and (
4716 need_column_expressions or populate_result_map
4717 ):
4718 col_expr = impl.column_expression(column)
4719 else:
4720 col_expr = column
4721
4722 if populate_result_map:
4723 # pass an "add_to_result_map" callable into the compilation
4724 # of embedded columns. this collects information about the
4725 # column as it will be fetched in the result and is coordinated
4726 # with cursor.description when the query is executed.
4727 add_to_result_map = self._add_to_result_map
4728
4729 # if the SELECT statement told us this column is a repeat,
4730 # wrap the callable with one that prevents the addition of the
4731 # targets
4732 if column_is_repeated:
4733 _add_to_result_map = add_to_result_map
4734
4735 def add_to_result_map(keyname, name, objects, type_):
4736 _add_to_result_map(keyname, name, (keyname,), type_)
4737
4738 # if we redefined col_expr for type expressions, wrap the
4739 # callable with one that adds the original column to the targets
4740 elif col_expr is not column:
4741 _add_to_result_map = add_to_result_map
4742
4743 def add_to_result_map(keyname, name, objects, type_):
4744 _add_to_result_map(
4745 keyname, name, (column,) + objects, type_
4746 )
4747
4748 else:
4749 add_to_result_map = None
4750
4751 # this method is used by some of the dialects for RETURNING,
4752 # which has different inputs. _label_returning_column was added
4753 # as the better target for this now however for 1.4 we will keep
4754 # _label_select_column directly compatible with this use case.
4755 # these assertions right now set up the current expected inputs
4756 assert within_columns_clause, (
4757 "_label_select_column is only relevant within "
4758 "the columns clause of a SELECT or RETURNING"
4759 )
4760 result_expr: elements.Label[Any] | _CompileLabel
4761
4762 if isinstance(column, elements.Label):
4763 if col_expr is not column:
4764 result_expr = _CompileLabel(
4765 col_expr, column.name, alt_names=(column.element,)
4766 )
4767 else:
4768 result_expr = col_expr
4769
4770 elif name:
4771 # here, _columns_plus_names has determined there's an explicit
4772 # label name we need to use. this is the default for
4773 # tablenames_plus_columnnames as well as when columns are being
4774 # deduplicated on name
4775
4776 assert (
4777 proxy_name is not None
4778 ), "proxy_name is required if 'name' is passed"
4779
4780 result_expr = _CompileLabel(
4781 col_expr,
4782 name,
4783 alt_names=(
4784 proxy_name,
4785 # this is a hack to allow legacy result column lookups
4786 # to work as they did before; this goes away in 2.0.
4787 # TODO: this only seems to be tested indirectly
4788 # via test/orm/test_deprecations.py. should be a
4789 # resultset test for this
4790 column._tq_label,
4791 ),
4792 )
4793 else:
4794 # determine here whether this column should be rendered in
4795 # a labelled context or not, as we were given no required label
4796 # name from the caller. Here we apply heuristics based on the kind
4797 # of SQL expression involved.
4798
4799 if col_expr is not column:
4800 # type-specific expression wrapping the given column,
4801 # so we render a label
4802 render_with_label = True
4803 elif isinstance(column, elements.ColumnClause):
4804 # table-bound column, we render its name as a label if we are
4805 # inside of a subquery only
4806 render_with_label = (
4807 asfrom
4808 and not column.is_literal
4809 and column.table is not None
4810 )
4811 elif isinstance(column, elements.TextClause):
4812 render_with_label = False
4813 elif isinstance(column, elements.UnaryExpression):
4814 # unary expression. notes added as of #12681
4815 #
4816 # By convention, the visit_unary() method
4817 # itself does not add an entry to the result map, and relies
4818 # upon either the inner expression creating a result map
4819 # entry, or if not, by creating a label here that produces
4820 # the result map entry. Where that happens is based on whether
4821 # or not the element immediately inside the unary is a
4822 # NamedColumn subclass or not.
4823 #
4824 # Now, this also impacts how the SELECT is written; if
4825 # we decide to generate a label here, we get the usual
4826 # "~(x+y) AS anon_1" thing in the columns clause. If we
4827 # don't, we don't get an AS at all, we get like
4828 # "~table.column".
4829 #
4830 # But here is the important thing as of modernish (like 1.4)
4831 # versions of SQLAlchemy - **whether or not the AS <label>
4832 # is present in the statement is not actually important**.
4833 # We target result columns **positionally** for a fully
4834 # compiled ``Select()`` object; before 1.4 we needed those
4835 # labels to match in cursor.description etc etc but now it
4836 # really doesn't matter.
4837 # So really, we could set render_with_label True in all cases.
4838 # Or we could just have visit_unary() populate the result map
4839 # in all cases.
4840 #
4841 # What we're doing here is strictly trying to not rock the
4842 # boat too much with when we do/don't render "AS label";
4843 # labels being present helps in the edge cases that we
4844 # "fall back" to named cursor.description matching, labels
4845 # not being present for columns keeps us from having awkward
4846 # phrases like "SELECT DISTINCT table.x AS x".
4847 render_with_label = (
4848 (
4849 # exception case to detect if we render "not boolean"
4850 # as "not <col>" for native boolean or "<col> = 1"
4851 # for non-native boolean. this is controlled by
4852 # visit_is_<true|false>_unary_operator
4853 column.operator
4854 in (operators.is_false, operators.is_true)
4855 and not self.dialect.supports_native_boolean
4856 )
4857 or column._wraps_unnamed_column()
4858 or asfrom
4859 )
4860 elif (
4861 # general class of expressions that don't have a SQL-column
4862 # addressable name. includes scalar selects, bind parameters,
4863 # SQL functions, others
4864 not isinstance(column, elements.NamedColumn)
4865 # deeper check that indicates there's no natural "name" to
4866 # this element, which accommodates for custom SQL constructs
4867 # that might have a ".name" attribute (but aren't SQL
4868 # functions) but are not implementing this more recently added
4869 # base class. in theory the "NamedColumn" check should be
4870 # enough, however here we seek to maintain legacy behaviors
4871 # as well.
4872 and column._non_anon_label is None
4873 ):
4874 render_with_label = True
4875 else:
4876 render_with_label = False
4877
4878 if render_with_label:
4879 if not fallback_label_name:
4880 # used by the RETURNING case right now. we generate it
4881 # here as 3rd party dialects may be referring to
4882 # _label_select_column method directly instead of the
4883 # just-added _label_returning_column method
4884 assert not column_is_repeated
4885 fallback_label_name = column._anon_name_label
4886
4887 fallback_label_name = (
4888 elements._truncated_label(fallback_label_name)
4889 if not isinstance(
4890 fallback_label_name, elements._truncated_label
4891 )
4892 else fallback_label_name
4893 )
4894
4895 result_expr = _CompileLabel(
4896 col_expr, fallback_label_name, alt_names=(proxy_name,)
4897 )
4898 else:
4899 result_expr = col_expr
4900
4901 column_clause_args.update(
4902 within_columns_clause=within_columns_clause,
4903 add_to_result_map=add_to_result_map,
4904 include_table=include_table,
4905 within_tstring=False,
4906 )
4907 return result_expr._compiler_dispatch(self, **column_clause_args)
4908
4909 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4910 hinttext = self.get_from_hint_text(table, hint)
4911 if hinttext:
4912 sqltext += " " + hinttext
4913 return sqltext
4914
4915 def get_select_hint_text(self, byfroms):
4916 return None
4917
4918 def get_from_hint_text(
4919 self, table: FromClause, text: Optional[str]
4920 ) -> Optional[str]:
4921 return None
4922
4923 def get_crud_hint_text(self, table, text):
4924 return None
4925
4926 def get_statement_hint_text(self, hint_texts):
4927 return " ".join(hint_texts)
4928
4929 _default_stack_entry: _CompilerStackEntry
4930
4931 if not typing.TYPE_CHECKING:
4932 _default_stack_entry = util.immutabledict(
4933 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4934 )
4935
4936 def _display_froms_for_select(
4937 self, select_stmt, asfrom, lateral=False, **kw
4938 ):
4939 # utility method to help external dialects
4940 # get the correct from list for a select.
4941 # specifically the oracle dialect needs this feature
4942 # right now.
4943 toplevel = not self.stack
4944 entry = self._default_stack_entry if toplevel else self.stack[-1]
4945
4946 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4947
4948 correlate_froms = entry["correlate_froms"]
4949 asfrom_froms = entry["asfrom_froms"]
4950
4951 if asfrom and not lateral:
4952 froms = compile_state._get_display_froms(
4953 explicit_correlate_froms=correlate_froms.difference(
4954 asfrom_froms
4955 ),
4956 implicit_correlate_froms=(),
4957 )
4958 else:
4959 froms = compile_state._get_display_froms(
4960 explicit_correlate_froms=correlate_froms,
4961 implicit_correlate_froms=asfrom_froms,
4962 )
4963 return froms
4964
4965 translate_select_structure: Any = None
4966 """if not ``None``, should be a callable which accepts ``(select_stmt,
4967 **kw)`` and returns a select object. this is used for structural changes
4968 mostly to accommodate for LIMIT/OFFSET schemes
4969
4970 """
4971
4972 def visit_select(
4973 self,
4974 select_stmt,
4975 asfrom=False,
4976 insert_into=False,
4977 fromhints=None,
4978 compound_index=None,
4979 select_wraps_for=None,
4980 lateral=False,
4981 from_linter=None,
4982 **kwargs,
4983 ):
4984 assert select_wraps_for is None, (
4985 "SQLAlchemy 1.4 requires use of "
4986 "the translate_select_structure hook for structural "
4987 "translations of SELECT objects"
4988 )
4989 if self._collect_params:
4990 self._add_to_params(select_stmt)
4991
4992 # initial setup of SELECT. the compile_state_factory may now
4993 # be creating a totally different SELECT from the one that was
4994 # passed in. for ORM use this will convert from an ORM-state
4995 # SELECT to a regular "Core" SELECT. other composed operations
4996 # such as computation of joins will be performed.
4997
4998 kwargs["within_columns_clause"] = False
4999
5000 compile_state = select_stmt._compile_state_factory(
5001 select_stmt, self, **kwargs
5002 )
5003 kwargs["ambiguous_table_name_map"] = (
5004 compile_state._ambiguous_table_name_map
5005 )
5006
5007 select_stmt = compile_state.statement
5008
5009 toplevel = not self.stack
5010
5011 if toplevel and not self.compile_state:
5012 self.compile_state = compile_state
5013
5014 is_embedded_select = compound_index is not None or insert_into
5015
5016 # translate step for Oracle, SQL Server which often need to
5017 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
5018 # other conditions
5019 if self.translate_select_structure:
5020 new_select_stmt = self.translate_select_structure(
5021 select_stmt, asfrom=asfrom, **kwargs
5022 )
5023
5024 # if SELECT was restructured, maintain a link to the originals
5025 # and assemble a new compile state
5026 if new_select_stmt is not select_stmt:
5027 compile_state_wraps_for = compile_state
5028 select_wraps_for = select_stmt
5029 select_stmt = new_select_stmt
5030
5031 compile_state = select_stmt._compile_state_factory(
5032 select_stmt, self, **kwargs
5033 )
5034 select_stmt = compile_state.statement
5035
5036 entry = self._default_stack_entry if toplevel else self.stack[-1]
5037
5038 populate_result_map = need_column_expressions = (
5039 toplevel
5040 or entry.get("need_result_map_for_compound", False)
5041 or entry.get("need_result_map_for_nested", False)
5042 )
5043
5044 # indicates there is a CompoundSelect in play and we are not the
5045 # first select
5046 if compound_index:
5047 populate_result_map = False
5048
5049 # this was first proposed as part of #3372; however, it is not
5050 # reached in current tests and could possibly be an assertion
5051 # instead.
5052 if not populate_result_map and "add_to_result_map" in kwargs:
5053 del kwargs["add_to_result_map"]
5054
5055 froms = self._setup_select_stack(
5056 select_stmt, compile_state, entry, asfrom, lateral, compound_index
5057 )
5058
5059 column_clause_args = kwargs.copy()
5060 column_clause_args.update(
5061 {"within_label_clause": False, "within_columns_clause": False}
5062 )
5063
5064 text = "SELECT " # we're off to a good start !
5065
5066 if select_stmt._post_select_clause is not None:
5067 psc = self.process(select_stmt._post_select_clause, **kwargs)
5068 if psc is not None:
5069 text += psc + " "
5070
5071 if select_stmt._hints:
5072 hint_text, byfrom = self._setup_select_hints(select_stmt)
5073 if hint_text:
5074 text += hint_text + " "
5075 else:
5076 byfrom = None
5077
5078 if select_stmt._independent_ctes:
5079 self._dispatch_independent_ctes(select_stmt, kwargs)
5080
5081 if select_stmt._prefixes:
5082 text += self._generate_prefixes(
5083 select_stmt, select_stmt._prefixes, **kwargs
5084 )
5085
5086 text += self.get_select_precolumns(select_stmt, **kwargs)
5087
5088 if select_stmt._pre_columns_clause is not None:
5089 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5090 if pcc is not None:
5091 text += pcc + " "
5092
5093 # the actual list of columns to print in the SELECT column list.
5094 inner_columns = [
5095 c
5096 for c in [
5097 self._label_select_column(
5098 select_stmt,
5099 column,
5100 populate_result_map,
5101 asfrom,
5102 column_clause_args,
5103 name=name,
5104 proxy_name=proxy_name,
5105 fallback_label_name=fallback_label_name,
5106 column_is_repeated=repeated,
5107 need_column_expressions=need_column_expressions,
5108 )
5109 for (
5110 name,
5111 proxy_name,
5112 fallback_label_name,
5113 column,
5114 repeated,
5115 ) in compile_state.columns_plus_names
5116 ]
5117 if c is not None
5118 ]
5119
5120 if populate_result_map and select_wraps_for is not None:
5121 # if this select was generated from translate_select,
5122 # rewrite the targeted columns in the result map
5123
5124 translate = dict(
5125 zip(
5126 [
5127 name
5128 for (
5129 key,
5130 proxy_name,
5131 fallback_label_name,
5132 name,
5133 repeated,
5134 ) in compile_state.columns_plus_names
5135 ],
5136 [
5137 name
5138 for (
5139 key,
5140 proxy_name,
5141 fallback_label_name,
5142 name,
5143 repeated,
5144 ) in compile_state_wraps_for.columns_plus_names
5145 ],
5146 )
5147 )
5148
5149 self._result_columns = [
5150 ResultColumnsEntry(
5151 key, name, tuple(translate.get(o, o) for o in obj), type_
5152 )
5153 for key, name, obj, type_ in self._result_columns
5154 ]
5155
5156 text = self._compose_select_body(
5157 text,
5158 select_stmt,
5159 compile_state,
5160 inner_columns,
5161 froms,
5162 byfrom,
5163 toplevel,
5164 kwargs,
5165 )
5166
5167 if select_stmt._post_body_clause is not None:
5168 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5169 if pbc:
5170 text += " " + pbc
5171
5172 if select_stmt._statement_hints:
5173 per_dialect = [
5174 ht
5175 for (dialect_name, ht) in select_stmt._statement_hints
5176 if dialect_name in ("*", self.dialect.name)
5177 ]
5178 if per_dialect:
5179 text += " " + self.get_statement_hint_text(per_dialect)
5180
5181 # In compound query, CTEs are shared at the compound level
5182 if self.ctes and (not is_embedded_select or toplevel):
5183 nesting_level = len(self.stack) if not toplevel else None
5184 text = self._render_cte_clause(nesting_level=nesting_level) + text
5185
5186 if select_stmt._suffixes:
5187 text += " " + self._generate_prefixes(
5188 select_stmt, select_stmt._suffixes, **kwargs
5189 )
5190
5191 self.stack.pop(-1)
5192
5193 return text
5194
5195 def _setup_select_hints(
5196 self, select: Select[Unpack[TupleAny]]
5197 ) -> Tuple[str, _FromHintsType]:
5198 byfrom = {
5199 from_: hinttext
5200 % {"name": from_._compiler_dispatch(self, ashint=True)}
5201 for (from_, dialect), hinttext in select._hints.items()
5202 if dialect in ("*", self.dialect.name)
5203 }
5204 hint_text = self.get_select_hint_text(byfrom)
5205 return hint_text, byfrom
5206
5207 def _setup_select_stack(
5208 self, select, compile_state, entry, asfrom, lateral, compound_index
5209 ):
5210 correlate_froms = entry["correlate_froms"]
5211 asfrom_froms = entry["asfrom_froms"]
5212
5213 if compound_index == 0:
5214 entry["select_0"] = select
5215 elif compound_index:
5216 select_0 = entry["select_0"]
5217 numcols = len(select_0._all_selected_columns)
5218
5219 if len(compile_state.columns_plus_names) != numcols:
5220 raise exc.CompileError(
5221 "All selectables passed to "
5222 "CompoundSelect must have identical numbers of "
5223 "columns; select #%d has %d columns, select "
5224 "#%d has %d"
5225 % (
5226 1,
5227 numcols,
5228 compound_index + 1,
5229 len(select._all_selected_columns),
5230 )
5231 )
5232
5233 if asfrom and not lateral:
5234 froms = compile_state._get_display_froms(
5235 explicit_correlate_froms=correlate_froms.difference(
5236 asfrom_froms
5237 ),
5238 implicit_correlate_froms=(),
5239 )
5240 else:
5241 froms = compile_state._get_display_froms(
5242 explicit_correlate_froms=correlate_froms,
5243 implicit_correlate_froms=asfrom_froms,
5244 )
5245
5246 new_correlate_froms = set(_from_objects(*froms))
5247 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5248
5249 new_entry: _CompilerStackEntry = {
5250 "asfrom_froms": new_correlate_froms,
5251 "correlate_froms": all_correlate_froms,
5252 "selectable": select,
5253 "compile_state": compile_state,
5254 }
5255 self.stack.append(new_entry)
5256
5257 return froms
5258
5259 def _compose_select_body(
5260 self,
5261 text,
5262 select,
5263 compile_state,
5264 inner_columns,
5265 froms,
5266 byfrom,
5267 toplevel,
5268 kwargs,
5269 ):
5270 text += ", ".join(inner_columns)
5271
5272 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5273 from_linter = FromLinter({}, set())
5274 warn_linting = self.linting & WARN_LINTING
5275 if toplevel:
5276 self.from_linter = from_linter
5277 else:
5278 from_linter = None
5279 warn_linting = False
5280
5281 # adjust the whitespace for no inner columns, part of #9440,
5282 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5283 # "SELECT FROM ...".
5284 # while it would be better to have built the SELECT starting string
5285 # without trailing whitespace first, then add whitespace only if inner
5286 # cols were present, this breaks compatibility with various custom
5287 # compilation schemes that are currently being tested.
5288 if not inner_columns:
5289 text = text.rstrip()
5290
5291 if froms:
5292 text += " \nFROM "
5293
5294 if select._hints:
5295 text += ", ".join(
5296 [
5297 f._compiler_dispatch(
5298 self,
5299 asfrom=True,
5300 fromhints=byfrom,
5301 from_linter=from_linter,
5302 **kwargs,
5303 )
5304 for f in froms
5305 ]
5306 )
5307 else:
5308 text += ", ".join(
5309 [
5310 f._compiler_dispatch(
5311 self,
5312 asfrom=True,
5313 from_linter=from_linter,
5314 **kwargs,
5315 )
5316 for f in froms
5317 ]
5318 )
5319 else:
5320 text += self.default_from()
5321
5322 if select._where_criteria:
5323 t = self._generate_delimited_and_list(
5324 select._where_criteria, from_linter=from_linter, **kwargs
5325 )
5326 if t:
5327 text += " \nWHERE " + t
5328
5329 if warn_linting:
5330 assert from_linter is not None
5331 from_linter.warn()
5332
5333 if select._group_by_clauses:
5334 text += self.group_by_clause(select, **kwargs)
5335
5336 if select._having_criteria:
5337 t = self._generate_delimited_and_list(
5338 select._having_criteria, **kwargs
5339 )
5340 if t:
5341 text += " \nHAVING " + t
5342
5343 if select._post_criteria_clause is not None:
5344 pcc = self.process(select._post_criteria_clause, **kwargs)
5345 if pcc is not None:
5346 text += " \n" + pcc
5347
5348 if select._order_by_clauses:
5349 text += self.order_by_clause(select, **kwargs)
5350
5351 if select._has_row_limiting_clause:
5352 text += self._row_limit_clause(select, **kwargs)
5353
5354 if select._for_update_arg is not None:
5355 text += self.for_update_clause(select, **kwargs)
5356
5357 return text
5358
5359 def _generate_prefixes(self, stmt, prefixes, **kw):
5360 clause = " ".join(
5361 prefix._compiler_dispatch(self, **kw)
5362 for prefix, dialect_name in prefixes
5363 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5364 )
5365 if clause:
5366 clause += " "
5367 return clause
5368
5369 def _render_cte_clause(
5370 self,
5371 nesting_level=None,
5372 include_following_stack=False,
5373 ):
5374 """
5375 include_following_stack
5376 Also render the nesting CTEs on the next stack. Useful for
5377 SQL structures like UNION or INSERT that can wrap SELECT
5378 statements containing nesting CTEs.
5379 """
5380 if not self.ctes:
5381 return ""
5382
5383 ctes: MutableMapping[CTE, str]
5384
5385 if nesting_level and nesting_level > 1:
5386 ctes = util.OrderedDict()
5387 for cte in list(self.ctes.keys()):
5388 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5389 cte._get_reference_cte()
5390 ]
5391 nesting = cte.nesting or cte_opts.nesting
5392 is_rendered_level = cte_level == nesting_level or (
5393 include_following_stack and cte_level == nesting_level + 1
5394 )
5395 if not (nesting and is_rendered_level):
5396 continue
5397
5398 ctes[cte] = self.ctes[cte]
5399
5400 else:
5401 ctes = self.ctes
5402
5403 if not ctes:
5404 return ""
5405 ctes_recursive = any([cte.recursive for cte in ctes])
5406
5407 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5408 cte_text += ", \n".join([txt for txt in ctes.values()])
5409 cte_text += "\n "
5410
5411 if nesting_level and nesting_level > 1:
5412 for cte in list(ctes.keys()):
5413 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5414 cte._get_reference_cte()
5415 ]
5416 del self.ctes[cte]
5417 del self.ctes_by_level_name[(cte_level, cte_name)]
5418 del self.level_name_by_cte[cte._get_reference_cte()]
5419
5420 return cte_text
5421
5422 def get_cte_preamble(self, recursive):
5423 if recursive:
5424 return "WITH RECURSIVE"
5425 else:
5426 return "WITH"
5427
5428 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5429 """Called when building a ``SELECT`` statement, position is just
5430 before column list.
5431
5432 """
5433 if select._distinct_on:
5434 util.warn_deprecated(
5435 "DISTINCT ON is currently supported only by the PostgreSQL "
5436 "dialect. Use of DISTINCT ON for other backends is currently "
5437 "silently ignored, however this usage is deprecated, and will "
5438 "raise CompileError in a future release for all backends "
5439 "that do not support this syntax.",
5440 version="1.4",
5441 )
5442 return "DISTINCT " if select._distinct else ""
5443
5444 def group_by_clause(self, select, **kw):
5445 """allow dialects to customize how GROUP BY is rendered."""
5446
5447 group_by = self._generate_delimited_list(
5448 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5449 )
5450 if group_by:
5451 return " GROUP BY " + group_by
5452 else:
5453 return ""
5454
5455 def order_by_clause(self, select, **kw):
5456 """allow dialects to customize how ORDER BY is rendered."""
5457
5458 order_by = self._generate_delimited_list(
5459 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5460 )
5461
5462 if order_by:
5463 return " ORDER BY " + order_by
5464 else:
5465 return ""
5466
5467 def for_update_clause(self, select, **kw):
5468 return " FOR UPDATE"
5469
5470 def returning_clause(
5471 self,
5472 stmt: UpdateBase,
5473 returning_cols: Sequence[_ColumnsClauseElement],
5474 *,
5475 populate_result_map: bool,
5476 **kw: Any,
5477 ) -> str:
5478 columns = [
5479 self._label_returning_column(
5480 stmt,
5481 column,
5482 populate_result_map,
5483 fallback_label_name=fallback_label_name,
5484 column_is_repeated=repeated,
5485 name=name,
5486 proxy_name=proxy_name,
5487 **kw,
5488 )
5489 for (
5490 name,
5491 proxy_name,
5492 fallback_label_name,
5493 column,
5494 repeated,
5495 ) in stmt._generate_columns_plus_names(
5496 True, cols=base._select_iterables(returning_cols)
5497 )
5498 ]
5499
5500 return "RETURNING " + ", ".join(columns)
5501
5502 def limit_clause(self, select, **kw):
5503 text = ""
5504 if select._limit_clause is not None:
5505 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5506 if select._offset_clause is not None:
5507 if select._limit_clause is None:
5508 text += "\n LIMIT -1"
5509 text += " OFFSET " + self.process(select._offset_clause, **kw)
5510 return text
5511
5512 def fetch_clause(
5513 self,
5514 select,
5515 fetch_clause=None,
5516 require_offset=False,
5517 use_literal_execute_for_simple_int=False,
5518 **kw,
5519 ):
5520 if fetch_clause is None:
5521 fetch_clause = select._fetch_clause
5522 fetch_clause_options = select._fetch_clause_options
5523 else:
5524 fetch_clause_options = {"percent": False, "with_ties": False}
5525
5526 text = ""
5527
5528 if select._offset_clause is not None:
5529 offset_clause = select._offset_clause
5530 if (
5531 use_literal_execute_for_simple_int
5532 and select._simple_int_clause(offset_clause)
5533 ):
5534 offset_clause = offset_clause.render_literal_execute()
5535 offset_str = self.process(offset_clause, **kw)
5536 text += "\n OFFSET %s ROWS" % offset_str
5537 elif require_offset:
5538 text += "\n OFFSET 0 ROWS"
5539
5540 if fetch_clause is not None:
5541 if (
5542 use_literal_execute_for_simple_int
5543 and select._simple_int_clause(fetch_clause)
5544 ):
5545 fetch_clause = fetch_clause.render_literal_execute()
5546 text += "\n FETCH FIRST %s%s ROWS %s" % (
5547 self.process(fetch_clause, **kw),
5548 " PERCENT" if fetch_clause_options["percent"] else "",
5549 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5550 )
5551 return text
5552
5553 def visit_table(
5554 self,
5555 table,
5556 asfrom=False,
5557 iscrud=False,
5558 ashint=False,
5559 fromhints=None,
5560 use_schema=True,
5561 from_linter=None,
5562 ambiguous_table_name_map=None,
5563 enclosing_alias=None,
5564 within_tstring=False,
5565 **kwargs,
5566 ):
5567 if from_linter:
5568 from_linter.froms[table] = table.fullname
5569
5570 if asfrom or ashint or within_tstring:
5571 effective_schema = self.preparer.schema_for_object(table)
5572
5573 if use_schema and effective_schema:
5574 ret = (
5575 self.preparer.quote_schema(effective_schema)
5576 + "."
5577 + self.preparer.quote(table.name)
5578 )
5579 else:
5580 ret = self.preparer.quote(table.name)
5581
5582 if (
5583 (
5584 enclosing_alias is None
5585 or enclosing_alias.element is not table
5586 )
5587 and not effective_schema
5588 and ambiguous_table_name_map
5589 and table.name in ambiguous_table_name_map
5590 ):
5591 anon_name = self._truncated_identifier(
5592 "alias", ambiguous_table_name_map[table.name]
5593 )
5594
5595 ret = ret + self.get_render_as_alias_suffix(
5596 self.preparer.format_alias(None, anon_name)
5597 )
5598
5599 if fromhints and table in fromhints:
5600 ret = self.format_from_hint_text(
5601 ret, table, fromhints[table], iscrud
5602 )
5603 return ret
5604 else:
5605 return ""
5606
5607 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5608 if from_linter:
5609 from_linter.edges.update(
5610 itertools.product(
5611 _de_clone(join.left._from_objects),
5612 _de_clone(join.right._from_objects),
5613 )
5614 )
5615
5616 if join.full:
5617 join_type = " FULL OUTER JOIN "
5618 elif join.isouter:
5619 join_type = " LEFT OUTER JOIN "
5620 else:
5621 join_type = " JOIN "
5622 return (
5623 join.left._compiler_dispatch(
5624 self, asfrom=True, from_linter=from_linter, **kwargs
5625 )
5626 + join_type
5627 + join.right._compiler_dispatch(
5628 self, asfrom=True, from_linter=from_linter, **kwargs
5629 )
5630 + " ON "
5631 # TODO: likely need asfrom=True here?
5632 + join.onclause._compiler_dispatch(
5633 self, from_linter=from_linter, **kwargs
5634 )
5635 )
5636
5637 def _setup_crud_hints(self, stmt, table_text):
5638 dialect_hints = {
5639 table: hint_text
5640 for (table, dialect), hint_text in stmt._hints.items()
5641 if dialect in ("*", self.dialect.name)
5642 }
5643 if stmt.table in dialect_hints:
5644 table_text = self.format_from_hint_text(
5645 table_text, stmt.table, dialect_hints[stmt.table], True
5646 )
5647 return dialect_hints, table_text
5648
5649 # within the realm of "insertmanyvalues sentinel columns",
5650 # these lookups match different kinds of Column() configurations
5651 # to specific backend capabilities. they are broken into two
5652 # lookups, one for autoincrement columns and the other for non
5653 # autoincrement columns
5654 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5655 {
5656 _SentinelDefaultCharacterization.CLIENTSIDE: (
5657 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5658 ),
5659 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5660 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5661 ),
5662 _SentinelDefaultCharacterization.NONE: (
5663 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5664 ),
5665 _SentinelDefaultCharacterization.IDENTITY: (
5666 InsertmanyvaluesSentinelOpts.IDENTITY
5667 ),
5668 _SentinelDefaultCharacterization.SEQUENCE: (
5669 InsertmanyvaluesSentinelOpts.SEQUENCE
5670 ),
5671 _SentinelDefaultCharacterization.MONOTONIC_FUNCTION: (
5672 InsertmanyvaluesSentinelOpts.MONOTONIC_FUNCTION
5673 ),
5674 }
5675 )
5676 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5677 {
5678 _SentinelDefaultCharacterization.NONE: (
5679 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5680 ),
5681 }
5682 )
5683
5684 def _get_sentinel_column_for_table(
5685 self, table: Table
5686 ) -> Optional[Sequence[Column[Any]]]:
5687 """given a :class:`.Table`, return a usable sentinel column or
5688 columns for this dialect if any.
5689
5690 Return None if no sentinel columns could be identified, or raise an
5691 error if a column was marked as a sentinel explicitly but isn't
5692 compatible with this dialect.
5693
5694 """
5695
5696 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5697 sentinel_characteristics = table._sentinel_column_characteristics
5698
5699 sent_cols = sentinel_characteristics.columns
5700
5701 if sent_cols is None:
5702 return None
5703
5704 if sentinel_characteristics.is_autoinc:
5705 bitmask = self._sentinel_col_autoinc_lookup.get(
5706 sentinel_characteristics.default_characterization, 0
5707 )
5708 else:
5709 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5710 sentinel_characteristics.default_characterization, 0
5711 )
5712
5713 if sentinel_opts & bitmask:
5714 return sent_cols
5715
5716 if sentinel_characteristics.is_explicit:
5717 # a column was explicitly marked as insert_sentinel=True,
5718 # however it is not compatible with this dialect. they should
5719 # not indicate this column as a sentinel if they need to include
5720 # this dialect.
5721
5722 # TODO: do we want non-primary key explicit sentinel cols
5723 # that can gracefully degrade for some backends?
5724 # insert_sentinel="degrade" perhaps. not for the initial release.
5725 # I am hoping people are generally not dealing with this sentinel
5726 # business at all.
5727
5728 # if is_explicit is True, there will be only one sentinel column.
5729
5730 raise exc.InvalidRequestError(
5731 f"Column {sent_cols[0]} can't be explicitly "
5732 "marked as a sentinel column when using the "
5733 f"{self.dialect.name} dialect, as the "
5734 "particular type of default generation on this column is "
5735 "not currently compatible with this dialect's specific "
5736 f"INSERT..RETURNING syntax which can receive the "
5737 "server-generated value in "
5738 "a deterministic way. To remove this error, remove "
5739 "insert_sentinel=True from primary key autoincrement "
5740 "columns; these columns are automatically used as "
5741 "sentinels for supported dialects in any case."
5742 )
5743
5744 return None
5745
5746 def _deliver_insertmanyvalues_batches(
5747 self,
5748 statement: str,
5749 parameters: _DBAPIMultiExecuteParams,
5750 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5751 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5752 batch_size: int,
5753 sort_by_parameter_order: bool,
5754 schema_translate_map: Optional[SchemaTranslateMapType],
5755 ) -> Iterator[_InsertManyValuesBatch]:
5756 imv = self._insertmanyvalues
5757 assert imv is not None
5758
5759 if not imv.sentinel_param_keys:
5760 _sentinel_from_params = None
5761 else:
5762 _sentinel_from_params = operator.itemgetter(
5763 *imv.sentinel_param_keys
5764 )
5765
5766 lenparams = len(parameters)
5767 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5768 # backend doesn't support
5769 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5770 # at the moment this is basically SQL Server due to
5771 # not being able to use DEFAULT for identity column
5772 # just yield out that many single statements! still
5773 # faster than a whole connection.execute() call ;)
5774 #
5775 # note we still are taking advantage of the fact that we know
5776 # we are using RETURNING. The generalized approach of fetching
5777 # cursor.lastrowid etc. still goes through the more heavyweight
5778 # "ExecutionContext per statement" system as it isn't usable
5779 # as a generic "RETURNING" approach
5780 use_row_at_a_time = True
5781 downgraded = False
5782 elif not self.dialect.supports_multivalues_insert or (
5783 sort_by_parameter_order
5784 and self._result_columns
5785 and (
5786 imv.sentinel_columns is None
5787 or (
5788 imv.includes_upsert_behaviors
5789 and not imv.embed_values_counter
5790 )
5791 )
5792 ):
5793 # deterministic order was requested and the compiler could
5794 # not organize sentinel columns for this dialect/statement.
5795 # use row at a time. Note: if embed_values_counter is True,
5796 # the counter itself provides the ordering capability we need,
5797 # so we can use batch mode even with upsert behaviors.
5798 use_row_at_a_time = True
5799 downgraded = True
5800 elif (
5801 imv.has_upsert_bound_parameters
5802 and not imv.embed_values_counter
5803 and self._result_columns
5804 ):
5805 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
5806 # and parametrized bindparams in the SET clause, we must use
5807 # row-at-a-time. Batching multiple rows in a single statement
5808 # doesn't work when the SET clause contains bound parameters that
5809 # will receive different values per row, as there's only one SET
5810 # clause per statement. See issue #13130.
5811 use_row_at_a_time = True
5812 downgraded = True
5813 else:
5814 use_row_at_a_time = False
5815 downgraded = False
5816
5817 if use_row_at_a_time:
5818 for batchnum, (param, compiled_param) in enumerate(
5819 cast(
5820 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5821 zip(parameters, compiled_parameters),
5822 ),
5823 1,
5824 ):
5825 yield _InsertManyValuesBatch(
5826 statement,
5827 param,
5828 generic_setinputsizes,
5829 [param],
5830 (
5831 [_sentinel_from_params(compiled_param)]
5832 if _sentinel_from_params
5833 else []
5834 ),
5835 1,
5836 batchnum,
5837 lenparams,
5838 sort_by_parameter_order,
5839 downgraded,
5840 )
5841 return
5842
5843 if schema_translate_map:
5844 rst = functools.partial(
5845 self.preparer._render_schema_translates,
5846 schema_translate_map=schema_translate_map,
5847 )
5848 else:
5849 rst = None
5850
5851 imv_single_values_expr = imv.single_values_expr
5852 if rst:
5853 imv_single_values_expr = rst(imv_single_values_expr)
5854
5855 executemany_values = f"({imv_single_values_expr})"
5856 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5857
5858 # Use optional insertmanyvalues_max_parameters
5859 # to further shrink the batch size so that there are no more than
5860 # insertmanyvalues_max_parameters params.
5861 # Currently used by SQL Server, which limits statements to 2100 bound
5862 # parameters (actually 2099).
5863 max_params = self.dialect.insertmanyvalues_max_parameters
5864 if max_params:
5865 total_num_of_params = len(self.bind_names)
5866 num_params_per_batch = len(imv.insert_crud_params)
5867 num_params_outside_of_batch = (
5868 total_num_of_params - num_params_per_batch
5869 )
5870 batch_size = min(
5871 batch_size,
5872 (
5873 (max_params - num_params_outside_of_batch)
5874 // num_params_per_batch
5875 ),
5876 )
5877
5878 batches = cast("List[Sequence[Any]]", list(parameters))
5879 compiled_batches = cast(
5880 "List[Sequence[Any]]", list(compiled_parameters)
5881 )
5882
5883 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5884 batchnum = 1
5885 total_batches = lenparams // batch_size + (
5886 1 if lenparams % batch_size else 0
5887 )
5888
5889 insert_crud_params = imv.insert_crud_params
5890 assert insert_crud_params is not None
5891
5892 if rst:
5893 insert_crud_params = [
5894 (col, key, rst(expr), st)
5895 for col, key, expr, st in insert_crud_params
5896 ]
5897
5898 escaped_bind_names: Mapping[str, str]
5899 expand_pos_lower_index = expand_pos_upper_index = 0
5900
5901 if not self.positional:
5902 if self.escaped_bind_names:
5903 escaped_bind_names = self.escaped_bind_names
5904 else:
5905 escaped_bind_names = {}
5906
5907 all_keys = set(parameters[0])
5908
5909 def apply_placeholders(keys, formatted):
5910 for key in keys:
5911 key = escaped_bind_names.get(key, key)
5912 formatted = formatted.replace(
5913 self.bindtemplate % {"name": key},
5914 self.bindtemplate
5915 % {"name": f"{key}__EXECMANY_INDEX__"},
5916 )
5917 return formatted
5918
5919 if imv.embed_values_counter:
5920 imv_values_counter = ", _IMV_VALUES_COUNTER"
5921 else:
5922 imv_values_counter = ""
5923 formatted_values_clause = f"""({', '.join(
5924 apply_placeholders(bind_keys, formatted)
5925 for _, _, formatted, bind_keys in insert_crud_params
5926 )}{imv_values_counter})"""
5927
5928 keys_to_replace = all_keys.intersection(
5929 escaped_bind_names.get(key, key)
5930 for _, _, _, bind_keys in insert_crud_params
5931 for key in bind_keys
5932 )
5933 base_parameters = {
5934 key: parameters[0][key]
5935 for key in all_keys.difference(keys_to_replace)
5936 }
5937
5938 executemany_values_w_comma = ""
5939 else:
5940 formatted_values_clause = ""
5941 keys_to_replace = set()
5942 base_parameters = {}
5943
5944 if imv.embed_values_counter:
5945 executemany_values_w_comma = (
5946 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5947 )
5948 else:
5949 executemany_values_w_comma = f"({imv_single_values_expr}), "
5950
5951 all_names_we_will_expand: Set[str] = set()
5952 for elem in imv.insert_crud_params:
5953 all_names_we_will_expand.update(elem[3])
5954
5955 # get the start and end position in a particular list
5956 # of parameters where we will be doing the "expanding".
5957 # statements can have params on either side or both sides,
5958 # given RETURNING and CTEs
5959 if all_names_we_will_expand:
5960 positiontup = self.positiontup
5961 assert positiontup is not None
5962
5963 all_expand_positions = {
5964 idx
5965 for idx, name in enumerate(positiontup)
5966 if name in all_names_we_will_expand
5967 }
5968 expand_pos_lower_index = min(all_expand_positions)
5969 expand_pos_upper_index = max(all_expand_positions) + 1
5970 assert (
5971 len(all_expand_positions)
5972 == expand_pos_upper_index - expand_pos_lower_index
5973 )
5974
5975 if self._numeric_binds:
5976 escaped = re.escape(self._numeric_binds_identifier_char)
5977 executemany_values_w_comma = re.sub(
5978 rf"{escaped}\d+", "%s", executemany_values_w_comma
5979 )
5980
5981 while batches:
5982 batch = batches[0:batch_size]
5983 compiled_batch = compiled_batches[0:batch_size]
5984
5985 batches[0:batch_size] = []
5986 compiled_batches[0:batch_size] = []
5987
5988 if batches:
5989 current_batch_size = batch_size
5990 else:
5991 current_batch_size = len(batch)
5992
5993 if generic_setinputsizes:
5994 # if setinputsizes is present, expand this collection to
5995 # suit the batch length as well
5996 # currently this will be mssql+pyodbc for internal dialects
5997 processed_setinputsizes = [
5998 (new_key, len_, typ)
5999 for new_key, len_, typ in (
6000 (f"{key}_{index}", len_, typ)
6001 for index in range(current_batch_size)
6002 for key, len_, typ in generic_setinputsizes
6003 )
6004 ]
6005
6006 replaced_parameters: Any
6007 if self.positional:
6008 num_ins_params = imv.num_positional_params_counted
6009
6010 batch_iterator: Iterable[Sequence[Any]]
6011 extra_params_left: Sequence[Any]
6012 extra_params_right: Sequence[Any]
6013
6014 if num_ins_params == len(batch[0]):
6015 extra_params_left = extra_params_right = ()
6016 batch_iterator = batch
6017 else:
6018 extra_params_left = batch[0][:expand_pos_lower_index]
6019 extra_params_right = batch[0][expand_pos_upper_index:]
6020 batch_iterator = (
6021 b[expand_pos_lower_index:expand_pos_upper_index]
6022 for b in batch
6023 )
6024
6025 if imv.embed_values_counter:
6026 expanded_values_string = (
6027 "".join(
6028 executemany_values_w_comma.replace(
6029 "_IMV_VALUES_COUNTER", str(i)
6030 )
6031 for i, _ in enumerate(batch)
6032 )
6033 )[:-2]
6034 else:
6035 expanded_values_string = (
6036 (executemany_values_w_comma * current_batch_size)
6037 )[:-2]
6038
6039 if self._numeric_binds and num_ins_params > 0:
6040 # numeric will always number the parameters inside of
6041 # VALUES (and thus order self.positiontup) to be higher
6042 # than non-VALUES parameters, no matter where in the
6043 # statement those non-VALUES parameters appear (this is
6044 # ensured in _process_numeric by numbering first all
6045 # params that are not in _values_bindparam)
6046 # therefore all extra params are always
6047 # on the left side and numbered lower than the VALUES
6048 # parameters
6049 assert not extra_params_right
6050
6051 start = expand_pos_lower_index + 1
6052 end = num_ins_params * (current_batch_size) + start
6053
6054 # need to format here, since statement may contain
6055 # unescaped %, while values_string contains just (%s, %s)
6056 positions = tuple(
6057 f"{self._numeric_binds_identifier_char}{i}"
6058 for i in range(start, end)
6059 )
6060 expanded_values_string = expanded_values_string % positions
6061
6062 replaced_statement = statement.replace(
6063 "__EXECMANY_TOKEN__", expanded_values_string
6064 )
6065
6066 replaced_parameters = tuple(
6067 itertools.chain.from_iterable(batch_iterator)
6068 )
6069
6070 replaced_parameters = (
6071 extra_params_left
6072 + replaced_parameters
6073 + extra_params_right
6074 )
6075
6076 else:
6077 replaced_values_clauses = []
6078 replaced_parameters = base_parameters.copy()
6079
6080 for i, param in enumerate(batch):
6081 fmv = formatted_values_clause.replace(
6082 "EXECMANY_INDEX__", str(i)
6083 )
6084 if imv.embed_values_counter:
6085 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
6086
6087 replaced_values_clauses.append(fmv)
6088 replaced_parameters.update(
6089 {f"{key}__{i}": param[key] for key in keys_to_replace}
6090 )
6091
6092 replaced_statement = statement.replace(
6093 "__EXECMANY_TOKEN__",
6094 ", ".join(replaced_values_clauses),
6095 )
6096
6097 yield _InsertManyValuesBatch(
6098 replaced_statement,
6099 replaced_parameters,
6100 processed_setinputsizes,
6101 batch,
6102 (
6103 [_sentinel_from_params(cb) for cb in compiled_batch]
6104 if _sentinel_from_params
6105 else []
6106 ),
6107 current_batch_size,
6108 batchnum,
6109 total_batches,
6110 sort_by_parameter_order,
6111 False,
6112 )
6113 batchnum += 1
6114
6115 def visit_insert(
6116 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6117 ):
6118 compile_state = insert_stmt._compile_state_factory(
6119 insert_stmt, self, **kw
6120 )
6121 insert_stmt = compile_state.statement
6122
6123 if visiting_cte is not None:
6124 kw["visiting_cte"] = visiting_cte
6125 toplevel = False
6126 else:
6127 toplevel = not self.stack
6128
6129 if toplevel:
6130 self.isinsert = True
6131 if not self.dml_compile_state:
6132 self.dml_compile_state = compile_state
6133 if not self.compile_state:
6134 self.compile_state = compile_state
6135
6136 self.stack.append(
6137 {
6138 "correlate_froms": set(),
6139 "asfrom_froms": set(),
6140 "selectable": insert_stmt,
6141 }
6142 )
6143
6144 counted_bindparam = 0
6145
6146 # reset any incoming "visited_bindparam" collection
6147 visited_bindparam = None
6148
6149 # for positional, insertmanyvalues needs to know how many
6150 # bound parameters are in the VALUES sequence; there's no simple
6151 # rule because default expressions etc. can have zero or more
6152 # params inside them. After multiple attempts to figure this out,
6153 # this very simplistic "count after" works and is
6154 # likely the least amount of callcounts, though looks clumsy
6155 if self.positional and visiting_cte is None:
6156 # if we are inside a CTE, don't count parameters
6157 # here since they won't be for insertmanyvalues. keep
6158 # visited_bindparam at None so no counting happens.
6159 # see #9173
6160 visited_bindparam = []
6161
6162 crud_params_struct = crud._get_crud_params(
6163 self,
6164 insert_stmt,
6165 compile_state,
6166 toplevel,
6167 visited_bindparam=visited_bindparam,
6168 **kw,
6169 )
6170
6171 if self.positional and visited_bindparam is not None:
6172 counted_bindparam = len(visited_bindparam)
6173 if self._numeric_binds:
6174 if self._values_bindparam is not None:
6175 self._values_bindparam += visited_bindparam
6176 else:
6177 self._values_bindparam = visited_bindparam
6178
6179 crud_params_single = crud_params_struct.single_params
6180
6181 if (
6182 not crud_params_single
6183 and not self.dialect.supports_default_values
6184 and not self.dialect.supports_default_metavalue
6185 and not self.dialect.supports_empty_insert
6186 ):
6187 raise exc.CompileError(
6188 "The '%s' dialect with current database "
6189 "version settings does not support empty "
6190 "inserts." % self.dialect.name
6191 )
6192
6193 if compile_state._has_multi_parameters:
6194 if not self.dialect.supports_multivalues_insert:
6195 raise exc.CompileError(
6196 "The '%s' dialect with current database "
6197 "version settings does not support "
6198 "in-place multirow inserts." % self.dialect.name
6199 )
6200 elif (
6201 self.implicit_returning or insert_stmt._returning
6202 ) and insert_stmt._sort_by_parameter_order:
6203 raise exc.CompileError(
6204 "RETURNING cannot be deterministically sorted when "
6205 "using an INSERT which includes multi-row values()."
6206 )
6207 crud_params_single = crud_params_struct.single_params
6208 else:
6209 crud_params_single = crud_params_struct.single_params
6210
6211 preparer = self.preparer
6212 supports_default_values = self.dialect.supports_default_values
6213
6214 text = "INSERT "
6215
6216 if insert_stmt._prefixes:
6217 text += self._generate_prefixes(
6218 insert_stmt, insert_stmt._prefixes, **kw
6219 )
6220
6221 text += "INTO "
6222 table_text = preparer.format_table(insert_stmt.table)
6223
6224 if insert_stmt._hints:
6225 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6226
6227 if insert_stmt._independent_ctes:
6228 self._dispatch_independent_ctes(insert_stmt, kw)
6229
6230 text += table_text
6231
6232 if crud_params_single or not supports_default_values:
6233 text += " (%s)" % ", ".join(
6234 [expr for _, expr, _, _ in crud_params_single]
6235 )
6236
6237 # look for insertmanyvalues attributes that would have been configured
6238 # by crud.py as it scanned through the columns to be part of the
6239 # INSERT
6240 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6241 named_sentinel_params: Optional[Sequence[str]] = None
6242 add_sentinel_cols = None
6243 implicit_sentinel = False
6244
6245 returning_cols = self.implicit_returning or insert_stmt._returning
6246 if returning_cols:
6247 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6248 if add_sentinel_cols is not None:
6249 assert use_insertmanyvalues
6250
6251 # search for the sentinel column explicitly present
6252 # in the INSERT columns list, and additionally check that
6253 # this column has a bound parameter name set up that's in the
6254 # parameter list. If both of these cases are present, it means
6255 # we will have a client side value for the sentinel in each
6256 # parameter set.
6257
6258 _params_by_col = {
6259 col: param_names
6260 for col, _, _, param_names in crud_params_single
6261 }
6262 named_sentinel_params = []
6263 for _add_sentinel_col in add_sentinel_cols:
6264 if _add_sentinel_col not in _params_by_col:
6265 named_sentinel_params = None
6266 break
6267 param_name = self._within_exec_param_key_getter(
6268 _add_sentinel_col
6269 )
6270 if param_name not in _params_by_col[_add_sentinel_col]:
6271 named_sentinel_params = None
6272 break
6273 named_sentinel_params.append(param_name)
6274
6275 if named_sentinel_params is None:
6276 # if we are not going to have a client side value for
6277 # the sentinel in the parameter set, that means it's
6278 # an autoincrement, an IDENTITY, or a server-side SQL
6279 # expression like nextval('seqname'). So this is
6280 # an "implicit" sentinel; we will look for it in
6281 # RETURNING
6282 # only, and then sort on it. For this case on PG,
6283 # SQL Server we have to use a special INSERT form
6284 # that guarantees the server side function lines up with
6285 # the entries in the VALUES.
6286 if (
6287 self.dialect.insertmanyvalues_implicit_sentinel
6288 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6289 ):
6290 implicit_sentinel = True
6291 else:
6292 # here, we are not using a sentinel at all
6293 # and we are likely the SQLite dialect.
6294 # The first add_sentinel_col that we have should not
6295 # be marked as "insert_sentinel=True". if it was,
6296 # an error should have been raised in
6297 # _get_sentinel_column_for_table.
6298 assert not add_sentinel_cols[0]._insert_sentinel, (
6299 "sentinel selection rules should have prevented "
6300 "us from getting here for this dialect"
6301 )
6302
6303 # always put the sentinel columns last. even if they are
6304 # in the returning list already, they will be there twice
6305 # then.
6306 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6307
6308 returning_clause = self.returning_clause(
6309 insert_stmt,
6310 returning_cols,
6311 populate_result_map=toplevel,
6312 )
6313
6314 if self.returning_precedes_values:
6315 text += " " + returning_clause
6316
6317 else:
6318 returning_clause = None
6319
6320 if insert_stmt.select is not None:
6321 # placed here by crud.py
6322 select_text = self.process(
6323 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6324 )
6325
6326 if self.ctes and self.dialect.cte_follows_insert:
6327 nesting_level = len(self.stack) if not toplevel else None
6328 text += " %s%s" % (
6329 self._render_cte_clause(
6330 nesting_level=nesting_level,
6331 include_following_stack=True,
6332 ),
6333 select_text,
6334 )
6335 else:
6336 text += " %s" % select_text
6337 elif not crud_params_single and supports_default_values:
6338 text += " DEFAULT VALUES"
6339 if use_insertmanyvalues:
6340 self._insertmanyvalues = _InsertManyValues(
6341 True,
6342 self.dialect.default_metavalue_token,
6343 crud_params_single,
6344 counted_bindparam,
6345 sort_by_parameter_order=(
6346 insert_stmt._sort_by_parameter_order
6347 ),
6348 includes_upsert_behaviors=(
6349 insert_stmt._post_values_clause is not None
6350 ),
6351 sentinel_columns=add_sentinel_cols,
6352 num_sentinel_columns=(
6353 len(add_sentinel_cols) if add_sentinel_cols else 0
6354 ),
6355 implicit_sentinel=implicit_sentinel,
6356 )
6357 elif compile_state._has_multi_parameters:
6358 text += " VALUES %s" % (
6359 ", ".join(
6360 "(%s)"
6361 % (", ".join(value for _, _, value, _ in crud_param_set))
6362 for crud_param_set in crud_params_struct.all_multi_params
6363 ),
6364 )
6365 elif use_insertmanyvalues:
6366 if (
6367 implicit_sentinel
6368 and (
6369 self.dialect.insertmanyvalues_implicit_sentinel
6370 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6371 )
6372 # this is checking if we have
6373 # INSERT INTO table (id) VALUES (DEFAULT).
6374 and not (crud_params_struct.is_default_metavalue_only)
6375 ):
6376 # if we have a sentinel column that is server generated,
6377 # then for selected backends render the VALUES list as a
6378 # subquery. This is the orderable form supported by
6379 # PostgreSQL and in fewer cases SQL Server
6380 embed_sentinel_value = True
6381
6382 render_bind_casts = (
6383 self.dialect.insertmanyvalues_implicit_sentinel
6384 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6385 )
6386
6387 add_sentinel_set = add_sentinel_cols or ()
6388
6389 insert_single_values_expr = ", ".join(
6390 [
6391 value
6392 for col, _, value, _ in crud_params_single
6393 if col not in add_sentinel_set
6394 ]
6395 )
6396
6397 colnames = ", ".join(
6398 f"p{i}"
6399 for i, cp in enumerate(crud_params_single)
6400 if cp[0] not in add_sentinel_set
6401 )
6402
6403 if render_bind_casts:
6404 # render casts for the SELECT list. For PG, we are
6405 # already rendering bind casts in the parameter list,
6406 # selectively for the more "tricky" types like ARRAY.
6407 # however, even for the "easy" types, if the parameter
6408 # is NULL for every entry, PG gives up and says
6409 # "it must be TEXT", which fails for other easy types
6410 # like ints. So we cast on this side too.
6411 colnames_w_cast = ", ".join(
6412 (
6413 self.render_bind_cast(
6414 col.type,
6415 col.type._unwrapped_dialect_impl(self.dialect),
6416 f"p{i}",
6417 )
6418 if col not in add_sentinel_set
6419 else expr
6420 )
6421 for i, (col, _, expr, _) in enumerate(
6422 crud_params_single
6423 )
6424 )
6425 else:
6426 colnames_w_cast = ", ".join(
6427 (f"p{i}" if col not in add_sentinel_set else expr)
6428 for i, (col, _, expr, _) in enumerate(
6429 crud_params_single
6430 )
6431 )
6432
6433 insert_crud_params = [
6434 elem
6435 for elem in crud_params_single
6436 if elem[0] not in add_sentinel_set
6437 ]
6438
6439 text += (
6440 f" SELECT {colnames_w_cast} FROM "
6441 f"(VALUES ({insert_single_values_expr})) "
6442 f"AS imp_sen({colnames}, sen_counter) "
6443 "ORDER BY sen_counter"
6444 )
6445
6446 else:
6447 # otherwise, if no sentinel or backend doesn't support
6448 # orderable subquery form, use a plain VALUES list
6449 embed_sentinel_value = False
6450 insert_crud_params = crud_params_single
6451 insert_single_values_expr = ", ".join(
6452 [value for _, _, value, _ in crud_params_single]
6453 )
6454
6455 text += f" VALUES ({insert_single_values_expr})"
6456
6457 self._insertmanyvalues = _InsertManyValues(
6458 is_default_expr=False,
6459 single_values_expr=insert_single_values_expr,
6460 insert_crud_params=insert_crud_params,
6461 num_positional_params_counted=counted_bindparam,
6462 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6463 includes_upsert_behaviors=(
6464 insert_stmt._post_values_clause is not None
6465 ),
6466 sentinel_columns=add_sentinel_cols,
6467 num_sentinel_columns=(
6468 len(add_sentinel_cols) if add_sentinel_cols else 0
6469 ),
6470 sentinel_param_keys=named_sentinel_params,
6471 implicit_sentinel=implicit_sentinel,
6472 embed_values_counter=embed_sentinel_value,
6473 )
6474
6475 else:
6476 insert_single_values_expr = ", ".join(
6477 [value for _, _, value, _ in crud_params_single]
6478 )
6479
6480 text += f" VALUES ({insert_single_values_expr})"
6481
6482 if insert_stmt._post_values_clause is not None:
6483 post_values_clause = self.process(
6484 insert_stmt._post_values_clause, **kw
6485 )
6486 if post_values_clause:
6487 text += " " + post_values_clause
6488
6489 if returning_clause and not self.returning_precedes_values:
6490 text += " " + returning_clause
6491
6492 if self.ctes and not self.dialect.cte_follows_insert:
6493 nesting_level = len(self.stack) if not toplevel else None
6494 text = (
6495 self._render_cte_clause(
6496 nesting_level=nesting_level,
6497 include_following_stack=True,
6498 )
6499 + text
6500 )
6501
6502 self.stack.pop(-1)
6503
6504 return text
6505
6506 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6507 """Provide a hook to override the initial table clause
6508 in an UPDATE statement.
6509
6510 MySQL overrides this.
6511
6512 """
6513 kw["asfrom"] = True
6514 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6515
6516 def update_from_clause(
6517 self, update_stmt, from_table, extra_froms, from_hints, **kw
6518 ):
6519 """Provide a hook to override the generation of an
6520 UPDATE..FROM clause.
6521 MySQL and MSSQL override this.
6522 """
6523 raise NotImplementedError(
6524 "This backend does not support multiple-table "
6525 "criteria within UPDATE"
6526 )
6527
6528 def update_post_criteria_clause(
6529 self, update_stmt: Update, **kw: Any
6530 ) -> Optional[str]:
6531 """provide a hook to override generation after the WHERE criteria
6532 in an UPDATE statement
6533
6534 .. versionadded:: 2.1
6535
6536 """
6537 if update_stmt._post_criteria_clause is not None:
6538 return self.process(
6539 update_stmt._post_criteria_clause,
6540 **kw,
6541 )
6542 else:
6543 return None
6544
6545 def delete_post_criteria_clause(
6546 self, delete_stmt: Delete, **kw: Any
6547 ) -> Optional[str]:
6548 """provide a hook to override generation after the WHERE criteria
6549 in a DELETE statement
6550
6551 .. versionadded:: 2.1
6552
6553 """
6554 if delete_stmt._post_criteria_clause is not None:
6555 return self.process(
6556 delete_stmt._post_criteria_clause,
6557 **kw,
6558 )
6559 else:
6560 return None
6561
6562 def visit_update(
6563 self,
6564 update_stmt: Update,
6565 visiting_cte: Optional[CTE] = None,
6566 **kw: Any,
6567 ) -> str:
6568 compile_state = update_stmt._compile_state_factory(
6569 update_stmt, self, **kw
6570 )
6571 if TYPE_CHECKING:
6572 assert isinstance(compile_state, UpdateDMLState)
6573 update_stmt = compile_state.statement # type: ignore[assignment]
6574
6575 if visiting_cte is not None:
6576 kw["visiting_cte"] = visiting_cte
6577 toplevel = False
6578 else:
6579 toplevel = not self.stack
6580
6581 if toplevel:
6582 self.isupdate = True
6583 if not self.dml_compile_state:
6584 self.dml_compile_state = compile_state
6585 if not self.compile_state:
6586 self.compile_state = compile_state
6587
6588 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6589 from_linter = FromLinter({}, set())
6590 warn_linting = self.linting & WARN_LINTING
6591 if toplevel:
6592 self.from_linter = from_linter
6593 else:
6594 from_linter = None
6595 warn_linting = False
6596
6597 extra_froms = compile_state._extra_froms
6598 is_multitable = bool(extra_froms)
6599
6600 if is_multitable:
6601 # main table might be a JOIN
6602 main_froms = set(_from_objects(update_stmt.table))
6603 render_extra_froms = [
6604 f for f in extra_froms if f not in main_froms
6605 ]
6606 correlate_froms = main_froms.union(extra_froms)
6607 else:
6608 render_extra_froms = []
6609 correlate_froms = {update_stmt.table}
6610
6611 self.stack.append(
6612 {
6613 "correlate_froms": correlate_froms,
6614 "asfrom_froms": correlate_froms,
6615 "selectable": update_stmt,
6616 }
6617 )
6618
6619 text = "UPDATE "
6620
6621 if update_stmt._prefixes:
6622 text += self._generate_prefixes(
6623 update_stmt, update_stmt._prefixes, **kw
6624 )
6625
6626 table_text = self.update_tables_clause(
6627 update_stmt,
6628 update_stmt.table,
6629 render_extra_froms,
6630 from_linter=from_linter,
6631 **kw,
6632 )
6633 crud_params_struct = crud._get_crud_params(
6634 self, update_stmt, compile_state, toplevel, **kw
6635 )
6636 crud_params = crud_params_struct.single_params
6637
6638 if update_stmt._hints:
6639 dialect_hints, table_text = self._setup_crud_hints(
6640 update_stmt, table_text
6641 )
6642 else:
6643 dialect_hints = None
6644
6645 if update_stmt._independent_ctes:
6646 self._dispatch_independent_ctes(update_stmt, kw)
6647
6648 text += table_text
6649
6650 text += " SET "
6651 text += ", ".join(
6652 expr + "=" + value
6653 for _, expr, value, _ in cast(
6654 "List[Tuple[Any, str, str, Any]]", crud_params
6655 )
6656 )
6657
6658 if self.implicit_returning or update_stmt._returning:
6659 if self.returning_precedes_values:
6660 text += " " + self.returning_clause(
6661 update_stmt,
6662 self.implicit_returning or update_stmt._returning,
6663 populate_result_map=toplevel,
6664 )
6665
6666 if extra_froms:
6667 extra_from_text = self.update_from_clause(
6668 update_stmt,
6669 update_stmt.table,
6670 render_extra_froms,
6671 dialect_hints,
6672 from_linter=from_linter,
6673 **kw,
6674 )
6675 if extra_from_text:
6676 text += " " + extra_from_text
6677
6678 if update_stmt._where_criteria:
6679 t = self._generate_delimited_and_list(
6680 update_stmt._where_criteria, from_linter=from_linter, **kw
6681 )
6682 if t:
6683 text += " WHERE " + t
6684
6685 ulc = self.update_post_criteria_clause(
6686 update_stmt, from_linter=from_linter, **kw
6687 )
6688 if ulc:
6689 text += " " + ulc
6690
6691 if (
6692 self.implicit_returning or update_stmt._returning
6693 ) and not self.returning_precedes_values:
6694 text += " " + self.returning_clause(
6695 update_stmt,
6696 self.implicit_returning or update_stmt._returning,
6697 populate_result_map=toplevel,
6698 )
6699
6700 if self.ctes:
6701 nesting_level = len(self.stack) if not toplevel else None
6702 text = self._render_cte_clause(nesting_level=nesting_level) + text
6703
6704 if warn_linting:
6705 assert from_linter is not None
6706 from_linter.warn(stmt_type="UPDATE")
6707
6708 self.stack.pop(-1)
6709
6710 return text # type: ignore[no-any-return]
6711
6712 def delete_extra_from_clause(
6713 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6714 ):
6715 """Provide a hook to override the generation of an
6716 DELETE..FROM clause.
6717
6718 This can be used to implement DELETE..USING for example.
6719
6720 MySQL and MSSQL override this.
6721
6722 """
6723 raise NotImplementedError(
6724 "This backend does not support multiple-table "
6725 "criteria within DELETE"
6726 )
6727
6728 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6729 return from_table._compiler_dispatch(
6730 self, asfrom=True, iscrud=True, **kw
6731 )
6732
6733 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6734 compile_state = delete_stmt._compile_state_factory(
6735 delete_stmt, self, **kw
6736 )
6737 delete_stmt = compile_state.statement
6738
6739 if visiting_cte is not None:
6740 kw["visiting_cte"] = visiting_cte
6741 toplevel = False
6742 else:
6743 toplevel = not self.stack
6744
6745 if toplevel:
6746 self.isdelete = True
6747 if not self.dml_compile_state:
6748 self.dml_compile_state = compile_state
6749 if not self.compile_state:
6750 self.compile_state = compile_state
6751
6752 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6753 from_linter = FromLinter({}, set())
6754 warn_linting = self.linting & WARN_LINTING
6755 if toplevel:
6756 self.from_linter = from_linter
6757 else:
6758 from_linter = None
6759 warn_linting = False
6760
6761 extra_froms = compile_state._extra_froms
6762
6763 correlate_froms = {delete_stmt.table}.union(extra_froms)
6764 self.stack.append(
6765 {
6766 "correlate_froms": correlate_froms,
6767 "asfrom_froms": correlate_froms,
6768 "selectable": delete_stmt,
6769 }
6770 )
6771
6772 text = "DELETE "
6773
6774 if delete_stmt._prefixes:
6775 text += self._generate_prefixes(
6776 delete_stmt, delete_stmt._prefixes, **kw
6777 )
6778
6779 text += "FROM "
6780
6781 try:
6782 table_text = self.delete_table_clause(
6783 delete_stmt,
6784 delete_stmt.table,
6785 extra_froms,
6786 from_linter=from_linter,
6787 )
6788 except TypeError:
6789 # anticipate 3rd party dialects that don't include **kw
6790 # TODO: remove in 2.1
6791 table_text = self.delete_table_clause(
6792 delete_stmt, delete_stmt.table, extra_froms
6793 )
6794 if from_linter:
6795 _ = self.process(delete_stmt.table, from_linter=from_linter)
6796
6797 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6798
6799 if delete_stmt._hints:
6800 dialect_hints, table_text = self._setup_crud_hints(
6801 delete_stmt, table_text
6802 )
6803 else:
6804 dialect_hints = None
6805
6806 if delete_stmt._independent_ctes:
6807 self._dispatch_independent_ctes(delete_stmt, kw)
6808
6809 text += table_text
6810
6811 if (
6812 self.implicit_returning or delete_stmt._returning
6813 ) and self.returning_precedes_values:
6814 text += " " + self.returning_clause(
6815 delete_stmt,
6816 self.implicit_returning or delete_stmt._returning,
6817 populate_result_map=toplevel,
6818 )
6819
6820 if extra_froms:
6821 extra_from_text = self.delete_extra_from_clause(
6822 delete_stmt,
6823 delete_stmt.table,
6824 extra_froms,
6825 dialect_hints,
6826 from_linter=from_linter,
6827 **kw,
6828 )
6829 if extra_from_text:
6830 text += " " + extra_from_text
6831
6832 if delete_stmt._where_criteria:
6833 t = self._generate_delimited_and_list(
6834 delete_stmt._where_criteria, from_linter=from_linter, **kw
6835 )
6836 if t:
6837 text += " WHERE " + t
6838
6839 dlc = self.delete_post_criteria_clause(
6840 delete_stmt, from_linter=from_linter, **kw
6841 )
6842 if dlc:
6843 text += " " + dlc
6844
6845 if (
6846 self.implicit_returning or delete_stmt._returning
6847 ) and not self.returning_precedes_values:
6848 text += " " + self.returning_clause(
6849 delete_stmt,
6850 self.implicit_returning or delete_stmt._returning,
6851 populate_result_map=toplevel,
6852 )
6853
6854 if self.ctes:
6855 nesting_level = len(self.stack) if not toplevel else None
6856 text = self._render_cte_clause(nesting_level=nesting_level) + text
6857
6858 if warn_linting:
6859 assert from_linter is not None
6860 from_linter.warn(stmt_type="DELETE")
6861
6862 self.stack.pop(-1)
6863
6864 return text
6865
6866 def visit_savepoint(self, savepoint_stmt, **kw):
6867 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6868
6869 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6870 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6871 savepoint_stmt
6872 )
6873
6874 def visit_release_savepoint(self, savepoint_stmt, **kw):
6875 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6876 savepoint_stmt
6877 )
6878
6879
6880class StrSQLCompiler(SQLCompiler):
6881 """A :class:`.SQLCompiler` subclass which allows a small selection
6882 of non-standard SQL features to render into a string value.
6883
6884 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6885 element is directly stringified without calling upon the
6886 :meth:`_expression.ClauseElement.compile` method.
6887 It can render a limited set
6888 of non-standard SQL constructs to assist in basic stringification,
6889 however for more substantial custom or dialect-specific SQL constructs,
6890 it will be necessary to make use of
6891 :meth:`_expression.ClauseElement.compile`
6892 directly.
6893
6894 .. seealso::
6895
6896 :ref:`faq_sql_expression_string`
6897
6898 """
6899
6900 def _fallback_column_name(self, column):
6901 return "<name unknown>"
6902
6903 @util.preload_module("sqlalchemy.engine.url")
6904 def visit_unsupported_compilation(self, element, err, **kw):
6905 if element.stringify_dialect != "default":
6906 url = util.preloaded.engine_url
6907 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6908
6909 compiler = dialect.statement_compiler(
6910 dialect, None, _supporting_against=self
6911 )
6912 if not isinstance(compiler, StrSQLCompiler):
6913 return compiler.process(element, **kw)
6914
6915 return super().visit_unsupported_compilation(element, err)
6916
6917 def visit_getitem_binary(self, binary, operator, **kw):
6918 return "%s[%s]" % (
6919 self.process(binary.left, **kw),
6920 self.process(binary.right, **kw),
6921 )
6922
6923 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6924 return self.visit_getitem_binary(binary, operator, **kw)
6925
6926 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6927 return self.visit_getitem_binary(binary, operator, **kw)
6928
6929 def visit_sequence(self, sequence, **kw):
6930 return (
6931 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6932 )
6933
6934 def returning_clause(
6935 self,
6936 stmt: UpdateBase,
6937 returning_cols: Sequence[_ColumnsClauseElement],
6938 *,
6939 populate_result_map: bool,
6940 **kw: Any,
6941 ) -> str:
6942 columns = [
6943 self._label_select_column(None, c, True, False, {})
6944 for c in base._select_iterables(returning_cols)
6945 ]
6946 return "RETURNING " + ", ".join(columns)
6947
6948 def update_from_clause(
6949 self, update_stmt, from_table, extra_froms, from_hints, **kw
6950 ):
6951 kw["asfrom"] = True
6952 return "FROM " + ", ".join(
6953 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6954 for t in extra_froms
6955 )
6956
6957 def delete_extra_from_clause(
6958 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6959 ):
6960 kw["asfrom"] = True
6961 return ", " + ", ".join(
6962 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6963 for t in extra_froms
6964 )
6965
6966 def visit_empty_set_expr(self, element_types, **kw):
6967 return "SELECT 1 WHERE 1!=1"
6968
6969 def get_from_hint_text(self, table, text):
6970 return "[%s]" % text
6971
6972 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6973 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6974
6975 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6976 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6977
6978 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6979 return "<regexp replace>(%s, %s)" % (
6980 binary.left._compiler_dispatch(self, **kw),
6981 binary.right._compiler_dispatch(self, **kw),
6982 )
6983
6984 def visit_try_cast(self, cast, **kwargs):
6985 return "TRY_CAST(%s AS %s)" % (
6986 cast.clause._compiler_dispatch(self, **kwargs),
6987 cast.typeclause._compiler_dispatch(self, **kwargs),
6988 )
6989
6990
6991class DDLCompiler(Compiled):
6992 is_ddl = True
6993
6994 if TYPE_CHECKING:
6995
6996 def __init__(
6997 self,
6998 dialect: Dialect,
6999 statement: ExecutableDDLElement,
7000 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
7001 render_schema_translate: bool = ...,
7002 compile_kwargs: Mapping[str, Any] = ...,
7003 ): ...
7004
7005 @util.ro_memoized_property
7006 def sql_compiler(self) -> SQLCompiler:
7007 return self.dialect.statement_compiler(
7008 self.dialect, None, schema_translate_map=self.schema_translate_map
7009 )
7010
7011 @util.memoized_property
7012 def type_compiler(self):
7013 return self.dialect.type_compiler_instance
7014
7015 def construct_params(
7016 self,
7017 params: Optional[_CoreSingleExecuteParams] = None,
7018 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
7019 escape_names: bool = True,
7020 ) -> Optional[_MutableCoreSingleExecuteParams]:
7021 return None
7022
7023 def visit_ddl(self, ddl, **kwargs):
7024 # table events can substitute table and schema name
7025 context = ddl.context
7026 if isinstance(ddl.target, schema.Table):
7027 context = context.copy()
7028
7029 preparer = self.preparer
7030 path = preparer.format_table_seq(ddl.target)
7031 if len(path) == 1:
7032 table, sch = path[0], ""
7033 else:
7034 table, sch = path[-1], path[0]
7035
7036 context.setdefault("table", table)
7037 context.setdefault("schema", sch)
7038 context.setdefault("fullname", preparer.format_table(ddl.target))
7039
7040 return self.sql_compiler.post_process_text(ddl.statement % context)
7041
7042 def visit_create_schema(self, create, **kw):
7043 text = "CREATE SCHEMA "
7044 if create.if_not_exists:
7045 text += "IF NOT EXISTS "
7046 return text + self.preparer.format_schema(create.element)
7047
7048 def visit_drop_schema(self, drop, **kw):
7049 text = "DROP SCHEMA "
7050 if drop.if_exists:
7051 text += "IF EXISTS "
7052 text += self.preparer.format_schema(drop.element)
7053 if drop.cascade:
7054 text += " CASCADE"
7055 return text
7056
7057 def visit_create_table(self, create, **kw):
7058 table = create.element
7059 preparer = self.preparer
7060
7061 text = "\nCREATE "
7062 if table._prefixes:
7063 text += " ".join(table._prefixes) + " "
7064
7065 text += "TABLE "
7066 if create.if_not_exists:
7067 text += "IF NOT EXISTS "
7068
7069 text += preparer.format_table(table) + " "
7070
7071 create_table_suffix = self.create_table_suffix(table)
7072 if create_table_suffix:
7073 text += create_table_suffix + " "
7074
7075 text += "("
7076
7077 separator = "\n"
7078
7079 # if only one primary key, specify it along with the column
7080 first_pk = False
7081 for create_column in create.columns:
7082 column = create_column.element
7083 try:
7084 processed = self.process(
7085 create_column, first_pk=column.primary_key and not first_pk
7086 )
7087 if processed is not None:
7088 text += separator
7089 separator = ", \n"
7090 text += "\t" + processed
7091 if column.primary_key:
7092 first_pk = True
7093 except exc.CompileError as ce:
7094 raise exc.CompileError(
7095 "(in table '%s', column '%s'): %s"
7096 % (table.description, column.name, ce.args[0])
7097 ) from ce
7098
7099 const = self.create_table_constraints(
7100 table,
7101 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
7102 )
7103 if const:
7104 text += separator + "\t" + const
7105
7106 text += "\n)%s\n\n" % self.post_create_table(table)
7107 return text
7108
7109 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
7110 return self._generate_table_select(element, "view", **kw)
7111
7112 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
7113 return self._generate_table_select(element, "create_table_as", **kw)
7114
7115 def _generate_table_select(
7116 self,
7117 element: _TableViaSelect,
7118 type_: str,
7119 if_not_exists: Optional[bool] = None,
7120 **kw: Any,
7121 ) -> str:
7122 prep = self.preparer
7123
7124 inner_kw = dict(kw)
7125 inner_kw["literal_binds"] = True
7126 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7127
7128 # Use if_not_exists parameter if provided, otherwise use element's
7129 use_if_not_exists = (
7130 if_not_exists
7131 if if_not_exists is not None
7132 else element.if_not_exists
7133 )
7134
7135 parts = [
7136 "CREATE",
7137 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7138 "TEMPORARY" if element.temporary else None,
7139 (
7140 "MATERIALIZED VIEW"
7141 if type_ == "view" and getattr(element, "materialized", False)
7142 else "TABLE" if type_ == "create_table_as" else "VIEW"
7143 ),
7144 "IF NOT EXISTS" if use_if_not_exists else None,
7145 prep.format_table(element.table),
7146 "AS",
7147 select_sql,
7148 ]
7149 return " ".join(p for p in parts if p)
7150
7151 def visit_create_column(self, create, first_pk=False, **kw):
7152 column = create.element
7153
7154 if column.system:
7155 return None
7156
7157 text = self.get_column_specification(column, first_pk=first_pk)
7158 const = " ".join(
7159 self.process(constraint) for constraint in column.constraints
7160 )
7161 if const:
7162 text += " " + const
7163
7164 return text
7165
7166 def create_table_constraints(
7167 self, table, _include_foreign_key_constraints=None, **kw
7168 ):
7169 # On some DB order is significant: visit PK first, then the
7170 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7171 constraints = []
7172 if table.primary_key:
7173 constraints.append(table.primary_key)
7174
7175 all_fkcs = table.foreign_key_constraints
7176 if _include_foreign_key_constraints is not None:
7177 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7178 else:
7179 omit_fkcs = set()
7180
7181 constraints.extend(
7182 [
7183 c
7184 for c in table._sorted_constraints
7185 if c is not table.primary_key and c not in omit_fkcs
7186 ]
7187 )
7188
7189 return ", \n\t".join(
7190 p
7191 for p in (
7192 self.process(constraint)
7193 for constraint in constraints
7194 if (constraint._should_create_for_compiler(self))
7195 and (
7196 not self.dialect.supports_alter
7197 or not getattr(constraint, "use_alter", False)
7198 )
7199 )
7200 if p is not None
7201 )
7202
7203 def visit_drop_table(self, drop, **kw):
7204 text = "\nDROP TABLE "
7205 if drop.if_exists:
7206 text += "IF EXISTS "
7207 return text + self.preparer.format_table(drop.element)
7208
7209 def visit_drop_view(self, drop, **kw):
7210 text = "\nDROP "
7211 if drop.materialized:
7212 text += "MATERIALIZED VIEW "
7213 else:
7214 text += "VIEW "
7215 if drop.if_exists:
7216 text += "IF EXISTS "
7217 return text + self.preparer.format_table(drop.element)
7218
7219 def _verify_index_table(self, index: Index) -> None:
7220 if index.table is None:
7221 raise exc.CompileError(
7222 "Index '%s' is not associated with any table." % index.name
7223 )
7224
7225 def visit_create_index(
7226 self, create, include_schema=False, include_table_schema=True, **kw
7227 ):
7228 index = create.element
7229 self._verify_index_table(index)
7230 preparer = self.preparer
7231 text = "CREATE "
7232 if index.unique:
7233 text += "UNIQUE "
7234 if index.name is None:
7235 raise exc.CompileError(
7236 "CREATE INDEX requires that the index have a name"
7237 )
7238
7239 text += "INDEX "
7240 if create.if_not_exists:
7241 text += "IF NOT EXISTS "
7242
7243 text += "%s ON %s (%s)" % (
7244 self._prepared_index_name(index, include_schema=include_schema),
7245 preparer.format_table(
7246 index.table, use_schema=include_table_schema
7247 ),
7248 ", ".join(
7249 self.sql_compiler.process(
7250 expr, include_table=False, literal_binds=True
7251 )
7252 for expr in index.expressions
7253 ),
7254 )
7255 return text
7256
7257 def visit_drop_index(self, drop, **kw):
7258 index = drop.element
7259
7260 if index.name is None:
7261 raise exc.CompileError(
7262 "DROP INDEX requires that the index have a name"
7263 )
7264 text = "\nDROP INDEX "
7265 if drop.if_exists:
7266 text += "IF EXISTS "
7267
7268 return text + self._prepared_index_name(index, include_schema=True)
7269
7270 def _prepared_index_name(
7271 self, index: Index, include_schema: bool = False
7272 ) -> str:
7273 if index.table is not None:
7274 effective_schema = self.preparer.schema_for_object(index.table)
7275 else:
7276 effective_schema = None
7277 if include_schema and effective_schema:
7278 schema_name = self.preparer.quote_schema(effective_schema)
7279 else:
7280 schema_name = None
7281
7282 index_name: str = self.preparer.format_index(index)
7283
7284 if schema_name:
7285 index_name = schema_name + "." + index_name
7286 return index_name
7287
7288 def visit_add_constraint(self, create, **kw):
7289 return "ALTER TABLE %s ADD %s" % (
7290 self.preparer.format_table(create.element.table),
7291 self.process(create.element),
7292 )
7293
7294 def visit_set_table_comment(self, create, **kw):
7295 return "COMMENT ON TABLE %s IS %s" % (
7296 self.preparer.format_table(create.element),
7297 self.sql_compiler.render_literal_value(
7298 create.element.comment, sqltypes.String()
7299 ),
7300 )
7301
7302 def visit_drop_table_comment(self, drop, **kw):
7303 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7304 drop.element
7305 )
7306
7307 def visit_set_column_comment(self, create, **kw):
7308 return "COMMENT ON COLUMN %s IS %s" % (
7309 self.preparer.format_column(
7310 create.element, use_table=True, use_schema=True
7311 ),
7312 self.sql_compiler.render_literal_value(
7313 create.element.comment, sqltypes.String()
7314 ),
7315 )
7316
7317 def visit_drop_column_comment(self, drop, **kw):
7318 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7319 drop.element, use_table=True
7320 )
7321
7322 def visit_set_constraint_comment(self, create, **kw):
7323 raise exc.UnsupportedCompilationError(self, type(create))
7324
7325 def visit_drop_constraint_comment(self, drop, **kw):
7326 raise exc.UnsupportedCompilationError(self, type(drop))
7327
7328 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7329 text = []
7330 if identity_options.increment is not None:
7331 text.append("INCREMENT BY %d" % identity_options.increment)
7332 if identity_options.start is not None:
7333 text.append("START WITH %d" % identity_options.start)
7334 if identity_options.minvalue is not None:
7335 text.append("MINVALUE %d" % identity_options.minvalue)
7336 if identity_options.maxvalue is not None:
7337 text.append("MAXVALUE %d" % identity_options.maxvalue)
7338 if identity_options.nominvalue is not None:
7339 text.append("NO MINVALUE")
7340 if identity_options.nomaxvalue is not None:
7341 text.append("NO MAXVALUE")
7342 if identity_options.cache is not None:
7343 text.append("CACHE %d" % identity_options.cache)
7344 if identity_options.cycle is not None:
7345 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7346 return " ".join(text)
7347
7348 def visit_create_sequence(self, create, prefix=None, **kw):
7349 text = "CREATE SEQUENCE "
7350 if create.if_not_exists:
7351 text += "IF NOT EXISTS "
7352 text += self.preparer.format_sequence(create.element)
7353
7354 if prefix:
7355 text += prefix
7356 options = self.get_identity_options(create.element)
7357 if options:
7358 text += " " + options
7359 return text
7360
7361 def visit_drop_sequence(self, drop, **kw):
7362 text = "DROP SEQUENCE "
7363 if drop.if_exists:
7364 text += "IF EXISTS "
7365 return text + self.preparer.format_sequence(drop.element)
7366
7367 def visit_drop_constraint(self, drop, **kw):
7368 constraint = drop.element
7369 if constraint.name is not None:
7370 formatted_name = self.preparer.format_constraint(constraint)
7371 else:
7372 formatted_name = None
7373
7374 if formatted_name is None:
7375 raise exc.CompileError(
7376 "Can't emit DROP CONSTRAINT for constraint %r; "
7377 "it has no name" % drop.element
7378 )
7379 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7380 self.preparer.format_table(drop.element.table),
7381 "IF EXISTS " if drop.if_exists else "",
7382 formatted_name,
7383 " CASCADE" if drop.cascade else "",
7384 )
7385
7386 def get_column_specification(
7387 self, column: Column[Any], **kwargs: Any
7388 ) -> str:
7389 colspec = (
7390 self.preparer.format_column(column)
7391 + " "
7392 + self.dialect.type_compiler_instance.process(
7393 column.type, type_expression=column
7394 )
7395 )
7396 default = self.get_column_default_string(column)
7397 if default is not None:
7398 colspec += " DEFAULT " + default
7399
7400 if column.computed is not None:
7401 colspec += " " + self.process(column.computed)
7402
7403 if (
7404 column.identity is not None
7405 and self.dialect.supports_identity_columns
7406 ):
7407 colspec += " " + self.process(column.identity)
7408
7409 if not column.nullable and (
7410 not column.identity or not self.dialect.supports_identity_columns
7411 ):
7412 colspec += " NOT NULL"
7413 return colspec
7414
7415 def create_table_suffix(self, table: Table) -> str:
7416 return ""
7417
7418 def post_create_table(self, table: Table) -> str:
7419 return ""
7420
7421 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7422 if isinstance(column.server_default, schema.DefaultClause):
7423 return self.render_default_string(column.server_default.arg)
7424 else:
7425 return None
7426
7427 def render_default_string(self, default: Union[Visitable, str]) -> str:
7428 if isinstance(default, str):
7429 return self.sql_compiler.render_literal_value(
7430 default, sqltypes.STRINGTYPE
7431 )
7432 else:
7433 return self.sql_compiler.process(default, literal_binds=True)
7434
7435 def visit_table_or_column_check_constraint(self, constraint, **kw):
7436 if constraint.is_column_level:
7437 return self.visit_column_check_constraint(constraint)
7438 else:
7439 return self.visit_check_constraint(constraint)
7440
7441 def visit_check_constraint(self, constraint, **kw):
7442 text = self.define_constraint_preamble(constraint, **kw)
7443 text += self.define_check_body(constraint, **kw)
7444 text += self.define_constraint_deferrability(constraint)
7445 return text
7446
7447 def visit_column_check_constraint(self, constraint, **kw):
7448 text = self.define_constraint_preamble(constraint, **kw)
7449 text += self.define_check_body(constraint, **kw)
7450 text += self.define_constraint_deferrability(constraint)
7451 return text
7452
7453 def visit_primary_key_constraint(
7454 self, constraint: PrimaryKeyConstraint, **kw: Any
7455 ) -> str:
7456 if len(constraint) == 0:
7457 return ""
7458 text = self.define_constraint_preamble(constraint, **kw)
7459 text += self.define_primary_key_body(constraint, **kw)
7460 text += self.define_constraint_deferrability(constraint)
7461 return text
7462
7463 def visit_foreign_key_constraint(
7464 self, constraint: ForeignKeyConstraint, **kw: Any
7465 ) -> str:
7466 text = self.define_constraint_preamble(constraint, **kw)
7467 text += self.define_foreign_key_body(constraint, **kw)
7468 text += self.define_constraint_match(constraint)
7469 text += self.define_constraint_cascades(constraint)
7470 text += self.define_constraint_deferrability(constraint)
7471 return text
7472
7473 def define_constraint_remote_table(self, constraint, table, preparer):
7474 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7475
7476 return preparer.format_table(table)
7477
7478 def visit_unique_constraint(
7479 self, constraint: UniqueConstraint, **kw: Any
7480 ) -> str:
7481 if len(constraint) == 0:
7482 return ""
7483 text = self.define_constraint_preamble(constraint, **kw)
7484 text += self.define_unique_body(constraint, **kw)
7485 text += self.define_constraint_deferrability(constraint)
7486 return text
7487
7488 def define_constraint_preamble(
7489 self, constraint: Constraint, **kw: Any
7490 ) -> str:
7491 text = ""
7492 if constraint.name is not None:
7493 formatted_name = self.preparer.format_constraint(constraint)
7494 if formatted_name is not None:
7495 text += "CONSTRAINT %s " % formatted_name
7496 return text
7497
7498 def define_primary_key_body(
7499 self, constraint: PrimaryKeyConstraint, **kw: Any
7500 ) -> str:
7501 text = ""
7502 text += "PRIMARY KEY "
7503 text += "(%s)" % ", ".join(
7504 self.preparer.quote(c.name)
7505 for c in (
7506 constraint.columns_autoinc_first
7507 if constraint._implicit_generated
7508 else constraint.columns
7509 )
7510 )
7511 return text
7512
7513 def define_foreign_key_body(
7514 self, constraint: ForeignKeyConstraint, **kw: Any
7515 ) -> str:
7516 preparer = self.preparer
7517 remote_table = list(constraint.elements)[0].column.table
7518 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7519 ", ".join(
7520 preparer.quote(f.parent.name) for f in constraint.elements
7521 ),
7522 self.define_constraint_remote_table(
7523 constraint, remote_table, preparer
7524 ),
7525 ", ".join(
7526 preparer.quote(f.column.name) for f in constraint.elements
7527 ),
7528 )
7529 return text
7530
7531 def define_unique_body(
7532 self, constraint: UniqueConstraint, **kw: Any
7533 ) -> str:
7534 text = "UNIQUE %s(%s)" % (
7535 self.define_unique_constraint_distinct(constraint, **kw),
7536 ", ".join(self.preparer.quote(c.name) for c in constraint),
7537 )
7538 return text
7539
7540 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7541 text = "CHECK (%s)" % self.sql_compiler.process(
7542 constraint.sqltext, include_table=False, literal_binds=True
7543 )
7544 return text
7545
7546 def define_unique_constraint_distinct(
7547 self, constraint: UniqueConstraint, **kw: Any
7548 ) -> str:
7549 return ""
7550
7551 def define_constraint_cascades(
7552 self, constraint: ForeignKeyConstraint
7553 ) -> str:
7554 text = ""
7555 if constraint.ondelete is not None:
7556 text += self.define_constraint_ondelete_cascade(constraint)
7557
7558 if constraint.onupdate is not None:
7559 text += self.define_constraint_onupdate_cascade(constraint)
7560 return text
7561
7562 def define_constraint_ondelete_cascade(
7563 self, constraint: ForeignKeyConstraint
7564 ) -> str:
7565 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7566 constraint.ondelete, FK_ON_DELETE
7567 )
7568
7569 def define_constraint_onupdate_cascade(
7570 self, constraint: ForeignKeyConstraint
7571 ) -> str:
7572 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7573 constraint.onupdate, FK_ON_UPDATE
7574 )
7575
7576 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7577 text = ""
7578 if constraint.deferrable is not None:
7579 if constraint.deferrable:
7580 text += " DEFERRABLE"
7581 else:
7582 text += " NOT DEFERRABLE"
7583 if constraint.initially is not None:
7584 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7585 constraint.initially, FK_INITIALLY
7586 )
7587 return text
7588
7589 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7590 text = ""
7591 if constraint.match is not None:
7592 text += " MATCH %s" % constraint.match
7593 return text
7594
7595 def visit_computed_column(self, generated, **kw):
7596 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7597 generated.sqltext, include_table=False, literal_binds=True
7598 )
7599 if generated.persisted is True:
7600 text += " STORED"
7601 elif generated.persisted is False:
7602 text += " VIRTUAL"
7603 return text
7604
7605 def visit_identity_column(self, identity, **kw):
7606 text = "GENERATED %s AS IDENTITY" % (
7607 "ALWAYS" if identity.always else "BY DEFAULT",
7608 )
7609 options = self.get_identity_options(identity)
7610 if options:
7611 text += " (%s)" % options
7612 return text
7613
7614
7615class GenericTypeCompiler(TypeCompiler):
7616 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7617 return "FLOAT"
7618
7619 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7620 return "DOUBLE"
7621
7622 def visit_DOUBLE_PRECISION(
7623 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7624 ) -> str:
7625 return "DOUBLE PRECISION"
7626
7627 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7628 return "REAL"
7629
7630 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7631 if type_.precision is None:
7632 return "NUMERIC"
7633 elif type_.scale is None:
7634 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7635 else:
7636 return "NUMERIC(%(precision)s, %(scale)s)" % {
7637 "precision": type_.precision,
7638 "scale": type_.scale,
7639 }
7640
7641 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7642 if type_.precision is None:
7643 return "DECIMAL"
7644 elif type_.scale is None:
7645 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7646 else:
7647 return "DECIMAL(%(precision)s, %(scale)s)" % {
7648 "precision": type_.precision,
7649 "scale": type_.scale,
7650 }
7651
7652 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7653 return "INTEGER"
7654
7655 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7656 return "SMALLINT"
7657
7658 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7659 return "BIGINT"
7660
7661 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7662 return "TIMESTAMP"
7663
7664 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7665 return "DATETIME"
7666
7667 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7668 return "DATE"
7669
7670 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7671 return "TIME"
7672
7673 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7674 return "CLOB"
7675
7676 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7677 return "NCLOB"
7678
7679 def _render_string_type(
7680 self, name: str, length: Optional[int], collation: Optional[str]
7681 ) -> str:
7682 text = name
7683 if length:
7684 text += f"({length})"
7685 if collation:
7686 text += f' COLLATE "{collation}"'
7687 return text
7688
7689 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7690 return self._render_string_type("CHAR", type_.length, type_.collation)
7691
7692 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7693 return self._render_string_type("NCHAR", type_.length, type_.collation)
7694
7695 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7696 return self._render_string_type(
7697 "VARCHAR", type_.length, type_.collation
7698 )
7699
7700 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7701 return self._render_string_type(
7702 "NVARCHAR", type_.length, type_.collation
7703 )
7704
7705 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7706 return self._render_string_type("TEXT", type_.length, type_.collation)
7707
7708 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7709 return "UUID"
7710
7711 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7712 return "BLOB"
7713
7714 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7715 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7716
7717 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7718 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7719
7720 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7721 return "BOOLEAN"
7722
7723 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7724 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7725 return self._render_string_type("CHAR", length=32, collation=None)
7726 else:
7727 return self.visit_UUID(type_, **kw)
7728
7729 def visit_large_binary(
7730 self, type_: sqltypes.LargeBinary, **kw: Any
7731 ) -> str:
7732 return self.visit_BLOB(type_, **kw)
7733
7734 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7735 return self.visit_BOOLEAN(type_, **kw)
7736
7737 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7738 return self.visit_TIME(type_, **kw)
7739
7740 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7741 return self.visit_DATETIME(type_, **kw)
7742
7743 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7744 return self.visit_DATE(type_, **kw)
7745
7746 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7747 return self.visit_BIGINT(type_, **kw)
7748
7749 def visit_small_integer(
7750 self, type_: sqltypes.SmallInteger, **kw: Any
7751 ) -> str:
7752 return self.visit_SMALLINT(type_, **kw)
7753
7754 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7755 return self.visit_INTEGER(type_, **kw)
7756
7757 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7758 return self.visit_REAL(type_, **kw)
7759
7760 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7761 return self.visit_FLOAT(type_, **kw)
7762
7763 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7764 return self.visit_DOUBLE(type_, **kw)
7765
7766 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7767 return self.visit_NUMERIC(type_, **kw)
7768
7769 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7770 return self.visit_VARCHAR(type_, **kw)
7771
7772 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7773 return self.visit_VARCHAR(type_, **kw)
7774
7775 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7776 return self.visit_TEXT(type_, **kw)
7777
7778 def visit_unicode_text(
7779 self, type_: sqltypes.UnicodeText, **kw: Any
7780 ) -> str:
7781 return self.visit_TEXT(type_, **kw)
7782
7783 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7784 return self.visit_VARCHAR(type_, **kw)
7785
7786 def visit_null(self, type_, **kw):
7787 raise exc.CompileError(
7788 "Can't generate DDL for %r; "
7789 "did you forget to specify a "
7790 "type on this Column?" % type_
7791 )
7792
7793 def visit_type_decorator(
7794 self, type_: TypeDecorator[Any], **kw: Any
7795 ) -> str:
7796 return self.process(type_.type_engine(self.dialect), **kw)
7797
7798 def visit_user_defined(
7799 self, type_: UserDefinedType[Any], **kw: Any
7800 ) -> str:
7801 return type_.get_col_spec(**kw)
7802
7803
7804class StrSQLTypeCompiler(GenericTypeCompiler):
7805 def process(self, type_, **kw):
7806 try:
7807 _compiler_dispatch = type_._compiler_dispatch
7808 except AttributeError:
7809 return self._visit_unknown(type_, **kw)
7810 else:
7811 return _compiler_dispatch(self, **kw)
7812
7813 def __getattr__(self, key):
7814 if key.startswith("visit_"):
7815 return self._visit_unknown
7816 else:
7817 raise AttributeError(key)
7818
7819 def _visit_unknown(self, type_, **kw):
7820 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7821 return type_.__class__.__name__
7822 else:
7823 return repr(type_)
7824
7825 def visit_null(self, type_, **kw):
7826 return "NULL"
7827
7828 def visit_user_defined(self, type_, **kw):
7829 try:
7830 get_col_spec = type_.get_col_spec
7831 except AttributeError:
7832 return repr(type_)
7833 else:
7834 return get_col_spec(**kw)
7835
7836
7837class _SchemaForObjectCallable(Protocol):
7838 def __call__(self, obj: Any, /) -> str: ...
7839
7840
7841class _BindNameForColProtocol(Protocol):
7842 def __call__(self, col: ColumnClause[Any]) -> str: ...
7843
7844
7845class IdentifierPreparer:
7846 """Handle quoting and case-folding of identifiers based on options."""
7847
7848 reserved_words = RESERVED_WORDS
7849
7850 legal_characters = LEGAL_CHARACTERS
7851
7852 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7853
7854 initial_quote: str
7855
7856 final_quote: str
7857
7858 _strings: MutableMapping[str, str]
7859
7860 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7861 """Return the .schema attribute for an object.
7862
7863 For the default IdentifierPreparer, the schema for an object is always
7864 the value of the ".schema" attribute. if the preparer is replaced
7865 with one that has a non-empty schema_translate_map, the value of the
7866 ".schema" attribute is rendered a symbol that will be converted to a
7867 real schema name from the mapping post-compile.
7868
7869 """
7870
7871 _includes_none_schema_translate: bool = False
7872
7873 def __init__(
7874 self,
7875 dialect: Dialect,
7876 initial_quote: str = '"',
7877 final_quote: Optional[str] = None,
7878 escape_quote: str = '"',
7879 quote_case_sensitive_collations: bool = True,
7880 omit_schema: bool = False,
7881 ):
7882 """Construct a new ``IdentifierPreparer`` object.
7883
7884 initial_quote
7885 Character that begins a delimited identifier.
7886
7887 final_quote
7888 Character that ends a delimited identifier. Defaults to
7889 `initial_quote`.
7890
7891 omit_schema
7892 Prevent prepending schema name. Useful for databases that do
7893 not support schemae.
7894 """
7895
7896 self.dialect = dialect
7897 self.initial_quote = initial_quote
7898 self.final_quote = final_quote or self.initial_quote
7899 self.escape_quote = escape_quote
7900 self.escape_to_quote = self.escape_quote * 2
7901 self.omit_schema = omit_schema
7902 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7903 self._strings = {}
7904 self._double_percents = self.dialect.paramstyle in (
7905 "format",
7906 "pyformat",
7907 )
7908
7909 def _with_schema_translate(self, schema_translate_map):
7910 prep = self.__class__.__new__(self.__class__)
7911 prep.__dict__.update(self.__dict__)
7912
7913 includes_none = None in schema_translate_map
7914
7915 def symbol_getter(obj):
7916 name = obj.schema
7917 if obj._use_schema_map and (name is not None or includes_none):
7918 if name is not None and ("[" in name or "]" in name):
7919 raise exc.CompileError(
7920 "Square bracket characters ([]) not supported "
7921 "in schema translate name '%s'" % name
7922 )
7923 return quoted_name(
7924 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7925 )
7926 else:
7927 return obj.schema
7928
7929 prep.schema_for_object = symbol_getter
7930 prep._includes_none_schema_translate = includes_none
7931 return prep
7932
7933 def _render_schema_translates(
7934 self, statement: str, schema_translate_map: SchemaTranslateMapType
7935 ) -> str:
7936 d = schema_translate_map
7937 if None in d:
7938 if not self._includes_none_schema_translate:
7939 raise exc.InvalidRequestError(
7940 "schema translate map which previously did not have "
7941 "`None` present as a key now has `None` present; compiled "
7942 "statement may lack adequate placeholders. Please use "
7943 "consistent keys in successive "
7944 "schema_translate_map dictionaries."
7945 )
7946
7947 d["_none"] = d[None] # type: ignore[index]
7948
7949 def replace(m):
7950 name = m.group(2)
7951 if name in d:
7952 effective_schema = d[name]
7953 else:
7954 if name in (None, "_none"):
7955 raise exc.InvalidRequestError(
7956 "schema translate map which previously had `None` "
7957 "present as a key now no longer has it present; don't "
7958 "know how to apply schema for compiled statement. "
7959 "Please use consistent keys in successive "
7960 "schema_translate_map dictionaries."
7961 )
7962 effective_schema = name
7963
7964 if not effective_schema:
7965 effective_schema = self.dialect.default_schema_name
7966 if not effective_schema:
7967 # TODO: no coverage here
7968 raise exc.CompileError(
7969 "Dialect has no default schema name; can't "
7970 "use None as dynamic schema target."
7971 )
7972 return self.quote_schema(effective_schema)
7973
7974 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7975
7976 def _escape_identifier(self, value: str) -> str:
7977 """Escape an identifier.
7978
7979 Subclasses should override this to provide database-dependent
7980 escaping behavior.
7981 """
7982
7983 value = value.replace(self.escape_quote, self.escape_to_quote)
7984 if self._double_percents:
7985 value = value.replace("%", "%%")
7986 return value
7987
7988 def _unescape_identifier(self, value: str) -> str:
7989 """Canonicalize an escaped identifier.
7990
7991 Subclasses should override this to provide database-dependent
7992 unescaping behavior that reverses _escape_identifier.
7993 """
7994
7995 return value.replace(self.escape_to_quote, self.escape_quote)
7996
7997 def validate_sql_phrase(self, element, reg):
7998 """keyword sequence filter.
7999
8000 a filter for elements that are intended to represent keyword sequences,
8001 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
8002 should be present.
8003
8004 """
8005
8006 if element is not None and not reg.match(element):
8007 raise exc.CompileError(
8008 "Unexpected SQL phrase: %r (matching against %r)"
8009 % (element, reg.pattern)
8010 )
8011 return element
8012
8013 def quote_identifier(self, value: str) -> str:
8014 """Quote an identifier.
8015
8016 Subclasses should override this to provide database-dependent
8017 quoting behavior.
8018 """
8019
8020 return (
8021 self.initial_quote
8022 + self._escape_identifier(value)
8023 + self.final_quote
8024 )
8025
8026 def _requires_quotes(self, value: str) -> bool:
8027 """Return True if the given identifier requires quoting."""
8028 lc_value = value.lower()
8029 return (
8030 lc_value in self.reserved_words
8031 or value[0] in self.illegal_initial_characters
8032 or not self.legal_characters.match(str(value))
8033 or (lc_value != value)
8034 )
8035
8036 def _requires_quotes_illegal_chars(self, value):
8037 """Return True if the given identifier requires quoting, but
8038 not taking case convention into account."""
8039 return not self.legal_characters.match(str(value))
8040
8041 def quote_schema(self, schema: str) -> str:
8042 """Conditionally quote a schema name.
8043
8044
8045 The name is quoted if it is a reserved word, contains quote-necessary
8046 characters, or is an instance of :class:`.quoted_name` which includes
8047 ``quote`` set to ``True``.
8048
8049 Subclasses can override this to provide database-dependent
8050 quoting behavior for schema names.
8051
8052 :param schema: string schema name
8053 """
8054 return self.quote(schema)
8055
8056 def quote(self, ident: str) -> str:
8057 """Conditionally quote an identifier.
8058
8059 The identifier is quoted if it is a reserved word, contains
8060 quote-necessary characters, or is an instance of
8061 :class:`.quoted_name` which includes ``quote`` set to ``True``.
8062
8063 Subclasses can override this to provide database-dependent
8064 quoting behavior for identifier names.
8065
8066 :param ident: string identifier
8067 """
8068 force = getattr(ident, "quote", None)
8069
8070 if force is None:
8071 if ident in self._strings:
8072 return self._strings[ident]
8073 else:
8074 if self._requires_quotes(ident):
8075 self._strings[ident] = self.quote_identifier(ident)
8076 else:
8077 self._strings[ident] = ident
8078 return self._strings[ident]
8079 elif force:
8080 return self.quote_identifier(ident)
8081 else:
8082 return ident
8083
8084 def format_collation(self, collation_name):
8085 if self.quote_case_sensitive_collations:
8086 return self.quote(collation_name)
8087 else:
8088 return collation_name
8089
8090 def format_sequence(
8091 self, sequence: schema.Sequence, use_schema: bool = True
8092 ) -> str:
8093 name = self.quote(sequence.name)
8094
8095 effective_schema = self.schema_for_object(sequence)
8096
8097 if (
8098 not self.omit_schema
8099 and use_schema
8100 and effective_schema is not None
8101 ):
8102 name = self.quote_schema(effective_schema) + "." + name
8103 return name
8104
8105 def format_label(
8106 self, label: Label[Any], name: Optional[str] = None
8107 ) -> str:
8108 return self.quote(name or label.name)
8109
8110 def format_alias(
8111 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
8112 ) -> str:
8113 if name is None:
8114 assert alias is not None
8115 return self.quote(alias.name)
8116 else:
8117 return self.quote(name)
8118
8119 def format_savepoint(self, savepoint, name=None):
8120 # Running the savepoint name through quoting is unnecessary
8121 # for all known dialects. This is here to support potential
8122 # third party use cases
8123 ident = name or savepoint.ident
8124 if self._requires_quotes(ident):
8125 ident = self.quote_identifier(ident)
8126 return ident
8127
8128 @util.preload_module("sqlalchemy.sql.naming")
8129 def format_constraint(
8130 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
8131 ) -> Optional[str]:
8132 naming = util.preloaded.sql_naming
8133
8134 if constraint.name is _NONE_NAME:
8135 name = naming._constraint_name_for_table(
8136 constraint, constraint.table
8137 )
8138
8139 if name is None:
8140 return None
8141 else:
8142 name = constraint.name
8143
8144 assert name is not None
8145 if constraint.__visit_name__ == "index":
8146 return self.truncate_and_render_index_name(
8147 name, _alembic_quote=_alembic_quote
8148 )
8149 else:
8150 return self.truncate_and_render_constraint_name(
8151 name, _alembic_quote=_alembic_quote
8152 )
8153
8154 def truncate_and_render_index_name(
8155 self, name: str, _alembic_quote: bool = True
8156 ) -> str:
8157 # calculate these at format time so that ad-hoc changes
8158 # to dialect.max_identifier_length etc. can be reflected
8159 # as IdentifierPreparer is long lived
8160 max_ = (
8161 self.dialect.max_index_name_length
8162 or self.dialect.max_identifier_length
8163 )
8164 return self._truncate_and_render_maxlen_name(
8165 name, max_, _alembic_quote
8166 )
8167
8168 def truncate_and_render_constraint_name(
8169 self, name: str, _alembic_quote: bool = True
8170 ) -> str:
8171 # calculate these at format time so that ad-hoc changes
8172 # to dialect.max_identifier_length etc. can be reflected
8173 # as IdentifierPreparer is long lived
8174 max_ = (
8175 self.dialect.max_constraint_name_length
8176 or self.dialect.max_identifier_length
8177 )
8178 return self._truncate_and_render_maxlen_name(
8179 name, max_, _alembic_quote
8180 )
8181
8182 def _truncate_and_render_maxlen_name(
8183 self, name: str, max_: int, _alembic_quote: bool
8184 ) -> str:
8185 if isinstance(name, elements._truncated_label):
8186 if len(name) > max_:
8187 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8188 else:
8189 self.dialect.validate_identifier(name)
8190
8191 if not _alembic_quote:
8192 return name
8193 else:
8194 return self.quote(name)
8195
8196 def format_index(self, index: Index) -> str:
8197 name = self.format_constraint(index)
8198 assert name is not None
8199 return name
8200
8201 def format_table(
8202 self,
8203 table: FromClause,
8204 use_schema: bool = True,
8205 name: Optional[str] = None,
8206 ) -> str:
8207 """Prepare a quoted table and schema name."""
8208 if name is None:
8209 if TYPE_CHECKING:
8210 assert isinstance(table, NamedFromClause)
8211 name = table.name
8212
8213 result = self.quote(name)
8214
8215 effective_schema = self.schema_for_object(table)
8216
8217 if not self.omit_schema and use_schema and effective_schema:
8218 result = self.quote_schema(effective_schema) + "." + result
8219 return result
8220
8221 def format_schema(self, name):
8222 """Prepare a quoted schema name."""
8223
8224 return self.quote(name)
8225
8226 def format_label_name(
8227 self,
8228 name,
8229 anon_map=None,
8230 ):
8231 """Prepare a quoted column name."""
8232
8233 if anon_map is not None and isinstance(
8234 name, elements._truncated_label
8235 ):
8236 name = name.apply_map(anon_map)
8237
8238 return self.quote(name)
8239
8240 def format_column(
8241 self,
8242 column: ColumnElement[Any],
8243 use_table: bool = False,
8244 name: Optional[str] = None,
8245 table_name: Optional[str] = None,
8246 use_schema: bool = False,
8247 anon_map: Optional[Mapping[str, Any]] = None,
8248 ) -> str:
8249 """Prepare a quoted column name."""
8250
8251 if name is None:
8252 name = column.name
8253 assert name is not None
8254
8255 if anon_map is not None and isinstance(
8256 name, elements._truncated_label
8257 ):
8258 name = name.apply_map(anon_map)
8259
8260 if not getattr(column, "is_literal", False):
8261 if use_table:
8262 return (
8263 self.format_table(
8264 column.table, use_schema=use_schema, name=table_name
8265 )
8266 + "."
8267 + self.quote(name)
8268 )
8269 else:
8270 return self.quote(name)
8271 else:
8272 # literal textual elements get stuck into ColumnClause a lot,
8273 # which shouldn't get quoted
8274
8275 if use_table:
8276 return (
8277 self.format_table(
8278 column.table, use_schema=use_schema, name=table_name
8279 )
8280 + "."
8281 + name
8282 )
8283 else:
8284 return name
8285
8286 def format_table_seq(self, table, use_schema=True):
8287 """Format table name and schema as a tuple."""
8288
8289 # Dialects with more levels in their fully qualified references
8290 # ('database', 'owner', etc.) could override this and return
8291 # a longer sequence.
8292
8293 effective_schema = self.schema_for_object(table)
8294
8295 if not self.omit_schema and use_schema and effective_schema:
8296 return (
8297 self.quote_schema(effective_schema),
8298 self.format_table(table, use_schema=False),
8299 )
8300 else:
8301 return (self.format_table(table, use_schema=False),)
8302
8303 @util.memoized_property
8304 def _r_identifiers(self):
8305 initial, final, escaped_final = (
8306 re.escape(s)
8307 for s in (
8308 self.initial_quote,
8309 self.final_quote,
8310 self._escape_identifier(self.final_quote),
8311 )
8312 )
8313 r = re.compile(
8314 r"(?:"
8315 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8316 r"|([^\.]+))(?=\.|$))+"
8317 % {"initial": initial, "final": final, "escaped": escaped_final}
8318 )
8319 return r
8320
8321 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8322 """Unpack 'schema.table.column'-like strings into components."""
8323
8324 r = self._r_identifiers
8325 return [
8326 self._unescape_identifier(i)
8327 for i in [a or b for a, b in r.findall(identifiers)]
8328 ]