1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26
27from __future__ import annotations
28
29import collections
30import collections.abc as collections_abc
31import contextlib
32from enum import IntEnum
33import functools
34import itertools
35import operator
36import re
37from time import perf_counter
38import typing
39from typing import Any
40from typing import Callable
41from typing import cast
42from typing import ClassVar
43from typing import Dict
44from typing import Final
45from typing import FrozenSet
46from typing import Iterable
47from typing import Iterator
48from typing import List
49from typing import Literal
50from typing import Mapping
51from typing import MutableMapping
52from typing import NamedTuple
53from typing import NoReturn
54from typing import Optional
55from typing import Pattern
56from typing import Protocol
57from typing import Sequence
58from typing import Set
59from typing import Tuple
60from typing import Type
61from typing import TYPE_CHECKING
62from typing import TypedDict
63from typing import Union
64
65from . import base
66from . import coercions
67from . import crud
68from . import elements
69from . import functions
70from . import operators
71from . import roles
72from . import schema
73from . import selectable
74from . import sqltypes
75from . import util as sql_util
76from ._typing import is_column_element
77from ._typing import is_dml
78from .base import _de_clone
79from .base import _from_objects
80from .base import _NONE_NAME
81from .base import _SentinelDefaultCharacterization
82from .base import NO_ARG
83from .elements import quoted_name
84from .sqltypes import TupleType
85from .visitors import prefix_anon_map
86from .. import exc
87from .. import util
88from ..util import FastIntFlag
89from ..util.typing import Self
90from ..util.typing import TupleAny
91from ..util.typing import Unpack
92
93if typing.TYPE_CHECKING:
94 from .annotation import _AnnotationDict
95 from .base import _AmbiguousTableNameMap
96 from .base import CompileState
97 from .base import Executable
98 from .base import ExecutableStatement
99 from .cache_key import CacheKey
100 from .ddl import _TableViaSelect
101 from .ddl import CreateTableAs
102 from .ddl import CreateView
103 from .ddl import ExecutableDDLElement
104 from .dml import Delete
105 from .dml import Insert
106 from .dml import Update
107 from .dml import UpdateBase
108 from .dml import UpdateDMLState
109 from .dml import ValuesBase
110 from .elements import _truncated_label
111 from .elements import BinaryExpression
112 from .elements import BindParameter
113 from .elements import ClauseElement
114 from .elements import ColumnClause
115 from .elements import ColumnElement
116 from .elements import False_
117 from .elements import Label
118 from .elements import Null
119 from .elements import True_
120 from .functions import Function
121 from .schema import CheckConstraint
122 from .schema import Column
123 from .schema import Constraint
124 from .schema import ForeignKeyConstraint
125 from .schema import IdentityOptions
126 from .schema import Index
127 from .schema import PrimaryKeyConstraint
128 from .schema import Table
129 from .schema import UniqueConstraint
130 from .selectable import _ColumnsClauseElement
131 from .selectable import AliasedReturnsRows
132 from .selectable import CompoundSelectState
133 from .selectable import CTE
134 from .selectable import FromClause
135 from .selectable import NamedFromClause
136 from .selectable import ReturnsRows
137 from .selectable import Select
138 from .selectable import SelectState
139 from .type_api import _BindProcessorType
140 from .type_api import TypeDecorator
141 from .type_api import TypeEngine
142 from .type_api import UserDefinedType
143 from .visitors import Visitable
144 from ..engine.cursor import CursorResultMetaData
145 from ..engine.interfaces import _CoreSingleExecuteParams
146 from ..engine.interfaces import _DBAPIAnyExecuteParams
147 from ..engine.interfaces import _DBAPIMultiExecuteParams
148 from ..engine.interfaces import _DBAPISingleExecuteParams
149 from ..engine.interfaces import _ExecuteOptions
150 from ..engine.interfaces import _GenericSetInputSizesType
151 from ..engine.interfaces import _MutableCoreSingleExecuteParams
152 from ..engine.interfaces import Dialect
153 from ..engine.interfaces import SchemaTranslateMapType
154
155
156_FromHintsType = Dict["FromClause", str]
157
158RESERVED_WORDS = {
159 "all",
160 "analyse",
161 "analyze",
162 "and",
163 "any",
164 "array",
165 "as",
166 "asc",
167 "asymmetric",
168 "authorization",
169 "between",
170 "binary",
171 "both",
172 "case",
173 "cast",
174 "check",
175 "collate",
176 "column",
177 "constraint",
178 "create",
179 "cross",
180 "current_date",
181 "current_role",
182 "current_time",
183 "current_timestamp",
184 "current_user",
185 "default",
186 "deferrable",
187 "desc",
188 "distinct",
189 "do",
190 "else",
191 "end",
192 "except",
193 "false",
194 "for",
195 "foreign",
196 "freeze",
197 "from",
198 "full",
199 "grant",
200 "group",
201 "having",
202 "ilike",
203 "in",
204 "initially",
205 "inner",
206 "intersect",
207 "into",
208 "is",
209 "isnull",
210 "join",
211 "leading",
212 "left",
213 "like",
214 "limit",
215 "localtime",
216 "localtimestamp",
217 "natural",
218 "new",
219 "not",
220 "notnull",
221 "null",
222 "off",
223 "offset",
224 "old",
225 "on",
226 "only",
227 "or",
228 "order",
229 "outer",
230 "overlaps",
231 "placing",
232 "primary",
233 "references",
234 "right",
235 "select",
236 "session_user",
237 "set",
238 "similar",
239 "some",
240 "symmetric",
241 "table",
242 "then",
243 "to",
244 "trailing",
245 "true",
246 "union",
247 "unique",
248 "user",
249 "using",
250 "verbose",
251 "when",
252 "where",
253}
254
255LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
256LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
257ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
258
259FK_ON_DELETE = re.compile(
260 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
261)
262FK_ON_UPDATE = re.compile(
263 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
264)
265FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
266_WINDOW_EXCLUDE_RE = re.compile(
267 r"^(?:CURRENT ROW|GROUP|TIES|NO OTHERS)$", re.I
268)
269BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
270BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
271
272_pyformat_template = "%%(%(name)s)s"
273BIND_TEMPLATES = {
274 "pyformat": _pyformat_template,
275 "qmark": "?",
276 "format": "%%s",
277 "numeric": ":[_POSITION]",
278 "numeric_dollar": "$[_POSITION]",
279 "named": ":%(name)s",
280}
281
282
283OPERATORS = {
284 # binary
285 operators.and_: " AND ",
286 operators.or_: " OR ",
287 operators.add: " + ",
288 operators.mul: " * ",
289 operators.sub: " - ",
290 operators.mod: " % ",
291 operators.neg: "-",
292 operators.lt: " < ",
293 operators.le: " <= ",
294 operators.ne: " != ",
295 operators.gt: " > ",
296 operators.ge: " >= ",
297 operators.eq: " = ",
298 operators.is_distinct_from: " IS DISTINCT FROM ",
299 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
300 operators.concat_op: " || ",
301 operators.match_op: " MATCH ",
302 operators.not_match_op: " NOT MATCH ",
303 operators.in_op: " IN ",
304 operators.not_in_op: " NOT IN ",
305 operators.comma_op: ", ",
306 operators.from_: " FROM ",
307 operators.as_: " AS ",
308 operators.is_: " IS ",
309 operators.is_not: " IS NOT ",
310 operators.collate: " COLLATE ",
311 # unary
312 operators.exists: "EXISTS ",
313 operators.distinct_op: "DISTINCT ",
314 operators.inv: "NOT ",
315 operators.any_op: "ANY ",
316 operators.all_op: "ALL ",
317 # modifiers
318 operators.desc_op: " DESC",
319 operators.asc_op: " ASC",
320 operators.nulls_first_op: " NULLS FIRST",
321 operators.nulls_last_op: " NULLS LAST",
322 # bitwise
323 operators.bitwise_xor_op: " ^ ",
324 operators.bitwise_or_op: " | ",
325 operators.bitwise_and_op: " & ",
326 operators.bitwise_not_op: "~",
327 operators.bitwise_lshift_op: " << ",
328 operators.bitwise_rshift_op: " >> ",
329}
330
331FUNCTIONS: Dict[Type[Function[Any]], str] = {
332 functions.coalesce: "coalesce",
333 functions.current_date: "CURRENT_DATE",
334 functions.current_time: "CURRENT_TIME",
335 functions.current_timestamp: "CURRENT_TIMESTAMP",
336 functions.current_user: "CURRENT_USER",
337 functions.localtime: "LOCALTIME",
338 functions.localtimestamp: "LOCALTIMESTAMP",
339 functions.random: "random",
340 functions.sysdate: "sysdate",
341 functions.session_user: "SESSION_USER",
342 functions.user: "USER",
343 functions.cube: "CUBE",
344 functions.rollup: "ROLLUP",
345 functions.grouping_sets: "GROUPING SETS",
346}
347
348
349EXTRACT_MAP = {
350 "month": "month",
351 "day": "day",
352 "year": "year",
353 "second": "second",
354 "hour": "hour",
355 "doy": "doy",
356 "minute": "minute",
357 "quarter": "quarter",
358 "dow": "dow",
359 "week": "week",
360 "epoch": "epoch",
361 "milliseconds": "milliseconds",
362 "microseconds": "microseconds",
363 "timezone_hour": "timezone_hour",
364 "timezone_minute": "timezone_minute",
365}
366
367COMPOUND_KEYWORDS = {
368 selectable._CompoundSelectKeyword.UNION: "UNION",
369 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
370 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
371 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
372 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
373 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
374}
375
376
377class ResultColumnsEntry(NamedTuple):
378 """Tracks a column expression that is expected to be represented
379 in the result rows for this statement.
380
381 This normally refers to the columns clause of a SELECT statement
382 but may also refer to a RETURNING clause, as well as for dialect-specific
383 emulations.
384
385 """
386
387 keyname: str
388 """string name that's expected in cursor.description"""
389
390 name: str
391 """column name, may be labeled"""
392
393 objects: Tuple[Any, ...]
394 """sequence of objects that should be able to locate this column
395 in a RowMapping. This is typically string names and aliases
396 as well as Column objects.
397
398 """
399
400 type: TypeEngine[Any]
401 """Datatype to be associated with this column. This is where
402 the "result processing" logic directly links the compiled statement
403 to the rows that come back from the cursor.
404
405 """
406
407
408class _ResultMapAppender(Protocol):
409 def __call__(
410 self,
411 keyname: str,
412 name: str,
413 objects: Sequence[Any],
414 type_: TypeEngine[Any],
415 ) -> None: ...
416
417
418# integer indexes into ResultColumnsEntry used by cursor.py.
419# some profiling showed integer access faster than named tuple
420RM_RENDERED_NAME: Literal[0] = 0
421RM_NAME: Literal[1] = 1
422RM_OBJECTS: Literal[2] = 2
423RM_TYPE: Literal[3] = 3
424
425
426class _BaseCompilerStackEntry(TypedDict):
427 asfrom_froms: Set[FromClause]
428 correlate_froms: Set[FromClause]
429 selectable: ReturnsRows
430
431
432class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
433 compile_state: CompileState
434 need_result_map_for_nested: bool
435 need_result_map_for_compound: bool
436 select_0: ReturnsRows
437 insert_from_select: Select[Unpack[TupleAny]]
438
439
440class ExpandedState(NamedTuple):
441 """represents state to use when producing "expanded" and
442 "post compile" bound parameters for a statement.
443
444 "expanded" parameters are parameters that are generated at
445 statement execution time to suit a number of parameters passed, the most
446 prominent example being the individual elements inside of an IN expression.
447
448 "post compile" parameters are parameters where the SQL literal value
449 will be rendered into the SQL statement at execution time, rather than
450 being passed as separate parameters to the driver.
451
452 To create an :class:`.ExpandedState` instance, use the
453 :meth:`.SQLCompiler.construct_expanded_state` method on any
454 :class:`.SQLCompiler` instance.
455
456 """
457
458 statement: str
459 """String SQL statement with parameters fully expanded"""
460
461 parameters: _CoreSingleExecuteParams
462 """Parameter dictionary with parameters fully expanded.
463
464 For a statement that uses named parameters, this dictionary will map
465 exactly to the names in the statement. For a statement that uses
466 positional parameters, the :attr:`.ExpandedState.positional_parameters`
467 will yield a tuple with the positional parameter set.
468
469 """
470
471 processors: Mapping[str, _BindProcessorType[Any]]
472 """mapping of bound value processors"""
473
474 positiontup: Optional[Sequence[str]]
475 """Sequence of string names indicating the order of positional
476 parameters"""
477
478 parameter_expansion: Mapping[str, List[str]]
479 """Mapping representing the intermediary link from original parameter
480 name to list of "expanded" parameter names, for those parameters that
481 were expanded."""
482
483 @property
484 def positional_parameters(self) -> Tuple[Any, ...]:
485 """Tuple of positional parameters, for statements that were compiled
486 using a positional paramstyle.
487
488 """
489 if self.positiontup is None:
490 raise exc.InvalidRequestError(
491 "statement does not use a positional paramstyle"
492 )
493 return tuple(self.parameters[key] for key in self.positiontup)
494
495 @property
496 def additional_parameters(self) -> _CoreSingleExecuteParams:
497 """synonym for :attr:`.ExpandedState.parameters`."""
498 return self.parameters
499
500
501class _InsertManyValues(NamedTuple):
502 """represents state to use for executing an "insertmanyvalues" statement.
503
504 The primary consumers of this object are the
505 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
506 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
507
508 .. versionadded:: 2.0
509
510 """
511
512 is_default_expr: bool
513 """if True, the statement is of the form
514 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
515
516 """
517
518 single_values_expr: str
519 """The rendered "values" clause of the INSERT statement.
520
521 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
522 The insertmanyvalues logic uses this string as a search and replace
523 target.
524
525 """
526
527 insert_crud_params: List[crud._CrudParamElementStr]
528 """List of Column / bind names etc. used while rewriting the statement"""
529
530 num_positional_params_counted: int
531 """the number of bound parameters in a single-row statement.
532
533 This count may be larger or smaller than the actual number of columns
534 targeted in the INSERT, as it accommodates for SQL expressions
535 in the values list that may have zero or more parameters embedded
536 within them.
537
538 This count is part of what's used to organize rewritten parameter lists
539 when batching.
540
541 """
542
543 sort_by_parameter_order: bool = False
544 """if the deterministic_returnined_order parameter were used on the
545 insert.
546
547 All of the attributes following this will only be used if this is True.
548
549 """
550
551 includes_upsert_behaviors: bool = False
552 """if True, we have to accommodate for upsert behaviors.
553
554 This will in some cases downgrade "insertmanyvalues" that requests
555 deterministic ordering.
556
557 """
558
559 sentinel_columns: Optional[Sequence[Column[Any]]] = None
560 """List of sentinel columns that were located.
561
562 This list is only here if the INSERT asked for
563 sort_by_parameter_order=True,
564 and dialect-appropriate sentinel columns were located.
565
566 .. versionadded:: 2.0.10
567
568 """
569
570 num_sentinel_columns: int = 0
571 """how many sentinel columns are in the above list, if any.
572
573 This is the same as
574 ``len(sentinel_columns) if sentinel_columns is not None else 0``
575
576 """
577
578 sentinel_param_keys: Optional[Sequence[str]] = None
579 """parameter str keys in each param dictionary / tuple
580 that would link to the client side "sentinel" values for that row, which
581 we can use to match up parameter sets to result rows.
582
583 This is only present if sentinel_columns is present and the INSERT
584 statement actually refers to client side values for these sentinel
585 columns.
586
587 .. versionadded:: 2.0.10
588
589 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
590 only, used against the "compiled parameteters" collection before
591 the parameters were converted by bound parameter processors
592
593 """
594
595 implicit_sentinel: bool = False
596 """if True, we have exactly one sentinel column and it uses a server side
597 value, currently has to generate an incrementing integer value.
598
599 The dialect in question would have asserted that it supports receiving
600 these values back and sorting on that value as a means of guaranteeing
601 correlation with the incoming parameter list.
602
603 .. versionadded:: 2.0.10
604
605 """
606
607 has_upsert_bound_parameters: bool = False
608 """if True, the upsert SET clause contains bound parameters that will
609 receive their values from the parameters dict (i.e., parametrized
610 bindparams where value is None and callable is None).
611
612 This means we can't batch multiple rows in a single statement, since
613 each row would need different values in the SET clause but there's only
614 one SET clause per statement. See issue #13130.
615
616 .. versionadded:: 2.0.37
617
618 """
619
620 embed_values_counter: bool = False
621 """Whether to embed an incrementing integer counter in each parameter
622 set within the VALUES clause as parameters are batched over.
623
624 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
625 where a subquery is used to produce value tuples. Current support
626 includes PostgreSQL, Microsoft SQL Server.
627
628 .. versionadded:: 2.0.10
629
630 """
631
632
633class _InsertManyValuesBatch(NamedTuple):
634 """represents an individual batch SQL statement for insertmanyvalues.
635
636 This is passed through the
637 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
638 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
639 to the :class:`.Connection` within the
640 :meth:`.Connection._exec_insertmany_context` method.
641
642 .. versionadded:: 2.0.10
643
644 """
645
646 replaced_statement: str
647 replaced_parameters: _DBAPIAnyExecuteParams
648 processed_setinputsizes: Optional[_GenericSetInputSizesType]
649 batch: Sequence[_DBAPISingleExecuteParams]
650 sentinel_values: Sequence[Tuple[Any, ...]]
651 current_batch_size: int
652 batchnum: int
653 total_batches: int
654 rows_sorted: bool
655 is_downgraded: bool
656
657
658class InsertmanyvaluesSentinelOpts(FastIntFlag):
659 """bitflag enum indicating styles of PK defaults
660 which can work as implicit sentinel columns
661
662 """
663
664 NOT_SUPPORTED = 1
665 AUTOINCREMENT = 2
666 IDENTITY = 4
667 SEQUENCE = 8
668 MONOTONIC_FUNCTION = 16
669
670 ANY_AUTOINCREMENT = (
671 AUTOINCREMENT | IDENTITY | SEQUENCE | MONOTONIC_FUNCTION
672 )
673 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
674
675 USE_INSERT_FROM_SELECT = 32
676 RENDER_SELECT_COL_CASTS = 64
677
678
679class AggregateOrderByStyle(IntEnum):
680 """Describes backend database's capabilities with ORDER BY for aggregate
681 functions
682
683 .. versionadded:: 2.1
684
685 """
686
687 NONE = 0
688 """database has no ORDER BY for aggregate functions"""
689
690 INLINE = 1
691 """ORDER BY is rendered inside the function's argument list, typically as
692 the last element"""
693
694 WITHIN_GROUP = 2
695 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
696 functions (not just the ordered set ones)"""
697
698
699class CompilerState(IntEnum):
700 COMPILING = 0
701 """statement is present, compilation phase in progress"""
702
703 STRING_APPLIED = 1
704 """statement is present, string form of the statement has been applied.
705
706 Additional processors by subclasses may still be pending.
707
708 """
709
710 NO_STATEMENT = 2
711 """compiler does not have a statement to compile, is used
712 for method access"""
713
714
715class Linting(IntEnum):
716 """represent preferences for the 'SQL linting' feature.
717
718 this feature currently includes support for flagging cartesian products
719 in SQL statements.
720
721 """
722
723 NO_LINTING = 0
724 "Disable all linting."
725
726 COLLECT_CARTESIAN_PRODUCTS = 1
727 """Collect data on FROMs and cartesian products and gather into
728 'self.from_linter'"""
729
730 WARN_LINTING = 2
731 "Emit warnings for linters that find problems"
732
733 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
734 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
735 and WARN_LINTING"""
736
737
738NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
739 Linting
740)
741
742
743class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
744 """represents current state for the "cartesian product" detection
745 feature."""
746
747 def lint(self, start=None):
748 froms = self.froms
749 if not froms:
750 return None, None
751
752 edges = set(self.edges)
753 the_rest = set(froms)
754
755 if start is not None:
756 start_with = start
757 the_rest.remove(start_with)
758 else:
759 start_with = the_rest.pop()
760
761 stack = collections.deque([start_with])
762
763 while stack and the_rest:
764 node = stack.popleft()
765 the_rest.discard(node)
766
767 # comparison of nodes in edges here is based on hash equality, as
768 # there are "annotated" elements that match the non-annotated ones.
769 # to remove the need for in-python hash() calls, use native
770 # containment routines (e.g. "node in edge", "edge.index(node)")
771 to_remove = {edge for edge in edges if node in edge}
772
773 # appendleft the node in each edge that is not
774 # the one that matched.
775 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
776 edges.difference_update(to_remove)
777
778 # FROMS left over? boom
779 if the_rest:
780 return the_rest, start_with
781 else:
782 return None, None
783
784 def warn(self, stmt_type="SELECT"):
785 the_rest, start_with = self.lint()
786
787 # FROMS left over? boom
788 if the_rest:
789 froms = the_rest
790 if froms:
791 template = (
792 "{stmt_type} statement has a cartesian product between "
793 "FROM element(s) {froms} and "
794 'FROM element "{start}". Apply join condition(s) '
795 "between each element to resolve."
796 )
797 froms_str = ", ".join(
798 f'"{self.froms[from_]}"' for from_ in froms
799 )
800 message = template.format(
801 stmt_type=stmt_type,
802 froms=froms_str,
803 start=self.froms[start_with],
804 )
805
806 util.warn(message)
807
808
809class Compiled:
810 """Represent a compiled SQL or DDL expression.
811
812 The ``__str__`` method of the ``Compiled`` object should produce
813 the actual text of the statement. ``Compiled`` objects are
814 specific to their underlying database dialect, and also may
815 or may not be specific to the columns referenced within a
816 particular set of bind parameters. In no case should the
817 ``Compiled`` object be dependent on the actual values of those
818 bind parameters, even though it may reference those values as
819 defaults.
820 """
821
822 statement: Optional[ClauseElement] = None
823 "The statement to compile."
824 string: str = ""
825 "The string representation of the ``statement``"
826
827 state: CompilerState
828 """description of the compiler's state"""
829
830 is_sql = False
831 is_ddl = False
832
833 _cached_metadata: Optional[CursorResultMetaData] = None
834
835 _result_columns: Optional[List[ResultColumnsEntry]] = None
836
837 schema_translate_map: Optional[SchemaTranslateMapType] = None
838
839 execution_options: _ExecuteOptions = util.EMPTY_DICT
840 """
841 Execution options propagated from the statement. In some cases,
842 sub-elements of the statement can modify these.
843 """
844
845 preparer: IdentifierPreparer
846
847 _annotations: _AnnotationDict = util.EMPTY_DICT
848
849 compile_state: Optional[CompileState] = None
850 """Optional :class:`.CompileState` object that maintains additional
851 state used by the compiler.
852
853 Major executable objects such as :class:`_expression.Insert`,
854 :class:`_expression.Update`, :class:`_expression.Delete`,
855 :class:`_expression.Select` will generate this
856 state when compiled in order to calculate additional information about the
857 object. For the top level object that is to be executed, the state can be
858 stored here where it can also have applicability towards result set
859 processing.
860
861 .. versionadded:: 1.4
862
863 """
864
865 dml_compile_state: Optional[CompileState] = None
866 """Optional :class:`.CompileState` assigned at the same point that
867 .isinsert, .isupdate, or .isdelete is assigned.
868
869 This will normally be the same object as .compile_state, with the
870 exception of cases like the :class:`.ORMFromStatementCompileState`
871 object.
872
873 .. versionadded:: 1.4.40
874
875 """
876
877 cache_key: Optional[CacheKey] = None
878 """The :class:`.CacheKey` that was generated ahead of creating this
879 :class:`.Compiled` object.
880
881 This is used for routines that need access to the original
882 :class:`.CacheKey` instance generated when the :class:`.Compiled`
883 instance was first cached, typically in order to reconcile
884 the original list of :class:`.BindParameter` objects with a
885 per-statement list that's generated on each call.
886
887 """
888
889 _gen_time: float
890 """Generation time of this :class:`.Compiled`, used for reporting
891 cache stats."""
892
893 def __init__(
894 self,
895 dialect: Dialect,
896 statement: Optional[ClauseElement],
897 schema_translate_map: Optional[SchemaTranslateMapType] = None,
898 render_schema_translate: bool = False,
899 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
900 ):
901 """Construct a new :class:`.Compiled` object.
902
903 :param dialect: :class:`.Dialect` to compile against.
904
905 :param statement: :class:`_expression.ClauseElement` to be compiled.
906
907 :param schema_translate_map: dictionary of schema names to be
908 translated when forming the resultant SQL
909
910 .. seealso::
911
912 :ref:`schema_translating`
913
914 :param compile_kwargs: additional kwargs that will be
915 passed to the initial call to :meth:`.Compiled.process`.
916
917
918 """
919 self.dialect = dialect
920 self.preparer = self.dialect.identifier_preparer
921 if schema_translate_map:
922 self.schema_translate_map = schema_translate_map
923 self.preparer = self.preparer._with_schema_translate(
924 schema_translate_map
925 )
926
927 if statement is not None:
928 self.state = CompilerState.COMPILING
929 self.statement = statement
930 self.can_execute = statement.supports_execution
931 self._annotations = statement._annotations
932 if self.can_execute:
933 if TYPE_CHECKING:
934 assert isinstance(statement, Executable)
935 self.execution_options = statement._execution_options
936 self.string = self.process(self.statement, **compile_kwargs)
937
938 if render_schema_translate:
939 assert schema_translate_map is not None
940 self.string = self.preparer._render_schema_translates(
941 self.string, schema_translate_map
942 )
943
944 self.state = CompilerState.STRING_APPLIED
945 else:
946 self.state = CompilerState.NO_STATEMENT
947
948 self._gen_time = perf_counter()
949
950 def __init_subclass__(cls) -> None:
951 cls._init_compiler_cls()
952 return super().__init_subclass__()
953
954 @classmethod
955 def _init_compiler_cls(cls):
956 pass
957
958 def visit_unsupported_compilation(self, element, err, **kw):
959 raise exc.UnsupportedCompilationError(self, type(element)) from err
960
961 @property
962 def sql_compiler(self) -> SQLCompiler:
963 """Return a Compiled that is capable of processing SQL expressions.
964
965 If this compiler is one, it would likely just return 'self'.
966
967 """
968
969 raise NotImplementedError()
970
971 def process(self, obj: Visitable, **kwargs: Any) -> str:
972 return obj._compiler_dispatch(self, **kwargs)
973
974 def __str__(self) -> str:
975 """Return the string text of the generated SQL or DDL."""
976
977 if self.state is CompilerState.STRING_APPLIED:
978 return self.string
979 else:
980 return ""
981
982 def construct_params(
983 self,
984 params: Optional[_CoreSingleExecuteParams] = None,
985 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
986 escape_names: bool = True,
987 ) -> Optional[_MutableCoreSingleExecuteParams]:
988 """Return the bind params for this compiled object.
989
990 :param params: a dict of string/object pairs whose values will
991 override bind values compiled in to the
992 statement.
993 """
994
995 raise NotImplementedError()
996
997 @property
998 def params(self):
999 """Return the bind params for this compiled object."""
1000 return self.construct_params()
1001
1002
1003class TypeCompiler(util.EnsureKWArg):
1004 """Produces DDL specification for TypeEngine objects."""
1005
1006 ensure_kwarg = r"visit_\w+"
1007
1008 def __init__(self, dialect: Dialect):
1009 self.dialect = dialect
1010
1011 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
1012 if (
1013 type_._variant_mapping
1014 and self.dialect.name in type_._variant_mapping
1015 ):
1016 type_ = type_._variant_mapping[self.dialect.name]
1017 return type_._compiler_dispatch(self, **kw)
1018
1019 def visit_unsupported_compilation(
1020 self, element: Any, err: Exception, **kw: Any
1021 ) -> NoReturn:
1022 raise exc.UnsupportedCompilationError(self, element) from err
1023
1024
1025# this was a Visitable, but to allow accurate detection of
1026# column elements this is actually a column element
1027class _CompileLabel(
1028 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1029):
1030 """lightweight label object which acts as an expression.Label."""
1031
1032 __visit_name__ = "label"
1033 __slots__ = "element", "name", "_alt_names"
1034
1035 def __init__(self, col, name, alt_names=()):
1036 self.element = col
1037 self.name = name
1038 self._alt_names = (col,) + alt_names
1039
1040 @property
1041 def proxy_set(self):
1042 return self.element.proxy_set
1043
1044 @property
1045 def type(self):
1046 return self.element.type
1047
1048 def self_group(self, **kw):
1049 return self
1050
1051
1052class aggregate_orderby_inline(
1053 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1054):
1055 """produce ORDER BY inside of function argument lists"""
1056
1057 __visit_name__ = "aggregate_orderby_inline"
1058 __slots__ = "element", "aggregate_order_by"
1059
1060 def __init__(self, element, orderby):
1061 self.element = element
1062 self.aggregate_order_by = orderby
1063
1064 def __iter__(self):
1065 return iter(self.element)
1066
1067 @property
1068 def proxy_set(self):
1069 return self.element.proxy_set
1070
1071 @property
1072 def type(self):
1073 return self.element.type
1074
1075 def self_group(self, **kw):
1076 return self
1077
1078 def _with_binary_element_type(self, type_):
1079 return aggregate_orderby_inline(
1080 self.element._with_binary_element_type(type_),
1081 self.aggregate_order_by,
1082 )
1083
1084
1085class ilike_case_insensitive(
1086 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1087):
1088 """produce a wrapping element for a case-insensitive portion of
1089 an ILIKE construct.
1090
1091 The construct usually renders the ``lower()`` function, but on
1092 PostgreSQL will pass silently with the assumption that "ILIKE"
1093 is being used.
1094
1095 .. versionadded:: 2.0
1096
1097 """
1098
1099 __visit_name__ = "ilike_case_insensitive_operand"
1100 __slots__ = "element", "comparator"
1101
1102 def __init__(self, element):
1103 self.element = element
1104 self.comparator = element.comparator
1105
1106 @property
1107 def proxy_set(self):
1108 return self.element.proxy_set
1109
1110 @property
1111 def type(self):
1112 return self.element.type
1113
1114 def self_group(self, **kw):
1115 return self
1116
1117 def _with_binary_element_type(self, type_):
1118 return ilike_case_insensitive(
1119 self.element._with_binary_element_type(type_)
1120 )
1121
1122
1123class SQLCompiler(Compiled):
1124 """Default implementation of :class:`.Compiled`.
1125
1126 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1127
1128 """
1129
1130 extract_map = EXTRACT_MAP
1131
1132 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1133 util.immutabledict(
1134 {
1135 "%": "P",
1136 "(": "A",
1137 ")": "Z",
1138 ":": "C",
1139 ".": "_",
1140 "[": "_",
1141 "]": "_",
1142 " ": "_",
1143 }
1144 )
1145 )
1146 """A mapping (e.g. dict or similar) containing a lookup of
1147 characters keyed to replacement characters which will be applied to all
1148 'bind names' used in SQL statements as a form of 'escaping'; the given
1149 characters are replaced entirely with the 'replacement' character when
1150 rendered in the SQL statement, and a similar translation is performed
1151 on the incoming names used in parameter dictionaries passed to methods
1152 like :meth:`_engine.Connection.execute`.
1153
1154 This allows bound parameter names used in :func:`_sql.bindparam` and
1155 other constructs to have any arbitrary characters present without any
1156 concern for characters that aren't allowed at all on the target database.
1157
1158 Third party dialects can establish their own dictionary here to replace the
1159 default mapping, which will ensure that the particular characters in the
1160 mapping will never appear in a bound parameter name.
1161
1162 The dictionary is evaluated at **class creation time**, so cannot be
1163 modified at runtime; it must be present on the class when the class
1164 is first declared.
1165
1166 Note that for dialects that have additional bound parameter rules such
1167 as additional restrictions on leading characters, the
1168 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1169 See the cx_Oracle compiler for an example of this.
1170
1171 .. versionadded:: 2.0.0rc1
1172
1173 """
1174
1175 _bind_translate_re: ClassVar[Pattern[str]]
1176 _bind_translate_chars: ClassVar[Mapping[str, str]]
1177
1178 is_sql = True
1179
1180 compound_keywords = COMPOUND_KEYWORDS
1181
1182 isdelete: bool = False
1183 isinsert: bool = False
1184 isupdate: bool = False
1185 """class-level defaults which can be set at the instance
1186 level to define if this Compiled instance represents
1187 INSERT/UPDATE/DELETE
1188 """
1189
1190 postfetch: Optional[List[Column[Any]]]
1191 """list of columns that can be post-fetched after INSERT or UPDATE to
1192 receive server-updated values"""
1193
1194 insert_prefetch: Sequence[Column[Any]] = ()
1195 """list of columns for which default values should be evaluated before
1196 an INSERT takes place"""
1197
1198 update_prefetch: Sequence[Column[Any]] = ()
1199 """list of columns for which onupdate default values should be evaluated
1200 before an UPDATE takes place"""
1201
1202 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1203 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1204 statement, used to receive newly generated values of columns.
1205
1206 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1207 ``returning`` collection, which was not a generalized RETURNING
1208 collection and instead was in fact specific to the "implicit returning"
1209 feature.
1210
1211 """
1212
1213 isplaintext: bool = False
1214
1215 binds: Dict[str, BindParameter[Any]]
1216 """a dictionary of bind parameter keys to BindParameter instances."""
1217
1218 bind_names: Dict[BindParameter[Any], str]
1219 """a dictionary of BindParameter instances to "compiled" names
1220 that are actually present in the generated SQL"""
1221
1222 stack: List[_CompilerStackEntry]
1223 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1224 tracked in this stack using an entry format."""
1225
1226 returning_precedes_values: bool = False
1227 """set to True classwide to generate RETURNING
1228 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1229 """
1230
1231 render_table_with_column_in_update_from: bool = False
1232 """set to True classwide to indicate the SET clause
1233 in a multi-table UPDATE statement should qualify
1234 columns with the table name (i.e. MySQL only)
1235 """
1236
1237 ansi_bind_rules: bool = False
1238 """SQL 92 doesn't allow bind parameters to be used
1239 in the columns clause of a SELECT, nor does it allow
1240 ambiguous expressions like "? = ?". A compiler
1241 subclass can set this flag to False if the target
1242 driver/DB enforces this
1243 """
1244
1245 bindtemplate: str
1246 """template to render bound parameters based on paramstyle."""
1247
1248 compilation_bindtemplate: str
1249 """template used by compiler to render parameters before positional
1250 paramstyle application"""
1251
1252 _numeric_binds_identifier_char: str
1253 """Character that's used to as the identifier of a numerical bind param.
1254 For example if this char is set to ``$``, numerical binds will be rendered
1255 in the form ``$1, $2, $3``.
1256 """
1257
1258 _result_columns: List[ResultColumnsEntry]
1259 """relates label names in the final SQL to a tuple of local
1260 column/label name, ColumnElement object (if any) and
1261 TypeEngine. CursorResult uses this for type processing and
1262 column targeting"""
1263
1264 _textual_ordered_columns: bool = False
1265 """tell the result object that the column names as rendered are important,
1266 but they are also "ordered" vs. what is in the compiled object here.
1267
1268 As of 1.4.42 this condition is only present when the statement is a
1269 TextualSelect, e.g. text("....").columns(...), where it is required
1270 that the columns are considered positionally and not by name.
1271
1272 """
1273
1274 _ad_hoc_textual: bool = False
1275 """tell the result that we encountered text() or '*' constructs in the
1276 middle of the result columns, but we also have compiled columns, so
1277 if the number of columns in cursor.description does not match how many
1278 expressions we have, that means we can't rely on positional at all and
1279 should match on name.
1280
1281 """
1282
1283 _ordered_columns: bool = True
1284 """
1285 if False, means we can't be sure the list of entries
1286 in _result_columns is actually the rendered order. Usually
1287 True unless using an unordered TextualSelect.
1288 """
1289
1290 _loose_column_name_matching: bool = False
1291 """tell the result object that the SQL statement is textual, wants to match
1292 up to Column objects, and may be using the ._tq_label in the SELECT rather
1293 than the base name.
1294
1295 """
1296
1297 _numeric_binds: bool = False
1298 """
1299 True if paramstyle is "numeric". This paramstyle is trickier than
1300 all the others.
1301
1302 """
1303
1304 _render_postcompile: bool = False
1305 """
1306 whether to render out POSTCOMPILE params during the compile phase.
1307
1308 This attribute is used only for end-user invocation of stmt.compile();
1309 it's never used for actual statement execution, where instead the
1310 dialect internals access and render the internal postcompile structure
1311 directly.
1312
1313 """
1314
1315 _post_compile_expanded_state: Optional[ExpandedState] = None
1316 """When render_postcompile is used, the ``ExpandedState`` used to create
1317 the "expanded" SQL is assigned here, and then used by the ``.params``
1318 accessor and ``.construct_params()`` methods for their return values.
1319
1320 .. versionadded:: 2.0.0rc1
1321
1322 """
1323
1324 _pre_expanded_string: Optional[str] = None
1325 """Stores the original string SQL before 'post_compile' is applied,
1326 for cases where 'post_compile' were used.
1327
1328 """
1329
1330 _pre_expanded_positiontup: Optional[List[str]] = None
1331
1332 _insertmanyvalues: Optional[_InsertManyValues] = None
1333
1334 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1335
1336 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1337 """bindparameter objects that are rendered as literal values at statement
1338 execution time.
1339
1340 """
1341
1342 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1343 """bindparameter objects that are rendered as bound parameter placeholders
1344 at statement execution time.
1345
1346 """
1347
1348 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1349 """Late escaping of bound parameter names that has to be converted
1350 to the original name when looking in the parameter dictionary.
1351
1352 """
1353
1354 has_out_parameters = False
1355 """if True, there are bindparam() objects that have the isoutparam
1356 flag set."""
1357
1358 postfetch_lastrowid = False
1359 """if True, and this in insert, use cursor.lastrowid to populate
1360 result.inserted_primary_key. """
1361
1362 _cache_key_bind_match: Optional[
1363 Tuple[
1364 Dict[
1365 BindParameter[Any],
1366 List[BindParameter[Any]],
1367 ],
1368 Dict[
1369 str,
1370 BindParameter[Any],
1371 ],
1372 ]
1373 ] = None
1374 """a mapping that will relate the BindParameter object we compile
1375 to those that are part of the extracted collection of parameters
1376 in the cache key, if we were given a cache key.
1377
1378 """
1379
1380 positiontup: Optional[List[str]] = None
1381 """for a compiled construct that uses a positional paramstyle, will be
1382 a sequence of strings, indicating the names of bound parameters in order.
1383
1384 This is used in order to render bound parameters in their correct order,
1385 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1386 render parameters.
1387
1388 This sequence always contains the unescaped name of the parameters.
1389
1390 .. seealso::
1391
1392 :ref:`faq_sql_expression_string` - includes a usage example for
1393 debugging use cases.
1394
1395 """
1396 _values_bindparam: Optional[List[str]] = None
1397
1398 _visited_bindparam: Optional[List[str]] = None
1399
1400 inline: bool = False
1401
1402 ctes: Optional[MutableMapping[CTE, str]]
1403
1404 # Detect same CTE references - Dict[(level, name), cte]
1405 # Level is required for supporting nesting
1406 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1407
1408 # To retrieve key/level in ctes_by_level_name -
1409 # Dict[cte_reference, (level, cte_name, cte_opts)]
1410 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1411
1412 ctes_recursive: bool
1413
1414 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1415 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1416 _positional_pattern = re.compile(
1417 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1418 )
1419 _collect_params: Final[bool]
1420 _collected_params: util.immutabledict[str, Any]
1421
1422 @classmethod
1423 def _init_compiler_cls(cls):
1424 cls._init_bind_translate()
1425
1426 @classmethod
1427 def _init_bind_translate(cls):
1428 reg = re.escape("".join(cls.bindname_escape_characters))
1429 cls._bind_translate_re = re.compile(f"[{reg}]")
1430 cls._bind_translate_chars = cls.bindname_escape_characters
1431
1432 def __init__(
1433 self,
1434 dialect: Dialect,
1435 statement: Optional[ClauseElement],
1436 cache_key: Optional[CacheKey] = None,
1437 column_keys: Optional[Sequence[str]] = None,
1438 for_executemany: bool = False,
1439 linting: Linting = NO_LINTING,
1440 _supporting_against: Optional[SQLCompiler] = None,
1441 **kwargs: Any,
1442 ):
1443 """Construct a new :class:`.SQLCompiler` object.
1444
1445 :param dialect: :class:`.Dialect` to be used
1446
1447 :param statement: :class:`_expression.ClauseElement` to be compiled
1448
1449 :param column_keys: a list of column names to be compiled into an
1450 INSERT or UPDATE statement.
1451
1452 :param for_executemany: whether INSERT / UPDATE statements should
1453 expect that they are to be invoked in an "executemany" style,
1454 which may impact how the statement will be expected to return the
1455 values of defaults and autoincrement / sequences and similar.
1456 Depending on the backend and driver in use, support for retrieving
1457 these values may be disabled which means SQL expressions may
1458 be rendered inline, RETURNING may not be rendered, etc.
1459
1460 :param kwargs: additional keyword arguments to be consumed by the
1461 superclass.
1462
1463 """
1464 self.column_keys = column_keys
1465
1466 self.cache_key = cache_key
1467
1468 if cache_key:
1469 cksm = {b.key: b for b in cache_key[1]}
1470 ckbm = {b: [b] for b in cache_key[1]}
1471 self._cache_key_bind_match = (ckbm, cksm)
1472
1473 # compile INSERT/UPDATE defaults/sequences to expect executemany
1474 # style execution, which may mean no pre-execute of defaults,
1475 # or no RETURNING
1476 self.for_executemany = for_executemany
1477
1478 self.linting = linting
1479
1480 # a dictionary of bind parameter keys to BindParameter
1481 # instances.
1482 self.binds = {}
1483
1484 # a dictionary of BindParameter instances to "compiled" names
1485 # that are actually present in the generated SQL
1486 self.bind_names = util.column_dict()
1487
1488 # stack which keeps track of nested SELECT statements
1489 self.stack = []
1490
1491 self._result_columns = []
1492
1493 # true if the paramstyle is positional
1494 self.positional = dialect.positional
1495 if self.positional:
1496 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1497 if nb:
1498 self._numeric_binds_identifier_char = (
1499 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1500 )
1501
1502 self.compilation_bindtemplate = _pyformat_template
1503 else:
1504 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1505
1506 self.ctes = None
1507
1508 self.label_length = (
1509 dialect.label_length or dialect.max_identifier_length
1510 )
1511
1512 # a map which tracks "anonymous" identifiers that are created on
1513 # the fly here
1514 self.anon_map = prefix_anon_map()
1515
1516 # a map which tracks "truncated" names based on
1517 # dialect.label_length or dialect.max_identifier_length
1518 self.truncated_names: Dict[Tuple[str, str], str] = {}
1519 self._truncated_counters: Dict[str, int] = {}
1520 if not cache_key:
1521 self._collect_params = True
1522 self._collected_params = util.EMPTY_DICT
1523 else:
1524 self._collect_params = False # type: ignore[misc]
1525
1526 Compiled.__init__(self, dialect, statement, **kwargs)
1527
1528 if self.isinsert or self.isupdate or self.isdelete:
1529 if TYPE_CHECKING:
1530 assert isinstance(statement, UpdateBase)
1531
1532 if self.isinsert or self.isupdate:
1533 if TYPE_CHECKING:
1534 assert isinstance(statement, ValuesBase)
1535 if statement._inline:
1536 self.inline = True
1537 elif self.for_executemany and (
1538 not self.isinsert
1539 or (
1540 self.dialect.insert_executemany_returning
1541 and statement._return_defaults
1542 )
1543 ):
1544 self.inline = True
1545
1546 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1547
1548 if _supporting_against:
1549 self.__dict__.update(
1550 {
1551 k: v
1552 for k, v in _supporting_against.__dict__.items()
1553 if k
1554 not in {
1555 "state",
1556 "dialect",
1557 "preparer",
1558 "positional",
1559 "_numeric_binds",
1560 "compilation_bindtemplate",
1561 "bindtemplate",
1562 }
1563 }
1564 )
1565
1566 if self.state is CompilerState.STRING_APPLIED:
1567 if self.positional:
1568 if self._numeric_binds:
1569 self._process_numeric()
1570 else:
1571 self._process_positional()
1572
1573 if self._render_postcompile:
1574 parameters = self.construct_params(
1575 escape_names=False,
1576 _no_postcompile=True,
1577 )
1578
1579 self._process_parameters_for_postcompile(
1580 parameters, _populate_self=True
1581 )
1582
1583 @property
1584 def insert_single_values_expr(self) -> Optional[str]:
1585 """When an INSERT is compiled with a single set of parameters inside
1586 a VALUES expression, the string is assigned here, where it can be
1587 used for insert batching schemes to rewrite the VALUES expression.
1588
1589 .. versionchanged:: 2.0 This collection is no longer used by
1590 SQLAlchemy's built-in dialects, in favor of the currently
1591 internal ``_insertmanyvalues`` collection that is used only by
1592 :class:`.SQLCompiler`.
1593
1594 """
1595 if self._insertmanyvalues is None:
1596 return None
1597 else:
1598 return self._insertmanyvalues.single_values_expr
1599
1600 @util.ro_memoized_property
1601 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1602 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1603
1604 This is either the so-called "implicit returning" columns which are
1605 calculated by the compiler on the fly, or those present based on what's
1606 present in ``self.statement._returning`` (expanded into individual
1607 columns using the ``._all_selected_columns`` attribute) i.e. those set
1608 explicitly using the :meth:`.UpdateBase.returning` method.
1609
1610 .. versionadded:: 2.0
1611
1612 """
1613 if self.implicit_returning:
1614 return self.implicit_returning
1615 elif self.statement is not None and is_dml(self.statement):
1616 return [
1617 c
1618 for c in self.statement._all_selected_columns
1619 if is_column_element(c)
1620 ]
1621
1622 else:
1623 return None
1624
1625 @property
1626 def returning(self):
1627 """backwards compatibility; returns the
1628 effective_returning collection.
1629
1630 """
1631 return self.effective_returning
1632
1633 @property
1634 def current_executable(self):
1635 """Return the current 'executable' that is being compiled.
1636
1637 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1638 :class:`_sql.Update`, :class:`_sql.Delete`,
1639 :class:`_sql.CompoundSelect` object that is being compiled.
1640 Specifically it's assigned to the ``self.stack`` list of elements.
1641
1642 When a statement like the above is being compiled, it normally
1643 is also assigned to the ``.statement`` attribute of the
1644 :class:`_sql.Compiler` object. However, all SQL constructs are
1645 ultimately nestable, and this attribute should never be consulted
1646 by a ``visit_`` method, as it is not guaranteed to be assigned
1647 nor guaranteed to correspond to the current statement being compiled.
1648
1649 """
1650 try:
1651 return self.stack[-1]["selectable"]
1652 except IndexError as ie:
1653 raise IndexError("Compiler does not have a stack entry") from ie
1654
1655 @property
1656 def prefetch(self):
1657 return list(self.insert_prefetch) + list(self.update_prefetch)
1658
1659 @util.memoized_property
1660 def _global_attributes(self) -> Dict[Any, Any]:
1661 return {}
1662
1663 def _add_to_params(self, item: ExecutableStatement) -> None:
1664 # assumes that this is called before traversing the statement
1665 # so the call happens outer to inner, meaning that existing params
1666 # take precedence
1667 if item._params:
1668 self._collected_params = item._params | self._collected_params
1669
1670 @util.memoized_instancemethod
1671 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1672 """Initialize collections related to CTEs only if
1673 a CTE is located, to save on the overhead of
1674 these collections otherwise.
1675
1676 """
1677 # collect CTEs to tack on top of a SELECT
1678 # To store the query to print - Dict[cte, text_query]
1679 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1680 self.ctes = ctes
1681
1682 # Detect same CTE references - Dict[(level, name), cte]
1683 # Level is required for supporting nesting
1684 self.ctes_by_level_name = {}
1685
1686 # To retrieve key/level in ctes_by_level_name -
1687 # Dict[cte_reference, (level, cte_name, cte_opts)]
1688 self.level_name_by_cte = {}
1689
1690 self.ctes_recursive = False
1691
1692 return ctes
1693
1694 @contextlib.contextmanager
1695 def _nested_result(self):
1696 """special API to support the use case of 'nested result sets'"""
1697 result_columns, ordered_columns = (
1698 self._result_columns,
1699 self._ordered_columns,
1700 )
1701 self._result_columns, self._ordered_columns = [], False
1702
1703 try:
1704 if self.stack:
1705 entry = self.stack[-1]
1706 entry["need_result_map_for_nested"] = True
1707 else:
1708 entry = None
1709 yield self._result_columns, self._ordered_columns
1710 finally:
1711 if entry:
1712 entry.pop("need_result_map_for_nested")
1713 self._result_columns, self._ordered_columns = (
1714 result_columns,
1715 ordered_columns,
1716 )
1717
1718 def _process_positional(self):
1719 assert not self.positiontup
1720 assert self.state is CompilerState.STRING_APPLIED
1721 assert not self._numeric_binds
1722
1723 if self.dialect.paramstyle == "format":
1724 placeholder = "%s"
1725 else:
1726 assert self.dialect.paramstyle == "qmark"
1727 placeholder = "?"
1728
1729 positions = []
1730
1731 def find_position(m: re.Match[str]) -> str:
1732 normal_bind = m.group(1)
1733 if normal_bind:
1734 positions.append(normal_bind)
1735 return placeholder
1736 else:
1737 # this a post-compile bind
1738 positions.append(m.group(2))
1739 return m.group(0)
1740
1741 self.string = re.sub(
1742 self._positional_pattern, find_position, self.string
1743 )
1744
1745 if self.escaped_bind_names:
1746 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1747 assert len(self.escaped_bind_names) == len(reverse_escape)
1748 self.positiontup = [
1749 reverse_escape.get(name, name) for name in positions
1750 ]
1751 else:
1752 self.positiontup = positions
1753
1754 if self._insertmanyvalues:
1755 positions = []
1756
1757 single_values_expr = re.sub(
1758 self._positional_pattern,
1759 find_position,
1760 self._insertmanyvalues.single_values_expr,
1761 )
1762 insert_crud_params = [
1763 (
1764 v[0],
1765 v[1],
1766 re.sub(self._positional_pattern, find_position, v[2]),
1767 v[3],
1768 )
1769 for v in self._insertmanyvalues.insert_crud_params
1770 ]
1771
1772 self._insertmanyvalues = self._insertmanyvalues._replace(
1773 single_values_expr=single_values_expr,
1774 insert_crud_params=insert_crud_params,
1775 )
1776
1777 def _process_numeric(self):
1778 assert self._numeric_binds
1779 assert self.state is CompilerState.STRING_APPLIED
1780
1781 num = 1
1782 param_pos: Dict[str, str] = {}
1783 order: Iterable[str]
1784 if self._insertmanyvalues and self._values_bindparam is not None:
1785 # bindparams that are not in values are always placed first.
1786 # this avoids the need of changing them when using executemany
1787 # values () ()
1788 order = itertools.chain(
1789 (
1790 name
1791 for name in self.bind_names.values()
1792 if name not in self._values_bindparam
1793 ),
1794 self.bind_names.values(),
1795 )
1796 else:
1797 order = self.bind_names.values()
1798
1799 for bind_name in order:
1800 if bind_name in param_pos:
1801 continue
1802 bind = self.binds[bind_name]
1803 if (
1804 bind in self.post_compile_params
1805 or bind in self.literal_execute_params
1806 ):
1807 # set to None to just mark the in positiontup, it will not
1808 # be replaced below.
1809 param_pos[bind_name] = None # type: ignore
1810 else:
1811 ph = f"{self._numeric_binds_identifier_char}{num}"
1812 num += 1
1813 param_pos[bind_name] = ph
1814
1815 self.next_numeric_pos = num
1816
1817 self.positiontup = list(param_pos)
1818 if self.escaped_bind_names:
1819 len_before = len(param_pos)
1820 param_pos = {
1821 self.escaped_bind_names.get(name, name): pos
1822 for name, pos in param_pos.items()
1823 }
1824 assert len(param_pos) == len_before
1825
1826 # Can't use format here since % chars are not escaped.
1827 self.string = self._pyformat_pattern.sub(
1828 lambda m: param_pos[m.group(1)], self.string
1829 )
1830
1831 if self._insertmanyvalues:
1832 single_values_expr = (
1833 # format is ok here since single_values_expr includes only
1834 # place-holders
1835 self._insertmanyvalues.single_values_expr
1836 % param_pos
1837 )
1838 insert_crud_params = [
1839 (v[0], v[1], "%s", v[3])
1840 for v in self._insertmanyvalues.insert_crud_params
1841 ]
1842
1843 self._insertmanyvalues = self._insertmanyvalues._replace(
1844 # This has the numbers (:1, :2)
1845 single_values_expr=single_values_expr,
1846 # The single binds are instead %s so they can be formatted
1847 insert_crud_params=insert_crud_params,
1848 )
1849
1850 @util.memoized_property
1851 def _bind_processors(
1852 self,
1853 ) -> MutableMapping[
1854 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1855 ]:
1856 # mypy is not able to see the two value types as the above Union,
1857 # it just sees "object". don't know how to resolve
1858 return {
1859 key: value # type: ignore
1860 for key, value in (
1861 (
1862 self.bind_names[bindparam],
1863 (
1864 bindparam.type._cached_bind_processor(self.dialect)
1865 if not bindparam.type._is_tuple_type
1866 else tuple(
1867 elem_type._cached_bind_processor(self.dialect)
1868 for elem_type in cast(
1869 TupleType, bindparam.type
1870 ).types
1871 )
1872 ),
1873 )
1874 for bindparam in self.bind_names
1875 )
1876 if value is not None
1877 }
1878
1879 def is_subquery(self):
1880 return len(self.stack) > 1
1881
1882 @property
1883 def sql_compiler(self) -> Self:
1884 return self
1885
1886 def construct_expanded_state(
1887 self,
1888 params: Optional[_CoreSingleExecuteParams] = None,
1889 escape_names: bool = True,
1890 ) -> ExpandedState:
1891 """Return a new :class:`.ExpandedState` for a given parameter set.
1892
1893 For queries that use "expanding" or other late-rendered parameters,
1894 this method will provide for both the finalized SQL string as well
1895 as the parameters that would be used for a particular parameter set.
1896
1897 .. versionadded:: 2.0.0rc1
1898
1899 """
1900 parameters = self.construct_params(
1901 params,
1902 escape_names=escape_names,
1903 _no_postcompile=True,
1904 )
1905 return self._process_parameters_for_postcompile(
1906 parameters,
1907 )
1908
1909 def construct_params(
1910 self,
1911 params: Optional[_CoreSingleExecuteParams] = None,
1912 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1913 escape_names: bool = True,
1914 _group_number: Optional[int] = None,
1915 _check: bool = True,
1916 _no_postcompile: bool = False,
1917 _collected_params: _CoreSingleExecuteParams | None = None,
1918 ) -> _MutableCoreSingleExecuteParams:
1919 """return a dictionary of bind parameter keys and values"""
1920 if _collected_params is not None:
1921 assert not self._collect_params
1922 elif self._collect_params:
1923 _collected_params = self._collected_params
1924
1925 if _collected_params:
1926 if not params:
1927 params = _collected_params
1928 else:
1929 params = {**_collected_params, **params}
1930
1931 if self._render_postcompile and not _no_postcompile:
1932 assert self._post_compile_expanded_state is not None
1933 if not params:
1934 return dict(self._post_compile_expanded_state.parameters)
1935 else:
1936 raise exc.InvalidRequestError(
1937 "can't construct new parameters when render_postcompile "
1938 "is used; the statement is hard-linked to the original "
1939 "parameters. Use construct_expanded_state to generate a "
1940 "new statement and parameters."
1941 )
1942
1943 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1944
1945 if extracted_parameters:
1946 # related the bound parameters collected in the original cache key
1947 # to those collected in the incoming cache key. They will not have
1948 # matching names but they will line up positionally in the same
1949 # way. The parameters present in self.bind_names may be clones of
1950 # these original cache key params in the case of DML but the .key
1951 # will be guaranteed to match.
1952 if self.cache_key is None:
1953 raise exc.CompileError(
1954 "This compiled object has no original cache key; "
1955 "can't pass extracted_parameters to construct_params"
1956 )
1957 else:
1958 orig_extracted = self.cache_key[1]
1959
1960 ckbm_tuple = self._cache_key_bind_match
1961 assert ckbm_tuple is not None
1962 ckbm, _ = ckbm_tuple
1963 resolved_extracted = {
1964 bind: extracted
1965 for b, extracted in zip(orig_extracted, extracted_parameters)
1966 for bind in ckbm[b]
1967 }
1968 else:
1969 resolved_extracted = None
1970
1971 if params:
1972 pd = {}
1973 for bindparam, name in self.bind_names.items():
1974 escaped_name = (
1975 self.escaped_bind_names.get(name, name)
1976 if has_escaped_names
1977 else name
1978 )
1979
1980 if bindparam.key in params:
1981 pd[escaped_name] = params[bindparam.key]
1982 elif name in params:
1983 pd[escaped_name] = params[name]
1984
1985 elif _check and bindparam.required:
1986 if _group_number:
1987 raise exc.InvalidRequestError(
1988 "A value is required for bind parameter %r, "
1989 "in parameter group %d"
1990 % (bindparam.key, _group_number),
1991 code="cd3x",
1992 )
1993 else:
1994 raise exc.InvalidRequestError(
1995 "A value is required for bind parameter %r"
1996 % bindparam.key,
1997 code="cd3x",
1998 )
1999 else:
2000 if resolved_extracted:
2001 value_param = resolved_extracted.get(
2002 bindparam, bindparam
2003 )
2004 else:
2005 value_param = bindparam
2006
2007 if bindparam.callable:
2008 pd[escaped_name] = value_param.effective_value
2009 else:
2010 pd[escaped_name] = value_param.value
2011 return pd
2012 else:
2013 pd = {}
2014 for bindparam, name in self.bind_names.items():
2015 escaped_name = (
2016 self.escaped_bind_names.get(name, name)
2017 if has_escaped_names
2018 else name
2019 )
2020
2021 if _check and bindparam.required:
2022 if _group_number:
2023 raise exc.InvalidRequestError(
2024 "A value is required for bind parameter %r, "
2025 "in parameter group %d"
2026 % (bindparam.key, _group_number),
2027 code="cd3x",
2028 )
2029 else:
2030 raise exc.InvalidRequestError(
2031 "A value is required for bind parameter %r"
2032 % bindparam.key,
2033 code="cd3x",
2034 )
2035
2036 if resolved_extracted:
2037 value_param = resolved_extracted.get(bindparam, bindparam)
2038 else:
2039 value_param = bindparam
2040
2041 if bindparam.callable:
2042 pd[escaped_name] = value_param.effective_value
2043 else:
2044 pd[escaped_name] = value_param.value
2045
2046 return pd
2047
2048 @util.memoized_instancemethod
2049 def _get_set_input_sizes_lookup(self):
2050 dialect = self.dialect
2051
2052 include_types = dialect.include_set_input_sizes
2053 exclude_types = dialect.exclude_set_input_sizes
2054
2055 dbapi = dialect.dbapi
2056
2057 def lookup_type(typ):
2058 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2059
2060 if (
2061 dbtype is not None
2062 and (exclude_types is None or dbtype not in exclude_types)
2063 and (include_types is None or dbtype in include_types)
2064 ):
2065 return dbtype
2066 else:
2067 return None
2068
2069 inputsizes = {}
2070
2071 literal_execute_params = self.literal_execute_params
2072
2073 for bindparam in self.bind_names:
2074 if bindparam in literal_execute_params:
2075 continue
2076
2077 if bindparam.type._is_tuple_type:
2078 inputsizes[bindparam] = [
2079 lookup_type(typ)
2080 for typ in cast(TupleType, bindparam.type).types
2081 ]
2082 else:
2083 inputsizes[bindparam] = lookup_type(bindparam.type)
2084
2085 return inputsizes
2086
2087 @property
2088 def params(self):
2089 """Return the bind param dictionary embedded into this
2090 compiled object, for those values that are present.
2091
2092 .. seealso::
2093
2094 :ref:`faq_sql_expression_string` - includes a usage example for
2095 debugging use cases.
2096
2097 """
2098 return self.construct_params(_check=False)
2099
2100 def _process_parameters_for_postcompile(
2101 self,
2102 parameters: _MutableCoreSingleExecuteParams,
2103 _populate_self: bool = False,
2104 ) -> ExpandedState:
2105 """handle special post compile parameters.
2106
2107 These include:
2108
2109 * "expanding" parameters -typically IN tuples that are rendered
2110 on a per-parameter basis for an otherwise fixed SQL statement string.
2111
2112 * literal_binds compiled with the literal_execute flag. Used for
2113 things like SQL Server "TOP N" where the driver does not accommodate
2114 N as a bound parameter.
2115
2116 """
2117
2118 expanded_parameters = {}
2119 new_positiontup: Optional[List[str]]
2120
2121 pre_expanded_string = self._pre_expanded_string
2122 if pre_expanded_string is None:
2123 pre_expanded_string = self.string
2124
2125 if self.positional:
2126 new_positiontup = []
2127
2128 pre_expanded_positiontup = self._pre_expanded_positiontup
2129 if pre_expanded_positiontup is None:
2130 pre_expanded_positiontup = self.positiontup
2131
2132 else:
2133 new_positiontup = pre_expanded_positiontup = None
2134
2135 processors = self._bind_processors
2136 single_processors = cast(
2137 "Mapping[str, _BindProcessorType[Any]]", processors
2138 )
2139 tuple_processors = cast(
2140 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2141 )
2142
2143 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2144
2145 replacement_expressions: Dict[str, Any] = {}
2146 to_update_sets: Dict[str, Any] = {}
2147
2148 # notes:
2149 # *unescaped* parameter names in:
2150 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2151 #
2152 # *escaped* parameter names in:
2153 # construct_params(), replacement_expressions
2154
2155 numeric_positiontup: Optional[List[str]] = None
2156
2157 if self.positional and pre_expanded_positiontup is not None:
2158 names: Iterable[str] = pre_expanded_positiontup
2159 if self._numeric_binds:
2160 numeric_positiontup = []
2161 else:
2162 names = self.bind_names.values()
2163
2164 ebn = self.escaped_bind_names
2165 for name in names:
2166 escaped_name = ebn.get(name, name) if ebn else name
2167 parameter = self.binds[name]
2168
2169 if parameter in self.literal_execute_params:
2170 if escaped_name not in replacement_expressions:
2171 replacement_expressions[escaped_name] = (
2172 self.render_literal_bindparam(
2173 parameter,
2174 render_literal_value=parameters.pop(escaped_name),
2175 )
2176 )
2177 continue
2178
2179 if parameter in self.post_compile_params:
2180 if escaped_name in replacement_expressions:
2181 to_update = to_update_sets[escaped_name]
2182 values = None
2183 else:
2184 # we are removing the parameter from parameters
2185 # because it is a list value, which is not expected by
2186 # TypeEngine objects that would otherwise be asked to
2187 # process it. the single name is being replaced with
2188 # individual numbered parameters for each value in the
2189 # param.
2190 #
2191 # note we are also inserting *escaped* parameter names
2192 # into the given dictionary. default dialect will
2193 # use these param names directly as they will not be
2194 # in the escaped_bind_names dictionary.
2195 values = parameters.pop(name)
2196
2197 leep_res = self._literal_execute_expanding_parameter(
2198 escaped_name, parameter, values
2199 )
2200 to_update, replacement_expr = leep_res
2201
2202 to_update_sets[escaped_name] = to_update
2203 replacement_expressions[escaped_name] = replacement_expr
2204
2205 if not parameter.literal_execute:
2206 parameters.update(to_update)
2207 if parameter.type._is_tuple_type:
2208 assert values is not None
2209 new_processors.update(
2210 (
2211 "%s_%s_%s" % (name, i, j),
2212 tuple_processors[name][j - 1],
2213 )
2214 for i, tuple_element in enumerate(values, 1)
2215 for j, _ in enumerate(tuple_element, 1)
2216 if name in tuple_processors
2217 and tuple_processors[name][j - 1] is not None
2218 )
2219 else:
2220 new_processors.update(
2221 (key, single_processors[name])
2222 for key, _ in to_update
2223 if name in single_processors
2224 )
2225 if numeric_positiontup is not None:
2226 numeric_positiontup.extend(
2227 name for name, _ in to_update
2228 )
2229 elif new_positiontup is not None:
2230 # to_update has escaped names, but that's ok since
2231 # these are new names, that aren't in the
2232 # escaped_bind_names dict.
2233 new_positiontup.extend(name for name, _ in to_update)
2234 expanded_parameters[name] = [
2235 expand_key for expand_key, _ in to_update
2236 ]
2237 elif new_positiontup is not None:
2238 new_positiontup.append(name)
2239
2240 def process_expanding(m):
2241 key = m.group(1)
2242 expr = replacement_expressions[key]
2243
2244 # if POSTCOMPILE included a bind_expression, render that
2245 # around each element
2246 if m.group(2):
2247 tok = m.group(2).split("~~")
2248 be_left, be_right = tok[1], tok[3]
2249 expr = ", ".join(
2250 "%s%s%s" % (be_left, exp, be_right)
2251 for exp in expr.split(", ")
2252 )
2253 return expr
2254
2255 statement = re.sub(
2256 self._post_compile_pattern, process_expanding, pre_expanded_string
2257 )
2258
2259 if numeric_positiontup is not None:
2260 assert new_positiontup is not None
2261 param_pos = {
2262 key: f"{self._numeric_binds_identifier_char}{num}"
2263 for num, key in enumerate(
2264 numeric_positiontup, self.next_numeric_pos
2265 )
2266 }
2267 # Can't use format here since % chars are not escaped.
2268 statement = self._pyformat_pattern.sub(
2269 lambda m: param_pos[m.group(1)], statement
2270 )
2271 new_positiontup.extend(numeric_positiontup)
2272
2273 expanded_state = ExpandedState(
2274 statement,
2275 parameters,
2276 new_processors,
2277 new_positiontup,
2278 expanded_parameters,
2279 )
2280
2281 if _populate_self:
2282 # this is for the "render_postcompile" flag, which is not
2283 # otherwise used internally and is for end-user debugging and
2284 # special use cases.
2285 self._pre_expanded_string = pre_expanded_string
2286 self._pre_expanded_positiontup = pre_expanded_positiontup
2287 self.string = expanded_state.statement
2288 self.positiontup = (
2289 list(expanded_state.positiontup or ())
2290 if self.positional
2291 else None
2292 )
2293 self._post_compile_expanded_state = expanded_state
2294
2295 return expanded_state
2296
2297 @util.preload_module("sqlalchemy.engine.cursor")
2298 def _create_result_map(self):
2299 """utility method used for unit tests only."""
2300 cursor = util.preloaded.engine_cursor
2301 return cursor.CursorResultMetaData._create_description_match_map(
2302 self._result_columns
2303 )
2304
2305 # assigned by crud.py for insert/update statements
2306 _get_bind_name_for_col: _BindNameForColProtocol
2307
2308 @util.memoized_property
2309 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2310 getter = self._get_bind_name_for_col
2311 return getter
2312
2313 @util.memoized_property
2314 @util.preload_module("sqlalchemy.engine.result")
2315 def _inserted_primary_key_from_lastrowid_getter(self):
2316 result = util.preloaded.engine_result
2317
2318 param_key_getter = self._within_exec_param_key_getter
2319
2320 assert self.compile_state is not None
2321 statement = self.compile_state.statement
2322
2323 if TYPE_CHECKING:
2324 assert isinstance(statement, Insert)
2325
2326 table = statement.table
2327
2328 getters = [
2329 (operator.methodcaller("get", param_key_getter(col), None), col)
2330 for col in table.primary_key
2331 ]
2332
2333 autoinc_getter = None
2334 autoinc_col = table._autoincrement_column
2335 if autoinc_col is not None:
2336 # apply type post processors to the lastrowid
2337 lastrowid_processor = autoinc_col.type._cached_result_processor(
2338 self.dialect, None
2339 )
2340 autoinc_key = param_key_getter(autoinc_col)
2341
2342 # if a bind value is present for the autoincrement column
2343 # in the parameters, we need to do the logic dictated by
2344 # #7998; honor a non-None user-passed parameter over lastrowid.
2345 # previously in the 1.4 series we weren't fetching lastrowid
2346 # at all if the key were present in the parameters
2347 if autoinc_key in self.binds:
2348
2349 def _autoinc_getter(lastrowid, parameters):
2350 param_value = parameters.get(autoinc_key, lastrowid)
2351 if param_value is not None:
2352 # they supplied non-None parameter, use that.
2353 # SQLite at least is observed to return the wrong
2354 # cursor.lastrowid for INSERT..ON CONFLICT so it
2355 # can't be used in all cases
2356 return param_value
2357 else:
2358 # use lastrowid
2359 return lastrowid
2360
2361 # work around mypy https://github.com/python/mypy/issues/14027
2362 autoinc_getter = _autoinc_getter
2363
2364 else:
2365 lastrowid_processor = None
2366
2367 row_fn = result.result_tuple([col.key for col in table.primary_key])
2368
2369 def get(lastrowid, parameters):
2370 """given cursor.lastrowid value and the parameters used for INSERT,
2371 return a "row" that represents the primary key, either by
2372 using the "lastrowid" or by extracting values from the parameters
2373 that were sent along with the INSERT.
2374
2375 """
2376 if lastrowid_processor is not None:
2377 lastrowid = lastrowid_processor(lastrowid)
2378
2379 if lastrowid is None:
2380 return row_fn(getter(parameters) for getter, col in getters)
2381 else:
2382 return row_fn(
2383 (
2384 (
2385 autoinc_getter(lastrowid, parameters)
2386 if autoinc_getter is not None
2387 else lastrowid
2388 )
2389 if col is autoinc_col
2390 else getter(parameters)
2391 )
2392 for getter, col in getters
2393 )
2394
2395 return get
2396
2397 @util.memoized_property
2398 @util.preload_module("sqlalchemy.engine.result")
2399 def _inserted_primary_key_from_returning_getter(self):
2400 result = util.preloaded.engine_result
2401
2402 assert self.compile_state is not None
2403 statement = self.compile_state.statement
2404
2405 if TYPE_CHECKING:
2406 assert isinstance(statement, Insert)
2407
2408 param_key_getter = self._within_exec_param_key_getter
2409 table = statement.table
2410
2411 returning = self.implicit_returning
2412 assert returning is not None
2413 ret = {col: idx for idx, col in enumerate(returning)}
2414
2415 getters = cast(
2416 "List[Tuple[Callable[[Any], Any], bool]]",
2417 [
2418 (
2419 (operator.itemgetter(ret[col]), True)
2420 if col in ret
2421 else (
2422 operator.methodcaller(
2423 "get", param_key_getter(col), None
2424 ),
2425 False,
2426 )
2427 )
2428 for col in table.primary_key
2429 ],
2430 )
2431
2432 row_fn = result.result_tuple([col.key for col in table.primary_key])
2433
2434 def get(row, parameters):
2435 return row_fn(
2436 getter(row) if use_row else getter(parameters)
2437 for getter, use_row in getters
2438 )
2439
2440 return get
2441
2442 def default_from(self) -> str:
2443 """Called when a SELECT statement has no froms, and no FROM clause is
2444 to be appended.
2445
2446 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2447 output.
2448
2449 """
2450 return ""
2451
2452 def visit_override_binds(self, override_binds, **kw):
2453 """SQL compile the nested element of an _OverrideBinds with
2454 bindparams swapped out.
2455
2456 The _OverrideBinds is not normally expected to be compiled; it
2457 is meant to be used when an already cached statement is to be used,
2458 the compilation was already performed, and only the bound params should
2459 be swapped in at execution time.
2460
2461 However, there are test cases that exericise this object, and
2462 additionally the ORM subquery loader is known to feed in expressions
2463 which include this construct into new queries (discovered in #11173),
2464 so it has to do the right thing at compile time as well.
2465
2466 """
2467
2468 # get SQL text first
2469 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2470
2471 # for a test compile that is not for caching, change binds after the
2472 # fact. note that we don't try to
2473 # swap the bindparam as we compile, because our element may be
2474 # elsewhere in the statement already (e.g. a subquery or perhaps a
2475 # CTE) and was already visited / compiled. See
2476 # test_relationship_criteria.py ->
2477 # test_selectinload_local_criteria_subquery
2478 for k in override_binds.translate:
2479 if k not in self.binds:
2480 continue
2481 bp = self.binds[k]
2482
2483 # so this would work, just change the value of bp in place.
2484 # but we dont want to mutate things outside.
2485 # bp.value = override_binds.translate[bp.key]
2486 # continue
2487
2488 # instead, need to replace bp with new_bp or otherwise accommodate
2489 # in all internal collections
2490 new_bp = bp._with_value(
2491 override_binds.translate[bp.key],
2492 maintain_key=True,
2493 required=False,
2494 )
2495
2496 name = self.bind_names[bp]
2497 self.binds[k] = self.binds[name] = new_bp
2498 self.bind_names[new_bp] = name
2499 self.bind_names.pop(bp, None)
2500
2501 if bp in self.post_compile_params:
2502 self.post_compile_params |= {new_bp}
2503 if bp in self.literal_execute_params:
2504 self.literal_execute_params |= {new_bp}
2505
2506 ckbm_tuple = self._cache_key_bind_match
2507 if ckbm_tuple:
2508 ckbm, cksm = ckbm_tuple
2509 for bp in bp._cloned_set:
2510 if bp.key in cksm:
2511 cb = cksm[bp.key]
2512 ckbm[cb].append(new_bp)
2513
2514 return sqltext
2515
2516 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2517 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2518
2519 def visit_select_statement_grouping(self, grouping, **kwargs):
2520 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2521
2522 def visit_label_reference(
2523 self, element, within_columns_clause=False, **kwargs
2524 ):
2525 if self.stack and self.dialect.supports_simple_order_by_label:
2526 try:
2527 compile_state = cast(
2528 "Union[SelectState, CompoundSelectState]",
2529 self.stack[-1]["compile_state"],
2530 )
2531 except KeyError as ke:
2532 raise exc.CompileError(
2533 "Can't resolve label reference for ORDER BY / "
2534 "GROUP BY / DISTINCT etc."
2535 ) from ke
2536
2537 (
2538 with_cols,
2539 only_froms,
2540 only_cols,
2541 ) = compile_state._label_resolve_dict
2542 if within_columns_clause:
2543 resolve_dict = only_froms
2544 else:
2545 resolve_dict = only_cols
2546
2547 # this can be None in the case that a _label_reference()
2548 # were subject to a replacement operation, in which case
2549 # the replacement of the Label element may have changed
2550 # to something else like a ColumnClause expression.
2551 order_by_elem = element.element._order_by_label_element
2552
2553 if (
2554 order_by_elem is not None
2555 and order_by_elem.name in resolve_dict
2556 and order_by_elem.shares_lineage(
2557 resolve_dict[order_by_elem.name]
2558 )
2559 ):
2560 kwargs["render_label_as_label"] = (
2561 element.element._order_by_label_element
2562 )
2563 return self.process(
2564 element.element,
2565 within_columns_clause=within_columns_clause,
2566 **kwargs,
2567 )
2568
2569 def visit_textual_label_reference(
2570 self, element, within_columns_clause=False, **kwargs
2571 ):
2572 if not self.stack:
2573 # compiling the element outside of the context of a SELECT
2574 return self.process(element._text_clause)
2575
2576 try:
2577 compile_state = cast(
2578 "Union[SelectState, CompoundSelectState]",
2579 self.stack[-1]["compile_state"],
2580 )
2581 except KeyError as ke:
2582 coercions._no_text_coercion(
2583 element.element,
2584 extra=(
2585 "Can't resolve label reference for ORDER BY / "
2586 "GROUP BY / DISTINCT etc."
2587 ),
2588 exc_cls=exc.CompileError,
2589 err=ke,
2590 )
2591
2592 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2593 try:
2594 if within_columns_clause:
2595 col = only_froms[element.element]
2596 else:
2597 col = with_cols[element.element]
2598 except KeyError as err:
2599 coercions._no_text_coercion(
2600 element.element,
2601 extra=(
2602 "Can't resolve label reference for ORDER BY / "
2603 "GROUP BY / DISTINCT etc."
2604 ),
2605 exc_cls=exc.CompileError,
2606 err=err,
2607 )
2608 else:
2609 kwargs["render_label_as_label"] = col
2610 return self.process(
2611 col, within_columns_clause=within_columns_clause, **kwargs
2612 )
2613
2614 def visit_label(
2615 self,
2616 label,
2617 add_to_result_map=None,
2618 within_label_clause=False,
2619 within_columns_clause=False,
2620 render_label_as_label=None,
2621 result_map_targets=(),
2622 within_tstring=False,
2623 **kw,
2624 ):
2625 if within_tstring:
2626 raise exc.CompileError(
2627 "Using label() directly inside tstring is not supported "
2628 "as it is ambiguous how the label expression should be "
2629 "rendered without knowledge of how it's being used in SQL"
2630 )
2631 # only render labels within the columns clause
2632 # or ORDER BY clause of a select. dialect-specific compilers
2633 # can modify this behavior.
2634 render_label_with_as = (
2635 within_columns_clause and not within_label_clause
2636 )
2637 render_label_only = render_label_as_label is label
2638
2639 if render_label_only or render_label_with_as:
2640 if isinstance(label.name, elements._truncated_label):
2641 labelname = self._truncated_identifier("colident", label.name)
2642 else:
2643 labelname = label.name
2644
2645 if render_label_with_as:
2646 if add_to_result_map is not None:
2647 add_to_result_map(
2648 labelname,
2649 label.name,
2650 (label, labelname) + label._alt_names + result_map_targets,
2651 label.type,
2652 )
2653 return (
2654 label.element._compiler_dispatch(
2655 self,
2656 within_columns_clause=True,
2657 within_label_clause=True,
2658 **kw,
2659 )
2660 + OPERATORS[operators.as_]
2661 + self.preparer.format_label(label, labelname)
2662 )
2663 elif render_label_only:
2664 return self.preparer.format_label(label, labelname)
2665 else:
2666 return label.element._compiler_dispatch(
2667 self, within_columns_clause=False, **kw
2668 )
2669
2670 def _fallback_column_name(self, column):
2671 raise exc.CompileError(
2672 "Cannot compile Column object until its 'name' is assigned."
2673 )
2674
2675 def visit_lambda_element(self, element, **kw):
2676 sql_element = element._resolved
2677 return self.process(sql_element, **kw)
2678
2679 def visit_column(
2680 self,
2681 column: ColumnClause[Any],
2682 add_to_result_map: Optional[_ResultMapAppender] = None,
2683 include_table: bool = True,
2684 result_map_targets: Tuple[Any, ...] = (),
2685 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2686 **kwargs: Any,
2687 ) -> str:
2688 name = orig_name = column.name
2689 if name is None:
2690 name = self._fallback_column_name(column)
2691
2692 is_literal = column.is_literal
2693 if not is_literal and isinstance(name, elements._truncated_label):
2694 name = self._truncated_identifier("colident", name)
2695
2696 if add_to_result_map is not None:
2697 targets = (column, name, column.key) + result_map_targets
2698 if column._tq_label:
2699 targets += (column._tq_label,)
2700
2701 add_to_result_map(name, orig_name, targets, column.type)
2702
2703 if is_literal:
2704 # note we are not currently accommodating for
2705 # literal_column(quoted_name('ident', True)) here
2706 name = self.escape_literal_column(name)
2707 else:
2708 name = self.preparer.quote(name)
2709 table = column.table
2710 if table is None or not include_table or not table.named_with_column:
2711 return name
2712 else:
2713 effective_schema = self.preparer.schema_for_object(table)
2714
2715 if effective_schema:
2716 schema_prefix = (
2717 self.preparer.quote_schema(effective_schema) + "."
2718 )
2719 else:
2720 schema_prefix = ""
2721
2722 if TYPE_CHECKING:
2723 assert isinstance(table, NamedFromClause)
2724 tablename = table.name
2725
2726 if (
2727 not effective_schema
2728 and ambiguous_table_name_map
2729 and tablename in ambiguous_table_name_map
2730 ):
2731 tablename = ambiguous_table_name_map[tablename]
2732
2733 if isinstance(tablename, elements._truncated_label):
2734 tablename = self._truncated_identifier("alias", tablename)
2735
2736 return schema_prefix + self.preparer.quote(tablename) + "." + name
2737
2738 def visit_collation(self, element, **kw):
2739 return self.preparer.format_collation(element.collation)
2740
2741 def visit_fromclause(self, fromclause, **kwargs):
2742 return fromclause.name
2743
2744 def visit_index(self, index, **kwargs):
2745 return index.name
2746
2747 def visit_typeclause(self, typeclause, **kw):
2748 kw["type_expression"] = typeclause
2749 kw["identifier_preparer"] = self.preparer
2750 return self.dialect.type_compiler_instance.process(
2751 typeclause.type, **kw
2752 )
2753
2754 def post_process_text(self, text):
2755 if self.preparer._double_percents:
2756 text = text.replace("%", "%%")
2757 return text
2758
2759 def escape_literal_column(self, text):
2760 if self.preparer._double_percents:
2761 text = text.replace("%", "%%")
2762 return text
2763
2764 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2765 if self._collect_params:
2766 self._add_to_params(textclause)
2767
2768 def do_bindparam(m):
2769 name = m.group(1)
2770 if name in textclause._bindparams:
2771 return self.process(textclause._bindparams[name], **kw)
2772 else:
2773 return self.bindparam_string(name, **kw)
2774
2775 if not self.stack:
2776 self.isplaintext = True
2777
2778 if add_to_result_map:
2779 # text() object is present in the columns clause of a
2780 # select(). Add a no-name entry to the result map so that
2781 # row[text()] produces a result
2782 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2783
2784 # un-escape any \:params
2785 return BIND_PARAMS_ESC.sub(
2786 lambda m: m.group(1),
2787 BIND_PARAMS.sub(
2788 do_bindparam, self.post_process_text(textclause.text)
2789 ),
2790 )
2791
2792 def visit_tstring(self, tstring, add_to_result_map=None, **kw):
2793 if self._collect_params:
2794 self._add_to_params(tstring)
2795
2796 if not self.stack:
2797 self.isplaintext = True
2798
2799 if add_to_result_map:
2800 # tstring() object is present in the columns clause of a
2801 # select(). Add a no-name entry to the result map so that
2802 # row[tstring()] produces a result
2803 add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
2804
2805 # Process each part and concatenate
2806 kw["within_tstring"] = True
2807 return "".join(self.process(part, **kw) for part in tstring.parts)
2808
2809 def visit_textual_select(
2810 self, taf, compound_index=None, asfrom=False, **kw
2811 ):
2812 if self._collect_params:
2813 self._add_to_params(taf)
2814 toplevel = not self.stack
2815 entry = self._default_stack_entry if toplevel else self.stack[-1]
2816
2817 new_entry: _CompilerStackEntry = {
2818 "correlate_froms": set(),
2819 "asfrom_froms": set(),
2820 "selectable": taf,
2821 }
2822 self.stack.append(new_entry)
2823
2824 if taf._independent_ctes:
2825 self._dispatch_independent_ctes(taf, kw)
2826
2827 populate_result_map = (
2828 toplevel
2829 or (
2830 compound_index == 0
2831 and entry.get("need_result_map_for_compound", False)
2832 )
2833 or entry.get("need_result_map_for_nested", False)
2834 )
2835
2836 if populate_result_map:
2837 self._ordered_columns = self._textual_ordered_columns = (
2838 taf.positional
2839 )
2840
2841 # enable looser result column matching when the SQL text links to
2842 # Column objects by name only
2843 self._loose_column_name_matching = not taf.positional and bool(
2844 taf.column_args
2845 )
2846
2847 for c in taf.column_args:
2848 self.process(
2849 c,
2850 within_columns_clause=True,
2851 add_to_result_map=self._add_to_result_map,
2852 )
2853
2854 text = self.process(taf.element, **kw)
2855 if self.ctes:
2856 nesting_level = len(self.stack) if not toplevel else None
2857 text = self._render_cte_clause(nesting_level=nesting_level) + text
2858
2859 self.stack.pop(-1)
2860
2861 return text
2862
2863 def visit_null(self, expr: Null, **kw: Any) -> str:
2864 return "NULL"
2865
2866 def visit_true(self, expr: True_, **kw: Any) -> str:
2867 if self.dialect.supports_native_boolean:
2868 return "true"
2869 else:
2870 return "1"
2871
2872 def visit_false(self, expr: False_, **kw: Any) -> str:
2873 if self.dialect.supports_native_boolean:
2874 return "false"
2875 else:
2876 return "0"
2877
2878 def _generate_delimited_list(self, elements, separator, **kw):
2879 return separator.join(
2880 s
2881 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2882 if s
2883 )
2884
2885 def _generate_delimited_and_list(self, clauses, **kw):
2886 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2887 operators.and_,
2888 elements.True_._singleton,
2889 elements.False_._singleton,
2890 clauses,
2891 )
2892 if lcc == 1:
2893 return clauses[0]._compiler_dispatch(self, **kw)
2894 else:
2895 separator = OPERATORS[operators.and_]
2896 return separator.join(
2897 s
2898 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2899 if s
2900 )
2901
2902 def visit_tuple(self, clauselist, **kw):
2903 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2904
2905 def visit_element_list(self, element, **kw):
2906 return self._generate_delimited_list(element.clauses, " ", **kw)
2907
2908 def visit_order_by_list(self, element, **kw):
2909 return self._generate_delimited_list(element.clauses, ", ", **kw)
2910
2911 def visit_clauselist(self, clauselist, **kw):
2912 sep = clauselist.operator
2913 if sep is None:
2914 sep = " "
2915 else:
2916 sep = OPERATORS[clauselist.operator]
2917
2918 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2919
2920 def visit_expression_clauselist(self, clauselist, **kw):
2921 operator_ = clauselist.operator
2922
2923 disp = self._get_operator_dispatch(
2924 operator_, "expression_clauselist", None
2925 )
2926 if disp:
2927 return disp(clauselist, operator_, **kw)
2928
2929 try:
2930 opstring = OPERATORS[operator_]
2931 except KeyError as err:
2932 raise exc.UnsupportedCompilationError(self, operator_) from err
2933 else:
2934 kw["_in_operator_expression"] = True
2935 return self._generate_delimited_list(
2936 clauselist.clauses, opstring, **kw
2937 )
2938
2939 def visit_case(self, clause, **kwargs):
2940 x = "CASE "
2941 if clause.value is not None:
2942 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2943 for cond, result in clause.whens:
2944 x += (
2945 "WHEN "
2946 + cond._compiler_dispatch(self, **kwargs)
2947 + " THEN "
2948 + result._compiler_dispatch(self, **kwargs)
2949 + " "
2950 )
2951 if clause.else_ is not None:
2952 x += (
2953 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2954 )
2955 x += "END"
2956 return x
2957
2958 def visit_type_coerce(self, type_coerce, **kw):
2959 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2960
2961 def visit_cast(self, cast, **kwargs):
2962 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2963 match = re.match("(.*)( COLLATE .*)", type_clause)
2964 return "CAST(%s AS %s)%s" % (
2965 cast.clause._compiler_dispatch(self, **kwargs),
2966 match.group(1) if match else type_clause,
2967 match.group(2) if match else "",
2968 )
2969
2970 def visit_frame_clause(self, frameclause, **kw):
2971
2972 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2973 left = "UNBOUNDED PRECEDING"
2974 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2975 left = "CURRENT ROW"
2976 else:
2977 val = self.process(frameclause.lower_bind, **kw)
2978 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2979 left = f"{val} PRECEDING"
2980 else:
2981 left = f"{val} FOLLOWING"
2982
2983 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2984 right = "UNBOUNDED FOLLOWING"
2985 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2986 right = "CURRENT ROW"
2987 else:
2988 val = self.process(frameclause.upper_bind, **kw)
2989 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2990 right = f"{val} PRECEDING"
2991 else:
2992 right = f"{val} FOLLOWING"
2993
2994 return f"{left} AND {right}"
2995
2996 def visit_over(self, over, **kwargs):
2997 text = over.element._compiler_dispatch(self, **kwargs)
2998 if over.range_ is not None:
2999 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
3000 elif over.rows is not None:
3001 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
3002 elif over.groups is not None:
3003 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
3004 else:
3005 range_ = None
3006
3007 if range_ is not None and over.exclude is not None:
3008 range_ += " EXCLUDE " + self.preparer.validate_sql_phrase(
3009 over.exclude, _WINDOW_EXCLUDE_RE
3010 )
3011
3012 return "%s OVER (%s)" % (
3013 text,
3014 " ".join(
3015 [
3016 "%s BY %s"
3017 % (word, clause._compiler_dispatch(self, **kwargs))
3018 for word, clause in (
3019 ("PARTITION", over.partition_by),
3020 ("ORDER", over.order_by),
3021 )
3022 if clause is not None and len(clause)
3023 ]
3024 + ([range_] if range_ else [])
3025 ),
3026 )
3027
3028 def visit_withingroup(self, withingroup, **kwargs):
3029 return "%s WITHIN GROUP (ORDER BY %s)" % (
3030 withingroup.element._compiler_dispatch(self, **kwargs),
3031 withingroup.order_by._compiler_dispatch(self, **kwargs),
3032 )
3033
3034 def visit_funcfilter(self, funcfilter, **kwargs):
3035 return "%s FILTER (WHERE %s)" % (
3036 funcfilter.func._compiler_dispatch(self, **kwargs),
3037 funcfilter.criterion._compiler_dispatch(self, **kwargs),
3038 )
3039
3040 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
3041 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
3042 raise exc.CompileError(
3043 "this dialect does not support "
3044 "ORDER BY within an aggregate function"
3045 )
3046 elif (
3047 self.dialect.aggregate_order_by_style
3048 is AggregateOrderByStyle.INLINE
3049 ):
3050 new_fn = aggregateorderby.element._clone()
3051 new_fn.clause_expr = elements.Grouping(
3052 aggregate_orderby_inline(
3053 new_fn.clause_expr.element, aggregateorderby.order_by
3054 )
3055 )
3056
3057 return new_fn._compiler_dispatch(self, **kwargs)
3058 else:
3059 return self.visit_withingroup(aggregateorderby, **kwargs)
3060
3061 def visit_aggregate_orderby_inline(self, element, **kw):
3062 return "%s ORDER BY %s" % (
3063 self.process(element.element, **kw),
3064 self.process(element.aggregate_order_by, **kw),
3065 )
3066
3067 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3068 # aggreagate_order_by attribute is present if visit_function
3069 # gave us a Function with aggregate_orderby_inline() as the inner
3070 # contents
3071 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3072
3073 literal_exec = dict(kw)
3074 literal_exec["literal_execute"] = True
3075
3076 # break up the function into its components so we can apply
3077 # literal_execute to the second argument (the delimiter)
3078 cl = list(fn.clauses)
3079 expr, delimiter = cl[0:2]
3080 if (
3081 order_by is not None
3082 and self.dialect.aggregate_order_by_style
3083 is AggregateOrderByStyle.INLINE
3084 ):
3085 return (
3086 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3087 f"{delimiter._compiler_dispatch(self, **literal_exec)} "
3088 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3089 )
3090 else:
3091 return (
3092 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3093 f"{delimiter._compiler_dispatch(self, **literal_exec)})"
3094 )
3095
3096 def visit_extract(self, extract, **kwargs):
3097 field = self.extract_map.get(extract.field, extract.field)
3098 return "EXTRACT(%s FROM %s)" % (
3099 field,
3100 extract.expr._compiler_dispatch(self, **kwargs),
3101 )
3102
3103 def visit_scalar_function_column(self, element, **kw):
3104 compiled_fn = self.visit_function(element.fn, **kw)
3105 compiled_col = self.visit_column(element, **kw)
3106 return "(%s).%s" % (compiled_fn, compiled_col)
3107
3108 def visit_function(
3109 self,
3110 func: Function[Any],
3111 add_to_result_map: Optional[_ResultMapAppender] = None,
3112 **kwargs: Any,
3113 ) -> str:
3114 if self._collect_params:
3115 self._add_to_params(func)
3116 if add_to_result_map is not None:
3117 add_to_result_map(func.name, func.name, (func.name,), func.type)
3118
3119 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3120
3121 text: str
3122
3123 kwargs["within_aggregate_function"] = True
3124
3125 if disp:
3126 text = disp(func, **kwargs)
3127 else:
3128 name = FUNCTIONS.get(func._deannotate().__class__, None)
3129 if name:
3130 if func._has_args:
3131 name += "%(expr)s"
3132 else:
3133 name = func.name
3134 name = (
3135 self.preparer.quote(name)
3136 if self.preparer._requires_quotes_illegal_chars(name)
3137 or isinstance(name, elements.quoted_name)
3138 else name
3139 )
3140 name = name + "%(expr)s"
3141 text = ".".join(
3142 [
3143 (
3144 self.preparer.quote(tok)
3145 if self.preparer._requires_quotes_illegal_chars(tok)
3146 or isinstance(name, elements.quoted_name)
3147 else tok
3148 )
3149 for tok in func.packagenames
3150 ]
3151 + [name]
3152 ) % {"expr": self.function_argspec(func, **kwargs)}
3153
3154 if func._with_ordinality:
3155 text += " WITH ORDINALITY"
3156 return text
3157
3158 def visit_next_value_func(self, next_value, **kw):
3159 return self.visit_sequence(next_value.sequence)
3160
3161 def visit_sequence(self, sequence, **kw):
3162 raise NotImplementedError(
3163 "Dialect '%s' does not support sequence increments."
3164 % self.dialect.name
3165 )
3166
3167 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3168 return func.clause_expr._compiler_dispatch(self, **kwargs)
3169
3170 def visit_compound_select(
3171 self, cs, asfrom=False, compound_index=None, **kwargs
3172 ):
3173 if self._collect_params:
3174 self._add_to_params(cs)
3175 toplevel = not self.stack
3176
3177 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3178
3179 if toplevel and not self.compile_state:
3180 self.compile_state = compile_state
3181
3182 compound_stmt = compile_state.statement
3183
3184 entry = self._default_stack_entry if toplevel else self.stack[-1]
3185 need_result_map = toplevel or (
3186 not compound_index
3187 and entry.get("need_result_map_for_compound", False)
3188 )
3189
3190 # indicates there is already a CompoundSelect in play
3191 if compound_index == 0:
3192 entry["select_0"] = cs
3193
3194 self.stack.append(
3195 {
3196 "correlate_froms": entry["correlate_froms"],
3197 "asfrom_froms": entry["asfrom_froms"],
3198 "selectable": cs,
3199 "compile_state": compile_state,
3200 "need_result_map_for_compound": need_result_map,
3201 }
3202 )
3203
3204 if compound_stmt._independent_ctes:
3205 self._dispatch_independent_ctes(compound_stmt, kwargs)
3206
3207 keyword = self.compound_keywords[cs.keyword]
3208
3209 text = (" " + keyword + " ").join(
3210 (
3211 c._compiler_dispatch(
3212 self, asfrom=asfrom, compound_index=i, **kwargs
3213 )
3214 for i, c in enumerate(cs.selects)
3215 )
3216 )
3217
3218 kwargs["include_table"] = False
3219 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3220 text += self.order_by_clause(cs, **kwargs)
3221 if cs._has_row_limiting_clause:
3222 text += self._row_limit_clause(cs, **kwargs)
3223
3224 if self.ctes:
3225 nesting_level = len(self.stack) if not toplevel else None
3226 text = (
3227 self._render_cte_clause(
3228 nesting_level=nesting_level,
3229 include_following_stack=True,
3230 )
3231 + text
3232 )
3233
3234 self.stack.pop(-1)
3235 return text
3236
3237 def _row_limit_clause(self, cs, **kwargs):
3238 if cs._fetch_clause is not None:
3239 return self.fetch_clause(cs, **kwargs)
3240 else:
3241 return self.limit_clause(cs, **kwargs)
3242
3243 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3244 attrname = "visit_%s_%s%s" % (
3245 operator_.__name__,
3246 qualifier1,
3247 "_" + qualifier2 if qualifier2 else "",
3248 )
3249 return getattr(self, attrname, None)
3250
3251 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3252 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3253 return getattr(self, attrname, None)
3254
3255 def visit_unary(
3256 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3257 ):
3258 if add_to_result_map is not None:
3259 result_map_targets += (unary,)
3260 kw["add_to_result_map"] = add_to_result_map
3261 kw["result_map_targets"] = result_map_targets
3262
3263 if unary.operator is operators.distinct_op and not kw.get(
3264 "within_aggregate_function", False
3265 ):
3266 util.warn(
3267 "Column-expression-level unary distinct() "
3268 "should not be used outside of an aggregate "
3269 "function. For general 'SELECT DISTINCT' support"
3270 "use select().distinct()."
3271 )
3272
3273 if unary.operator:
3274 if unary.modifier:
3275 raise exc.CompileError(
3276 "Unary expression does not support operator "
3277 "and modifier simultaneously"
3278 )
3279 disp = self._get_operator_dispatch(
3280 unary.operator, "unary", "operator"
3281 )
3282 if disp:
3283 return disp(unary, unary.operator, **kw)
3284 else:
3285 return self._generate_generic_unary_operator(
3286 unary, OPERATORS[unary.operator], **kw
3287 )
3288 elif unary.modifier:
3289 disp = self._get_operator_dispatch(
3290 unary.modifier, "unary", "modifier"
3291 )
3292 if disp:
3293 return disp(unary, unary.modifier, **kw)
3294 else:
3295 return self._generate_generic_unary_modifier(
3296 unary, OPERATORS[unary.modifier], **kw
3297 )
3298 else:
3299 raise exc.CompileError(
3300 "Unary expression has no operator or modifier"
3301 )
3302
3303 def visit_truediv_binary(self, binary, operator, **kw):
3304 if self.dialect.div_is_floordiv:
3305 return (
3306 self.process(binary.left, **kw)
3307 + " / "
3308 # TODO: would need a fast cast again here,
3309 # unless we want to use an implicit cast like "+ 0.0"
3310 + self.process(
3311 elements.Cast(
3312 binary.right,
3313 (
3314 binary.right.type
3315 if binary.right.type._type_affinity
3316 in (sqltypes.Numeric, sqltypes.Float)
3317 else sqltypes.Numeric()
3318 ),
3319 ),
3320 **kw,
3321 )
3322 )
3323 else:
3324 return (
3325 self.process(binary.left, **kw)
3326 + " / "
3327 + self.process(binary.right, **kw)
3328 )
3329
3330 def visit_floordiv_binary(self, binary, operator, **kw):
3331 if (
3332 self.dialect.div_is_floordiv
3333 and binary.right.type._type_affinity is sqltypes.Integer
3334 and binary.left.type._type_affinity is sqltypes.Integer
3335 ):
3336 return (
3337 self.process(binary.left, **kw)
3338 + " / "
3339 + self.process(binary.right, **kw)
3340 )
3341 else:
3342 return "FLOOR(%s)" % (
3343 self.process(binary.left, **kw)
3344 + " / "
3345 + self.process(binary.right, **kw)
3346 )
3347
3348 def visit_is_true_unary_operator(self, element, operator, **kw):
3349 if (
3350 element._is_implicitly_boolean
3351 or self.dialect.supports_native_boolean
3352 ):
3353 return self.process(element.element, **kw)
3354 else:
3355 return "%s = 1" % self.process(element.element, **kw)
3356
3357 def visit_is_false_unary_operator(self, element, operator, **kw):
3358 if (
3359 element._is_implicitly_boolean
3360 or self.dialect.supports_native_boolean
3361 ):
3362 return "NOT %s" % self.process(element.element, **kw)
3363 else:
3364 return "%s = 0" % self.process(element.element, **kw)
3365
3366 def visit_not_match_op_binary(self, binary, operator, **kw):
3367 return "NOT %s" % self.visit_binary(
3368 binary, override_operator=operators.match_op
3369 )
3370
3371 def visit_not_in_op_binary(self, binary, operator, **kw):
3372 # The brackets are required in the NOT IN operation because the empty
3373 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3374 # The presence of the OR makes the brackets required.
3375 return "(%s)" % self._generate_generic_binary(
3376 binary, OPERATORS[operator], **kw
3377 )
3378
3379 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3380 if expand_op is operators.not_in_op:
3381 if len(type_) > 1:
3382 return "(%s)) OR (1 = 1" % (
3383 ", ".join("NULL" for element in type_)
3384 )
3385 else:
3386 return "NULL) OR (1 = 1"
3387 elif expand_op is operators.in_op:
3388 if len(type_) > 1:
3389 return "(%s)) AND (1 != 1" % (
3390 ", ".join("NULL" for element in type_)
3391 )
3392 else:
3393 return "NULL) AND (1 != 1"
3394 else:
3395 return self.visit_empty_set_expr(type_)
3396
3397 def visit_empty_set_expr(self, element_types, **kw):
3398 raise NotImplementedError(
3399 "Dialect '%s' does not support empty set expression."
3400 % self.dialect.name
3401 )
3402
3403 def _literal_execute_expanding_parameter_literal_binds(
3404 self, parameter, values, bind_expression_template=None
3405 ):
3406 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3407
3408 if not values:
3409 # empty IN expression. note we don't need to use
3410 # bind_expression_template here because there are no
3411 # expressions to render.
3412
3413 if typ_dialect_impl._is_tuple_type:
3414 replacement_expression = (
3415 "VALUES " if self.dialect.tuple_in_values else ""
3416 ) + self.visit_empty_set_op_expr(
3417 parameter.type.types, parameter.expand_op
3418 )
3419
3420 else:
3421 replacement_expression = self.visit_empty_set_op_expr(
3422 [parameter.type], parameter.expand_op
3423 )
3424
3425 elif typ_dialect_impl._is_tuple_type or (
3426 typ_dialect_impl._isnull
3427 and isinstance(values[0], collections_abc.Sequence)
3428 and not isinstance(values[0], (str, bytes))
3429 ):
3430 if typ_dialect_impl._has_bind_expression:
3431 raise NotImplementedError(
3432 "bind_expression() on TupleType not supported with "
3433 "literal_binds"
3434 )
3435
3436 replacement_expression = (
3437 "VALUES " if self.dialect.tuple_in_values else ""
3438 ) + ", ".join(
3439 "(%s)"
3440 % (
3441 ", ".join(
3442 self.render_literal_value(value, param_type)
3443 for value, param_type in zip(
3444 tuple_element, parameter.type.types
3445 )
3446 )
3447 )
3448 for i, tuple_element in enumerate(values)
3449 )
3450 else:
3451 if bind_expression_template:
3452 post_compile_pattern = self._post_compile_pattern
3453 m = post_compile_pattern.search(bind_expression_template)
3454 assert m and m.group(
3455 2
3456 ), "unexpected format for expanding parameter"
3457
3458 tok = m.group(2).split("~~")
3459 be_left, be_right = tok[1], tok[3]
3460 replacement_expression = ", ".join(
3461 "%s%s%s"
3462 % (
3463 be_left,
3464 self.render_literal_value(value, parameter.type),
3465 be_right,
3466 )
3467 for value in values
3468 )
3469 else:
3470 replacement_expression = ", ".join(
3471 self.render_literal_value(value, parameter.type)
3472 for value in values
3473 )
3474
3475 return (), replacement_expression
3476
3477 def _literal_execute_expanding_parameter(self, name, parameter, values):
3478 if parameter.literal_execute:
3479 return self._literal_execute_expanding_parameter_literal_binds(
3480 parameter, values
3481 )
3482
3483 dialect = self.dialect
3484 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3485
3486 if self._numeric_binds:
3487 bind_template = self.compilation_bindtemplate
3488 else:
3489 bind_template = self.bindtemplate
3490
3491 if (
3492 self.dialect._bind_typing_render_casts
3493 and typ_dialect_impl.render_bind_cast
3494 ):
3495
3496 def _render_bindtemplate(name):
3497 return self.render_bind_cast(
3498 parameter.type,
3499 typ_dialect_impl,
3500 bind_template % {"name": name},
3501 )
3502
3503 else:
3504
3505 def _render_bindtemplate(name):
3506 return bind_template % {"name": name}
3507
3508 if not values:
3509 to_update = []
3510 if typ_dialect_impl._is_tuple_type:
3511 replacement_expression = self.visit_empty_set_op_expr(
3512 parameter.type.types, parameter.expand_op
3513 )
3514 else:
3515 replacement_expression = self.visit_empty_set_op_expr(
3516 [parameter.type], parameter.expand_op
3517 )
3518
3519 elif typ_dialect_impl._is_tuple_type or (
3520 typ_dialect_impl._isnull
3521 and isinstance(values[0], collections_abc.Sequence)
3522 and not isinstance(values[0], (str, bytes))
3523 ):
3524 assert not typ_dialect_impl._is_array
3525 to_update = [
3526 ("%s_%s_%s" % (name, i, j), value)
3527 for i, tuple_element in enumerate(values, 1)
3528 for j, value in enumerate(tuple_element, 1)
3529 ]
3530
3531 replacement_expression = (
3532 "VALUES " if dialect.tuple_in_values else ""
3533 ) + ", ".join(
3534 "(%s)"
3535 % (
3536 ", ".join(
3537 _render_bindtemplate(
3538 to_update[i * len(tuple_element) + j][0]
3539 )
3540 for j, value in enumerate(tuple_element)
3541 )
3542 )
3543 for i, tuple_element in enumerate(values)
3544 )
3545 else:
3546 to_update = [
3547 ("%s_%s" % (name, i), value)
3548 for i, value in enumerate(values, 1)
3549 ]
3550 replacement_expression = ", ".join(
3551 _render_bindtemplate(key) for key, value in to_update
3552 )
3553
3554 return to_update, replacement_expression
3555
3556 def visit_binary(
3557 self,
3558 binary,
3559 override_operator=None,
3560 eager_grouping=False,
3561 from_linter=None,
3562 lateral_from_linter=None,
3563 **kw,
3564 ):
3565 if from_linter and operators.is_comparison(binary.operator):
3566 if lateral_from_linter is not None:
3567 enclosing_lateral = kw["enclosing_lateral"]
3568 lateral_from_linter.edges.update(
3569 itertools.product(
3570 _de_clone(
3571 binary.left._from_objects + [enclosing_lateral]
3572 ),
3573 _de_clone(
3574 binary.right._from_objects + [enclosing_lateral]
3575 ),
3576 )
3577 )
3578 else:
3579 from_linter.edges.update(
3580 itertools.product(
3581 _de_clone(binary.left._from_objects),
3582 _de_clone(binary.right._from_objects),
3583 )
3584 )
3585
3586 # don't allow "? = ?" to render
3587 if (
3588 self.ansi_bind_rules
3589 and isinstance(binary.left, elements.BindParameter)
3590 and isinstance(binary.right, elements.BindParameter)
3591 ):
3592 kw["literal_execute"] = True
3593
3594 operator_ = override_operator or binary.operator
3595 disp = self._get_operator_dispatch(operator_, "binary", None)
3596 if disp:
3597 return disp(binary, operator_, **kw)
3598 else:
3599 try:
3600 opstring = OPERATORS[operator_]
3601 except KeyError as err:
3602 raise exc.UnsupportedCompilationError(self, operator_) from err
3603 else:
3604 return self._generate_generic_binary(
3605 binary,
3606 opstring,
3607 from_linter=from_linter,
3608 lateral_from_linter=lateral_from_linter,
3609 **kw,
3610 )
3611
3612 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3613 return self.process(element.sql_function, **kw)
3614
3615 def visit_mod_binary(self, binary, operator, **kw):
3616 if self.preparer._double_percents:
3617 return (
3618 self.process(binary.left, **kw)
3619 + " %% "
3620 + self.process(binary.right, **kw)
3621 )
3622 else:
3623 return (
3624 self.process(binary.left, **kw)
3625 + " % "
3626 + self.process(binary.right, **kw)
3627 )
3628
3629 def visit_custom_op_binary(self, element, operator, **kw):
3630 if operator.visit_name:
3631 disp = self._get_custom_operator_dispatch(operator, "binary")
3632 if disp:
3633 return disp(element, operator, **kw)
3634
3635 kw["eager_grouping"] = operator.eager_grouping
3636 return self._generate_generic_binary(
3637 element,
3638 " " + self.escape_literal_column(operator.opstring) + " ",
3639 **kw,
3640 )
3641
3642 def visit_custom_op_unary_operator(self, element, operator, **kw):
3643 if operator.visit_name:
3644 disp = self._get_custom_operator_dispatch(operator, "unary")
3645 if disp:
3646 return disp(element, operator, **kw)
3647
3648 return self._generate_generic_unary_operator(
3649 element, self.escape_literal_column(operator.opstring) + " ", **kw
3650 )
3651
3652 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3653 if operator.visit_name:
3654 disp = self._get_custom_operator_dispatch(operator, "unary")
3655 if disp:
3656 return disp(element, operator, **kw)
3657
3658 return self._generate_generic_unary_modifier(
3659 element, " " + self.escape_literal_column(operator.opstring), **kw
3660 )
3661
3662 def _generate_generic_binary(
3663 self,
3664 binary: BinaryExpression[Any],
3665 opstring: str,
3666 eager_grouping: bool = False,
3667 **kw: Any,
3668 ) -> str:
3669 _in_operator_expression = kw.get("_in_operator_expression", False)
3670
3671 kw["_in_operator_expression"] = True
3672 kw["_binary_op"] = binary.operator
3673 text = (
3674 binary.left._compiler_dispatch(
3675 self, eager_grouping=eager_grouping, **kw
3676 )
3677 + opstring
3678 + binary.right._compiler_dispatch(
3679 self, eager_grouping=eager_grouping, **kw
3680 )
3681 )
3682
3683 if _in_operator_expression and eager_grouping:
3684 text = "(%s)" % text
3685 return text
3686
3687 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3688 return opstring + unary.element._compiler_dispatch(self, **kw)
3689
3690 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3691 return unary.element._compiler_dispatch(self, **kw) + opstring
3692
3693 @util.memoized_property
3694 def _like_percent_literal(self):
3695 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3696
3697 def visit_ilike_case_insensitive_operand(self, element, **kw):
3698 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3699
3700 def visit_contains_op_binary(self, binary, operator, **kw):
3701 binary = binary._clone()
3702 percent = self._like_percent_literal
3703 binary.right = percent.concat(binary.right).concat(percent)
3704 return self.visit_like_op_binary(binary, operator, **kw)
3705
3706 def visit_not_contains_op_binary(self, binary, operator, **kw):
3707 binary = binary._clone()
3708 percent = self._like_percent_literal
3709 binary.right = percent.concat(binary.right).concat(percent)
3710 return self.visit_not_like_op_binary(binary, operator, **kw)
3711
3712 def visit_icontains_op_binary(self, binary, operator, **kw):
3713 binary = binary._clone()
3714 percent = self._like_percent_literal
3715 binary.left = ilike_case_insensitive(binary.left)
3716 binary.right = percent.concat(
3717 ilike_case_insensitive(binary.right)
3718 ).concat(percent)
3719 return self.visit_ilike_op_binary(binary, operator, **kw)
3720
3721 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3722 binary = binary._clone()
3723 percent = self._like_percent_literal
3724 binary.left = ilike_case_insensitive(binary.left)
3725 binary.right = percent.concat(
3726 ilike_case_insensitive(binary.right)
3727 ).concat(percent)
3728 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3729
3730 def visit_startswith_op_binary(self, binary, operator, **kw):
3731 binary = binary._clone()
3732 percent = self._like_percent_literal
3733 binary.right = percent._rconcat(binary.right)
3734 return self.visit_like_op_binary(binary, operator, **kw)
3735
3736 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3737 binary = binary._clone()
3738 percent = self._like_percent_literal
3739 binary.right = percent._rconcat(binary.right)
3740 return self.visit_not_like_op_binary(binary, operator, **kw)
3741
3742 def visit_istartswith_op_binary(self, binary, operator, **kw):
3743 binary = binary._clone()
3744 percent = self._like_percent_literal
3745 binary.left = ilike_case_insensitive(binary.left)
3746 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3747 return self.visit_ilike_op_binary(binary, operator, **kw)
3748
3749 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3750 binary = binary._clone()
3751 percent = self._like_percent_literal
3752 binary.left = ilike_case_insensitive(binary.left)
3753 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3754 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3755
3756 def visit_endswith_op_binary(self, binary, operator, **kw):
3757 binary = binary._clone()
3758 percent = self._like_percent_literal
3759 binary.right = percent.concat(binary.right)
3760 return self.visit_like_op_binary(binary, operator, **kw)
3761
3762 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3763 binary = binary._clone()
3764 percent = self._like_percent_literal
3765 binary.right = percent.concat(binary.right)
3766 return self.visit_not_like_op_binary(binary, operator, **kw)
3767
3768 def visit_iendswith_op_binary(self, binary, operator, **kw):
3769 binary = binary._clone()
3770 percent = self._like_percent_literal
3771 binary.left = ilike_case_insensitive(binary.left)
3772 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3773 return self.visit_ilike_op_binary(binary, operator, **kw)
3774
3775 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3776 binary = binary._clone()
3777 percent = self._like_percent_literal
3778 binary.left = ilike_case_insensitive(binary.left)
3779 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3780 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3781
3782 def visit_like_op_binary(self, binary, operator, **kw):
3783 escape = binary.modifiers.get("escape", None)
3784
3785 return "%s LIKE %s" % (
3786 binary.left._compiler_dispatch(self, **kw),
3787 binary.right._compiler_dispatch(self, **kw),
3788 ) + (
3789 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3790 if escape is not None
3791 else ""
3792 )
3793
3794 def visit_not_like_op_binary(self, binary, operator, **kw):
3795 escape = binary.modifiers.get("escape", None)
3796 return "%s NOT LIKE %s" % (
3797 binary.left._compiler_dispatch(self, **kw),
3798 binary.right._compiler_dispatch(self, **kw),
3799 ) + (
3800 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3801 if escape is not None
3802 else ""
3803 )
3804
3805 def visit_ilike_op_binary(self, binary, operator, **kw):
3806 if operator is operators.ilike_op:
3807 binary = binary._clone()
3808 binary.left = ilike_case_insensitive(binary.left)
3809 binary.right = ilike_case_insensitive(binary.right)
3810 # else we assume ilower() has been applied
3811
3812 return self.visit_like_op_binary(binary, operator, **kw)
3813
3814 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3815 if operator is operators.not_ilike_op:
3816 binary = binary._clone()
3817 binary.left = ilike_case_insensitive(binary.left)
3818 binary.right = ilike_case_insensitive(binary.right)
3819 # else we assume ilower() has been applied
3820
3821 return self.visit_not_like_op_binary(binary, operator, **kw)
3822
3823 def visit_between_op_binary(self, binary, operator, **kw):
3824 symmetric = binary.modifiers.get("symmetric", False)
3825 return self._generate_generic_binary(
3826 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3827 )
3828
3829 def visit_not_between_op_binary(self, binary, operator, **kw):
3830 symmetric = binary.modifiers.get("symmetric", False)
3831 return self._generate_generic_binary(
3832 binary,
3833 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3834 **kw,
3835 )
3836
3837 def visit_regexp_match_op_binary(
3838 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3839 ) -> str:
3840 raise exc.CompileError(
3841 "%s dialect does not support regular expressions"
3842 % self.dialect.name
3843 )
3844
3845 def visit_not_regexp_match_op_binary(
3846 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3847 ) -> str:
3848 raise exc.CompileError(
3849 "%s dialect does not support regular expressions"
3850 % self.dialect.name
3851 )
3852
3853 def visit_regexp_replace_op_binary(
3854 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3855 ) -> str:
3856 raise exc.CompileError(
3857 "%s dialect does not support regular expression replacements"
3858 % self.dialect.name
3859 )
3860
3861 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3862 if bindmarkers is None:
3863 raise exc.CompileError(
3864 "DML target objects may only be used with "
3865 "compiled INSERT or UPDATE statements"
3866 )
3867
3868 bindmarkers[element.column.key] = element
3869 return f"__BINDMARKER_~~{element.column.key}~~"
3870
3871 def visit_bindparam(
3872 self,
3873 bindparam,
3874 within_columns_clause=False,
3875 literal_binds=False,
3876 skip_bind_expression=False,
3877 literal_execute=False,
3878 render_postcompile=False,
3879 is_upsert_set=False,
3880 **kwargs,
3881 ):
3882 # Detect parametrized bindparams in upsert SET clause for issue #13130
3883 if (
3884 is_upsert_set
3885 and bindparam.value is None
3886 and bindparam.callable is None
3887 and self._insertmanyvalues is not None
3888 ):
3889 self._insertmanyvalues = self._insertmanyvalues._replace(
3890 has_upsert_bound_parameters=True
3891 )
3892
3893 if not skip_bind_expression:
3894 impl = bindparam.type.dialect_impl(self.dialect)
3895 if impl._has_bind_expression:
3896 bind_expression = impl.bind_expression(bindparam)
3897 wrapped = self.process(
3898 bind_expression,
3899 skip_bind_expression=True,
3900 within_columns_clause=within_columns_clause,
3901 literal_binds=literal_binds and not bindparam.expanding,
3902 literal_execute=literal_execute,
3903 render_postcompile=render_postcompile,
3904 **kwargs,
3905 )
3906 if bindparam.expanding:
3907 # for postcompile w/ expanding, move the "wrapped" part
3908 # of this into the inside
3909
3910 m = re.match(
3911 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3912 )
3913 assert m, "unexpected format for expanding parameter"
3914 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3915 m.group(2),
3916 m.group(1),
3917 m.group(3),
3918 )
3919
3920 if literal_binds:
3921 ret = self.render_literal_bindparam(
3922 bindparam,
3923 within_columns_clause=True,
3924 bind_expression_template=wrapped,
3925 **kwargs,
3926 )
3927 return f"({ret})"
3928
3929 return wrapped
3930
3931 if not literal_binds:
3932 literal_execute = (
3933 literal_execute
3934 or bindparam.literal_execute
3935 or (within_columns_clause and self.ansi_bind_rules)
3936 )
3937 post_compile = literal_execute or bindparam.expanding
3938 else:
3939 post_compile = False
3940
3941 if literal_binds:
3942 ret = self.render_literal_bindparam(
3943 bindparam, within_columns_clause=True, **kwargs
3944 )
3945 if bindparam.expanding:
3946 ret = f"({ret})"
3947 return ret
3948
3949 name = self._truncate_bindparam(bindparam)
3950
3951 if name in self.binds:
3952 existing = self.binds[name]
3953 if existing is not bindparam:
3954 if (
3955 (existing.unique or bindparam.unique)
3956 and not existing.proxy_set.intersection(
3957 bindparam.proxy_set
3958 )
3959 and not existing._cloned_set.intersection(
3960 bindparam._cloned_set
3961 )
3962 ):
3963 raise exc.CompileError(
3964 "Bind parameter '%s' conflicts with "
3965 "unique bind parameter of the same name" % name
3966 )
3967 elif existing.expanding != bindparam.expanding:
3968 raise exc.CompileError(
3969 "Can't reuse bound parameter name '%s' in both "
3970 "'expanding' (e.g. within an IN expression) and "
3971 "non-expanding contexts. If this parameter is to "
3972 "receive a list/array value, set 'expanding=True' on "
3973 "it for expressions that aren't IN, otherwise use "
3974 "a different parameter name." % (name,)
3975 )
3976 elif existing._is_crud or bindparam._is_crud:
3977 if existing._is_crud and bindparam._is_crud:
3978 # TODO: this condition is not well understood.
3979 # see tests in test/sql/test_update.py
3980 raise exc.CompileError(
3981 "Encountered unsupported case when compiling an "
3982 "INSERT or UPDATE statement. If this is a "
3983 "multi-table "
3984 "UPDATE statement, please provide string-named "
3985 "arguments to the "
3986 "values() method with distinct names; support for "
3987 "multi-table UPDATE statements that "
3988 "target multiple tables for UPDATE is very "
3989 "limited",
3990 )
3991 else:
3992 raise exc.CompileError(
3993 f"bindparam() name '{bindparam.key}' is reserved "
3994 "for automatic usage in the VALUES or SET "
3995 "clause of this "
3996 "insert/update statement. Please use a "
3997 "name other than column name when using "
3998 "bindparam() "
3999 "with insert() or update() (for example, "
4000 f"'b_{bindparam.key}')."
4001 )
4002
4003 self.binds[bindparam.key] = self.binds[name] = bindparam
4004
4005 # if we are given a cache key that we're going to match against,
4006 # relate the bindparam here to one that is most likely present
4007 # in the "extracted params" portion of the cache key. this is used
4008 # to set up a positional mapping that is used to determine the
4009 # correct parameters for a subsequent use of this compiled with
4010 # a different set of parameter values. here, we accommodate for
4011 # parameters that may have been cloned both before and after the cache
4012 # key was been generated.
4013 ckbm_tuple = self._cache_key_bind_match
4014
4015 if ckbm_tuple:
4016 ckbm, cksm = ckbm_tuple
4017 for bp in bindparam._cloned_set:
4018 if bp.key in cksm:
4019 cb = cksm[bp.key]
4020 ckbm[cb].append(bindparam)
4021
4022 if bindparam.isoutparam:
4023 self.has_out_parameters = True
4024
4025 if post_compile:
4026 if render_postcompile:
4027 self._render_postcompile = True
4028
4029 if literal_execute:
4030 self.literal_execute_params |= {bindparam}
4031 else:
4032 self.post_compile_params |= {bindparam}
4033
4034 ret = self.bindparam_string(
4035 name,
4036 post_compile=post_compile,
4037 expanding=bindparam.expanding,
4038 bindparam_type=bindparam.type,
4039 **kwargs,
4040 )
4041
4042 if bindparam.expanding:
4043 ret = f"({ret})"
4044
4045 return ret
4046
4047 def render_bind_cast(self, type_, dbapi_type, sqltext):
4048 raise NotImplementedError()
4049
4050 def render_literal_bindparam(
4051 self,
4052 bindparam,
4053 render_literal_value=NO_ARG,
4054 bind_expression_template=None,
4055 **kw,
4056 ):
4057 if render_literal_value is not NO_ARG:
4058 value = render_literal_value
4059 else:
4060 if bindparam.value is None and bindparam.callable is None:
4061 op = kw.get("_binary_op", None)
4062 if op and op not in (operators.is_, operators.is_not):
4063 util.warn_limited(
4064 "Bound parameter '%s' rendering literal NULL in a SQL "
4065 "expression; comparisons to NULL should not use "
4066 "operators outside of 'is' or 'is not'",
4067 (bindparam.key,),
4068 )
4069 return self.process(sqltypes.NULLTYPE, **kw)
4070 value = bindparam.effective_value
4071
4072 if bindparam.expanding:
4073 leep = self._literal_execute_expanding_parameter_literal_binds
4074 to_update, replacement_expr = leep(
4075 bindparam,
4076 value,
4077 bind_expression_template=bind_expression_template,
4078 )
4079 return replacement_expr
4080 else:
4081 return self.render_literal_value(value, bindparam.type)
4082
4083 def render_literal_value(
4084 self, value: Any, type_: sqltypes.TypeEngine[Any]
4085 ) -> str:
4086 """Render the value of a bind parameter as a quoted literal.
4087
4088 This is used for statement sections that do not accept bind parameters
4089 on the target driver/database.
4090
4091 This should be implemented by subclasses using the quoting services
4092 of the DBAPI.
4093
4094 """
4095
4096 if value is None and not type_.should_evaluate_none:
4097 # issue #10535 - handle NULL in the compiler without placing
4098 # this onto each type, except for "evaluate None" types
4099 # (e.g. JSON)
4100 return self.process(elements.Null._instance())
4101
4102 processor = type_._cached_literal_processor(self.dialect)
4103 if processor:
4104 try:
4105 return processor(value)
4106 except Exception as e:
4107 raise exc.CompileError(
4108 f"Could not render literal value "
4109 f'"{sql_util._repr_single_value(value)}" '
4110 f"with datatype "
4111 f"{type_}; see parent stack trace for "
4112 "more detail."
4113 ) from e
4114
4115 else:
4116 raise exc.CompileError(
4117 f"No literal value renderer is available for literal value "
4118 f'"{sql_util._repr_single_value(value)}" '
4119 f"with datatype {type_}"
4120 )
4121
4122 def _truncate_bindparam(self, bindparam):
4123 if bindparam in self.bind_names:
4124 return self.bind_names[bindparam]
4125
4126 bind_name = bindparam.key
4127 if isinstance(bind_name, elements._truncated_label):
4128 bind_name = self._truncated_identifier("bindparam", bind_name)
4129
4130 # add to bind_names for translation
4131 self.bind_names[bindparam] = bind_name
4132
4133 return bind_name
4134
4135 def _truncated_identifier(
4136 self, ident_class: str, name: _truncated_label
4137 ) -> str:
4138 if (ident_class, name) in self.truncated_names:
4139 return self.truncated_names[(ident_class, name)]
4140
4141 anonname = name.apply_map(self.anon_map)
4142
4143 if len(anonname) > self.label_length - 6:
4144 counter = self._truncated_counters.get(ident_class, 1)
4145 truncname = (
4146 anonname[0 : max(self.label_length - 6, 0)]
4147 + "_"
4148 + hex(counter)[2:]
4149 )
4150 self._truncated_counters[ident_class] = counter + 1
4151 else:
4152 truncname = anonname
4153 self.truncated_names[(ident_class, name)] = truncname
4154 return truncname
4155
4156 def _anonymize(self, name: str) -> str:
4157 return name % self.anon_map
4158
4159 def bindparam_string(
4160 self,
4161 name: str,
4162 post_compile: bool = False,
4163 expanding: bool = False,
4164 escaped_from: Optional[str] = None,
4165 bindparam_type: Optional[TypeEngine[Any]] = None,
4166 accumulate_bind_names: Optional[Set[str]] = None,
4167 visited_bindparam: Optional[List[str]] = None,
4168 **kw: Any,
4169 ) -> str:
4170 # TODO: accumulate_bind_names is passed by crud.py to gather
4171 # names on a per-value basis, visited_bindparam is passed by
4172 # visit_insert() to collect all parameters in the statement.
4173 # see if this gathering can be simplified somehow
4174 if accumulate_bind_names is not None:
4175 accumulate_bind_names.add(name)
4176 if visited_bindparam is not None:
4177 visited_bindparam.append(name)
4178
4179 if not escaped_from:
4180 if self._bind_translate_re.search(name):
4181 # not quite the translate use case as we want to
4182 # also get a quick boolean if we even found
4183 # unusual characters in the name
4184 new_name = self._bind_translate_re.sub(
4185 lambda m: self._bind_translate_chars[m.group(0)],
4186 name,
4187 )
4188 escaped_from = name
4189 name = new_name
4190
4191 if escaped_from:
4192 self.escaped_bind_names = self.escaped_bind_names.union(
4193 {escaped_from: name}
4194 )
4195 if post_compile:
4196 ret = "__[POSTCOMPILE_%s]" % name
4197 if expanding:
4198 # for expanding, bound parameters or literal values will be
4199 # rendered per item
4200 return ret
4201
4202 # otherwise, for non-expanding "literal execute", apply
4203 # bind casts as determined by the datatype
4204 if bindparam_type is not None:
4205 type_impl = bindparam_type._unwrapped_dialect_impl(
4206 self.dialect
4207 )
4208 if type_impl.render_literal_cast:
4209 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4210 return ret
4211 elif self.state is CompilerState.COMPILING:
4212 ret = self.compilation_bindtemplate % {"name": name}
4213 else:
4214 ret = self.bindtemplate % {"name": name}
4215
4216 if (
4217 bindparam_type is not None
4218 and self.dialect._bind_typing_render_casts
4219 ):
4220 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4221 if type_impl.render_bind_cast:
4222 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4223
4224 return ret
4225
4226 def _dispatch_independent_ctes(self, stmt, kw):
4227 local_kw = kw.copy()
4228 local_kw.pop("cte_opts", None)
4229 for cte, opt in zip(
4230 stmt._independent_ctes, stmt._independent_ctes_opts
4231 ):
4232 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4233
4234 def visit_cte(
4235 self,
4236 cte: CTE,
4237 asfrom: bool = False,
4238 ashint: bool = False,
4239 fromhints: Optional[_FromHintsType] = None,
4240 visiting_cte: Optional[CTE] = None,
4241 from_linter: Optional[FromLinter] = None,
4242 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4243 **kwargs: Any,
4244 ) -> Optional[str]:
4245 self_ctes = self._init_cte_state()
4246 assert self_ctes is self.ctes
4247
4248 kwargs["visiting_cte"] = cte
4249
4250 cte_name = cte.name
4251
4252 if isinstance(cte_name, elements._truncated_label):
4253 cte_name = self._truncated_identifier("alias", cte_name)
4254
4255 is_new_cte = True
4256 embedded_in_current_named_cte = False
4257
4258 _reference_cte = cte._get_reference_cte()
4259
4260 nesting = cte.nesting or cte_opts.nesting
4261
4262 # check for CTE already encountered
4263 if _reference_cte in self.level_name_by_cte:
4264 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4265 _reference_cte
4266 ]
4267 assert _ == cte_name
4268
4269 cte_level_name = (cte_level, cte_name)
4270 existing_cte = self.ctes_by_level_name[cte_level_name]
4271
4272 # check if we are receiving it here with a specific
4273 # "nest_here" location; if so, move it to this location
4274
4275 if cte_opts.nesting:
4276 if existing_cte_opts.nesting:
4277 raise exc.CompileError(
4278 "CTE is stated as 'nest_here' in "
4279 "more than one location"
4280 )
4281
4282 old_level_name = (cte_level, cte_name)
4283 cte_level = len(self.stack) if nesting else 1
4284 cte_level_name = new_level_name = (cte_level, cte_name)
4285
4286 del self.ctes_by_level_name[old_level_name]
4287 self.ctes_by_level_name[new_level_name] = existing_cte
4288 self.level_name_by_cte[_reference_cte] = new_level_name + (
4289 cte_opts,
4290 )
4291
4292 else:
4293 cte_level = len(self.stack) if nesting else 1
4294 cte_level_name = (cte_level, cte_name)
4295
4296 if cte_level_name in self.ctes_by_level_name:
4297 existing_cte = self.ctes_by_level_name[cte_level_name]
4298 else:
4299 existing_cte = None
4300
4301 if existing_cte is not None:
4302 embedded_in_current_named_cte = visiting_cte is existing_cte
4303
4304 # we've generated a same-named CTE that we are enclosed in,
4305 # or this is the same CTE. just return the name.
4306 if cte is existing_cte._restates or cte is existing_cte:
4307 is_new_cte = False
4308 elif existing_cte is cte._restates:
4309 # we've generated a same-named CTE that is
4310 # enclosed in us - we take precedence, so
4311 # discard the text for the "inner".
4312 del self_ctes[existing_cte]
4313
4314 existing_cte_reference_cte = existing_cte._get_reference_cte()
4315
4316 assert existing_cte_reference_cte is _reference_cte
4317 assert existing_cte_reference_cte is existing_cte
4318
4319 del self.level_name_by_cte[existing_cte_reference_cte]
4320 else:
4321 if (
4322 # if the two CTEs have the same hash, which we expect
4323 # here means that one/both is an annotated of the other
4324 (hash(cte) == hash(existing_cte))
4325 # or...
4326 or (
4327 (
4328 # if they are clones, i.e. they came from the ORM
4329 # or some other visit method
4330 cte._is_clone_of is not None
4331 or existing_cte._is_clone_of is not None
4332 )
4333 # and are deep-copy identical
4334 and cte.compare(existing_cte)
4335 )
4336 ):
4337 # then consider these two CTEs the same
4338 is_new_cte = False
4339 else:
4340 # otherwise these are two CTEs that either will render
4341 # differently, or were indicated separately by the user,
4342 # with the same name
4343 raise exc.CompileError(
4344 "Multiple, unrelated CTEs found with "
4345 "the same name: %r" % cte_name
4346 )
4347
4348 if not asfrom and not is_new_cte:
4349 return None
4350
4351 if cte._cte_alias is not None:
4352 pre_alias_cte = cte._cte_alias
4353 cte_pre_alias_name = cte._cte_alias.name
4354 if isinstance(cte_pre_alias_name, elements._truncated_label):
4355 cte_pre_alias_name = self._truncated_identifier(
4356 "alias", cte_pre_alias_name
4357 )
4358 else:
4359 pre_alias_cte = cte
4360 cte_pre_alias_name = None
4361
4362 if is_new_cte:
4363 self.ctes_by_level_name[cte_level_name] = cte
4364 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4365 cte_opts,
4366 )
4367
4368 if pre_alias_cte not in self.ctes:
4369 self.visit_cte(pre_alias_cte, **kwargs)
4370
4371 if not cte_pre_alias_name and cte not in self_ctes:
4372 if cte.recursive:
4373 self.ctes_recursive = True
4374 text = self.preparer.format_alias(cte, cte_name)
4375 if cte.recursive or cte.element.name_cte_columns:
4376 col_source = cte.element
4377
4378 # TODO: can we get at the .columns_plus_names collection
4379 # that is already (or will be?) generated for the SELECT
4380 # rather than calling twice?
4381 recur_cols = [
4382 # TODO: proxy_name is not technically safe,
4383 # see test_cte->
4384 # test_with_recursive_no_name_currently_buggy. not
4385 # clear what should be done with such a case
4386 fallback_label_name or proxy_name
4387 for (
4388 _,
4389 proxy_name,
4390 fallback_label_name,
4391 c,
4392 repeated,
4393 ) in (col_source._generate_columns_plus_names(True))
4394 if not repeated
4395 ]
4396
4397 text += "(%s)" % (
4398 ", ".join(
4399 self.preparer.format_label_name(
4400 ident, anon_map=self.anon_map
4401 )
4402 for ident in recur_cols
4403 )
4404 )
4405
4406 assert kwargs.get("subquery", False) is False
4407
4408 if not self.stack:
4409 # toplevel, this is a stringify of the
4410 # cte directly. just compile the inner
4411 # the way alias() does.
4412 return cte.element._compiler_dispatch(
4413 self, asfrom=asfrom, **kwargs
4414 )
4415 else:
4416 prefixes = self._generate_prefixes(
4417 cte, cte._prefixes, **kwargs
4418 )
4419 inner = cte.element._compiler_dispatch(
4420 self, asfrom=True, **kwargs
4421 )
4422
4423 text += " AS %s\n(%s)" % (prefixes, inner)
4424
4425 if cte._suffixes:
4426 text += " " + self._generate_prefixes(
4427 cte, cte._suffixes, **kwargs
4428 )
4429
4430 self_ctes[cte] = text
4431
4432 if asfrom:
4433 if from_linter:
4434 from_linter.froms[cte._de_clone()] = cte_name
4435
4436 if not is_new_cte and embedded_in_current_named_cte:
4437 return self.preparer.format_alias(cte, cte_name)
4438
4439 if cte_pre_alias_name:
4440 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4441 if self.preparer._requires_quotes(cte_name):
4442 cte_name = self.preparer.quote(cte_name)
4443 text += self.get_render_as_alias_suffix(cte_name)
4444 return text # type: ignore[no-any-return]
4445 else:
4446 return self.preparer.format_alias(cte, cte_name)
4447
4448 return None
4449
4450 def visit_table_valued_alias(self, element, **kw):
4451 if element.joins_implicitly:
4452 kw["from_linter"] = None
4453 if element._is_lateral:
4454 return self.visit_lateral(element, **kw)
4455 else:
4456 return self.visit_alias(element, **kw)
4457
4458 def visit_table_valued_column(self, element, **kw):
4459 return self.visit_column(element, **kw)
4460
4461 def visit_alias(
4462 self,
4463 alias,
4464 asfrom=False,
4465 ashint=False,
4466 iscrud=False,
4467 fromhints=None,
4468 subquery=False,
4469 lateral=False,
4470 enclosing_alias=None,
4471 from_linter=None,
4472 within_tstring=False,
4473 **kwargs,
4474 ):
4475 if lateral:
4476 if "enclosing_lateral" not in kwargs:
4477 # if lateral is set and enclosing_lateral is not
4478 # present, we assume we are being called directly
4479 # from visit_lateral() and we need to set enclosing_lateral.
4480 assert alias._is_lateral
4481 kwargs["enclosing_lateral"] = alias
4482
4483 # for lateral objects, we track a second from_linter that is...
4484 # lateral! to the level above us.
4485 if (
4486 from_linter
4487 and "lateral_from_linter" not in kwargs
4488 and "enclosing_lateral" in kwargs
4489 ):
4490 kwargs["lateral_from_linter"] = from_linter
4491
4492 if enclosing_alias is not None and enclosing_alias.element is alias:
4493 inner = alias.element._compiler_dispatch(
4494 self,
4495 asfrom=asfrom,
4496 ashint=ashint,
4497 iscrud=iscrud,
4498 fromhints=fromhints,
4499 lateral=lateral,
4500 enclosing_alias=alias,
4501 **kwargs,
4502 )
4503 if subquery and (asfrom or lateral):
4504 inner = "(%s)" % (inner,)
4505 return inner
4506 else:
4507 kwargs["enclosing_alias"] = alias
4508
4509 if asfrom or ashint or within_tstring:
4510 if isinstance(alias.name, elements._truncated_label):
4511 alias_name = self._truncated_identifier("alias", alias.name)
4512 else:
4513 alias_name = alias.name
4514
4515 if ashint:
4516 return self.preparer.format_alias(alias, alias_name)
4517 elif asfrom or within_tstring:
4518 if from_linter:
4519 from_linter.froms[alias._de_clone()] = alias_name
4520
4521 inner = alias.element._compiler_dispatch(
4522 self, asfrom=True, lateral=lateral, **kwargs
4523 )
4524 if subquery:
4525 inner = "(%s)" % (inner,)
4526
4527 ret = inner + self.get_render_as_alias_suffix(
4528 self.preparer.format_alias(alias, alias_name)
4529 )
4530
4531 if alias._supports_derived_columns and alias._render_derived:
4532 ret += "(%s)" % (
4533 ", ".join(
4534 "%s%s"
4535 % (
4536 self.preparer.quote(col.name),
4537 (
4538 " %s"
4539 % self.dialect.type_compiler_instance.process(
4540 col.type, **kwargs
4541 )
4542 if alias._render_derived_w_types
4543 else ""
4544 ),
4545 )
4546 for col in alias.c
4547 )
4548 )
4549
4550 if fromhints and alias in fromhints:
4551 ret = self.format_from_hint_text(
4552 ret, alias, fromhints[alias], iscrud
4553 )
4554
4555 return ret
4556 else:
4557 # note we cancel the "subquery" flag here as well
4558 return alias.element._compiler_dispatch(
4559 self, lateral=lateral, **kwargs
4560 )
4561
4562 def visit_subquery(self, subquery, **kw):
4563 kw["subquery"] = True
4564 return self.visit_alias(subquery, **kw)
4565
4566 def visit_lateral(self, lateral_, **kw):
4567 kw["lateral"] = True
4568 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4569
4570 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4571 text = "%s TABLESAMPLE %s" % (
4572 self.visit_alias(tablesample, asfrom=True, **kw),
4573 tablesample._get_method()._compiler_dispatch(self, **kw),
4574 )
4575
4576 if tablesample.seed is not None:
4577 text += " REPEATABLE (%s)" % (
4578 tablesample.seed._compiler_dispatch(self, **kw)
4579 )
4580
4581 return text
4582
4583 def _render_values(self, element, **kw):
4584 kw.setdefault("literal_binds", element.literal_binds)
4585 tuples = ", ".join(
4586 self.process(
4587 elements.Tuple(
4588 types=element._column_types, *elem
4589 ).self_group(),
4590 **kw,
4591 )
4592 for chunk in element._data
4593 for elem in chunk
4594 )
4595 return f"VALUES {tuples}"
4596
4597 def visit_values(
4598 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4599 ):
4600
4601 if element._independent_ctes:
4602 self._dispatch_independent_ctes(element, kw)
4603
4604 v = self._render_values(element, **kw)
4605
4606 if element._unnamed:
4607 name = None
4608 elif isinstance(element.name, elements._truncated_label):
4609 name = self._truncated_identifier("values", element.name)
4610 else:
4611 name = element.name
4612
4613 if element._is_lateral:
4614 lateral = "LATERAL "
4615 else:
4616 lateral = ""
4617
4618 if asfrom:
4619 if from_linter:
4620 from_linter.froms[element._de_clone()] = (
4621 name if name is not None else "(unnamed VALUES element)"
4622 )
4623
4624 if visiting_cte is not None and visiting_cte.element is element:
4625 if element._is_lateral:
4626 raise exc.CompileError(
4627 "Can't use a LATERAL VALUES expression inside of a CTE"
4628 )
4629 elif name:
4630 kw["include_table"] = False
4631 v = "%s(%s)%s (%s)" % (
4632 lateral,
4633 v,
4634 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4635 (
4636 ", ".join(
4637 c._compiler_dispatch(self, **kw)
4638 for c in element.columns
4639 )
4640 ),
4641 )
4642 else:
4643 v = "%s(%s)" % (lateral, v)
4644 return v
4645
4646 def visit_scalar_values(self, element, **kw):
4647 return f"({self._render_values(element, **kw)})"
4648
4649 def get_render_as_alias_suffix(self, alias_name_text):
4650 return " AS " + alias_name_text
4651
4652 def _add_to_result_map(
4653 self,
4654 keyname: str,
4655 name: str,
4656 objects: Tuple[Any, ...],
4657 type_: TypeEngine[Any],
4658 ) -> None:
4659
4660 # note objects must be non-empty for cursor.py to handle the
4661 # collection properly
4662 assert objects
4663
4664 if keyname is None or keyname == "*":
4665 self._ordered_columns = False
4666 self._ad_hoc_textual = True
4667 if type_._is_tuple_type:
4668 raise exc.CompileError(
4669 "Most backends don't support SELECTing "
4670 "from a tuple() object. If this is an ORM query, "
4671 "consider using the Bundle object."
4672 )
4673 self._result_columns.append(
4674 ResultColumnsEntry(keyname, name, objects, type_)
4675 )
4676
4677 def _label_returning_column(
4678 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4679 ):
4680 """Render a column with necessary labels inside of a RETURNING clause.
4681
4682 This method is provided for individual dialects in place of calling
4683 the _label_select_column method directly, so that the two use cases
4684 of RETURNING vs. SELECT can be disambiguated going forward.
4685
4686 .. versionadded:: 1.4.21
4687
4688 """
4689 return self._label_select_column(
4690 None,
4691 column,
4692 populate_result_map,
4693 False,
4694 {} if column_clause_args is None else column_clause_args,
4695 **kw,
4696 )
4697
4698 def _label_select_column(
4699 self,
4700 select,
4701 column,
4702 populate_result_map,
4703 asfrom,
4704 column_clause_args,
4705 name=None,
4706 proxy_name=None,
4707 fallback_label_name=None,
4708 within_columns_clause=True,
4709 column_is_repeated=False,
4710 need_column_expressions=False,
4711 include_table=True,
4712 ):
4713 """produce labeled columns present in a select()."""
4714 impl = column.type.dialect_impl(self.dialect)
4715
4716 if impl._has_column_expression and (
4717 need_column_expressions or populate_result_map
4718 ):
4719 col_expr = impl.column_expression(column)
4720 else:
4721 col_expr = column
4722
4723 if populate_result_map:
4724 # pass an "add_to_result_map" callable into the compilation
4725 # of embedded columns. this collects information about the
4726 # column as it will be fetched in the result and is coordinated
4727 # with cursor.description when the query is executed.
4728 add_to_result_map = self._add_to_result_map
4729
4730 # if the SELECT statement told us this column is a repeat,
4731 # wrap the callable with one that prevents the addition of the
4732 # targets
4733 if column_is_repeated:
4734 _add_to_result_map = add_to_result_map
4735
4736 def add_to_result_map(keyname, name, objects, type_):
4737 _add_to_result_map(keyname, name, (keyname,), type_)
4738
4739 # if we redefined col_expr for type expressions, wrap the
4740 # callable with one that adds the original column to the targets
4741 elif col_expr is not column:
4742 _add_to_result_map = add_to_result_map
4743
4744 def add_to_result_map(keyname, name, objects, type_):
4745 _add_to_result_map(
4746 keyname, name, (column,) + objects, type_
4747 )
4748
4749 else:
4750 add_to_result_map = None
4751
4752 # this method is used by some of the dialects for RETURNING,
4753 # which has different inputs. _label_returning_column was added
4754 # as the better target for this now however for 1.4 we will keep
4755 # _label_select_column directly compatible with this use case.
4756 # these assertions right now set up the current expected inputs
4757 assert within_columns_clause, (
4758 "_label_select_column is only relevant within "
4759 "the columns clause of a SELECT or RETURNING"
4760 )
4761 result_expr: elements.Label[Any] | _CompileLabel
4762
4763 if isinstance(column, elements.Label):
4764 if col_expr is not column:
4765 result_expr = _CompileLabel(
4766 col_expr, column.name, alt_names=(column.element,)
4767 )
4768 else:
4769 result_expr = col_expr
4770
4771 elif name:
4772 # here, _columns_plus_names has determined there's an explicit
4773 # label name we need to use. this is the default for
4774 # tablenames_plus_columnnames as well as when columns are being
4775 # deduplicated on name
4776
4777 assert (
4778 proxy_name is not None
4779 ), "proxy_name is required if 'name' is passed"
4780
4781 result_expr = _CompileLabel(
4782 col_expr,
4783 name,
4784 alt_names=(
4785 proxy_name,
4786 # this is a hack to allow legacy result column lookups
4787 # to work as they did before; this goes away in 2.0.
4788 # TODO: this only seems to be tested indirectly
4789 # via test/orm/test_deprecations.py. should be a
4790 # resultset test for this
4791 column._tq_label,
4792 ),
4793 )
4794 else:
4795 # determine here whether this column should be rendered in
4796 # a labelled context or not, as we were given no required label
4797 # name from the caller. Here we apply heuristics based on the kind
4798 # of SQL expression involved.
4799
4800 if col_expr is not column:
4801 # type-specific expression wrapping the given column,
4802 # so we render a label
4803 render_with_label = True
4804 elif isinstance(column, elements.ColumnClause):
4805 # table-bound column, we render its name as a label if we are
4806 # inside of a subquery only
4807 render_with_label = (
4808 asfrom
4809 and not column.is_literal
4810 and column.table is not None
4811 )
4812 elif isinstance(column, elements.TextClause):
4813 render_with_label = False
4814 elif isinstance(column, elements.UnaryExpression):
4815 # unary expression. notes added as of #12681
4816 #
4817 # By convention, the visit_unary() method
4818 # itself does not add an entry to the result map, and relies
4819 # upon either the inner expression creating a result map
4820 # entry, or if not, by creating a label here that produces
4821 # the result map entry. Where that happens is based on whether
4822 # or not the element immediately inside the unary is a
4823 # NamedColumn subclass or not.
4824 #
4825 # Now, this also impacts how the SELECT is written; if
4826 # we decide to generate a label here, we get the usual
4827 # "~(x+y) AS anon_1" thing in the columns clause. If we
4828 # don't, we don't get an AS at all, we get like
4829 # "~table.column".
4830 #
4831 # But here is the important thing as of modernish (like 1.4)
4832 # versions of SQLAlchemy - **whether or not the AS <label>
4833 # is present in the statement is not actually important**.
4834 # We target result columns **positionally** for a fully
4835 # compiled ``Select()`` object; before 1.4 we needed those
4836 # labels to match in cursor.description etc etc but now it
4837 # really doesn't matter.
4838 # So really, we could set render_with_label True in all cases.
4839 # Or we could just have visit_unary() populate the result map
4840 # in all cases.
4841 #
4842 # What we're doing here is strictly trying to not rock the
4843 # boat too much with when we do/don't render "AS label";
4844 # labels being present helps in the edge cases that we
4845 # "fall back" to named cursor.description matching, labels
4846 # not being present for columns keeps us from having awkward
4847 # phrases like "SELECT DISTINCT table.x AS x".
4848 render_with_label = (
4849 (
4850 # exception case to detect if we render "not boolean"
4851 # as "not <col>" for native boolean or "<col> = 1"
4852 # for non-native boolean. this is controlled by
4853 # visit_is_<true|false>_unary_operator
4854 column.operator
4855 in (operators.is_false, operators.is_true)
4856 and not self.dialect.supports_native_boolean
4857 )
4858 or column._wraps_unnamed_column()
4859 or asfrom
4860 )
4861 elif (
4862 # general class of expressions that don't have a SQL-column
4863 # addressable name. includes scalar selects, bind parameters,
4864 # SQL functions, others
4865 not isinstance(column, elements.NamedColumn)
4866 # deeper check that indicates there's no natural "name" to
4867 # this element, which accommodates for custom SQL constructs
4868 # that might have a ".name" attribute (but aren't SQL
4869 # functions) but are not implementing this more recently added
4870 # base class. in theory the "NamedColumn" check should be
4871 # enough, however here we seek to maintain legacy behaviors
4872 # as well.
4873 and column._non_anon_label is None
4874 ):
4875 render_with_label = True
4876 else:
4877 render_with_label = False
4878
4879 if render_with_label:
4880 if not fallback_label_name:
4881 # used by the RETURNING case right now. we generate it
4882 # here as 3rd party dialects may be referring to
4883 # _label_select_column method directly instead of the
4884 # just-added _label_returning_column method
4885 assert not column_is_repeated
4886 fallback_label_name = column._anon_name_label
4887
4888 fallback_label_name = (
4889 elements._truncated_label(fallback_label_name)
4890 if not isinstance(
4891 fallback_label_name, elements._truncated_label
4892 )
4893 else fallback_label_name
4894 )
4895
4896 result_expr = _CompileLabel(
4897 col_expr, fallback_label_name, alt_names=(proxy_name,)
4898 )
4899 else:
4900 result_expr = col_expr
4901
4902 column_clause_args.update(
4903 within_columns_clause=within_columns_clause,
4904 add_to_result_map=add_to_result_map,
4905 include_table=include_table,
4906 within_tstring=False,
4907 )
4908 return result_expr._compiler_dispatch(self, **column_clause_args)
4909
4910 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4911 hinttext = self.get_from_hint_text(table, hint)
4912 if hinttext:
4913 sqltext += " " + hinttext
4914 return sqltext
4915
4916 def get_select_hint_text(self, byfroms):
4917 return None
4918
4919 def get_from_hint_text(
4920 self, table: FromClause, text: Optional[str]
4921 ) -> Optional[str]:
4922 return None
4923
4924 def get_crud_hint_text(self, table, text):
4925 return None
4926
4927 def get_statement_hint_text(self, hint_texts):
4928 return " ".join(hint_texts)
4929
4930 _default_stack_entry: _CompilerStackEntry
4931
4932 if not typing.TYPE_CHECKING:
4933 _default_stack_entry = util.immutabledict(
4934 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4935 )
4936
4937 def _display_froms_for_select(
4938 self, select_stmt, asfrom, lateral=False, **kw
4939 ):
4940 # utility method to help external dialects
4941 # get the correct from list for a select.
4942 # specifically the oracle dialect needs this feature
4943 # right now.
4944 toplevel = not self.stack
4945 entry = self._default_stack_entry if toplevel else self.stack[-1]
4946
4947 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4948
4949 correlate_froms = entry["correlate_froms"]
4950 asfrom_froms = entry["asfrom_froms"]
4951
4952 if asfrom and not lateral:
4953 froms = compile_state._get_display_froms(
4954 explicit_correlate_froms=correlate_froms.difference(
4955 asfrom_froms
4956 ),
4957 implicit_correlate_froms=(),
4958 )
4959 else:
4960 froms = compile_state._get_display_froms(
4961 explicit_correlate_froms=correlate_froms,
4962 implicit_correlate_froms=asfrom_froms,
4963 )
4964 return froms
4965
4966 translate_select_structure: Any = None
4967 """if not ``None``, should be a callable which accepts ``(select_stmt,
4968 **kw)`` and returns a select object. this is used for structural changes
4969 mostly to accommodate for LIMIT/OFFSET schemes
4970
4971 """
4972
4973 def visit_select(
4974 self,
4975 select_stmt,
4976 asfrom=False,
4977 insert_into=False,
4978 fromhints=None,
4979 compound_index=None,
4980 select_wraps_for=None,
4981 lateral=False,
4982 from_linter=None,
4983 **kwargs,
4984 ):
4985 assert select_wraps_for is None, (
4986 "SQLAlchemy 1.4 requires use of "
4987 "the translate_select_structure hook for structural "
4988 "translations of SELECT objects"
4989 )
4990 if self._collect_params:
4991 self._add_to_params(select_stmt)
4992
4993 # initial setup of SELECT. the compile_state_factory may now
4994 # be creating a totally different SELECT from the one that was
4995 # passed in. for ORM use this will convert from an ORM-state
4996 # SELECT to a regular "Core" SELECT. other composed operations
4997 # such as computation of joins will be performed.
4998
4999 kwargs["within_columns_clause"] = False
5000
5001 compile_state = select_stmt._compile_state_factory(
5002 select_stmt, self, **kwargs
5003 )
5004 kwargs["ambiguous_table_name_map"] = (
5005 compile_state._ambiguous_table_name_map
5006 )
5007
5008 select_stmt = compile_state.statement
5009
5010 toplevel = not self.stack
5011
5012 if toplevel and not self.compile_state:
5013 self.compile_state = compile_state
5014
5015 is_embedded_select = compound_index is not None or insert_into
5016
5017 # translate step for Oracle, SQL Server which often need to
5018 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
5019 # other conditions
5020 if self.translate_select_structure:
5021 new_select_stmt = self.translate_select_structure(
5022 select_stmt, asfrom=asfrom, **kwargs
5023 )
5024
5025 # if SELECT was restructured, maintain a link to the originals
5026 # and assemble a new compile state
5027 if new_select_stmt is not select_stmt:
5028 compile_state_wraps_for = compile_state
5029 select_wraps_for = select_stmt
5030 select_stmt = new_select_stmt
5031
5032 compile_state = select_stmt._compile_state_factory(
5033 select_stmt, self, **kwargs
5034 )
5035 select_stmt = compile_state.statement
5036
5037 entry = self._default_stack_entry if toplevel else self.stack[-1]
5038
5039 populate_result_map = need_column_expressions = (
5040 toplevel
5041 or entry.get("need_result_map_for_compound", False)
5042 or entry.get("need_result_map_for_nested", False)
5043 )
5044
5045 # indicates there is a CompoundSelect in play and we are not the
5046 # first select
5047 if compound_index:
5048 populate_result_map = False
5049
5050 # this was first proposed as part of #3372; however, it is not
5051 # reached in current tests and could possibly be an assertion
5052 # instead.
5053 if not populate_result_map and "add_to_result_map" in kwargs:
5054 del kwargs["add_to_result_map"]
5055
5056 froms = self._setup_select_stack(
5057 select_stmt, compile_state, entry, asfrom, lateral, compound_index
5058 )
5059
5060 column_clause_args = kwargs.copy()
5061 column_clause_args.update(
5062 {"within_label_clause": False, "within_columns_clause": False}
5063 )
5064
5065 text = "SELECT " # we're off to a good start !
5066
5067 if select_stmt._post_select_clause is not None:
5068 psc = self.process(select_stmt._post_select_clause, **kwargs)
5069 if psc is not None:
5070 text += psc + " "
5071
5072 if select_stmt._hints:
5073 hint_text, byfrom = self._setup_select_hints(select_stmt)
5074 if hint_text:
5075 text += hint_text + " "
5076 else:
5077 byfrom = None
5078
5079 if select_stmt._independent_ctes:
5080 self._dispatch_independent_ctes(select_stmt, kwargs)
5081
5082 if select_stmt._prefixes:
5083 text += self._generate_prefixes(
5084 select_stmt, select_stmt._prefixes, **kwargs
5085 )
5086
5087 text += self.get_select_precolumns(select_stmt, **kwargs)
5088
5089 if select_stmt._pre_columns_clause is not None:
5090 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5091 if pcc is not None:
5092 text += pcc + " "
5093
5094 # the actual list of columns to print in the SELECT column list.
5095 inner_columns = [
5096 c
5097 for c in [
5098 self._label_select_column(
5099 select_stmt,
5100 column,
5101 populate_result_map,
5102 asfrom,
5103 column_clause_args,
5104 name=name,
5105 proxy_name=proxy_name,
5106 fallback_label_name=fallback_label_name,
5107 column_is_repeated=repeated,
5108 need_column_expressions=need_column_expressions,
5109 )
5110 for (
5111 name,
5112 proxy_name,
5113 fallback_label_name,
5114 column,
5115 repeated,
5116 ) in compile_state.columns_plus_names
5117 ]
5118 if c is not None
5119 ]
5120
5121 if populate_result_map and select_wraps_for is not None:
5122 # if this select was generated from translate_select,
5123 # rewrite the targeted columns in the result map
5124
5125 translate = dict(
5126 zip(
5127 [
5128 name
5129 for (
5130 key,
5131 proxy_name,
5132 fallback_label_name,
5133 name,
5134 repeated,
5135 ) in compile_state.columns_plus_names
5136 ],
5137 [
5138 name
5139 for (
5140 key,
5141 proxy_name,
5142 fallback_label_name,
5143 name,
5144 repeated,
5145 ) in compile_state_wraps_for.columns_plus_names
5146 ],
5147 )
5148 )
5149
5150 self._result_columns = [
5151 ResultColumnsEntry(
5152 key, name, tuple(translate.get(o, o) for o in obj), type_
5153 )
5154 for key, name, obj, type_ in self._result_columns
5155 ]
5156
5157 text = self._compose_select_body(
5158 text,
5159 select_stmt,
5160 compile_state,
5161 inner_columns,
5162 froms,
5163 byfrom,
5164 toplevel,
5165 kwargs,
5166 )
5167
5168 if select_stmt._post_body_clause is not None:
5169 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5170 if pbc:
5171 text += " " + pbc
5172
5173 if select_stmt._statement_hints:
5174 per_dialect = [
5175 ht
5176 for (dialect_name, ht) in select_stmt._statement_hints
5177 if dialect_name in ("*", self.dialect.name)
5178 ]
5179 if per_dialect:
5180 text += " " + self.get_statement_hint_text(per_dialect)
5181
5182 # In compound query, CTEs are shared at the compound level
5183 if self.ctes and (not is_embedded_select or toplevel):
5184 nesting_level = len(self.stack) if not toplevel else None
5185 text = self._render_cte_clause(nesting_level=nesting_level) + text
5186
5187 if select_stmt._suffixes:
5188 text += " " + self._generate_prefixes(
5189 select_stmt, select_stmt._suffixes, **kwargs
5190 )
5191
5192 self.stack.pop(-1)
5193
5194 return text
5195
5196 def _setup_select_hints(
5197 self, select: Select[Unpack[TupleAny]]
5198 ) -> Tuple[str, _FromHintsType]:
5199 byfrom = {
5200 from_: hinttext
5201 % {"name": from_._compiler_dispatch(self, ashint=True)}
5202 for (from_, dialect), hinttext in select._hints.items()
5203 if dialect in ("*", self.dialect.name)
5204 }
5205 hint_text = self.get_select_hint_text(byfrom)
5206 return hint_text, byfrom
5207
5208 def _setup_select_stack(
5209 self, select, compile_state, entry, asfrom, lateral, compound_index
5210 ):
5211 correlate_froms = entry["correlate_froms"]
5212 asfrom_froms = entry["asfrom_froms"]
5213
5214 if compound_index == 0:
5215 entry["select_0"] = select
5216 elif compound_index:
5217 select_0 = entry["select_0"]
5218 numcols = len(select_0._all_selected_columns)
5219
5220 if len(compile_state.columns_plus_names) != numcols:
5221 raise exc.CompileError(
5222 "All selectables passed to "
5223 "CompoundSelect must have identical numbers of "
5224 "columns; select #%d has %d columns, select "
5225 "#%d has %d"
5226 % (
5227 1,
5228 numcols,
5229 compound_index + 1,
5230 len(select._all_selected_columns),
5231 )
5232 )
5233
5234 if asfrom and not lateral:
5235 froms = compile_state._get_display_froms(
5236 explicit_correlate_froms=correlate_froms.difference(
5237 asfrom_froms
5238 ),
5239 implicit_correlate_froms=(),
5240 )
5241 else:
5242 froms = compile_state._get_display_froms(
5243 explicit_correlate_froms=correlate_froms,
5244 implicit_correlate_froms=asfrom_froms,
5245 )
5246
5247 new_correlate_froms = set(_from_objects(*froms))
5248 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5249
5250 new_entry: _CompilerStackEntry = {
5251 "asfrom_froms": new_correlate_froms,
5252 "correlate_froms": all_correlate_froms,
5253 "selectable": select,
5254 "compile_state": compile_state,
5255 }
5256 self.stack.append(new_entry)
5257
5258 return froms
5259
5260 def _compose_select_body(
5261 self,
5262 text,
5263 select,
5264 compile_state,
5265 inner_columns,
5266 froms,
5267 byfrom,
5268 toplevel,
5269 kwargs,
5270 ):
5271 text += ", ".join(inner_columns)
5272
5273 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5274 from_linter = FromLinter({}, set())
5275 warn_linting = self.linting & WARN_LINTING
5276 if toplevel:
5277 self.from_linter = from_linter
5278 else:
5279 from_linter = None
5280 warn_linting = False
5281
5282 # adjust the whitespace for no inner columns, part of #9440,
5283 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5284 # "SELECT FROM ...".
5285 # while it would be better to have built the SELECT starting string
5286 # without trailing whitespace first, then add whitespace only if inner
5287 # cols were present, this breaks compatibility with various custom
5288 # compilation schemes that are currently being tested.
5289 if not inner_columns:
5290 text = text.rstrip()
5291
5292 if froms:
5293 text += " \nFROM "
5294
5295 if select._hints:
5296 text += ", ".join(
5297 [
5298 f._compiler_dispatch(
5299 self,
5300 asfrom=True,
5301 fromhints=byfrom,
5302 from_linter=from_linter,
5303 **kwargs,
5304 )
5305 for f in froms
5306 ]
5307 )
5308 else:
5309 text += ", ".join(
5310 [
5311 f._compiler_dispatch(
5312 self,
5313 asfrom=True,
5314 from_linter=from_linter,
5315 **kwargs,
5316 )
5317 for f in froms
5318 ]
5319 )
5320 else:
5321 text += self.default_from()
5322
5323 if select._where_criteria:
5324 t = self._generate_delimited_and_list(
5325 select._where_criteria, from_linter=from_linter, **kwargs
5326 )
5327 if t:
5328 text += " \nWHERE " + t
5329
5330 if warn_linting:
5331 assert from_linter is not None
5332 from_linter.warn()
5333
5334 if select._group_by_clauses:
5335 text += self.group_by_clause(select, **kwargs)
5336
5337 if select._having_criteria:
5338 t = self._generate_delimited_and_list(
5339 select._having_criteria, **kwargs
5340 )
5341 if t:
5342 text += " \nHAVING " + t
5343
5344 if select._post_criteria_clause is not None:
5345 pcc = self.process(select._post_criteria_clause, **kwargs)
5346 if pcc is not None:
5347 text += " \n" + pcc
5348
5349 if select._order_by_clauses:
5350 text += self.order_by_clause(select, **kwargs)
5351
5352 if select._has_row_limiting_clause:
5353 text += self._row_limit_clause(select, **kwargs)
5354
5355 if select._for_update_arg is not None:
5356 text += self.for_update_clause(select, **kwargs)
5357
5358 return text
5359
5360 def _generate_prefixes(self, stmt, prefixes, **kw):
5361 clause = " ".join(
5362 prefix._compiler_dispatch(self, **kw)
5363 for prefix, dialect_name in prefixes
5364 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5365 )
5366 if clause:
5367 clause += " "
5368 return clause
5369
5370 def _render_cte_clause(
5371 self,
5372 nesting_level=None,
5373 include_following_stack=False,
5374 ):
5375 """
5376 include_following_stack
5377 Also render the nesting CTEs on the next stack. Useful for
5378 SQL structures like UNION or INSERT that can wrap SELECT
5379 statements containing nesting CTEs.
5380 """
5381 if not self.ctes:
5382 return ""
5383
5384 ctes: MutableMapping[CTE, str]
5385
5386 if nesting_level and nesting_level > 1:
5387 ctes = util.OrderedDict()
5388 for cte in list(self.ctes.keys()):
5389 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5390 cte._get_reference_cte()
5391 ]
5392 nesting = cte.nesting or cte_opts.nesting
5393 is_rendered_level = cte_level == nesting_level or (
5394 include_following_stack and cte_level == nesting_level + 1
5395 )
5396 if not (nesting and is_rendered_level):
5397 continue
5398
5399 ctes[cte] = self.ctes[cte]
5400
5401 else:
5402 ctes = self.ctes
5403
5404 if not ctes:
5405 return ""
5406 ctes_recursive = any([cte.recursive for cte in ctes])
5407
5408 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5409 cte_text += ", \n".join([txt for txt in ctes.values()])
5410 cte_text += "\n "
5411
5412 if nesting_level and nesting_level > 1:
5413 for cte in list(ctes.keys()):
5414 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5415 cte._get_reference_cte()
5416 ]
5417 del self.ctes[cte]
5418 del self.ctes_by_level_name[(cte_level, cte_name)]
5419 del self.level_name_by_cte[cte._get_reference_cte()]
5420
5421 return cte_text
5422
5423 def get_cte_preamble(self, recursive):
5424 if recursive:
5425 return "WITH RECURSIVE"
5426 else:
5427 return "WITH"
5428
5429 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5430 """Called when building a ``SELECT`` statement, position is just
5431 before column list.
5432
5433 """
5434 if select._distinct_on:
5435 util.warn_deprecated(
5436 "DISTINCT ON is currently supported only by the PostgreSQL "
5437 "dialect. Use of DISTINCT ON for other backends is currently "
5438 "silently ignored, however this usage is deprecated, and will "
5439 "raise CompileError in a future release for all backends "
5440 "that do not support this syntax.",
5441 version="1.4",
5442 )
5443 return "DISTINCT " if select._distinct else ""
5444
5445 def group_by_clause(self, select, **kw):
5446 """allow dialects to customize how GROUP BY is rendered."""
5447
5448 group_by = self._generate_delimited_list(
5449 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5450 )
5451 if group_by:
5452 return " GROUP BY " + group_by
5453 else:
5454 return ""
5455
5456 def order_by_clause(self, select, **kw):
5457 """allow dialects to customize how ORDER BY is rendered."""
5458
5459 order_by = self._generate_delimited_list(
5460 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5461 )
5462
5463 if order_by:
5464 return " ORDER BY " + order_by
5465 else:
5466 return ""
5467
5468 def for_update_clause(self, select, **kw):
5469 return " FOR UPDATE"
5470
5471 def returning_clause(
5472 self,
5473 stmt: UpdateBase,
5474 returning_cols: Sequence[_ColumnsClauseElement],
5475 *,
5476 populate_result_map: bool,
5477 **kw: Any,
5478 ) -> str:
5479 columns = [
5480 self._label_returning_column(
5481 stmt,
5482 column,
5483 populate_result_map,
5484 fallback_label_name=fallback_label_name,
5485 column_is_repeated=repeated,
5486 name=name,
5487 proxy_name=proxy_name,
5488 **kw,
5489 )
5490 for (
5491 name,
5492 proxy_name,
5493 fallback_label_name,
5494 column,
5495 repeated,
5496 ) in stmt._generate_columns_plus_names(
5497 True, cols=base._select_iterables(returning_cols)
5498 )
5499 ]
5500
5501 return "RETURNING " + ", ".join(columns)
5502
5503 def limit_clause(self, select, **kw):
5504 text = ""
5505 if select._limit_clause is not None:
5506 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5507 if select._offset_clause is not None:
5508 if select._limit_clause is None:
5509 text += "\n LIMIT -1"
5510 text += " OFFSET " + self.process(select._offset_clause, **kw)
5511 return text
5512
5513 def fetch_clause(
5514 self,
5515 select,
5516 fetch_clause=None,
5517 require_offset=False,
5518 use_literal_execute_for_simple_int=False,
5519 **kw,
5520 ):
5521 if fetch_clause is None:
5522 fetch_clause = select._fetch_clause
5523 fetch_clause_options = select._fetch_clause_options
5524 else:
5525 fetch_clause_options = {"percent": False, "with_ties": False}
5526
5527 text = ""
5528
5529 if select._offset_clause is not None:
5530 offset_clause = select._offset_clause
5531 if (
5532 use_literal_execute_for_simple_int
5533 and select._simple_int_clause(offset_clause)
5534 ):
5535 offset_clause = offset_clause.render_literal_execute()
5536 offset_str = self.process(offset_clause, **kw)
5537 text += "\n OFFSET %s ROWS" % offset_str
5538 elif require_offset:
5539 text += "\n OFFSET 0 ROWS"
5540
5541 if fetch_clause is not None:
5542 if (
5543 use_literal_execute_for_simple_int
5544 and select._simple_int_clause(fetch_clause)
5545 ):
5546 fetch_clause = fetch_clause.render_literal_execute()
5547 text += "\n FETCH FIRST %s%s ROWS %s" % (
5548 self.process(fetch_clause, **kw),
5549 " PERCENT" if fetch_clause_options["percent"] else "",
5550 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5551 )
5552 return text
5553
5554 def visit_table(
5555 self,
5556 table,
5557 asfrom=False,
5558 iscrud=False,
5559 ashint=False,
5560 fromhints=None,
5561 use_schema=True,
5562 from_linter=None,
5563 ambiguous_table_name_map=None,
5564 enclosing_alias=None,
5565 within_tstring=False,
5566 **kwargs,
5567 ):
5568 if from_linter:
5569 from_linter.froms[table] = table.fullname
5570
5571 if asfrom or ashint or within_tstring:
5572 effective_schema = self.preparer.schema_for_object(table)
5573
5574 if use_schema and effective_schema:
5575 ret = (
5576 self.preparer.quote_schema(effective_schema)
5577 + "."
5578 + self.preparer.quote(table.name)
5579 )
5580 else:
5581 ret = self.preparer.quote(table.name)
5582
5583 if (
5584 (
5585 enclosing_alias is None
5586 or enclosing_alias.element is not table
5587 )
5588 and not effective_schema
5589 and ambiguous_table_name_map
5590 and table.name in ambiguous_table_name_map
5591 ):
5592 anon_name = self._truncated_identifier(
5593 "alias", ambiguous_table_name_map[table.name]
5594 )
5595
5596 ret = ret + self.get_render_as_alias_suffix(
5597 self.preparer.format_alias(None, anon_name)
5598 )
5599
5600 if fromhints and table in fromhints:
5601 ret = self.format_from_hint_text(
5602 ret, table, fromhints[table], iscrud
5603 )
5604 return ret
5605 else:
5606 return ""
5607
5608 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5609 if from_linter:
5610 from_linter.edges.update(
5611 itertools.product(
5612 _de_clone(join.left._from_objects),
5613 _de_clone(join.right._from_objects),
5614 )
5615 )
5616
5617 if join.full:
5618 join_type = " FULL OUTER JOIN "
5619 elif join.isouter:
5620 join_type = " LEFT OUTER JOIN "
5621 else:
5622 join_type = " JOIN "
5623 return (
5624 join.left._compiler_dispatch(
5625 self, asfrom=True, from_linter=from_linter, **kwargs
5626 )
5627 + join_type
5628 + join.right._compiler_dispatch(
5629 self, asfrom=True, from_linter=from_linter, **kwargs
5630 )
5631 + " ON "
5632 # TODO: likely need asfrom=True here?
5633 + join.onclause._compiler_dispatch(
5634 self, from_linter=from_linter, **kwargs
5635 )
5636 )
5637
5638 def _setup_crud_hints(self, stmt, table_text):
5639 dialect_hints = {
5640 table: hint_text
5641 for (table, dialect), hint_text in stmt._hints.items()
5642 if dialect in ("*", self.dialect.name)
5643 }
5644 if stmt.table in dialect_hints:
5645 table_text = self.format_from_hint_text(
5646 table_text, stmt.table, dialect_hints[stmt.table], True
5647 )
5648 return dialect_hints, table_text
5649
5650 # within the realm of "insertmanyvalues sentinel columns",
5651 # these lookups match different kinds of Column() configurations
5652 # to specific backend capabilities. they are broken into two
5653 # lookups, one for autoincrement columns and the other for non
5654 # autoincrement columns
5655 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5656 {
5657 _SentinelDefaultCharacterization.CLIENTSIDE: (
5658 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5659 ),
5660 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5661 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5662 ),
5663 _SentinelDefaultCharacterization.NONE: (
5664 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5665 ),
5666 _SentinelDefaultCharacterization.IDENTITY: (
5667 InsertmanyvaluesSentinelOpts.IDENTITY
5668 ),
5669 _SentinelDefaultCharacterization.SEQUENCE: (
5670 InsertmanyvaluesSentinelOpts.SEQUENCE
5671 ),
5672 _SentinelDefaultCharacterization.MONOTONIC_FUNCTION: (
5673 InsertmanyvaluesSentinelOpts.MONOTONIC_FUNCTION
5674 ),
5675 }
5676 )
5677 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5678 {
5679 _SentinelDefaultCharacterization.NONE: (
5680 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5681 ),
5682 }
5683 )
5684
5685 def _get_sentinel_column_for_table(
5686 self, table: Table
5687 ) -> Optional[Sequence[Column[Any]]]:
5688 """given a :class:`.Table`, return a usable sentinel column or
5689 columns for this dialect if any.
5690
5691 Return None if no sentinel columns could be identified, or raise an
5692 error if a column was marked as a sentinel explicitly but isn't
5693 compatible with this dialect.
5694
5695 """
5696
5697 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5698 sentinel_characteristics = table._sentinel_column_characteristics
5699
5700 sent_cols = sentinel_characteristics.columns
5701
5702 if sent_cols is None:
5703 return None
5704
5705 if sentinel_characteristics.is_autoinc:
5706 bitmask = self._sentinel_col_autoinc_lookup.get(
5707 sentinel_characteristics.default_characterization, 0
5708 )
5709 else:
5710 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5711 sentinel_characteristics.default_characterization, 0
5712 )
5713
5714 if sentinel_opts & bitmask:
5715 return sent_cols
5716
5717 if sentinel_characteristics.is_explicit:
5718 # a column was explicitly marked as insert_sentinel=True,
5719 # however it is not compatible with this dialect. they should
5720 # not indicate this column as a sentinel if they need to include
5721 # this dialect.
5722
5723 # TODO: do we want non-primary key explicit sentinel cols
5724 # that can gracefully degrade for some backends?
5725 # insert_sentinel="degrade" perhaps. not for the initial release.
5726 # I am hoping people are generally not dealing with this sentinel
5727 # business at all.
5728
5729 # if is_explicit is True, there will be only one sentinel column.
5730
5731 raise exc.InvalidRequestError(
5732 f"Column {sent_cols[0]} can't be explicitly "
5733 "marked as a sentinel column when using the "
5734 f"{self.dialect.name} dialect, as the "
5735 "particular type of default generation on this column is "
5736 "not currently compatible with this dialect's specific "
5737 f"INSERT..RETURNING syntax which can receive the "
5738 "server-generated value in "
5739 "a deterministic way. To remove this error, remove "
5740 "insert_sentinel=True from primary key autoincrement "
5741 "columns; these columns are automatically used as "
5742 "sentinels for supported dialects in any case."
5743 )
5744
5745 return None
5746
5747 def _deliver_insertmanyvalues_batches(
5748 self,
5749 statement: str,
5750 parameters: _DBAPIMultiExecuteParams,
5751 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5752 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5753 batch_size: int,
5754 sort_by_parameter_order: bool,
5755 schema_translate_map: Optional[SchemaTranslateMapType],
5756 ) -> Iterator[_InsertManyValuesBatch]:
5757 imv = self._insertmanyvalues
5758 assert imv is not None
5759
5760 if not imv.sentinel_param_keys:
5761 _sentinel_from_params = None
5762 else:
5763 _sentinel_from_params = operator.itemgetter(
5764 *imv.sentinel_param_keys
5765 )
5766
5767 lenparams = len(parameters)
5768 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5769 # backend doesn't support
5770 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5771 # at the moment this is basically SQL Server due to
5772 # not being able to use DEFAULT for identity column
5773 # just yield out that many single statements! still
5774 # faster than a whole connection.execute() call ;)
5775 #
5776 # note we still are taking advantage of the fact that we know
5777 # we are using RETURNING. The generalized approach of fetching
5778 # cursor.lastrowid etc. still goes through the more heavyweight
5779 # "ExecutionContext per statement" system as it isn't usable
5780 # as a generic "RETURNING" approach
5781 use_row_at_a_time = True
5782 downgraded = False
5783 elif not self.dialect.supports_multivalues_insert or (
5784 sort_by_parameter_order
5785 and self._result_columns
5786 and (
5787 imv.sentinel_columns is None
5788 or (
5789 imv.includes_upsert_behaviors
5790 and not imv.embed_values_counter
5791 )
5792 )
5793 ):
5794 # deterministic order was requested and the compiler could
5795 # not organize sentinel columns for this dialect/statement.
5796 # use row at a time. Note: if embed_values_counter is True,
5797 # the counter itself provides the ordering capability we need,
5798 # so we can use batch mode even with upsert behaviors.
5799 use_row_at_a_time = True
5800 downgraded = True
5801 elif (
5802 imv.has_upsert_bound_parameters
5803 and not imv.embed_values_counter
5804 and self._result_columns
5805 ):
5806 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
5807 # and parametrized bindparams in the SET clause, we must use
5808 # row-at-a-time. Batching multiple rows in a single statement
5809 # doesn't work when the SET clause contains bound parameters that
5810 # will receive different values per row, as there's only one SET
5811 # clause per statement. See issue #13130.
5812 use_row_at_a_time = True
5813 downgraded = True
5814 else:
5815 use_row_at_a_time = False
5816 downgraded = False
5817
5818 if use_row_at_a_time:
5819 for batchnum, (param, compiled_param) in enumerate(
5820 cast(
5821 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5822 zip(parameters, compiled_parameters),
5823 ),
5824 1,
5825 ):
5826 yield _InsertManyValuesBatch(
5827 statement,
5828 param,
5829 generic_setinputsizes,
5830 [param],
5831 (
5832 [_sentinel_from_params(compiled_param)]
5833 if _sentinel_from_params
5834 else []
5835 ),
5836 1,
5837 batchnum,
5838 lenparams,
5839 sort_by_parameter_order,
5840 downgraded,
5841 )
5842 return
5843
5844 if schema_translate_map:
5845 rst = functools.partial(
5846 self.preparer._render_schema_translates,
5847 schema_translate_map=schema_translate_map,
5848 )
5849 else:
5850 rst = None
5851
5852 imv_single_values_expr = imv.single_values_expr
5853 if rst:
5854 imv_single_values_expr = rst(imv_single_values_expr)
5855
5856 executemany_values = f"({imv_single_values_expr})"
5857 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5858
5859 # Use optional insertmanyvalues_max_parameters
5860 # to further shrink the batch size so that there are no more than
5861 # insertmanyvalues_max_parameters params.
5862 # Currently used by SQL Server, which limits statements to 2100 bound
5863 # parameters (actually 2099).
5864 max_params = self.dialect.insertmanyvalues_max_parameters
5865 if max_params:
5866 total_num_of_params = len(self.bind_names)
5867 num_params_per_batch = len(imv.insert_crud_params)
5868 num_params_outside_of_batch = (
5869 total_num_of_params - num_params_per_batch
5870 )
5871 batch_size = min(
5872 batch_size,
5873 (
5874 (max_params - num_params_outside_of_batch)
5875 // num_params_per_batch
5876 ),
5877 )
5878
5879 batches = cast("List[Sequence[Any]]", list(parameters))
5880 compiled_batches = cast(
5881 "List[Sequence[Any]]", list(compiled_parameters)
5882 )
5883
5884 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5885 batchnum = 1
5886 total_batches = lenparams // batch_size + (
5887 1 if lenparams % batch_size else 0
5888 )
5889
5890 insert_crud_params = imv.insert_crud_params
5891 assert insert_crud_params is not None
5892
5893 if rst:
5894 insert_crud_params = [
5895 (col, key, rst(expr), st)
5896 for col, key, expr, st in insert_crud_params
5897 ]
5898
5899 escaped_bind_names: Mapping[str, str]
5900 expand_pos_lower_index = expand_pos_upper_index = 0
5901
5902 if not self.positional:
5903 if self.escaped_bind_names:
5904 escaped_bind_names = self.escaped_bind_names
5905 else:
5906 escaped_bind_names = {}
5907
5908 all_keys = set(parameters[0])
5909
5910 def apply_placeholders(keys, formatted):
5911 for key in keys:
5912 key = escaped_bind_names.get(key, key)
5913 formatted = formatted.replace(
5914 self.bindtemplate % {"name": key},
5915 self.bindtemplate
5916 % {"name": f"{key}__EXECMANY_INDEX__"},
5917 )
5918 return formatted
5919
5920 if imv.embed_values_counter:
5921 imv_values_counter = ", _IMV_VALUES_COUNTER"
5922 else:
5923 imv_values_counter = ""
5924 formatted_values_clause = f"""({', '.join(
5925 apply_placeholders(bind_keys, formatted)
5926 for _, _, formatted, bind_keys in insert_crud_params
5927 )}{imv_values_counter})"""
5928
5929 keys_to_replace = all_keys.intersection(
5930 escaped_bind_names.get(key, key)
5931 for _, _, _, bind_keys in insert_crud_params
5932 for key in bind_keys
5933 )
5934 base_parameters = {
5935 key: parameters[0][key]
5936 for key in all_keys.difference(keys_to_replace)
5937 }
5938
5939 executemany_values_w_comma = ""
5940 else:
5941 formatted_values_clause = ""
5942 keys_to_replace = set()
5943 base_parameters = {}
5944
5945 if imv.embed_values_counter:
5946 executemany_values_w_comma = (
5947 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5948 )
5949 else:
5950 executemany_values_w_comma = f"({imv_single_values_expr}), "
5951
5952 all_names_we_will_expand: Set[str] = set()
5953 for elem in imv.insert_crud_params:
5954 all_names_we_will_expand.update(elem[3])
5955
5956 # get the start and end position in a particular list
5957 # of parameters where we will be doing the "expanding".
5958 # statements can have params on either side or both sides,
5959 # given RETURNING and CTEs
5960 if all_names_we_will_expand:
5961 positiontup = self.positiontup
5962 assert positiontup is not None
5963
5964 all_expand_positions = {
5965 idx
5966 for idx, name in enumerate(positiontup)
5967 if name in all_names_we_will_expand
5968 }
5969 expand_pos_lower_index = min(all_expand_positions)
5970 expand_pos_upper_index = max(all_expand_positions) + 1
5971 assert (
5972 len(all_expand_positions)
5973 == expand_pos_upper_index - expand_pos_lower_index
5974 )
5975
5976 if self._numeric_binds:
5977 escaped = re.escape(self._numeric_binds_identifier_char)
5978 executemany_values_w_comma = re.sub(
5979 rf"{escaped}\d+", "%s", executemany_values_w_comma
5980 )
5981
5982 while batches:
5983 batch = batches[0:batch_size]
5984 compiled_batch = compiled_batches[0:batch_size]
5985
5986 batches[0:batch_size] = []
5987 compiled_batches[0:batch_size] = []
5988
5989 if batches:
5990 current_batch_size = batch_size
5991 else:
5992 current_batch_size = len(batch)
5993
5994 if generic_setinputsizes:
5995 # if setinputsizes is present, expand this collection to
5996 # suit the batch length as well
5997 # currently this will be mssql+pyodbc for internal dialects
5998 processed_setinputsizes = [
5999 (new_key, len_, typ)
6000 for new_key, len_, typ in (
6001 (f"{key}_{index}", len_, typ)
6002 for index in range(current_batch_size)
6003 for key, len_, typ in generic_setinputsizes
6004 )
6005 ]
6006
6007 replaced_parameters: Any
6008 if self.positional:
6009 num_ins_params = imv.num_positional_params_counted
6010
6011 batch_iterator: Iterable[Sequence[Any]]
6012 extra_params_left: Sequence[Any]
6013 extra_params_right: Sequence[Any]
6014
6015 if num_ins_params == len(batch[0]):
6016 extra_params_left = extra_params_right = ()
6017 batch_iterator = batch
6018 else:
6019 extra_params_left = batch[0][:expand_pos_lower_index]
6020 extra_params_right = batch[0][expand_pos_upper_index:]
6021 batch_iterator = (
6022 b[expand_pos_lower_index:expand_pos_upper_index]
6023 for b in batch
6024 )
6025
6026 if imv.embed_values_counter:
6027 expanded_values_string = (
6028 "".join(
6029 executemany_values_w_comma.replace(
6030 "_IMV_VALUES_COUNTER", str(i)
6031 )
6032 for i, _ in enumerate(batch)
6033 )
6034 )[:-2]
6035 else:
6036 expanded_values_string = (
6037 (executemany_values_w_comma * current_batch_size)
6038 )[:-2]
6039
6040 if self._numeric_binds and num_ins_params > 0:
6041 # numeric will always number the parameters inside of
6042 # VALUES (and thus order self.positiontup) to be higher
6043 # than non-VALUES parameters, no matter where in the
6044 # statement those non-VALUES parameters appear (this is
6045 # ensured in _process_numeric by numbering first all
6046 # params that are not in _values_bindparam)
6047 # therefore all extra params are always
6048 # on the left side and numbered lower than the VALUES
6049 # parameters
6050 assert not extra_params_right
6051
6052 start = expand_pos_lower_index + 1
6053 end = num_ins_params * (current_batch_size) + start
6054
6055 # need to format here, since statement may contain
6056 # unescaped %, while values_string contains just (%s, %s)
6057 positions = tuple(
6058 f"{self._numeric_binds_identifier_char}{i}"
6059 for i in range(start, end)
6060 )
6061 expanded_values_string = expanded_values_string % positions
6062
6063 replaced_statement = statement.replace(
6064 "__EXECMANY_TOKEN__", expanded_values_string
6065 )
6066
6067 replaced_parameters = tuple(
6068 itertools.chain.from_iterable(batch_iterator)
6069 )
6070
6071 replaced_parameters = (
6072 extra_params_left
6073 + replaced_parameters
6074 + extra_params_right
6075 )
6076
6077 else:
6078 replaced_values_clauses = []
6079 replaced_parameters = base_parameters.copy()
6080
6081 for i, param in enumerate(batch):
6082 fmv = formatted_values_clause.replace(
6083 "EXECMANY_INDEX__", str(i)
6084 )
6085 if imv.embed_values_counter:
6086 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
6087
6088 replaced_values_clauses.append(fmv)
6089 replaced_parameters.update(
6090 {f"{key}__{i}": param[key] for key in keys_to_replace}
6091 )
6092
6093 replaced_statement = statement.replace(
6094 "__EXECMANY_TOKEN__",
6095 ", ".join(replaced_values_clauses),
6096 )
6097
6098 yield _InsertManyValuesBatch(
6099 replaced_statement,
6100 replaced_parameters,
6101 processed_setinputsizes,
6102 batch,
6103 (
6104 [_sentinel_from_params(cb) for cb in compiled_batch]
6105 if _sentinel_from_params
6106 else []
6107 ),
6108 current_batch_size,
6109 batchnum,
6110 total_batches,
6111 sort_by_parameter_order,
6112 False,
6113 )
6114 batchnum += 1
6115
6116 def visit_insert(
6117 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6118 ):
6119 compile_state = insert_stmt._compile_state_factory(
6120 insert_stmt, self, **kw
6121 )
6122 insert_stmt = compile_state.statement
6123
6124 if visiting_cte is not None:
6125 kw["visiting_cte"] = visiting_cte
6126 toplevel = False
6127 else:
6128 toplevel = not self.stack
6129
6130 if toplevel:
6131 self.isinsert = True
6132 if not self.dml_compile_state:
6133 self.dml_compile_state = compile_state
6134 if not self.compile_state:
6135 self.compile_state = compile_state
6136
6137 self.stack.append(
6138 {
6139 "correlate_froms": set(),
6140 "asfrom_froms": set(),
6141 "selectable": insert_stmt,
6142 }
6143 )
6144
6145 counted_bindparam = 0
6146
6147 # reset any incoming "visited_bindparam" collection
6148 visited_bindparam = None
6149
6150 # for positional, insertmanyvalues needs to know how many
6151 # bound parameters are in the VALUES sequence; there's no simple
6152 # rule because default expressions etc. can have zero or more
6153 # params inside them. After multiple attempts to figure this out,
6154 # this very simplistic "count after" works and is
6155 # likely the least amount of callcounts, though looks clumsy
6156 if self.positional and visiting_cte is None:
6157 # if we are inside a CTE, don't count parameters
6158 # here since they won't be for insertmanyvalues. keep
6159 # visited_bindparam at None so no counting happens.
6160 # see #9173
6161 visited_bindparam = []
6162
6163 crud_params_struct = crud._get_crud_params(
6164 self,
6165 insert_stmt,
6166 compile_state,
6167 toplevel,
6168 visited_bindparam=visited_bindparam,
6169 **kw,
6170 )
6171
6172 if self.positional and visited_bindparam is not None:
6173 counted_bindparam = len(visited_bindparam)
6174 if self._numeric_binds:
6175 if self._values_bindparam is not None:
6176 self._values_bindparam += visited_bindparam
6177 else:
6178 self._values_bindparam = visited_bindparam
6179
6180 crud_params_single = crud_params_struct.single_params
6181
6182 if (
6183 not crud_params_single
6184 and not self.dialect.supports_default_values
6185 and not self.dialect.supports_default_metavalue
6186 and not self.dialect.supports_empty_insert
6187 ):
6188 raise exc.CompileError(
6189 "The '%s' dialect with current database "
6190 "version settings does not support empty "
6191 "inserts." % self.dialect.name
6192 )
6193
6194 if compile_state._has_multi_parameters:
6195 if not self.dialect.supports_multivalues_insert:
6196 raise exc.CompileError(
6197 "The '%s' dialect with current database "
6198 "version settings does not support "
6199 "in-place multirow inserts." % self.dialect.name
6200 )
6201 elif (
6202 self.implicit_returning or insert_stmt._returning
6203 ) and insert_stmt._sort_by_parameter_order:
6204 raise exc.CompileError(
6205 "RETURNING cannot be deterministically sorted when "
6206 "using an INSERT which includes multi-row values()."
6207 )
6208 crud_params_single = crud_params_struct.single_params
6209 else:
6210 crud_params_single = crud_params_struct.single_params
6211
6212 preparer = self.preparer
6213 supports_default_values = self.dialect.supports_default_values
6214
6215 text = "INSERT "
6216
6217 if insert_stmt._prefixes:
6218 text += self._generate_prefixes(
6219 insert_stmt, insert_stmt._prefixes, **kw
6220 )
6221
6222 text += "INTO "
6223 table_text = preparer.format_table(insert_stmt.table)
6224
6225 if insert_stmt._hints:
6226 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6227
6228 if insert_stmt._independent_ctes:
6229 self._dispatch_independent_ctes(insert_stmt, kw)
6230
6231 text += table_text
6232
6233 if crud_params_single or not supports_default_values:
6234 text += " (%s)" % ", ".join(
6235 [expr for _, expr, _, _ in crud_params_single]
6236 )
6237
6238 # look for insertmanyvalues attributes that would have been configured
6239 # by crud.py as it scanned through the columns to be part of the
6240 # INSERT
6241 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6242 named_sentinel_params: Optional[Sequence[str]] = None
6243 add_sentinel_cols = None
6244 implicit_sentinel = False
6245
6246 returning_cols = self.implicit_returning or insert_stmt._returning
6247 if returning_cols:
6248 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6249 if add_sentinel_cols is not None:
6250 assert use_insertmanyvalues
6251
6252 # search for the sentinel column explicitly present
6253 # in the INSERT columns list, and additionally check that
6254 # this column has a bound parameter name set up that's in the
6255 # parameter list. If both of these cases are present, it means
6256 # we will have a client side value for the sentinel in each
6257 # parameter set.
6258
6259 _params_by_col = {
6260 col: param_names
6261 for col, _, _, param_names in crud_params_single
6262 }
6263 named_sentinel_params = []
6264 for _add_sentinel_col in add_sentinel_cols:
6265 if _add_sentinel_col not in _params_by_col:
6266 named_sentinel_params = None
6267 break
6268 param_name = self._within_exec_param_key_getter(
6269 _add_sentinel_col
6270 )
6271 if param_name not in _params_by_col[_add_sentinel_col]:
6272 named_sentinel_params = None
6273 break
6274 named_sentinel_params.append(param_name)
6275
6276 if named_sentinel_params is None:
6277 # if we are not going to have a client side value for
6278 # the sentinel in the parameter set, that means it's
6279 # an autoincrement, an IDENTITY, or a server-side SQL
6280 # expression like nextval('seqname'). So this is
6281 # an "implicit" sentinel; we will look for it in
6282 # RETURNING
6283 # only, and then sort on it. For this case on PG,
6284 # SQL Server we have to use a special INSERT form
6285 # that guarantees the server side function lines up with
6286 # the entries in the VALUES.
6287 if (
6288 self.dialect.insertmanyvalues_implicit_sentinel
6289 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6290 ):
6291 implicit_sentinel = True
6292 else:
6293 # here, we are not using a sentinel at all
6294 # and we are likely the SQLite dialect.
6295 # The first add_sentinel_col that we have should not
6296 # be marked as "insert_sentinel=True". if it was,
6297 # an error should have been raised in
6298 # _get_sentinel_column_for_table.
6299 assert not add_sentinel_cols[0]._insert_sentinel, (
6300 "sentinel selection rules should have prevented "
6301 "us from getting here for this dialect"
6302 )
6303
6304 # always put the sentinel columns last. even if they are
6305 # in the returning list already, they will be there twice
6306 # then.
6307 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6308
6309 returning_clause = self.returning_clause(
6310 insert_stmt,
6311 returning_cols,
6312 populate_result_map=toplevel,
6313 )
6314
6315 if self.returning_precedes_values:
6316 text += " " + returning_clause
6317
6318 else:
6319 returning_clause = None
6320
6321 if insert_stmt.select is not None:
6322 # placed here by crud.py
6323 select_text = self.process(
6324 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6325 )
6326
6327 if self.ctes and self.dialect.cte_follows_insert:
6328 nesting_level = len(self.stack) if not toplevel else None
6329 text += " %s%s" % (
6330 self._render_cte_clause(
6331 nesting_level=nesting_level,
6332 include_following_stack=True,
6333 ),
6334 select_text,
6335 )
6336 else:
6337 text += " %s" % select_text
6338 elif not crud_params_single and supports_default_values:
6339 text += " DEFAULT VALUES"
6340 if use_insertmanyvalues:
6341 self._insertmanyvalues = _InsertManyValues(
6342 True,
6343 self.dialect.default_metavalue_token,
6344 crud_params_single,
6345 counted_bindparam,
6346 sort_by_parameter_order=(
6347 insert_stmt._sort_by_parameter_order
6348 ),
6349 includes_upsert_behaviors=(
6350 insert_stmt._post_values_clause is not None
6351 ),
6352 sentinel_columns=add_sentinel_cols,
6353 num_sentinel_columns=(
6354 len(add_sentinel_cols) if add_sentinel_cols else 0
6355 ),
6356 implicit_sentinel=implicit_sentinel,
6357 )
6358 elif compile_state._has_multi_parameters:
6359 text += " VALUES %s" % (
6360 ", ".join(
6361 "(%s)"
6362 % (", ".join(value for _, _, value, _ in crud_param_set))
6363 for crud_param_set in crud_params_struct.all_multi_params
6364 ),
6365 )
6366 elif use_insertmanyvalues:
6367 if (
6368 implicit_sentinel
6369 and (
6370 self.dialect.insertmanyvalues_implicit_sentinel
6371 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6372 )
6373 # this is checking if we have
6374 # INSERT INTO table (id) VALUES (DEFAULT).
6375 and not (crud_params_struct.is_default_metavalue_only)
6376 ):
6377 # if we have a sentinel column that is server generated,
6378 # then for selected backends render the VALUES list as a
6379 # subquery. This is the orderable form supported by
6380 # PostgreSQL and in fewer cases SQL Server
6381 embed_sentinel_value = True
6382
6383 render_bind_casts = (
6384 self.dialect.insertmanyvalues_implicit_sentinel
6385 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6386 )
6387
6388 add_sentinel_set = add_sentinel_cols or ()
6389
6390 insert_single_values_expr = ", ".join(
6391 [
6392 value
6393 for col, _, value, _ in crud_params_single
6394 if col not in add_sentinel_set
6395 ]
6396 )
6397
6398 colnames = ", ".join(
6399 f"p{i}"
6400 for i, cp in enumerate(crud_params_single)
6401 if cp[0] not in add_sentinel_set
6402 )
6403
6404 if render_bind_casts:
6405 # render casts for the SELECT list. For PG, we are
6406 # already rendering bind casts in the parameter list,
6407 # selectively for the more "tricky" types like ARRAY.
6408 # however, even for the "easy" types, if the parameter
6409 # is NULL for every entry, PG gives up and says
6410 # "it must be TEXT", which fails for other easy types
6411 # like ints. So we cast on this side too.
6412 colnames_w_cast = ", ".join(
6413 (
6414 self.render_bind_cast(
6415 col.type,
6416 col.type._unwrapped_dialect_impl(self.dialect),
6417 f"p{i}",
6418 )
6419 if col not in add_sentinel_set
6420 else expr
6421 )
6422 for i, (col, _, expr, _) in enumerate(
6423 crud_params_single
6424 )
6425 )
6426 else:
6427 colnames_w_cast = ", ".join(
6428 (f"p{i}" if col not in add_sentinel_set else expr)
6429 for i, (col, _, expr, _) in enumerate(
6430 crud_params_single
6431 )
6432 )
6433
6434 insert_crud_params = [
6435 elem
6436 for elem in crud_params_single
6437 if elem[0] not in add_sentinel_set
6438 ]
6439
6440 text += (
6441 f" SELECT {colnames_w_cast} FROM "
6442 f"(VALUES ({insert_single_values_expr})) "
6443 f"AS imp_sen({colnames}, sen_counter) "
6444 "ORDER BY sen_counter"
6445 )
6446
6447 else:
6448 # otherwise, if no sentinel or backend doesn't support
6449 # orderable subquery form, use a plain VALUES list
6450 embed_sentinel_value = False
6451 insert_crud_params = crud_params_single
6452 insert_single_values_expr = ", ".join(
6453 [value for _, _, value, _ in crud_params_single]
6454 )
6455
6456 text += f" VALUES ({insert_single_values_expr})"
6457
6458 self._insertmanyvalues = _InsertManyValues(
6459 is_default_expr=False,
6460 single_values_expr=insert_single_values_expr,
6461 insert_crud_params=insert_crud_params,
6462 num_positional_params_counted=counted_bindparam,
6463 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6464 includes_upsert_behaviors=(
6465 insert_stmt._post_values_clause is not None
6466 ),
6467 sentinel_columns=add_sentinel_cols,
6468 num_sentinel_columns=(
6469 len(add_sentinel_cols) if add_sentinel_cols else 0
6470 ),
6471 sentinel_param_keys=named_sentinel_params,
6472 implicit_sentinel=implicit_sentinel,
6473 embed_values_counter=embed_sentinel_value,
6474 )
6475
6476 else:
6477 insert_single_values_expr = ", ".join(
6478 [value for _, _, value, _ in crud_params_single]
6479 )
6480
6481 text += f" VALUES ({insert_single_values_expr})"
6482
6483 if insert_stmt._post_values_clause is not None:
6484 post_values_clause = self.process(
6485 insert_stmt._post_values_clause, **kw
6486 )
6487 if post_values_clause:
6488 text += " " + post_values_clause
6489
6490 if returning_clause and not self.returning_precedes_values:
6491 text += " " + returning_clause
6492
6493 if self.ctes and not self.dialect.cte_follows_insert:
6494 nesting_level = len(self.stack) if not toplevel else None
6495 text = (
6496 self._render_cte_clause(
6497 nesting_level=nesting_level,
6498 include_following_stack=True,
6499 )
6500 + text
6501 )
6502
6503 self.stack.pop(-1)
6504
6505 return text
6506
6507 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6508 """Provide a hook to override the initial table clause
6509 in an UPDATE statement.
6510
6511 MySQL overrides this.
6512
6513 """
6514 kw["asfrom"] = True
6515 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6516
6517 def update_from_clause(
6518 self, update_stmt, from_table, extra_froms, from_hints, **kw
6519 ):
6520 """Provide a hook to override the generation of an
6521 UPDATE..FROM clause.
6522 MySQL and MSSQL override this.
6523 """
6524 raise NotImplementedError(
6525 "This backend does not support multiple-table "
6526 "criteria within UPDATE"
6527 )
6528
6529 def update_post_criteria_clause(
6530 self, update_stmt: Update, **kw: Any
6531 ) -> Optional[str]:
6532 """provide a hook to override generation after the WHERE criteria
6533 in an UPDATE statement
6534
6535 .. versionadded:: 2.1
6536
6537 """
6538 if update_stmt._post_criteria_clause is not None:
6539 return self.process(
6540 update_stmt._post_criteria_clause,
6541 **kw,
6542 )
6543 else:
6544 return None
6545
6546 def delete_post_criteria_clause(
6547 self, delete_stmt: Delete, **kw: Any
6548 ) -> Optional[str]:
6549 """provide a hook to override generation after the WHERE criteria
6550 in a DELETE statement
6551
6552 .. versionadded:: 2.1
6553
6554 """
6555 if delete_stmt._post_criteria_clause is not None:
6556 return self.process(
6557 delete_stmt._post_criteria_clause,
6558 **kw,
6559 )
6560 else:
6561 return None
6562
6563 def visit_update(
6564 self,
6565 update_stmt: Update,
6566 visiting_cte: Optional[CTE] = None,
6567 **kw: Any,
6568 ) -> str:
6569 compile_state = update_stmt._compile_state_factory(
6570 update_stmt, self, **kw
6571 )
6572 if TYPE_CHECKING:
6573 assert isinstance(compile_state, UpdateDMLState)
6574 update_stmt = compile_state.statement # type: ignore[assignment]
6575
6576 if visiting_cte is not None:
6577 kw["visiting_cte"] = visiting_cte
6578 toplevel = False
6579 else:
6580 toplevel = not self.stack
6581
6582 if toplevel:
6583 self.isupdate = True
6584 if not self.dml_compile_state:
6585 self.dml_compile_state = compile_state
6586 if not self.compile_state:
6587 self.compile_state = compile_state
6588
6589 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6590 from_linter = FromLinter({}, set())
6591 warn_linting = self.linting & WARN_LINTING
6592 if toplevel:
6593 self.from_linter = from_linter
6594 else:
6595 from_linter = None
6596 warn_linting = False
6597
6598 extra_froms = compile_state._extra_froms
6599 is_multitable = bool(extra_froms)
6600
6601 if is_multitable:
6602 # main table might be a JOIN
6603 main_froms = set(_from_objects(update_stmt.table))
6604 render_extra_froms = [
6605 f for f in extra_froms if f not in main_froms
6606 ]
6607 correlate_froms = main_froms.union(extra_froms)
6608 else:
6609 render_extra_froms = []
6610 correlate_froms = {update_stmt.table}
6611
6612 self.stack.append(
6613 {
6614 "correlate_froms": correlate_froms,
6615 "asfrom_froms": correlate_froms,
6616 "selectable": update_stmt,
6617 }
6618 )
6619
6620 text = "UPDATE "
6621
6622 if update_stmt._prefixes:
6623 text += self._generate_prefixes(
6624 update_stmt, update_stmt._prefixes, **kw
6625 )
6626
6627 table_text = self.update_tables_clause(
6628 update_stmt,
6629 update_stmt.table,
6630 render_extra_froms,
6631 from_linter=from_linter,
6632 **kw,
6633 )
6634 crud_params_struct = crud._get_crud_params(
6635 self, update_stmt, compile_state, toplevel, **kw
6636 )
6637 crud_params = crud_params_struct.single_params
6638
6639 if update_stmt._hints:
6640 dialect_hints, table_text = self._setup_crud_hints(
6641 update_stmt, table_text
6642 )
6643 else:
6644 dialect_hints = None
6645
6646 if update_stmt._independent_ctes:
6647 self._dispatch_independent_ctes(update_stmt, kw)
6648
6649 text += table_text
6650
6651 text += " SET "
6652 text += ", ".join(
6653 expr + "=" + value
6654 for _, expr, value, _ in cast(
6655 "List[Tuple[Any, str, str, Any]]", crud_params
6656 )
6657 )
6658
6659 if self.implicit_returning or update_stmt._returning:
6660 if self.returning_precedes_values:
6661 text += " " + self.returning_clause(
6662 update_stmt,
6663 self.implicit_returning or update_stmt._returning,
6664 populate_result_map=toplevel,
6665 )
6666
6667 if extra_froms:
6668 extra_from_text = self.update_from_clause(
6669 update_stmt,
6670 update_stmt.table,
6671 render_extra_froms,
6672 dialect_hints,
6673 from_linter=from_linter,
6674 **kw,
6675 )
6676 if extra_from_text:
6677 text += " " + extra_from_text
6678
6679 if update_stmt._where_criteria:
6680 t = self._generate_delimited_and_list(
6681 update_stmt._where_criteria, from_linter=from_linter, **kw
6682 )
6683 if t:
6684 text += " WHERE " + t
6685
6686 ulc = self.update_post_criteria_clause(
6687 update_stmt, from_linter=from_linter, **kw
6688 )
6689 if ulc:
6690 text += " " + ulc
6691
6692 if (
6693 self.implicit_returning or update_stmt._returning
6694 ) and not self.returning_precedes_values:
6695 text += " " + self.returning_clause(
6696 update_stmt,
6697 self.implicit_returning or update_stmt._returning,
6698 populate_result_map=toplevel,
6699 )
6700
6701 if self.ctes:
6702 nesting_level = len(self.stack) if not toplevel else None
6703 text = self._render_cte_clause(nesting_level=nesting_level) + text
6704
6705 if warn_linting:
6706 assert from_linter is not None
6707 from_linter.warn(stmt_type="UPDATE")
6708
6709 self.stack.pop(-1)
6710
6711 return text # type: ignore[no-any-return]
6712
6713 def delete_extra_from_clause(
6714 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6715 ):
6716 """Provide a hook to override the generation of an
6717 DELETE..FROM clause.
6718
6719 This can be used to implement DELETE..USING for example.
6720
6721 MySQL and MSSQL override this.
6722
6723 """
6724 raise NotImplementedError(
6725 "This backend does not support multiple-table "
6726 "criteria within DELETE"
6727 )
6728
6729 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6730 return from_table._compiler_dispatch(
6731 self, asfrom=True, iscrud=True, **kw
6732 )
6733
6734 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6735 compile_state = delete_stmt._compile_state_factory(
6736 delete_stmt, self, **kw
6737 )
6738 delete_stmt = compile_state.statement
6739
6740 if visiting_cte is not None:
6741 kw["visiting_cte"] = visiting_cte
6742 toplevel = False
6743 else:
6744 toplevel = not self.stack
6745
6746 if toplevel:
6747 self.isdelete = True
6748 if not self.dml_compile_state:
6749 self.dml_compile_state = compile_state
6750 if not self.compile_state:
6751 self.compile_state = compile_state
6752
6753 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6754 from_linter = FromLinter({}, set())
6755 warn_linting = self.linting & WARN_LINTING
6756 if toplevel:
6757 self.from_linter = from_linter
6758 else:
6759 from_linter = None
6760 warn_linting = False
6761
6762 extra_froms = compile_state._extra_froms
6763
6764 correlate_froms = {delete_stmt.table}.union(extra_froms)
6765 self.stack.append(
6766 {
6767 "correlate_froms": correlate_froms,
6768 "asfrom_froms": correlate_froms,
6769 "selectable": delete_stmt,
6770 }
6771 )
6772
6773 text = "DELETE "
6774
6775 if delete_stmt._prefixes:
6776 text += self._generate_prefixes(
6777 delete_stmt, delete_stmt._prefixes, **kw
6778 )
6779
6780 text += "FROM "
6781
6782 try:
6783 table_text = self.delete_table_clause(
6784 delete_stmt,
6785 delete_stmt.table,
6786 extra_froms,
6787 from_linter=from_linter,
6788 )
6789 except TypeError:
6790 # anticipate 3rd party dialects that don't include **kw
6791 # TODO: remove in 2.1
6792 table_text = self.delete_table_clause(
6793 delete_stmt, delete_stmt.table, extra_froms
6794 )
6795 if from_linter:
6796 _ = self.process(delete_stmt.table, from_linter=from_linter)
6797
6798 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6799
6800 if delete_stmt._hints:
6801 dialect_hints, table_text = self._setup_crud_hints(
6802 delete_stmt, table_text
6803 )
6804 else:
6805 dialect_hints = None
6806
6807 if delete_stmt._independent_ctes:
6808 self._dispatch_independent_ctes(delete_stmt, kw)
6809
6810 text += table_text
6811
6812 if (
6813 self.implicit_returning or delete_stmt._returning
6814 ) and self.returning_precedes_values:
6815 text += " " + self.returning_clause(
6816 delete_stmt,
6817 self.implicit_returning or delete_stmt._returning,
6818 populate_result_map=toplevel,
6819 )
6820
6821 if extra_froms:
6822 extra_from_text = self.delete_extra_from_clause(
6823 delete_stmt,
6824 delete_stmt.table,
6825 extra_froms,
6826 dialect_hints,
6827 from_linter=from_linter,
6828 **kw,
6829 )
6830 if extra_from_text:
6831 text += " " + extra_from_text
6832
6833 if delete_stmt._where_criteria:
6834 t = self._generate_delimited_and_list(
6835 delete_stmt._where_criteria, from_linter=from_linter, **kw
6836 )
6837 if t:
6838 text += " WHERE " + t
6839
6840 dlc = self.delete_post_criteria_clause(
6841 delete_stmt, from_linter=from_linter, **kw
6842 )
6843 if dlc:
6844 text += " " + dlc
6845
6846 if (
6847 self.implicit_returning or delete_stmt._returning
6848 ) and not self.returning_precedes_values:
6849 text += " " + self.returning_clause(
6850 delete_stmt,
6851 self.implicit_returning or delete_stmt._returning,
6852 populate_result_map=toplevel,
6853 )
6854
6855 if self.ctes:
6856 nesting_level = len(self.stack) if not toplevel else None
6857 text = self._render_cte_clause(nesting_level=nesting_level) + text
6858
6859 if warn_linting:
6860 assert from_linter is not None
6861 from_linter.warn(stmt_type="DELETE")
6862
6863 self.stack.pop(-1)
6864
6865 return text
6866
6867 def visit_savepoint(self, savepoint_stmt, **kw):
6868 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6869
6870 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6871 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6872 savepoint_stmt
6873 )
6874
6875 def visit_release_savepoint(self, savepoint_stmt, **kw):
6876 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6877 savepoint_stmt
6878 )
6879
6880
6881class StrSQLCompiler(SQLCompiler):
6882 """A :class:`.SQLCompiler` subclass which allows a small selection
6883 of non-standard SQL features to render into a string value.
6884
6885 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6886 element is directly stringified without calling upon the
6887 :meth:`_expression.ClauseElement.compile` method.
6888 It can render a limited set
6889 of non-standard SQL constructs to assist in basic stringification,
6890 however for more substantial custom or dialect-specific SQL constructs,
6891 it will be necessary to make use of
6892 :meth:`_expression.ClauseElement.compile`
6893 directly.
6894
6895 .. seealso::
6896
6897 :ref:`faq_sql_expression_string`
6898
6899 """
6900
6901 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
6902 return "DISTINCT " if select._distinct else ""
6903
6904 def _fallback_column_name(self, column):
6905 return "<name unknown>"
6906
6907 @util.preload_module("sqlalchemy.engine.url")
6908 def visit_unsupported_compilation(self, element, err, **kw):
6909 if element.stringify_dialect != "default":
6910 url = util.preloaded.engine_url
6911 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6912
6913 compiler = dialect.statement_compiler(
6914 dialect, None, _supporting_against=self
6915 )
6916 if not isinstance(compiler, StrSQLCompiler):
6917 return compiler.process(element, **kw)
6918
6919 return super().visit_unsupported_compilation(element, err)
6920
6921 def visit_getitem_binary(self, binary, operator, **kw):
6922 return "%s[%s]" % (
6923 self.process(binary.left, **kw),
6924 self.process(binary.right, **kw),
6925 )
6926
6927 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6928 return self.visit_getitem_binary(binary, operator, **kw)
6929
6930 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6931 return self.visit_getitem_binary(binary, operator, **kw)
6932
6933 def visit_sequence(self, sequence, **kw):
6934 return (
6935 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6936 )
6937
6938 def returning_clause(
6939 self,
6940 stmt: UpdateBase,
6941 returning_cols: Sequence[_ColumnsClauseElement],
6942 *,
6943 populate_result_map: bool,
6944 **kw: Any,
6945 ) -> str:
6946 columns = [
6947 self._label_select_column(None, c, True, False, {})
6948 for c in base._select_iterables(returning_cols)
6949 ]
6950 return "RETURNING " + ", ".join(columns)
6951
6952 def update_from_clause(
6953 self, update_stmt, from_table, extra_froms, from_hints, **kw
6954 ):
6955 kw["asfrom"] = True
6956 return "FROM " + ", ".join(
6957 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6958 for t in extra_froms
6959 )
6960
6961 def delete_extra_from_clause(
6962 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6963 ):
6964 kw["asfrom"] = True
6965 return ", " + ", ".join(
6966 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6967 for t in extra_froms
6968 )
6969
6970 def visit_empty_set_expr(self, element_types, **kw):
6971 return "SELECT 1 WHERE 1!=1"
6972
6973 def get_from_hint_text(self, table, text):
6974 return "[%s]" % text
6975
6976 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6977 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6978
6979 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6980 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6981
6982 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6983 return "<regexp replace>(%s, %s)" % (
6984 binary.left._compiler_dispatch(self, **kw),
6985 binary.right._compiler_dispatch(self, **kw),
6986 )
6987
6988 def visit_try_cast(self, cast, **kwargs):
6989 return "TRY_CAST(%s AS %s)" % (
6990 cast.clause._compiler_dispatch(self, **kwargs),
6991 cast.typeclause._compiler_dispatch(self, **kwargs),
6992 )
6993
6994
6995class DDLCompiler(Compiled):
6996 is_ddl = True
6997
6998 if TYPE_CHECKING:
6999
7000 def __init__(
7001 self,
7002 dialect: Dialect,
7003 statement: ExecutableDDLElement,
7004 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
7005 render_schema_translate: bool = ...,
7006 compile_kwargs: Mapping[str, Any] = ...,
7007 ): ...
7008
7009 @util.ro_memoized_property
7010 def sql_compiler(self) -> SQLCompiler:
7011 return self.dialect.statement_compiler(
7012 self.dialect, None, schema_translate_map=self.schema_translate_map
7013 )
7014
7015 @util.memoized_property
7016 def type_compiler(self):
7017 return self.dialect.type_compiler_instance
7018
7019 def construct_params(
7020 self,
7021 params: Optional[_CoreSingleExecuteParams] = None,
7022 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
7023 escape_names: bool = True,
7024 ) -> Optional[_MutableCoreSingleExecuteParams]:
7025 return None
7026
7027 def visit_ddl(self, ddl, **kwargs):
7028 # table events can substitute table and schema name
7029 context = ddl.context
7030 if isinstance(ddl.target, schema.Table):
7031 context = context.copy()
7032
7033 preparer = self.preparer
7034 path = preparer.format_table_seq(ddl.target)
7035 if len(path) == 1:
7036 table, sch = path[0], ""
7037 else:
7038 table, sch = path[-1], path[0]
7039
7040 context.setdefault("table", table)
7041 context.setdefault("schema", sch)
7042 context.setdefault("fullname", preparer.format_table(ddl.target))
7043
7044 return self.sql_compiler.post_process_text(ddl.statement % context)
7045
7046 def visit_create_schema(self, create, **kw):
7047 text = "CREATE SCHEMA "
7048 if create.if_not_exists:
7049 text += "IF NOT EXISTS "
7050 return text + self.preparer.format_schema(create.element)
7051
7052 def visit_drop_schema(self, drop, **kw):
7053 text = "DROP SCHEMA "
7054 if drop.if_exists:
7055 text += "IF EXISTS "
7056 text += self.preparer.format_schema(drop.element)
7057 if drop.cascade:
7058 text += " CASCADE"
7059 return text
7060
7061 def visit_create_table(self, create, **kw):
7062 table = create.element
7063 preparer = self.preparer
7064
7065 text = "\nCREATE "
7066 if table._prefixes:
7067 text += " ".join(table._prefixes) + " "
7068
7069 text += "TABLE "
7070 if create.if_not_exists:
7071 text += "IF NOT EXISTS "
7072
7073 text += preparer.format_table(table) + " "
7074
7075 create_table_suffix = self.create_table_suffix(table)
7076 if create_table_suffix:
7077 text += create_table_suffix + " "
7078
7079 text += "("
7080
7081 separator = "\n"
7082
7083 # if only one primary key, specify it along with the column
7084 first_pk = False
7085 for create_column in create.columns:
7086 column = create_column.element
7087 try:
7088 processed = self.process(
7089 create_column, first_pk=column.primary_key and not first_pk
7090 )
7091 if processed is not None:
7092 text += separator
7093 separator = ", \n"
7094 text += "\t" + processed
7095 if column.primary_key:
7096 first_pk = True
7097 except exc.CompileError as ce:
7098 raise exc.CompileError(
7099 "(in table '%s', column '%s'): %s"
7100 % (table.description, column.name, ce.args[0])
7101 ) from ce
7102
7103 const = self.create_table_constraints(
7104 table,
7105 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
7106 )
7107 if const:
7108 text += separator + "\t" + const
7109
7110 text += "\n)%s\n\n" % self.post_create_table(table)
7111 return text
7112
7113 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
7114 return self._generate_table_select(element, "view", **kw)
7115
7116 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
7117 return self._generate_table_select(element, "create_table_as", **kw)
7118
7119 def _generate_table_select(
7120 self,
7121 element: _TableViaSelect,
7122 type_: str,
7123 if_not_exists: Optional[bool] = None,
7124 **kw: Any,
7125 ) -> str:
7126 prep = self.preparer
7127
7128 inner_kw = dict(kw)
7129 inner_kw["literal_binds"] = True
7130 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7131
7132 # Use if_not_exists parameter if provided, otherwise use element's
7133 use_if_not_exists = (
7134 if_not_exists
7135 if if_not_exists is not None
7136 else element.if_not_exists
7137 )
7138
7139 parts = [
7140 "CREATE",
7141 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7142 "TEMPORARY" if element.temporary else None,
7143 (
7144 "MATERIALIZED VIEW"
7145 if type_ == "view" and getattr(element, "materialized", False)
7146 else "TABLE" if type_ == "create_table_as" else "VIEW"
7147 ),
7148 "IF NOT EXISTS" if use_if_not_exists else None,
7149 prep.format_table(element.table),
7150 "AS",
7151 select_sql,
7152 ]
7153 return " ".join(p for p in parts if p)
7154
7155 def visit_create_column(self, create, first_pk=False, **kw):
7156 column = create.element
7157
7158 if column.system:
7159 return None
7160
7161 text = self.get_column_specification(column, first_pk=first_pk)
7162 const = " ".join(
7163 self.process(constraint) for constraint in column.constraints
7164 )
7165 if const:
7166 text += " " + const
7167
7168 return text
7169
7170 def create_table_constraints(
7171 self, table, _include_foreign_key_constraints=None, **kw
7172 ):
7173 # On some DB order is significant: visit PK first, then the
7174 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7175 constraints = []
7176 if table.primary_key:
7177 constraints.append(table.primary_key)
7178
7179 all_fkcs = table.foreign_key_constraints
7180 if _include_foreign_key_constraints is not None:
7181 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7182 else:
7183 omit_fkcs = set()
7184
7185 constraints.extend(
7186 [
7187 c
7188 for c in table._sorted_constraints
7189 if c is not table.primary_key and c not in omit_fkcs
7190 ]
7191 )
7192
7193 return ", \n\t".join(
7194 p
7195 for p in (
7196 self.process(constraint)
7197 for constraint in constraints
7198 if (constraint._should_create_for_compiler(self))
7199 and (
7200 not self.dialect.supports_alter
7201 or not getattr(constraint, "use_alter", False)
7202 )
7203 )
7204 if p is not None
7205 )
7206
7207 def visit_drop_table(self, drop, **kw):
7208 text = "\nDROP TABLE "
7209 if drop.if_exists:
7210 text += "IF EXISTS "
7211 return text + self.preparer.format_table(drop.element)
7212
7213 def visit_drop_view(self, drop, **kw):
7214 text = "\nDROP "
7215 if drop.materialized:
7216 text += "MATERIALIZED VIEW "
7217 else:
7218 text += "VIEW "
7219 if drop.if_exists:
7220 text += "IF EXISTS "
7221 return text + self.preparer.format_table(drop.element)
7222
7223 def _verify_index_table(self, index: Index) -> None:
7224 if index.table is None:
7225 raise exc.CompileError(
7226 "Index '%s' is not associated with any table." % index.name
7227 )
7228
7229 def visit_create_index(
7230 self, create, include_schema=False, include_table_schema=True, **kw
7231 ):
7232 index = create.element
7233 self._verify_index_table(index)
7234 preparer = self.preparer
7235 text = "CREATE "
7236 if index.unique:
7237 text += "UNIQUE "
7238 if index.name is None:
7239 raise exc.CompileError(
7240 "CREATE INDEX requires that the index have a name"
7241 )
7242
7243 text += "INDEX "
7244 if create.if_not_exists:
7245 text += "IF NOT EXISTS "
7246
7247 text += "%s ON %s (%s)" % (
7248 self._prepared_index_name(index, include_schema=include_schema),
7249 preparer.format_table(
7250 index.table, use_schema=include_table_schema
7251 ),
7252 ", ".join(
7253 self.sql_compiler.process(
7254 expr, include_table=False, literal_binds=True
7255 )
7256 for expr in index.expressions
7257 ),
7258 )
7259 return text
7260
7261 def visit_drop_index(self, drop, **kw):
7262 index = drop.element
7263
7264 if index.name is None:
7265 raise exc.CompileError(
7266 "DROP INDEX requires that the index have a name"
7267 )
7268 text = "\nDROP INDEX "
7269 if drop.if_exists:
7270 text += "IF EXISTS "
7271
7272 return text + self._prepared_index_name(index, include_schema=True)
7273
7274 def _prepared_index_name(
7275 self, index: Index, include_schema: bool = False
7276 ) -> str:
7277 if index.table is not None:
7278 effective_schema = self.preparer.schema_for_object(index.table)
7279 else:
7280 effective_schema = None
7281 if include_schema and effective_schema:
7282 schema_name = self.preparer.quote_schema(effective_schema)
7283 else:
7284 schema_name = None
7285
7286 index_name: str = self.preparer.format_index(index)
7287
7288 if schema_name:
7289 index_name = schema_name + "." + index_name
7290 return index_name
7291
7292 def visit_add_constraint(self, create, **kw):
7293 return "ALTER TABLE %s ADD %s" % (
7294 self.preparer.format_table(create.element.table),
7295 self.process(create.element),
7296 )
7297
7298 def visit_set_table_comment(self, create, **kw):
7299 return "COMMENT ON TABLE %s IS %s" % (
7300 self.preparer.format_table(create.element),
7301 self.sql_compiler.render_literal_value(
7302 create.element.comment, sqltypes.String()
7303 ),
7304 )
7305
7306 def visit_drop_table_comment(self, drop, **kw):
7307 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7308 drop.element
7309 )
7310
7311 def visit_set_column_comment(self, create, **kw):
7312 return "COMMENT ON COLUMN %s IS %s" % (
7313 self.preparer.format_column(
7314 create.element, use_table=True, use_schema=True
7315 ),
7316 self.sql_compiler.render_literal_value(
7317 create.element.comment, sqltypes.String()
7318 ),
7319 )
7320
7321 def visit_drop_column_comment(self, drop, **kw):
7322 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7323 drop.element, use_table=True
7324 )
7325
7326 def visit_set_constraint_comment(self, create, **kw):
7327 raise exc.UnsupportedCompilationError(self, type(create))
7328
7329 def visit_drop_constraint_comment(self, drop, **kw):
7330 raise exc.UnsupportedCompilationError(self, type(drop))
7331
7332 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7333 text = []
7334 if identity_options.increment is not None:
7335 text.append("INCREMENT BY %d" % identity_options.increment)
7336 if identity_options.start is not None:
7337 text.append("START WITH %d" % identity_options.start)
7338 if identity_options.minvalue is not None:
7339 text.append("MINVALUE %d" % identity_options.minvalue)
7340 if identity_options.maxvalue is not None:
7341 text.append("MAXVALUE %d" % identity_options.maxvalue)
7342 if identity_options.nominvalue is not None:
7343 text.append("NO MINVALUE")
7344 if identity_options.nomaxvalue is not None:
7345 text.append("NO MAXVALUE")
7346 if identity_options.cache is not None:
7347 text.append("CACHE %d" % identity_options.cache)
7348 if identity_options.cycle is not None:
7349 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7350 return " ".join(text)
7351
7352 def visit_create_sequence(self, create, prefix=None, **kw):
7353 text = "CREATE SEQUENCE "
7354 if create.if_not_exists:
7355 text += "IF NOT EXISTS "
7356 text += self.preparer.format_sequence(create.element)
7357
7358 if prefix:
7359 text += prefix
7360 options = self.get_identity_options(create.element)
7361 if options:
7362 text += " " + options
7363 return text
7364
7365 def visit_drop_sequence(self, drop, **kw):
7366 text = "DROP SEQUENCE "
7367 if drop.if_exists:
7368 text += "IF EXISTS "
7369 return text + self.preparer.format_sequence(drop.element)
7370
7371 def visit_drop_constraint(self, drop, **kw):
7372 constraint = drop.element
7373 if constraint.name is not None:
7374 formatted_name = self.preparer.format_constraint(constraint)
7375 else:
7376 formatted_name = None
7377
7378 if formatted_name is None:
7379 raise exc.CompileError(
7380 "Can't emit DROP CONSTRAINT for constraint %r; "
7381 "it has no name" % drop.element
7382 )
7383 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7384 self.preparer.format_table(drop.element.table),
7385 "IF EXISTS " if drop.if_exists else "",
7386 formatted_name,
7387 " CASCADE" if drop.cascade else "",
7388 )
7389
7390 def get_column_specification(
7391 self, column: Column[Any], **kwargs: Any
7392 ) -> str:
7393 colspec = (
7394 self.preparer.format_column(column)
7395 + " "
7396 + self.dialect.type_compiler_instance.process(
7397 column.type, type_expression=column
7398 )
7399 )
7400 default = self.get_column_default_string(column)
7401 if default is not None:
7402 colspec += " DEFAULT " + default
7403
7404 if column.computed is not None:
7405 colspec += " " + self.process(column.computed)
7406
7407 if (
7408 column.identity is not None
7409 and self.dialect.supports_identity_columns
7410 ):
7411 colspec += " " + self.process(column.identity)
7412
7413 if not column.nullable and (
7414 not column.identity or not self.dialect.supports_identity_columns
7415 ):
7416 colspec += " NOT NULL"
7417 return colspec
7418
7419 def create_table_suffix(self, table: Table) -> str:
7420 return ""
7421
7422 def post_create_table(self, table: Table) -> str:
7423 return ""
7424
7425 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7426 if isinstance(column.server_default, schema.DefaultClause):
7427 return self.render_default_string(column.server_default.arg)
7428 else:
7429 return None
7430
7431 def render_default_string(self, default: Union[Visitable, str]) -> str:
7432 if isinstance(default, str):
7433 return self.sql_compiler.render_literal_value(
7434 default, sqltypes.STRINGTYPE
7435 )
7436 else:
7437 return self.sql_compiler.process(default, literal_binds=True)
7438
7439 def visit_table_or_column_check_constraint(self, constraint, **kw):
7440 if constraint.is_column_level:
7441 return self.visit_column_check_constraint(constraint)
7442 else:
7443 return self.visit_check_constraint(constraint)
7444
7445 def visit_check_constraint(self, constraint, **kw):
7446 text = self.define_constraint_preamble(constraint, **kw)
7447 text += self.define_check_body(constraint, **kw)
7448 text += self.define_constraint_deferrability(constraint)
7449 return text
7450
7451 def visit_column_check_constraint(self, constraint, **kw):
7452 text = self.define_constraint_preamble(constraint, **kw)
7453 text += self.define_check_body(constraint, **kw)
7454 text += self.define_constraint_deferrability(constraint)
7455 return text
7456
7457 def visit_primary_key_constraint(
7458 self, constraint: PrimaryKeyConstraint, **kw: Any
7459 ) -> str:
7460 if len(constraint) == 0:
7461 return ""
7462 text = self.define_constraint_preamble(constraint, **kw)
7463 text += self.define_primary_key_body(constraint, **kw)
7464 text += self.define_constraint_deferrability(constraint)
7465 return text
7466
7467 def visit_foreign_key_constraint(
7468 self, constraint: ForeignKeyConstraint, **kw: Any
7469 ) -> str:
7470 text = self.define_constraint_preamble(constraint, **kw)
7471 text += self.define_foreign_key_body(constraint, **kw)
7472 text += self.define_constraint_match(constraint)
7473 text += self.define_constraint_cascades(constraint)
7474 text += self.define_constraint_deferrability(constraint)
7475 return text
7476
7477 def define_constraint_remote_table(self, constraint, table, preparer):
7478 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7479
7480 return preparer.format_table(table)
7481
7482 def visit_unique_constraint(
7483 self, constraint: UniqueConstraint, **kw: Any
7484 ) -> str:
7485 if len(constraint) == 0:
7486 return ""
7487 text = self.define_constraint_preamble(constraint, **kw)
7488 text += self.define_unique_body(constraint, **kw)
7489 text += self.define_constraint_deferrability(constraint)
7490 return text
7491
7492 def define_constraint_preamble(
7493 self, constraint: Constraint, **kw: Any
7494 ) -> str:
7495 text = ""
7496 if constraint.name is not None:
7497 formatted_name = self.preparer.format_constraint(constraint)
7498 if formatted_name is not None:
7499 text += "CONSTRAINT %s " % formatted_name
7500 return text
7501
7502 def define_primary_key_body(
7503 self, constraint: PrimaryKeyConstraint, **kw: Any
7504 ) -> str:
7505 text = ""
7506 text += "PRIMARY KEY "
7507 text += "(%s)" % ", ".join(
7508 self.preparer.quote(c.name)
7509 for c in (
7510 constraint.columns_autoinc_first
7511 if constraint._implicit_generated
7512 else constraint.columns
7513 )
7514 )
7515 return text
7516
7517 def define_foreign_key_body(
7518 self, constraint: ForeignKeyConstraint, **kw: Any
7519 ) -> str:
7520 preparer = self.preparer
7521 remote_table = list(constraint.elements)[0].column.table
7522 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7523 ", ".join(
7524 preparer.quote(f.parent.name) for f in constraint.elements
7525 ),
7526 self.define_constraint_remote_table(
7527 constraint, remote_table, preparer
7528 ),
7529 ", ".join(
7530 preparer.quote(f.column.name) for f in constraint.elements
7531 ),
7532 )
7533 return text
7534
7535 def define_unique_body(
7536 self, constraint: UniqueConstraint, **kw: Any
7537 ) -> str:
7538 text = "UNIQUE %s(%s)" % (
7539 self.define_unique_constraint_distinct(constraint, **kw),
7540 ", ".join(self.preparer.quote(c.name) for c in constraint),
7541 )
7542 return text
7543
7544 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7545 text = "CHECK (%s)" % self.sql_compiler.process(
7546 constraint.sqltext, include_table=False, literal_binds=True
7547 )
7548 return text
7549
7550 def define_unique_constraint_distinct(
7551 self, constraint: UniqueConstraint, **kw: Any
7552 ) -> str:
7553 return ""
7554
7555 def define_constraint_cascades(
7556 self, constraint: ForeignKeyConstraint
7557 ) -> str:
7558 text = ""
7559 if constraint.ondelete is not None:
7560 text += self.define_constraint_ondelete_cascade(constraint)
7561
7562 if constraint.onupdate is not None:
7563 text += self.define_constraint_onupdate_cascade(constraint)
7564 return text
7565
7566 def define_constraint_ondelete_cascade(
7567 self, constraint: ForeignKeyConstraint
7568 ) -> str:
7569 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7570 constraint.ondelete, FK_ON_DELETE
7571 )
7572
7573 def define_constraint_onupdate_cascade(
7574 self, constraint: ForeignKeyConstraint
7575 ) -> str:
7576 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7577 constraint.onupdate, FK_ON_UPDATE
7578 )
7579
7580 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7581 text = ""
7582 if constraint.deferrable is not None:
7583 if constraint.deferrable:
7584 text += " DEFERRABLE"
7585 else:
7586 text += " NOT DEFERRABLE"
7587 if constraint.initially is not None:
7588 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7589 constraint.initially, FK_INITIALLY
7590 )
7591 return text
7592
7593 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7594 text = ""
7595 if constraint.match is not None:
7596 text += " MATCH %s" % constraint.match
7597 return text
7598
7599 def visit_computed_column(self, generated, **kw):
7600 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7601 generated.sqltext, include_table=False, literal_binds=True
7602 )
7603 if generated.persisted is True:
7604 text += " STORED"
7605 elif generated.persisted is False:
7606 text += " VIRTUAL"
7607 return text
7608
7609 def visit_identity_column(self, identity, **kw):
7610 text = "GENERATED %s AS IDENTITY" % (
7611 "ALWAYS" if identity.always else "BY DEFAULT",
7612 )
7613 options = self.get_identity_options(identity)
7614 if options:
7615 text += " (%s)" % options
7616 return text
7617
7618
7619class GenericTypeCompiler(TypeCompiler):
7620 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7621 return "FLOAT"
7622
7623 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7624 return "DOUBLE"
7625
7626 def visit_DOUBLE_PRECISION(
7627 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7628 ) -> str:
7629 return "DOUBLE PRECISION"
7630
7631 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7632 return "REAL"
7633
7634 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7635 if type_.precision is None:
7636 return "NUMERIC"
7637 elif type_.scale is None:
7638 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7639 else:
7640 return "NUMERIC(%(precision)s, %(scale)s)" % {
7641 "precision": type_.precision,
7642 "scale": type_.scale,
7643 }
7644
7645 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7646 if type_.precision is None:
7647 return "DECIMAL"
7648 elif type_.scale is None:
7649 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7650 else:
7651 return "DECIMAL(%(precision)s, %(scale)s)" % {
7652 "precision": type_.precision,
7653 "scale": type_.scale,
7654 }
7655
7656 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7657 return "INTEGER"
7658
7659 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7660 return "SMALLINT"
7661
7662 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7663 return "BIGINT"
7664
7665 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7666 return "TIMESTAMP"
7667
7668 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7669 return "DATETIME"
7670
7671 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7672 return "DATE"
7673
7674 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7675 return "TIME"
7676
7677 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7678 return "CLOB"
7679
7680 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7681 return "NCLOB"
7682
7683 def _render_string_type(
7684 self, name: str, length: Optional[int], collation: Optional[str]
7685 ) -> str:
7686 text = name
7687 if length:
7688 text += f"({length})"
7689 if collation:
7690 text += f' COLLATE "{collation}"'
7691 return text
7692
7693 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7694 return self._render_string_type("CHAR", type_.length, type_.collation)
7695
7696 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7697 return self._render_string_type("NCHAR", type_.length, type_.collation)
7698
7699 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7700 return self._render_string_type(
7701 "VARCHAR", type_.length, type_.collation
7702 )
7703
7704 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7705 return self._render_string_type(
7706 "NVARCHAR", type_.length, type_.collation
7707 )
7708
7709 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7710 return self._render_string_type("TEXT", type_.length, type_.collation)
7711
7712 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7713 return "UUID"
7714
7715 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7716 return "BLOB"
7717
7718 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7719 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7720
7721 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7722 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7723
7724 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7725 return "BOOLEAN"
7726
7727 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7728 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7729 return self._render_string_type("CHAR", length=32, collation=None)
7730 else:
7731 return self.visit_UUID(type_, **kw)
7732
7733 def visit_large_binary(
7734 self, type_: sqltypes.LargeBinary, **kw: Any
7735 ) -> str:
7736 return self.visit_BLOB(type_, **kw)
7737
7738 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7739 return self.visit_BOOLEAN(type_, **kw)
7740
7741 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7742 return self.visit_TIME(type_, **kw)
7743
7744 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7745 return self.visit_DATETIME(type_, **kw)
7746
7747 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7748 return self.visit_DATE(type_, **kw)
7749
7750 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7751 return self.visit_BIGINT(type_, **kw)
7752
7753 def visit_small_integer(
7754 self, type_: sqltypes.SmallInteger, **kw: Any
7755 ) -> str:
7756 return self.visit_SMALLINT(type_, **kw)
7757
7758 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7759 return self.visit_INTEGER(type_, **kw)
7760
7761 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7762 return self.visit_REAL(type_, **kw)
7763
7764 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7765 return self.visit_FLOAT(type_, **kw)
7766
7767 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7768 return self.visit_DOUBLE(type_, **kw)
7769
7770 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7771 return self.visit_NUMERIC(type_, **kw)
7772
7773 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7774 return self.visit_VARCHAR(type_, **kw)
7775
7776 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7777 return self.visit_VARCHAR(type_, **kw)
7778
7779 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7780 return self.visit_TEXT(type_, **kw)
7781
7782 def visit_unicode_text(
7783 self, type_: sqltypes.UnicodeText, **kw: Any
7784 ) -> str:
7785 return self.visit_TEXT(type_, **kw)
7786
7787 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7788 return self.visit_VARCHAR(type_, **kw)
7789
7790 def visit_null(self, type_, **kw):
7791 raise exc.CompileError(
7792 "Can't generate DDL for %r; "
7793 "did you forget to specify a "
7794 "type on this Column?" % type_
7795 )
7796
7797 def visit_type_decorator(
7798 self, type_: TypeDecorator[Any], **kw: Any
7799 ) -> str:
7800 return self.process(type_.type_engine(self.dialect), **kw)
7801
7802 def visit_user_defined(
7803 self, type_: UserDefinedType[Any], **kw: Any
7804 ) -> str:
7805 return type_.get_col_spec(**kw)
7806
7807
7808class StrSQLTypeCompiler(GenericTypeCompiler):
7809 def process(self, type_, **kw):
7810 try:
7811 _compiler_dispatch = type_._compiler_dispatch
7812 except AttributeError:
7813 return self._visit_unknown(type_, **kw)
7814 else:
7815 return _compiler_dispatch(self, **kw)
7816
7817 def __getattr__(self, key):
7818 if key.startswith("visit_"):
7819 return self._visit_unknown
7820 else:
7821 raise AttributeError(key)
7822
7823 def _visit_unknown(self, type_, **kw):
7824 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7825 return type_.__class__.__name__
7826 else:
7827 return repr(type_)
7828
7829 def visit_null(self, type_, **kw):
7830 return "NULL"
7831
7832 def visit_user_defined(self, type_, **kw):
7833 try:
7834 get_col_spec = type_.get_col_spec
7835 except AttributeError:
7836 return repr(type_)
7837 else:
7838 return get_col_spec(**kw)
7839
7840
7841class _SchemaForObjectCallable(Protocol):
7842 def __call__(self, obj: Any, /) -> str: ...
7843
7844
7845class _BindNameForColProtocol(Protocol):
7846 def __call__(self, col: ColumnClause[Any]) -> str: ...
7847
7848
7849class IdentifierPreparer:
7850 """Handle quoting and case-folding of identifiers based on options."""
7851
7852 reserved_words = RESERVED_WORDS
7853
7854 legal_characters = LEGAL_CHARACTERS
7855
7856 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7857
7858 initial_quote: str
7859
7860 final_quote: str
7861
7862 _strings: MutableMapping[str, str]
7863
7864 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7865 """Return the .schema attribute for an object.
7866
7867 For the default IdentifierPreparer, the schema for an object is always
7868 the value of the ".schema" attribute. if the preparer is replaced
7869 with one that has a non-empty schema_translate_map, the value of the
7870 ".schema" attribute is rendered a symbol that will be converted to a
7871 real schema name from the mapping post-compile.
7872
7873 """
7874
7875 _includes_none_schema_translate: bool = False
7876
7877 def __init__(
7878 self,
7879 dialect: Dialect,
7880 initial_quote: str = '"',
7881 final_quote: Optional[str] = None,
7882 escape_quote: str = '"',
7883 quote_case_sensitive_collations: bool = True,
7884 omit_schema: bool = False,
7885 ):
7886 """Construct a new ``IdentifierPreparer`` object.
7887
7888 initial_quote
7889 Character that begins a delimited identifier.
7890
7891 final_quote
7892 Character that ends a delimited identifier. Defaults to
7893 `initial_quote`.
7894
7895 omit_schema
7896 Prevent prepending schema name. Useful for databases that do
7897 not support schemae.
7898 """
7899
7900 self.dialect = dialect
7901 self.initial_quote = initial_quote
7902 self.final_quote = final_quote or self.initial_quote
7903 self.escape_quote = escape_quote
7904 self.escape_to_quote = self.escape_quote * 2
7905 self.omit_schema = omit_schema
7906 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7907 self._strings = {}
7908 self._double_percents = self.dialect.paramstyle in (
7909 "format",
7910 "pyformat",
7911 )
7912
7913 def _with_schema_translate(self, schema_translate_map):
7914 prep = self.__class__.__new__(self.__class__)
7915 prep.__dict__.update(self.__dict__)
7916
7917 includes_none = None in schema_translate_map
7918
7919 def symbol_getter(obj):
7920 name = obj.schema
7921 if obj._use_schema_map and (name is not None or includes_none):
7922 if name is not None and ("[" in name or "]" in name):
7923 raise exc.CompileError(
7924 "Square bracket characters ([]) not supported "
7925 "in schema translate name '%s'" % name
7926 )
7927 return quoted_name(
7928 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7929 )
7930 else:
7931 return obj.schema
7932
7933 prep.schema_for_object = symbol_getter
7934 prep._includes_none_schema_translate = includes_none
7935 return prep
7936
7937 def _render_schema_translates(
7938 self, statement: str, schema_translate_map: SchemaTranslateMapType
7939 ) -> str:
7940 d = schema_translate_map
7941 if None in d:
7942 if not self._includes_none_schema_translate:
7943 raise exc.InvalidRequestError(
7944 "schema translate map which previously did not have "
7945 "`None` present as a key now has `None` present; compiled "
7946 "statement may lack adequate placeholders. Please use "
7947 "consistent keys in successive "
7948 "schema_translate_map dictionaries."
7949 )
7950
7951 d["_none"] = d[None] # type: ignore[index]
7952
7953 def replace(m):
7954 name = m.group(2)
7955 if name in d:
7956 effective_schema = d[name]
7957 else:
7958 if name in (None, "_none"):
7959 raise exc.InvalidRequestError(
7960 "schema translate map which previously had `None` "
7961 "present as a key now no longer has it present; don't "
7962 "know how to apply schema for compiled statement. "
7963 "Please use consistent keys in successive "
7964 "schema_translate_map dictionaries."
7965 )
7966 effective_schema = name
7967
7968 if not effective_schema:
7969 effective_schema = self.dialect.default_schema_name
7970 if not effective_schema:
7971 # TODO: no coverage here
7972 raise exc.CompileError(
7973 "Dialect has no default schema name; can't "
7974 "use None as dynamic schema target."
7975 )
7976 return self.quote_schema(effective_schema)
7977
7978 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7979
7980 def _escape_identifier(self, value: str) -> str:
7981 """Escape an identifier.
7982
7983 Subclasses should override this to provide database-dependent
7984 escaping behavior.
7985 """
7986
7987 value = value.replace(self.escape_quote, self.escape_to_quote)
7988 if self._double_percents:
7989 value = value.replace("%", "%%")
7990 return value
7991
7992 def _unescape_identifier(self, value: str) -> str:
7993 """Canonicalize an escaped identifier.
7994
7995 Subclasses should override this to provide database-dependent
7996 unescaping behavior that reverses _escape_identifier.
7997 """
7998
7999 return value.replace(self.escape_to_quote, self.escape_quote)
8000
8001 def validate_sql_phrase(self, element, reg):
8002 """keyword sequence filter.
8003
8004 a filter for elements that are intended to represent keyword sequences,
8005 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
8006 should be present.
8007
8008 """
8009
8010 if element is not None and not reg.match(element):
8011 raise exc.CompileError(
8012 "Unexpected SQL phrase: %r (matching against %r)"
8013 % (element, reg.pattern)
8014 )
8015 return element
8016
8017 def quote_identifier(self, value: str) -> str:
8018 """Quote an identifier.
8019
8020 Subclasses should override this to provide database-dependent
8021 quoting behavior.
8022 """
8023
8024 return (
8025 self.initial_quote
8026 + self._escape_identifier(value)
8027 + self.final_quote
8028 )
8029
8030 def _requires_quotes(self, value: str) -> bool:
8031 """Return True if the given identifier requires quoting."""
8032 lc_value = value.lower()
8033 return (
8034 lc_value in self.reserved_words
8035 or value[0] in self.illegal_initial_characters
8036 or not self.legal_characters.match(str(value))
8037 or (lc_value != value)
8038 )
8039
8040 def _requires_quotes_illegal_chars(self, value):
8041 """Return True if the given identifier requires quoting, but
8042 not taking case convention into account."""
8043 return not self.legal_characters.match(str(value))
8044
8045 def quote_schema(self, schema: str) -> str:
8046 """Conditionally quote a schema name.
8047
8048
8049 The name is quoted if it is a reserved word, contains quote-necessary
8050 characters, or is an instance of :class:`.quoted_name` which includes
8051 ``quote`` set to ``True``.
8052
8053 Subclasses can override this to provide database-dependent
8054 quoting behavior for schema names.
8055
8056 :param schema: string schema name
8057 """
8058 return self.quote(schema)
8059
8060 def quote(self, ident: str) -> str:
8061 """Conditionally quote an identifier.
8062
8063 The identifier is quoted if it is a reserved word, contains
8064 quote-necessary characters, or is an instance of
8065 :class:`.quoted_name` which includes ``quote`` set to ``True``.
8066
8067 Subclasses can override this to provide database-dependent
8068 quoting behavior for identifier names.
8069
8070 :param ident: string identifier
8071 """
8072 force = getattr(ident, "quote", None)
8073
8074 if force is None:
8075 if ident in self._strings:
8076 return self._strings[ident]
8077 else:
8078 if self._requires_quotes(ident):
8079 self._strings[ident] = self.quote_identifier(ident)
8080 else:
8081 self._strings[ident] = ident
8082 return self._strings[ident]
8083 elif force:
8084 return self.quote_identifier(ident)
8085 else:
8086 return ident
8087
8088 def format_collation(self, collation_name):
8089 if self.quote_case_sensitive_collations:
8090 return self.quote(collation_name)
8091 else:
8092 return collation_name
8093
8094 def format_sequence(
8095 self, sequence: schema.Sequence, use_schema: bool = True
8096 ) -> str:
8097 name = self.quote(sequence.name)
8098
8099 effective_schema = self.schema_for_object(sequence)
8100
8101 if (
8102 not self.omit_schema
8103 and use_schema
8104 and effective_schema is not None
8105 ):
8106 name = self.quote_schema(effective_schema) + "." + name
8107 return name
8108
8109 def format_label(
8110 self, label: Label[Any], name: Optional[str] = None
8111 ) -> str:
8112 return self.quote(name or label.name)
8113
8114 def format_alias(
8115 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
8116 ) -> str:
8117 if name is None:
8118 assert alias is not None
8119 return self.quote(alias.name)
8120 else:
8121 return self.quote(name)
8122
8123 def format_savepoint(self, savepoint, name=None):
8124 # Running the savepoint name through quoting is unnecessary
8125 # for all known dialects. This is here to support potential
8126 # third party use cases
8127 ident = name or savepoint.ident
8128 if self._requires_quotes(ident):
8129 ident = self.quote_identifier(ident)
8130 return ident
8131
8132 @util.preload_module("sqlalchemy.sql.naming")
8133 def format_constraint(
8134 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
8135 ) -> Optional[str]:
8136 naming = util.preloaded.sql_naming
8137
8138 if constraint.name is _NONE_NAME:
8139 name = naming._constraint_name_for_table(
8140 constraint, constraint.table
8141 )
8142
8143 if name is None:
8144 return None
8145 else:
8146 name = constraint.name
8147
8148 assert name is not None
8149 if constraint.__visit_name__ == "index":
8150 return self.truncate_and_render_index_name(
8151 name, _alembic_quote=_alembic_quote
8152 )
8153 else:
8154 return self.truncate_and_render_constraint_name(
8155 name, _alembic_quote=_alembic_quote
8156 )
8157
8158 def truncate_and_render_index_name(
8159 self, name: str, _alembic_quote: bool = True
8160 ) -> str:
8161 # calculate these at format time so that ad-hoc changes
8162 # to dialect.max_identifier_length etc. can be reflected
8163 # as IdentifierPreparer is long lived
8164 max_ = (
8165 self.dialect.max_index_name_length
8166 or self.dialect.max_identifier_length
8167 )
8168 return self._truncate_and_render_maxlen_name(
8169 name, max_, _alembic_quote
8170 )
8171
8172 def truncate_and_render_constraint_name(
8173 self, name: str, _alembic_quote: bool = True
8174 ) -> str:
8175 # calculate these at format time so that ad-hoc changes
8176 # to dialect.max_identifier_length etc. can be reflected
8177 # as IdentifierPreparer is long lived
8178 max_ = (
8179 self.dialect.max_constraint_name_length
8180 or self.dialect.max_identifier_length
8181 )
8182 return self._truncate_and_render_maxlen_name(
8183 name, max_, _alembic_quote
8184 )
8185
8186 def _truncate_and_render_maxlen_name(
8187 self, name: str, max_: int, _alembic_quote: bool
8188 ) -> str:
8189 if isinstance(name, elements._truncated_label):
8190 if len(name) > max_:
8191 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8192 else:
8193 self.dialect.validate_identifier(name)
8194
8195 if not _alembic_quote:
8196 return name
8197 else:
8198 return self.quote(name)
8199
8200 def format_index(self, index: Index) -> str:
8201 name = self.format_constraint(index)
8202 assert name is not None
8203 return name
8204
8205 def format_table(
8206 self,
8207 table: FromClause,
8208 use_schema: bool = True,
8209 name: Optional[str] = None,
8210 ) -> str:
8211 """Prepare a quoted table and schema name."""
8212 if name is None:
8213 if TYPE_CHECKING:
8214 assert isinstance(table, NamedFromClause)
8215 name = table.name
8216
8217 result = self.quote(name)
8218
8219 effective_schema = self.schema_for_object(table)
8220
8221 if not self.omit_schema and use_schema and effective_schema:
8222 result = self.quote_schema(effective_schema) + "." + result
8223 return result
8224
8225 def format_schema(self, name):
8226 """Prepare a quoted schema name."""
8227
8228 return self.quote(name)
8229
8230 def format_label_name(
8231 self,
8232 name,
8233 anon_map=None,
8234 ):
8235 """Prepare a quoted column name."""
8236
8237 if anon_map is not None and isinstance(
8238 name, elements._truncated_label
8239 ):
8240 name = name.apply_map(anon_map)
8241
8242 return self.quote(name)
8243
8244 def format_column(
8245 self,
8246 column: ColumnElement[Any],
8247 use_table: bool = False,
8248 name: Optional[str] = None,
8249 table_name: Optional[str] = None,
8250 use_schema: bool = False,
8251 anon_map: Optional[Mapping[str, Any]] = None,
8252 ) -> str:
8253 """Prepare a quoted column name."""
8254
8255 if name is None:
8256 name = column.name
8257 assert name is not None
8258
8259 if anon_map is not None and isinstance(
8260 name, elements._truncated_label
8261 ):
8262 name = name.apply_map(anon_map)
8263
8264 if not getattr(column, "is_literal", False):
8265 if use_table:
8266 return (
8267 self.format_table(
8268 column.table, use_schema=use_schema, name=table_name
8269 )
8270 + "."
8271 + self.quote(name)
8272 )
8273 else:
8274 return self.quote(name)
8275 else:
8276 # literal textual elements get stuck into ColumnClause a lot,
8277 # which shouldn't get quoted
8278
8279 if use_table:
8280 return (
8281 self.format_table(
8282 column.table, use_schema=use_schema, name=table_name
8283 )
8284 + "."
8285 + name
8286 )
8287 else:
8288 return name
8289
8290 def format_table_seq(self, table, use_schema=True):
8291 """Format table name and schema as a tuple."""
8292
8293 # Dialects with more levels in their fully qualified references
8294 # ('database', 'owner', etc.) could override this and return
8295 # a longer sequence.
8296
8297 effective_schema = self.schema_for_object(table)
8298
8299 if not self.omit_schema and use_schema and effective_schema:
8300 return (
8301 self.quote_schema(effective_schema),
8302 self.format_table(table, use_schema=False),
8303 )
8304 else:
8305 return (self.format_table(table, use_schema=False),)
8306
8307 @util.memoized_property
8308 def _r_identifiers(self):
8309 initial, final, escaped_final = (
8310 re.escape(s)
8311 for s in (
8312 self.initial_quote,
8313 self.final_quote,
8314 self._escape_identifier(self.final_quote),
8315 )
8316 )
8317 r = re.compile(
8318 r"(?:"
8319 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8320 r"|([^\.]+))(?=\.|$))+"
8321 % {"initial": initial, "final": final, "escaped": escaped_final}
8322 )
8323 return r
8324
8325 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8326 """Unpack 'schema.table.column'-like strings into components."""
8327
8328 r = self._r_identifiers
8329 return [
8330 self._unescape_identifier(i)
8331 for i in [a or b for a, b in r.findall(identifiers)]
8332 ]