1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import Final
44from typing import FrozenSet
45from typing import Iterable
46from typing import Iterator
47from typing import List
48from typing import Literal
49from typing import Mapping
50from typing import MutableMapping
51from typing import NamedTuple
52from typing import NoReturn
53from typing import Optional
54from typing import Pattern
55from typing import Protocol
56from typing import Sequence
57from typing import Set
58from typing import Tuple
59from typing import Type
60from typing import TYPE_CHECKING
61from typing import TypedDict
62from typing import Union
63
64from . import base
65from . import coercions
66from . import crud
67from . import elements
68from . import functions
69from . import operators
70from . import roles
71from . import schema
72from . import selectable
73from . import sqltypes
74from . import util as sql_util
75from ._typing import is_column_element
76from ._typing import is_dml
77from .base import _de_clone
78from .base import _from_objects
79from .base import _NONE_NAME
80from .base import _SentinelDefaultCharacterization
81from .base import NO_ARG
82from .elements import quoted_name
83from .sqltypes import TupleType
84from .visitors import prefix_anon_map
85from .. import exc
86from .. import util
87from ..util import FastIntFlag
88from ..util.typing import Self
89from ..util.typing import TupleAny
90from ..util.typing import Unpack
91
92if typing.TYPE_CHECKING:
93 from .annotation import _AnnotationDict
94 from .base import _AmbiguousTableNameMap
95 from .base import CompileState
96 from .base import Executable
97 from .base import ExecutableStatement
98 from .cache_key import CacheKey
99 from .ddl import _TableViaSelect
100 from .ddl import CreateTableAs
101 from .ddl import CreateView
102 from .ddl import ExecutableDDLElement
103 from .dml import Delete
104 from .dml import Insert
105 from .dml import Update
106 from .dml import UpdateBase
107 from .dml import UpdateDMLState
108 from .dml import ValuesBase
109 from .elements import _truncated_label
110 from .elements import BinaryExpression
111 from .elements import BindParameter
112 from .elements import ClauseElement
113 from .elements import ColumnClause
114 from .elements import ColumnElement
115 from .elements import False_
116 from .elements import Label
117 from .elements import Null
118 from .elements import True_
119 from .functions import Function
120 from .schema import CheckConstraint
121 from .schema import Column
122 from .schema import Constraint
123 from .schema import ForeignKeyConstraint
124 from .schema import IdentityOptions
125 from .schema import Index
126 from .schema import PrimaryKeyConstraint
127 from .schema import Table
128 from .schema import UniqueConstraint
129 from .selectable import _ColumnsClauseElement
130 from .selectable import AliasedReturnsRows
131 from .selectable import CompoundSelectState
132 from .selectable import CTE
133 from .selectable import FromClause
134 from .selectable import NamedFromClause
135 from .selectable import ReturnsRows
136 from .selectable import Select
137 from .selectable import SelectState
138 from .type_api import _BindProcessorType
139 from .type_api import TypeDecorator
140 from .type_api import TypeEngine
141 from .type_api import UserDefinedType
142 from .visitors import Visitable
143 from ..engine.cursor import CursorResultMetaData
144 from ..engine.interfaces import _CoreSingleExecuteParams
145 from ..engine.interfaces import _DBAPIAnyExecuteParams
146 from ..engine.interfaces import _DBAPIMultiExecuteParams
147 from ..engine.interfaces import _DBAPISingleExecuteParams
148 from ..engine.interfaces import _ExecuteOptions
149 from ..engine.interfaces import _GenericSetInputSizesType
150 from ..engine.interfaces import _MutableCoreSingleExecuteParams
151 from ..engine.interfaces import Dialect
152 from ..engine.interfaces import SchemaTranslateMapType
153
154
155_FromHintsType = Dict["FromClause", str]
156
157RESERVED_WORDS = {
158 "all",
159 "analyse",
160 "analyze",
161 "and",
162 "any",
163 "array",
164 "as",
165 "asc",
166 "asymmetric",
167 "authorization",
168 "between",
169 "binary",
170 "both",
171 "case",
172 "cast",
173 "check",
174 "collate",
175 "column",
176 "constraint",
177 "create",
178 "cross",
179 "current_date",
180 "current_role",
181 "current_time",
182 "current_timestamp",
183 "current_user",
184 "default",
185 "deferrable",
186 "desc",
187 "distinct",
188 "do",
189 "else",
190 "end",
191 "except",
192 "false",
193 "for",
194 "foreign",
195 "freeze",
196 "from",
197 "full",
198 "grant",
199 "group",
200 "having",
201 "ilike",
202 "in",
203 "initially",
204 "inner",
205 "intersect",
206 "into",
207 "is",
208 "isnull",
209 "join",
210 "leading",
211 "left",
212 "like",
213 "limit",
214 "localtime",
215 "localtimestamp",
216 "natural",
217 "new",
218 "not",
219 "notnull",
220 "null",
221 "off",
222 "offset",
223 "old",
224 "on",
225 "only",
226 "or",
227 "order",
228 "outer",
229 "overlaps",
230 "placing",
231 "primary",
232 "references",
233 "right",
234 "select",
235 "session_user",
236 "set",
237 "similar",
238 "some",
239 "symmetric",
240 "table",
241 "then",
242 "to",
243 "trailing",
244 "true",
245 "union",
246 "unique",
247 "user",
248 "using",
249 "verbose",
250 "when",
251 "where",
252}
253
254LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
255LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
256ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
257
258FK_ON_DELETE = re.compile(
259 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
260)
261FK_ON_UPDATE = re.compile(
262 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
263)
264FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
265_WINDOW_EXCLUDE_RE = re.compile(
266 r"^(?:CURRENT ROW|GROUP|TIES|NO OTHERS)$", re.I
267)
268BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
269BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
270
271_pyformat_template = "%%(%(name)s)s"
272BIND_TEMPLATES = {
273 "pyformat": _pyformat_template,
274 "qmark": "?",
275 "format": "%%s",
276 "numeric": ":[_POSITION]",
277 "numeric_dollar": "$[_POSITION]",
278 "named": ":%(name)s",
279}
280
281
282OPERATORS = {
283 # binary
284 operators.and_: " AND ",
285 operators.or_: " OR ",
286 operators.add: " + ",
287 operators.mul: " * ",
288 operators.sub: " - ",
289 operators.mod: " % ",
290 operators.neg: "-",
291 operators.lt: " < ",
292 operators.le: " <= ",
293 operators.ne: " != ",
294 operators.gt: " > ",
295 operators.ge: " >= ",
296 operators.eq: " = ",
297 operators.is_distinct_from: " IS DISTINCT FROM ",
298 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
299 operators.concat_op: " || ",
300 operators.match_op: " MATCH ",
301 operators.not_match_op: " NOT MATCH ",
302 operators.in_op: " IN ",
303 operators.not_in_op: " NOT IN ",
304 operators.comma_op: ", ",
305 operators.from_: " FROM ",
306 operators.as_: " AS ",
307 operators.is_: " IS ",
308 operators.is_not: " IS NOT ",
309 operators.collate: " COLLATE ",
310 # unary
311 operators.exists: "EXISTS ",
312 operators.distinct_op: "DISTINCT ",
313 operators.inv: "NOT ",
314 operators.any_op: "ANY ",
315 operators.all_op: "ALL ",
316 # modifiers
317 operators.desc_op: " DESC",
318 operators.asc_op: " ASC",
319 operators.nulls_first_op: " NULLS FIRST",
320 operators.nulls_last_op: " NULLS LAST",
321 # bitwise
322 operators.bitwise_xor_op: " ^ ",
323 operators.bitwise_or_op: " | ",
324 operators.bitwise_and_op: " & ",
325 operators.bitwise_not_op: "~",
326 operators.bitwise_lshift_op: " << ",
327 operators.bitwise_rshift_op: " >> ",
328}
329
330FUNCTIONS: Dict[Type[Function[Any]], str] = {
331 functions.coalesce: "coalesce",
332 functions.current_date: "CURRENT_DATE",
333 functions.current_time: "CURRENT_TIME",
334 functions.current_timestamp: "CURRENT_TIMESTAMP",
335 functions.current_user: "CURRENT_USER",
336 functions.localtime: "LOCALTIME",
337 functions.localtimestamp: "LOCALTIMESTAMP",
338 functions.random: "random",
339 functions.sysdate: "sysdate",
340 functions.session_user: "SESSION_USER",
341 functions.user: "USER",
342 functions.cube: "CUBE",
343 functions.rollup: "ROLLUP",
344 functions.grouping_sets: "GROUPING SETS",
345}
346
347
348EXTRACT_MAP = {
349 "month": "month",
350 "day": "day",
351 "year": "year",
352 "second": "second",
353 "hour": "hour",
354 "doy": "doy",
355 "minute": "minute",
356 "quarter": "quarter",
357 "dow": "dow",
358 "week": "week",
359 "epoch": "epoch",
360 "milliseconds": "milliseconds",
361 "microseconds": "microseconds",
362 "timezone_hour": "timezone_hour",
363 "timezone_minute": "timezone_minute",
364}
365
366COMPOUND_KEYWORDS = {
367 selectable._CompoundSelectKeyword.UNION: "UNION",
368 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
369 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
370 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
371 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
372 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
373}
374
375
376class ResultColumnsEntry(NamedTuple):
377 """Tracks a column expression that is expected to be represented
378 in the result rows for this statement.
379
380 This normally refers to the columns clause of a SELECT statement
381 but may also refer to a RETURNING clause, as well as for dialect-specific
382 emulations.
383
384 """
385
386 keyname: str
387 """string name that's expected in cursor.description"""
388
389 name: str
390 """column name, may be labeled"""
391
392 objects: Tuple[Any, ...]
393 """sequence of objects that should be able to locate this column
394 in a RowMapping. This is typically string names and aliases
395 as well as Column objects.
396
397 """
398
399 type: TypeEngine[Any]
400 """Datatype to be associated with this column. This is where
401 the "result processing" logic directly links the compiled statement
402 to the rows that come back from the cursor.
403
404 """
405
406
407class _ResultMapAppender(Protocol):
408 def __call__(
409 self,
410 keyname: str,
411 name: str,
412 objects: Sequence[Any],
413 type_: TypeEngine[Any],
414 ) -> None: ...
415
416
417# integer indexes into ResultColumnsEntry used by cursor.py.
418# some profiling showed integer access faster than named tuple
419RM_RENDERED_NAME: Literal[0] = 0
420RM_NAME: Literal[1] = 1
421RM_OBJECTS: Literal[2] = 2
422RM_TYPE: Literal[3] = 3
423
424
425class _BaseCompilerStackEntry(TypedDict):
426 asfrom_froms: Set[FromClause]
427 correlate_froms: Set[FromClause]
428 selectable: ReturnsRows
429
430
431class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
432 compile_state: CompileState
433 need_result_map_for_nested: bool
434 need_result_map_for_compound: bool
435 select_0: ReturnsRows
436 insert_from_select: Select[Unpack[TupleAny]]
437
438
439class ExpandedState(NamedTuple):
440 """represents state to use when producing "expanded" and
441 "post compile" bound parameters for a statement.
442
443 "expanded" parameters are parameters that are generated at
444 statement execution time to suit a number of parameters passed, the most
445 prominent example being the individual elements inside of an IN expression.
446
447 "post compile" parameters are parameters where the SQL literal value
448 will be rendered into the SQL statement at execution time, rather than
449 being passed as separate parameters to the driver.
450
451 To create an :class:`.ExpandedState` instance, use the
452 :meth:`.SQLCompiler.construct_expanded_state` method on any
453 :class:`.SQLCompiler` instance.
454
455 """
456
457 statement: str
458 """String SQL statement with parameters fully expanded"""
459
460 parameters: _CoreSingleExecuteParams
461 """Parameter dictionary with parameters fully expanded.
462
463 For a statement that uses named parameters, this dictionary will map
464 exactly to the names in the statement. For a statement that uses
465 positional parameters, the :attr:`.ExpandedState.positional_parameters`
466 will yield a tuple with the positional parameter set.
467
468 """
469
470 processors: Mapping[str, _BindProcessorType[Any]]
471 """mapping of bound value processors"""
472
473 positiontup: Optional[Sequence[str]]
474 """Sequence of string names indicating the order of positional
475 parameters"""
476
477 parameter_expansion: Mapping[str, List[str]]
478 """Mapping representing the intermediary link from original parameter
479 name to list of "expanded" parameter names, for those parameters that
480 were expanded."""
481
482 @property
483 def positional_parameters(self) -> Tuple[Any, ...]:
484 """Tuple of positional parameters, for statements that were compiled
485 using a positional paramstyle.
486
487 """
488 if self.positiontup is None:
489 raise exc.InvalidRequestError(
490 "statement does not use a positional paramstyle"
491 )
492 return tuple(self.parameters[key] for key in self.positiontup)
493
494 @property
495 def additional_parameters(self) -> _CoreSingleExecuteParams:
496 """synonym for :attr:`.ExpandedState.parameters`."""
497 return self.parameters
498
499
500class _InsertManyValues(NamedTuple):
501 """represents state to use for executing an "insertmanyvalues" statement.
502
503 The primary consumers of this object are the
504 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
505 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
506
507 .. versionadded:: 2.0
508
509 """
510
511 is_default_expr: bool
512 """if True, the statement is of the form
513 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
514
515 """
516
517 single_values_expr: str
518 """The rendered "values" clause of the INSERT statement.
519
520 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
521 The insertmanyvalues logic uses this string as a search and replace
522 target.
523
524 """
525
526 insert_crud_params: List[crud._CrudParamElementStr]
527 """List of Column / bind names etc. used while rewriting the statement"""
528
529 num_positional_params_counted: int
530 """the number of bound parameters in a single-row statement.
531
532 This count may be larger or smaller than the actual number of columns
533 targeted in the INSERT, as it accommodates for SQL expressions
534 in the values list that may have zero or more parameters embedded
535 within them.
536
537 This count is part of what's used to organize rewritten parameter lists
538 when batching.
539
540 """
541
542 sort_by_parameter_order: bool = False
543 """if the deterministic_returnined_order parameter were used on the
544 insert.
545
546 All of the attributes following this will only be used if this is True.
547
548 """
549
550 includes_upsert_behaviors: bool = False
551 """if True, we have to accommodate for upsert behaviors.
552
553 This will in some cases downgrade "insertmanyvalues" that requests
554 deterministic ordering.
555
556 """
557
558 sentinel_columns: Optional[Sequence[Column[Any]]] = None
559 """List of sentinel columns that were located.
560
561 This list is only here if the INSERT asked for
562 sort_by_parameter_order=True,
563 and dialect-appropriate sentinel columns were located.
564
565 .. versionadded:: 2.0.10
566
567 """
568
569 num_sentinel_columns: int = 0
570 """how many sentinel columns are in the above list, if any.
571
572 This is the same as
573 ``len(sentinel_columns) if sentinel_columns is not None else 0``
574
575 """
576
577 sentinel_param_keys: Optional[Sequence[str]] = None
578 """parameter str keys in each param dictionary / tuple
579 that would link to the client side "sentinel" values for that row, which
580 we can use to match up parameter sets to result rows.
581
582 This is only present if sentinel_columns is present and the INSERT
583 statement actually refers to client side values for these sentinel
584 columns.
585
586 .. versionadded:: 2.0.10
587
588 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
589 only, used against the "compiled parameteters" collection before
590 the parameters were converted by bound parameter processors
591
592 """
593
594 implicit_sentinel: bool = False
595 """if True, we have exactly one sentinel column and it uses a server side
596 value, currently has to generate an incrementing integer value.
597
598 The dialect in question would have asserted that it supports receiving
599 these values back and sorting on that value as a means of guaranteeing
600 correlation with the incoming parameter list.
601
602 .. versionadded:: 2.0.10
603
604 """
605
606 has_upsert_bound_parameters: bool = False
607 """if True, the upsert SET clause contains bound parameters that will
608 receive their values from the parameters dict (i.e., parametrized
609 bindparams where value is None and callable is None).
610
611 This means we can't batch multiple rows in a single statement, since
612 each row would need different values in the SET clause but there's only
613 one SET clause per statement. See issue #13130.
614
615 .. versionadded:: 2.0.37
616
617 """
618
619 embed_values_counter: bool = False
620 """Whether to embed an incrementing integer counter in each parameter
621 set within the VALUES clause as parameters are batched over.
622
623 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
624 where a subquery is used to produce value tuples. Current support
625 includes PostgreSQL, Microsoft SQL Server.
626
627 .. versionadded:: 2.0.10
628
629 """
630
631
632class _InsertManyValuesBatch(NamedTuple):
633 """represents an individual batch SQL statement for insertmanyvalues.
634
635 This is passed through the
636 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
637 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
638 to the :class:`.Connection` within the
639 :meth:`.Connection._exec_insertmany_context` method.
640
641 .. versionadded:: 2.0.10
642
643 """
644
645 replaced_statement: str
646 replaced_parameters: _DBAPIAnyExecuteParams
647 processed_setinputsizes: Optional[_GenericSetInputSizesType]
648 batch: Sequence[_DBAPISingleExecuteParams]
649 sentinel_values: Sequence[Tuple[Any, ...]]
650 current_batch_size: int
651 batchnum: int
652 total_batches: int
653 rows_sorted: bool
654 is_downgraded: bool
655
656
657class InsertmanyvaluesSentinelOpts(FastIntFlag):
658 """bitflag enum indicating styles of PK defaults
659 which can work as implicit sentinel columns
660
661 """
662
663 NOT_SUPPORTED = 1
664 AUTOINCREMENT = 2
665 IDENTITY = 4
666 SEQUENCE = 8
667 MONOTONIC_FUNCTION = 16
668
669 ANY_AUTOINCREMENT = (
670 AUTOINCREMENT | IDENTITY | SEQUENCE | MONOTONIC_FUNCTION
671 )
672 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
673
674 USE_INSERT_FROM_SELECT = 32
675 RENDER_SELECT_COL_CASTS = 64
676
677
678class AggregateOrderByStyle(IntEnum):
679 """Describes backend database's capabilities with ORDER BY for aggregate
680 functions
681
682 .. versionadded:: 2.1
683
684 """
685
686 NONE = 0
687 """database has no ORDER BY for aggregate functions"""
688
689 INLINE = 1
690 """ORDER BY is rendered inside the function's argument list, typically as
691 the last element"""
692
693 WITHIN_GROUP = 2
694 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
695 functions (not just the ordered set ones)"""
696
697
698class CompilerState(IntEnum):
699 COMPILING = 0
700 """statement is present, compilation phase in progress"""
701
702 STRING_APPLIED = 1
703 """statement is present, string form of the statement has been applied.
704
705 Additional processors by subclasses may still be pending.
706
707 """
708
709 NO_STATEMENT = 2
710 """compiler does not have a statement to compile, is used
711 for method access"""
712
713
714class Linting(IntEnum):
715 """represent preferences for the 'SQL linting' feature.
716
717 this feature currently includes support for flagging cartesian products
718 in SQL statements.
719
720 """
721
722 NO_LINTING = 0
723 "Disable all linting."
724
725 COLLECT_CARTESIAN_PRODUCTS = 1
726 """Collect data on FROMs and cartesian products and gather into
727 'self.from_linter'"""
728
729 WARN_LINTING = 2
730 "Emit warnings for linters that find problems"
731
732 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
733 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
734 and WARN_LINTING"""
735
736
737NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
738 Linting
739)
740
741
742class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
743 """represents current state for the "cartesian product" detection
744 feature."""
745
746 def lint(self, start=None):
747 froms = self.froms
748 if not froms:
749 return None, None
750
751 edges = set(self.edges)
752 the_rest = set(froms)
753
754 if start is not None:
755 start_with = start
756 the_rest.remove(start_with)
757 else:
758 start_with = the_rest.pop()
759
760 stack = collections.deque([start_with])
761
762 while stack and the_rest:
763 node = stack.popleft()
764 the_rest.discard(node)
765
766 # comparison of nodes in edges here is based on hash equality, as
767 # there are "annotated" elements that match the non-annotated ones.
768 # to remove the need for in-python hash() calls, use native
769 # containment routines (e.g. "node in edge", "edge.index(node)")
770 to_remove = {edge for edge in edges if node in edge}
771
772 # appendleft the node in each edge that is not
773 # the one that matched.
774 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
775 edges.difference_update(to_remove)
776
777 # FROMS left over? boom
778 if the_rest:
779 return the_rest, start_with
780 else:
781 return None, None
782
783 def warn(self, stmt_type="SELECT"):
784 the_rest, start_with = self.lint()
785
786 # FROMS left over? boom
787 if the_rest:
788 froms = the_rest
789 if froms:
790 template = (
791 "{stmt_type} statement has a cartesian product between "
792 "FROM element(s) {froms} and "
793 'FROM element "{start}". Apply join condition(s) '
794 "between each element to resolve."
795 )
796 froms_str = ", ".join(
797 f'"{self.froms[from_]}"' for from_ in froms
798 )
799 message = template.format(
800 stmt_type=stmt_type,
801 froms=froms_str,
802 start=self.froms[start_with],
803 )
804
805 util.warn(message)
806
807
808class Compiled:
809 """Represent a compiled SQL or DDL expression.
810
811 The ``__str__`` method of the ``Compiled`` object should produce
812 the actual text of the statement. ``Compiled`` objects are
813 specific to their underlying database dialect, and also may
814 or may not be specific to the columns referenced within a
815 particular set of bind parameters. In no case should the
816 ``Compiled`` object be dependent on the actual values of those
817 bind parameters, even though it may reference those values as
818 defaults.
819 """
820
821 statement: Optional[ClauseElement] = None
822 "The statement to compile."
823 string: str = ""
824 "The string representation of the ``statement``"
825
826 state: CompilerState
827 """description of the compiler's state"""
828
829 is_sql = False
830 is_ddl = False
831
832 _cached_metadata: Optional[CursorResultMetaData] = None
833
834 _result_columns: Optional[List[ResultColumnsEntry]] = None
835
836 schema_translate_map: Optional[SchemaTranslateMapType] = None
837
838 execution_options: _ExecuteOptions = util.EMPTY_DICT
839 """
840 Execution options propagated from the statement. In some cases,
841 sub-elements of the statement can modify these.
842 """
843
844 preparer: IdentifierPreparer
845
846 _annotations: _AnnotationDict = util.EMPTY_DICT
847
848 compile_state: Optional[CompileState] = None
849 """Optional :class:`.CompileState` object that maintains additional
850 state used by the compiler.
851
852 Major executable objects such as :class:`_expression.Insert`,
853 :class:`_expression.Update`, :class:`_expression.Delete`,
854 :class:`_expression.Select` will generate this
855 state when compiled in order to calculate additional information about the
856 object. For the top level object that is to be executed, the state can be
857 stored here where it can also have applicability towards result set
858 processing.
859
860 .. versionadded:: 1.4
861
862 """
863
864 dml_compile_state: Optional[CompileState] = None
865 """Optional :class:`.CompileState` assigned at the same point that
866 .isinsert, .isupdate, or .isdelete is assigned.
867
868 This will normally be the same object as .compile_state, with the
869 exception of cases like the :class:`.ORMFromStatementCompileState`
870 object.
871
872 .. versionadded:: 1.4.40
873
874 """
875
876 cache_key: Optional[CacheKey] = None
877 """The :class:`.CacheKey` that was generated ahead of creating this
878 :class:`.Compiled` object.
879
880 This is used for routines that need access to the original
881 :class:`.CacheKey` instance generated when the :class:`.Compiled`
882 instance was first cached, typically in order to reconcile
883 the original list of :class:`.BindParameter` objects with a
884 per-statement list that's generated on each call.
885
886 """
887
888 _gen_time: float
889 """Generation time of this :class:`.Compiled`, used for reporting
890 cache stats."""
891
892 def __init__(
893 self,
894 dialect: Dialect,
895 statement: Optional[ClauseElement],
896 schema_translate_map: Optional[SchemaTranslateMapType] = None,
897 render_schema_translate: bool = False,
898 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
899 ):
900 """Construct a new :class:`.Compiled` object.
901
902 :param dialect: :class:`.Dialect` to compile against.
903
904 :param statement: :class:`_expression.ClauseElement` to be compiled.
905
906 :param schema_translate_map: dictionary of schema names to be
907 translated when forming the resultant SQL
908
909 .. seealso::
910
911 :ref:`schema_translating`
912
913 :param compile_kwargs: additional kwargs that will be
914 passed to the initial call to :meth:`.Compiled.process`.
915
916
917 """
918 self.dialect = dialect
919 self.preparer = self.dialect.identifier_preparer
920 if schema_translate_map:
921 self.schema_translate_map = schema_translate_map
922 self.preparer = self.preparer._with_schema_translate(
923 schema_translate_map
924 )
925
926 if statement is not None:
927 self.state = CompilerState.COMPILING
928 self.statement = statement
929 self.can_execute = statement.supports_execution
930 self._annotations = statement._annotations
931 if self.can_execute:
932 if TYPE_CHECKING:
933 assert isinstance(statement, Executable)
934 self.execution_options = statement._execution_options
935 self.string = self.process(self.statement, **compile_kwargs)
936
937 if render_schema_translate:
938 assert schema_translate_map is not None
939 self.string = self.preparer._render_schema_translates(
940 self.string, schema_translate_map
941 )
942
943 self.state = CompilerState.STRING_APPLIED
944 else:
945 self.state = CompilerState.NO_STATEMENT
946
947 self._gen_time = perf_counter()
948
949 def __init_subclass__(cls) -> None:
950 cls._init_compiler_cls()
951 return super().__init_subclass__()
952
953 @classmethod
954 def _init_compiler_cls(cls):
955 pass
956
957 def visit_unsupported_compilation(self, element, err, **kw):
958 raise exc.UnsupportedCompilationError(self, type(element)) from err
959
960 @property
961 def sql_compiler(self) -> SQLCompiler:
962 """Return a Compiled that is capable of processing SQL expressions.
963
964 If this compiler is one, it would likely just return 'self'.
965
966 """
967
968 raise NotImplementedError()
969
970 def process(self, obj: Visitable, **kwargs: Any) -> str:
971 return obj._compiler_dispatch(self, **kwargs)
972
973 def __str__(self) -> str:
974 """Return the string text of the generated SQL or DDL."""
975
976 if self.state is CompilerState.STRING_APPLIED:
977 return self.string
978 else:
979 return ""
980
981 def construct_params(
982 self,
983 params: Optional[_CoreSingleExecuteParams] = None,
984 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
985 escape_names: bool = True,
986 ) -> Optional[_MutableCoreSingleExecuteParams]:
987 """Return the bind params for this compiled object.
988
989 :param params: a dict of string/object pairs whose values will
990 override bind values compiled in to the
991 statement.
992 """
993
994 raise NotImplementedError()
995
996 @property
997 def params(self):
998 """Return the bind params for this compiled object."""
999 return self.construct_params()
1000
1001
1002class TypeCompiler(util.EnsureKWArg):
1003 """Produces DDL specification for TypeEngine objects."""
1004
1005 ensure_kwarg = r"visit_\w+"
1006
1007 def __init__(self, dialect: Dialect):
1008 self.dialect = dialect
1009
1010 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
1011 if (
1012 type_._variant_mapping
1013 and self.dialect.name in type_._variant_mapping
1014 ):
1015 type_ = type_._variant_mapping[self.dialect.name]
1016 return type_._compiler_dispatch(self, **kw)
1017
1018 def visit_unsupported_compilation(
1019 self, element: Any, err: Exception, **kw: Any
1020 ) -> NoReturn:
1021 raise exc.UnsupportedCompilationError(self, element) from err
1022
1023
1024# this was a Visitable, but to allow accurate detection of
1025# column elements this is actually a column element
1026class _CompileLabel(
1027 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1028):
1029 """lightweight label object which acts as an expression.Label."""
1030
1031 __visit_name__ = "label"
1032 __slots__ = "element", "name", "_alt_names"
1033
1034 def __init__(self, col, name, alt_names=()):
1035 self.element = col
1036 self.name = name
1037 self._alt_names = (col,) + alt_names
1038
1039 @property
1040 def proxy_set(self):
1041 return self.element.proxy_set
1042
1043 @property
1044 def type(self):
1045 return self.element.type
1046
1047 def self_group(self, **kw):
1048 return self
1049
1050
1051class aggregate_orderby_inline(
1052 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1053):
1054 """produce ORDER BY inside of function argument lists"""
1055
1056 __visit_name__ = "aggregate_orderby_inline"
1057 __slots__ = "element", "aggregate_order_by"
1058
1059 def __init__(self, element, orderby):
1060 self.element = element
1061 self.aggregate_order_by = orderby
1062
1063 def __iter__(self):
1064 return iter(self.element)
1065
1066 @property
1067 def proxy_set(self):
1068 return self.element.proxy_set
1069
1070 @property
1071 def type(self):
1072 return self.element.type
1073
1074 def self_group(self, **kw):
1075 return self
1076
1077 def _with_binary_element_type(self, type_):
1078 return aggregate_orderby_inline(
1079 self.element._with_binary_element_type(type_),
1080 self.aggregate_order_by,
1081 )
1082
1083
1084class ilike_case_insensitive(
1085 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1086):
1087 """produce a wrapping element for a case-insensitive portion of
1088 an ILIKE construct.
1089
1090 The construct usually renders the ``lower()`` function, but on
1091 PostgreSQL will pass silently with the assumption that "ILIKE"
1092 is being used.
1093
1094 .. versionadded:: 2.0
1095
1096 """
1097
1098 __visit_name__ = "ilike_case_insensitive_operand"
1099 __slots__ = "element", "comparator"
1100
1101 def __init__(self, element):
1102 self.element = element
1103 self.comparator = element.comparator
1104
1105 @property
1106 def proxy_set(self):
1107 return self.element.proxy_set
1108
1109 @property
1110 def type(self):
1111 return self.element.type
1112
1113 def self_group(self, **kw):
1114 return self
1115
1116 def _with_binary_element_type(self, type_):
1117 return ilike_case_insensitive(
1118 self.element._with_binary_element_type(type_)
1119 )
1120
1121
1122class SQLCompiler(Compiled):
1123 """Default implementation of :class:`.Compiled`.
1124
1125 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1126
1127 """
1128
1129 extract_map = EXTRACT_MAP
1130
1131 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1132 util.immutabledict(
1133 {
1134 "%": "P",
1135 "(": "A",
1136 ")": "Z",
1137 ":": "C",
1138 ".": "_",
1139 "[": "_",
1140 "]": "_",
1141 " ": "_",
1142 }
1143 )
1144 )
1145 """A mapping (e.g. dict or similar) containing a lookup of
1146 characters keyed to replacement characters which will be applied to all
1147 'bind names' used in SQL statements as a form of 'escaping'; the given
1148 characters are replaced entirely with the 'replacement' character when
1149 rendered in the SQL statement, and a similar translation is performed
1150 on the incoming names used in parameter dictionaries passed to methods
1151 like :meth:`_engine.Connection.execute`.
1152
1153 This allows bound parameter names used in :func:`_sql.bindparam` and
1154 other constructs to have any arbitrary characters present without any
1155 concern for characters that aren't allowed at all on the target database.
1156
1157 Third party dialects can establish their own dictionary here to replace the
1158 default mapping, which will ensure that the particular characters in the
1159 mapping will never appear in a bound parameter name.
1160
1161 The dictionary is evaluated at **class creation time**, so cannot be
1162 modified at runtime; it must be present on the class when the class
1163 is first declared.
1164
1165 Note that for dialects that have additional bound parameter rules such
1166 as additional restrictions on leading characters, the
1167 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1168 See the cx_Oracle compiler for an example of this.
1169
1170 .. versionadded:: 2.0.0rc1
1171
1172 """
1173
1174 _bind_translate_re: ClassVar[Pattern[str]]
1175 _bind_translate_chars: ClassVar[Mapping[str, str]]
1176
1177 is_sql = True
1178
1179 compound_keywords = COMPOUND_KEYWORDS
1180
1181 isdelete: bool = False
1182 isinsert: bool = False
1183 isupdate: bool = False
1184 """class-level defaults which can be set at the instance
1185 level to define if this Compiled instance represents
1186 INSERT/UPDATE/DELETE
1187 """
1188
1189 postfetch: Optional[List[Column[Any]]]
1190 """list of columns that can be post-fetched after INSERT or UPDATE to
1191 receive server-updated values"""
1192
1193 insert_prefetch: Sequence[Column[Any]] = ()
1194 """list of columns for which default values should be evaluated before
1195 an INSERT takes place"""
1196
1197 update_prefetch: Sequence[Column[Any]] = ()
1198 """list of columns for which onupdate default values should be evaluated
1199 before an UPDATE takes place"""
1200
1201 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1202 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1203 statement, used to receive newly generated values of columns.
1204
1205 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1206 ``returning`` collection, which was not a generalized RETURNING
1207 collection and instead was in fact specific to the "implicit returning"
1208 feature.
1209
1210 """
1211
1212 isplaintext: bool = False
1213
1214 binds: Dict[str, BindParameter[Any]]
1215 """a dictionary of bind parameter keys to BindParameter instances."""
1216
1217 bind_names: Dict[BindParameter[Any], str]
1218 """a dictionary of BindParameter instances to "compiled" names
1219 that are actually present in the generated SQL"""
1220
1221 stack: List[_CompilerStackEntry]
1222 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1223 tracked in this stack using an entry format."""
1224
1225 returning_precedes_values: bool = False
1226 """set to True classwide to generate RETURNING
1227 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1228 """
1229
1230 render_table_with_column_in_update_from: bool = False
1231 """set to True classwide to indicate the SET clause
1232 in a multi-table UPDATE statement should qualify
1233 columns with the table name (i.e. MySQL only)
1234 """
1235
1236 ansi_bind_rules: bool = False
1237 """SQL 92 doesn't allow bind parameters to be used
1238 in the columns clause of a SELECT, nor does it allow
1239 ambiguous expressions like "? = ?". A compiler
1240 subclass can set this flag to False if the target
1241 driver/DB enforces this
1242 """
1243
1244 bindtemplate: str
1245 """template to render bound parameters based on paramstyle."""
1246
1247 compilation_bindtemplate: str
1248 """template used by compiler to render parameters before positional
1249 paramstyle application"""
1250
1251 _numeric_binds_identifier_char: str
1252 """Character that's used to as the identifier of a numerical bind param.
1253 For example if this char is set to ``$``, numerical binds will be rendered
1254 in the form ``$1, $2, $3``.
1255 """
1256
1257 _result_columns: List[ResultColumnsEntry]
1258 """relates label names in the final SQL to a tuple of local
1259 column/label name, ColumnElement object (if any) and
1260 TypeEngine. CursorResult uses this for type processing and
1261 column targeting"""
1262
1263 _textual_ordered_columns: bool = False
1264 """tell the result object that the column names as rendered are important,
1265 but they are also "ordered" vs. what is in the compiled object here.
1266
1267 As of 1.4.42 this condition is only present when the statement is a
1268 TextualSelect, e.g. text("....").columns(...), where it is required
1269 that the columns are considered positionally and not by name.
1270
1271 """
1272
1273 _ad_hoc_textual: bool = False
1274 """tell the result that we encountered text() or '*' constructs in the
1275 middle of the result columns, but we also have compiled columns, so
1276 if the number of columns in cursor.description does not match how many
1277 expressions we have, that means we can't rely on positional at all and
1278 should match on name.
1279
1280 """
1281
1282 _ordered_columns: bool = True
1283 """
1284 if False, means we can't be sure the list of entries
1285 in _result_columns is actually the rendered order. Usually
1286 True unless using an unordered TextualSelect.
1287 """
1288
1289 _loose_column_name_matching: bool = False
1290 """tell the result object that the SQL statement is textual, wants to match
1291 up to Column objects, and may be using the ._tq_label in the SELECT rather
1292 than the base name.
1293
1294 """
1295
1296 _numeric_binds: bool = False
1297 """
1298 True if paramstyle is "numeric". This paramstyle is trickier than
1299 all the others.
1300
1301 """
1302
1303 _render_postcompile: bool = False
1304 """
1305 whether to render out POSTCOMPILE params during the compile phase.
1306
1307 This attribute is used only for end-user invocation of stmt.compile();
1308 it's never used for actual statement execution, where instead the
1309 dialect internals access and render the internal postcompile structure
1310 directly.
1311
1312 """
1313
1314 _post_compile_expanded_state: Optional[ExpandedState] = None
1315 """When render_postcompile is used, the ``ExpandedState`` used to create
1316 the "expanded" SQL is assigned here, and then used by the ``.params``
1317 accessor and ``.construct_params()`` methods for their return values.
1318
1319 .. versionadded:: 2.0.0rc1
1320
1321 """
1322
1323 _pre_expanded_string: Optional[str] = None
1324 """Stores the original string SQL before 'post_compile' is applied,
1325 for cases where 'post_compile' were used.
1326
1327 """
1328
1329 _pre_expanded_positiontup: Optional[List[str]] = None
1330
1331 _insertmanyvalues: Optional[_InsertManyValues] = None
1332
1333 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1334
1335 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1336 """bindparameter objects that are rendered as literal values at statement
1337 execution time.
1338
1339 """
1340
1341 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1342 """bindparameter objects that are rendered as bound parameter placeholders
1343 at statement execution time.
1344
1345 """
1346
1347 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1348 """Late escaping of bound parameter names that has to be converted
1349 to the original name when looking in the parameter dictionary.
1350
1351 """
1352
1353 has_out_parameters = False
1354 """if True, there are bindparam() objects that have the isoutparam
1355 flag set."""
1356
1357 postfetch_lastrowid = False
1358 """if True, and this in insert, use cursor.lastrowid to populate
1359 result.inserted_primary_key. """
1360
1361 _cache_key_bind_match: Optional[
1362 Tuple[
1363 Dict[
1364 BindParameter[Any],
1365 List[BindParameter[Any]],
1366 ],
1367 Dict[
1368 str,
1369 BindParameter[Any],
1370 ],
1371 ]
1372 ] = None
1373 """a mapping that will relate the BindParameter object we compile
1374 to those that are part of the extracted collection of parameters
1375 in the cache key, if we were given a cache key.
1376
1377 """
1378
1379 positiontup: Optional[List[str]] = None
1380 """for a compiled construct that uses a positional paramstyle, will be
1381 a sequence of strings, indicating the names of bound parameters in order.
1382
1383 This is used in order to render bound parameters in their correct order,
1384 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1385 render parameters.
1386
1387 This sequence always contains the unescaped name of the parameters.
1388
1389 .. seealso::
1390
1391 :ref:`faq_sql_expression_string` - includes a usage example for
1392 debugging use cases.
1393
1394 """
1395 _values_bindparam: Optional[List[str]] = None
1396
1397 _visited_bindparam: Optional[List[str]] = None
1398
1399 inline: bool = False
1400
1401 ctes: Optional[MutableMapping[CTE, str]]
1402
1403 # Detect same CTE references - Dict[(level, name), cte]
1404 # Level is required for supporting nesting
1405 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1406
1407 # To retrieve key/level in ctes_by_level_name -
1408 # Dict[cte_reference, (level, cte_name, cte_opts)]
1409 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1410
1411 ctes_recursive: bool
1412
1413 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1414 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1415 _positional_pattern = re.compile(
1416 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1417 )
1418 _collect_params: Final[bool]
1419 _collected_params: util.immutabledict[str, Any]
1420
1421 @classmethod
1422 def _init_compiler_cls(cls):
1423 cls._init_bind_translate()
1424
1425 @classmethod
1426 def _init_bind_translate(cls):
1427 reg = re.escape("".join(cls.bindname_escape_characters))
1428 cls._bind_translate_re = re.compile(f"[{reg}]")
1429 cls._bind_translate_chars = cls.bindname_escape_characters
1430
1431 def __init__(
1432 self,
1433 dialect: Dialect,
1434 statement: Optional[ClauseElement],
1435 cache_key: Optional[CacheKey] = None,
1436 column_keys: Optional[Sequence[str]] = None,
1437 for_executemany: bool = False,
1438 linting: Linting = NO_LINTING,
1439 _supporting_against: Optional[SQLCompiler] = None,
1440 **kwargs: Any,
1441 ):
1442 """Construct a new :class:`.SQLCompiler` object.
1443
1444 :param dialect: :class:`.Dialect` to be used
1445
1446 :param statement: :class:`_expression.ClauseElement` to be compiled
1447
1448 :param column_keys: a list of column names to be compiled into an
1449 INSERT or UPDATE statement.
1450
1451 :param for_executemany: whether INSERT / UPDATE statements should
1452 expect that they are to be invoked in an "executemany" style,
1453 which may impact how the statement will be expected to return the
1454 values of defaults and autoincrement / sequences and similar.
1455 Depending on the backend and driver in use, support for retrieving
1456 these values may be disabled which means SQL expressions may
1457 be rendered inline, RETURNING may not be rendered, etc.
1458
1459 :param kwargs: additional keyword arguments to be consumed by the
1460 superclass.
1461
1462 """
1463 self.column_keys = column_keys
1464
1465 self.cache_key = cache_key
1466
1467 if cache_key:
1468 cksm = {b.key: b for b in cache_key[1]}
1469 ckbm = {b: [b] for b in cache_key[1]}
1470 self._cache_key_bind_match = (ckbm, cksm)
1471
1472 # compile INSERT/UPDATE defaults/sequences to expect executemany
1473 # style execution, which may mean no pre-execute of defaults,
1474 # or no RETURNING
1475 self.for_executemany = for_executemany
1476
1477 self.linting = linting
1478
1479 # a dictionary of bind parameter keys to BindParameter
1480 # instances.
1481 self.binds = {}
1482
1483 # a dictionary of BindParameter instances to "compiled" names
1484 # that are actually present in the generated SQL
1485 self.bind_names = util.column_dict()
1486
1487 # stack which keeps track of nested SELECT statements
1488 self.stack = []
1489
1490 self._result_columns = []
1491
1492 # true if the paramstyle is positional
1493 self.positional = dialect.positional
1494 if self.positional:
1495 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1496 if nb:
1497 self._numeric_binds_identifier_char = (
1498 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1499 )
1500
1501 self.compilation_bindtemplate = _pyformat_template
1502 else:
1503 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1504
1505 self.ctes = None
1506
1507 self.label_length = (
1508 dialect.label_length or dialect.max_identifier_length
1509 )
1510
1511 # a map which tracks "anonymous" identifiers that are created on
1512 # the fly here
1513 self.anon_map = prefix_anon_map()
1514
1515 # a map which tracks "truncated" names based on
1516 # dialect.label_length or dialect.max_identifier_length
1517 self.truncated_names: Dict[Tuple[str, str], str] = {}
1518 self._truncated_counters: Dict[str, int] = {}
1519 if not cache_key:
1520 self._collect_params = True
1521 self._collected_params = util.EMPTY_DICT
1522 else:
1523 self._collect_params = False # type: ignore[misc]
1524
1525 Compiled.__init__(self, dialect, statement, **kwargs)
1526
1527 if self.isinsert or self.isupdate or self.isdelete:
1528 if TYPE_CHECKING:
1529 assert isinstance(statement, UpdateBase)
1530
1531 if self.isinsert or self.isupdate:
1532 if TYPE_CHECKING:
1533 assert isinstance(statement, ValuesBase)
1534 if statement._inline:
1535 self.inline = True
1536 elif self.for_executemany and (
1537 not self.isinsert
1538 or (
1539 self.dialect.insert_executemany_returning
1540 and statement._return_defaults
1541 )
1542 ):
1543 self.inline = True
1544
1545 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1546
1547 if _supporting_against:
1548 self.__dict__.update(
1549 {
1550 k: v
1551 for k, v in _supporting_against.__dict__.items()
1552 if k
1553 not in {
1554 "state",
1555 "dialect",
1556 "preparer",
1557 "positional",
1558 "_numeric_binds",
1559 "compilation_bindtemplate",
1560 "bindtemplate",
1561 }
1562 }
1563 )
1564
1565 if self.state is CompilerState.STRING_APPLIED:
1566 if self.positional:
1567 if self._numeric_binds:
1568 self._process_numeric()
1569 else:
1570 self._process_positional()
1571
1572 if self._render_postcompile:
1573 parameters = self.construct_params(
1574 escape_names=False,
1575 _no_postcompile=True,
1576 )
1577
1578 self._process_parameters_for_postcompile(
1579 parameters, _populate_self=True
1580 )
1581
1582 @property
1583 def insert_single_values_expr(self) -> Optional[str]:
1584 """When an INSERT is compiled with a single set of parameters inside
1585 a VALUES expression, the string is assigned here, where it can be
1586 used for insert batching schemes to rewrite the VALUES expression.
1587
1588 .. versionchanged:: 2.0 This collection is no longer used by
1589 SQLAlchemy's built-in dialects, in favor of the currently
1590 internal ``_insertmanyvalues`` collection that is used only by
1591 :class:`.SQLCompiler`.
1592
1593 """
1594 if self._insertmanyvalues is None:
1595 return None
1596 else:
1597 return self._insertmanyvalues.single_values_expr
1598
1599 @util.ro_memoized_property
1600 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1601 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1602
1603 This is either the so-called "implicit returning" columns which are
1604 calculated by the compiler on the fly, or those present based on what's
1605 present in ``self.statement._returning`` (expanded into individual
1606 columns using the ``._all_selected_columns`` attribute) i.e. those set
1607 explicitly using the :meth:`.UpdateBase.returning` method.
1608
1609 .. versionadded:: 2.0
1610
1611 """
1612 if self.implicit_returning:
1613 return self.implicit_returning
1614 elif self.statement is not None and is_dml(self.statement):
1615 return [
1616 c
1617 for c in self.statement._all_selected_columns
1618 if is_column_element(c)
1619 ]
1620
1621 else:
1622 return None
1623
1624 @property
1625 def returning(self):
1626 """backwards compatibility; returns the
1627 effective_returning collection.
1628
1629 """
1630 return self.effective_returning
1631
1632 @property
1633 def current_executable(self):
1634 """Return the current 'executable' that is being compiled.
1635
1636 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1637 :class:`_sql.Update`, :class:`_sql.Delete`,
1638 :class:`_sql.CompoundSelect` object that is being compiled.
1639 Specifically it's assigned to the ``self.stack`` list of elements.
1640
1641 When a statement like the above is being compiled, it normally
1642 is also assigned to the ``.statement`` attribute of the
1643 :class:`_sql.Compiler` object. However, all SQL constructs are
1644 ultimately nestable, and this attribute should never be consulted
1645 by a ``visit_`` method, as it is not guaranteed to be assigned
1646 nor guaranteed to correspond to the current statement being compiled.
1647
1648 """
1649 try:
1650 return self.stack[-1]["selectable"]
1651 except IndexError as ie:
1652 raise IndexError("Compiler does not have a stack entry") from ie
1653
1654 @property
1655 def prefetch(self):
1656 return list(self.insert_prefetch) + list(self.update_prefetch)
1657
1658 @util.memoized_property
1659 def _global_attributes(self) -> Dict[Any, Any]:
1660 return {}
1661
1662 def _add_to_params(self, item: ExecutableStatement) -> None:
1663 # assumes that this is called before traversing the statement
1664 # so the call happens outer to inner, meaning that existing params
1665 # take precedence
1666 if item._params:
1667 self._collected_params = item._params | self._collected_params
1668
1669 @util.memoized_instancemethod
1670 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1671 """Initialize collections related to CTEs only if
1672 a CTE is located, to save on the overhead of
1673 these collections otherwise.
1674
1675 """
1676 # collect CTEs to tack on top of a SELECT
1677 # To store the query to print - Dict[cte, text_query]
1678 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1679 self.ctes = ctes
1680
1681 # Detect same CTE references - Dict[(level, name), cte]
1682 # Level is required for supporting nesting
1683 self.ctes_by_level_name = {}
1684
1685 # To retrieve key/level in ctes_by_level_name -
1686 # Dict[cte_reference, (level, cte_name, cte_opts)]
1687 self.level_name_by_cte = {}
1688
1689 self.ctes_recursive = False
1690
1691 return ctes
1692
1693 @contextlib.contextmanager
1694 def _nested_result(self):
1695 """special API to support the use case of 'nested result sets'"""
1696 result_columns, ordered_columns = (
1697 self._result_columns,
1698 self._ordered_columns,
1699 )
1700 self._result_columns, self._ordered_columns = [], False
1701
1702 try:
1703 if self.stack:
1704 entry = self.stack[-1]
1705 entry["need_result_map_for_nested"] = True
1706 else:
1707 entry = None
1708 yield self._result_columns, self._ordered_columns
1709 finally:
1710 if entry:
1711 entry.pop("need_result_map_for_nested")
1712 self._result_columns, self._ordered_columns = (
1713 result_columns,
1714 ordered_columns,
1715 )
1716
1717 def _process_positional(self):
1718 assert not self.positiontup
1719 assert self.state is CompilerState.STRING_APPLIED
1720 assert not self._numeric_binds
1721
1722 if self.dialect.paramstyle == "format":
1723 placeholder = "%s"
1724 else:
1725 assert self.dialect.paramstyle == "qmark"
1726 placeholder = "?"
1727
1728 positions = []
1729
1730 def find_position(m: re.Match[str]) -> str:
1731 normal_bind = m.group(1)
1732 if normal_bind:
1733 positions.append(normal_bind)
1734 return placeholder
1735 else:
1736 # this a post-compile bind
1737 positions.append(m.group(2))
1738 return m.group(0)
1739
1740 self.string = re.sub(
1741 self._positional_pattern, find_position, self.string
1742 )
1743
1744 if self.escaped_bind_names:
1745 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1746 assert len(self.escaped_bind_names) == len(reverse_escape)
1747 self.positiontup = [
1748 reverse_escape.get(name, name) for name in positions
1749 ]
1750 else:
1751 self.positiontup = positions
1752
1753 if self._insertmanyvalues:
1754 positions = []
1755
1756 single_values_expr = re.sub(
1757 self._positional_pattern,
1758 find_position,
1759 self._insertmanyvalues.single_values_expr,
1760 )
1761 insert_crud_params = [
1762 (
1763 v[0],
1764 v[1],
1765 re.sub(self._positional_pattern, find_position, v[2]),
1766 v[3],
1767 )
1768 for v in self._insertmanyvalues.insert_crud_params
1769 ]
1770
1771 self._insertmanyvalues = self._insertmanyvalues._replace(
1772 single_values_expr=single_values_expr,
1773 insert_crud_params=insert_crud_params,
1774 )
1775
1776 def _process_numeric(self):
1777 assert self._numeric_binds
1778 assert self.state is CompilerState.STRING_APPLIED
1779
1780 num = 1
1781 param_pos: Dict[str, str] = {}
1782 order: Iterable[str]
1783 if self._insertmanyvalues and self._values_bindparam is not None:
1784 # bindparams that are not in values are always placed first.
1785 # this avoids the need of changing them when using executemany
1786 # values () ()
1787 order = itertools.chain(
1788 (
1789 name
1790 for name in self.bind_names.values()
1791 if name not in self._values_bindparam
1792 ),
1793 self.bind_names.values(),
1794 )
1795 else:
1796 order = self.bind_names.values()
1797
1798 for bind_name in order:
1799 if bind_name in param_pos:
1800 continue
1801 bind = self.binds[bind_name]
1802 if (
1803 bind in self.post_compile_params
1804 or bind in self.literal_execute_params
1805 ):
1806 # set to None to just mark the in positiontup, it will not
1807 # be replaced below.
1808 param_pos[bind_name] = None # type: ignore
1809 else:
1810 ph = f"{self._numeric_binds_identifier_char}{num}"
1811 num += 1
1812 param_pos[bind_name] = ph
1813
1814 self.next_numeric_pos = num
1815
1816 self.positiontup = list(param_pos)
1817 if self.escaped_bind_names:
1818 len_before = len(param_pos)
1819 param_pos = {
1820 self.escaped_bind_names.get(name, name): pos
1821 for name, pos in param_pos.items()
1822 }
1823 assert len(param_pos) == len_before
1824
1825 # Can't use format here since % chars are not escaped.
1826 self.string = self._pyformat_pattern.sub(
1827 lambda m: param_pos[m.group(1)], self.string
1828 )
1829
1830 if self._insertmanyvalues:
1831 single_values_expr = (
1832 # format is ok here since single_values_expr includes only
1833 # place-holders
1834 self._insertmanyvalues.single_values_expr
1835 % param_pos
1836 )
1837 insert_crud_params = [
1838 (v[0], v[1], "%s", v[3])
1839 for v in self._insertmanyvalues.insert_crud_params
1840 ]
1841
1842 self._insertmanyvalues = self._insertmanyvalues._replace(
1843 # This has the numbers (:1, :2)
1844 single_values_expr=single_values_expr,
1845 # The single binds are instead %s so they can be formatted
1846 insert_crud_params=insert_crud_params,
1847 )
1848
1849 @util.memoized_property
1850 def _bind_processors(
1851 self,
1852 ) -> MutableMapping[
1853 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1854 ]:
1855 # mypy is not able to see the two value types as the above Union,
1856 # it just sees "object". don't know how to resolve
1857 return {
1858 key: value # type: ignore
1859 for key, value in (
1860 (
1861 self.bind_names[bindparam],
1862 (
1863 bindparam.type._cached_bind_processor(self.dialect)
1864 if not bindparam.type._is_tuple_type
1865 else tuple(
1866 elem_type._cached_bind_processor(self.dialect)
1867 for elem_type in cast(
1868 TupleType, bindparam.type
1869 ).types
1870 )
1871 ),
1872 )
1873 for bindparam in self.bind_names
1874 )
1875 if value is not None
1876 }
1877
1878 def is_subquery(self):
1879 return len(self.stack) > 1
1880
1881 @property
1882 def sql_compiler(self) -> Self:
1883 return self
1884
1885 def construct_expanded_state(
1886 self,
1887 params: Optional[_CoreSingleExecuteParams] = None,
1888 escape_names: bool = True,
1889 ) -> ExpandedState:
1890 """Return a new :class:`.ExpandedState` for a given parameter set.
1891
1892 For queries that use "expanding" or other late-rendered parameters,
1893 this method will provide for both the finalized SQL string as well
1894 as the parameters that would be used for a particular parameter set.
1895
1896 .. versionadded:: 2.0.0rc1
1897
1898 """
1899 parameters = self.construct_params(
1900 params,
1901 escape_names=escape_names,
1902 _no_postcompile=True,
1903 )
1904 return self._process_parameters_for_postcompile(
1905 parameters,
1906 )
1907
1908 def construct_params(
1909 self,
1910 params: Optional[_CoreSingleExecuteParams] = None,
1911 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1912 escape_names: bool = True,
1913 _group_number: Optional[int] = None,
1914 _check: bool = True,
1915 _no_postcompile: bool = False,
1916 _collected_params: _CoreSingleExecuteParams | None = None,
1917 ) -> _MutableCoreSingleExecuteParams:
1918 """return a dictionary of bind parameter keys and values"""
1919 if _collected_params is not None:
1920 assert not self._collect_params
1921 elif self._collect_params:
1922 _collected_params = self._collected_params
1923
1924 if _collected_params:
1925 if not params:
1926 params = _collected_params
1927 else:
1928 params = {**_collected_params, **params}
1929
1930 if self._render_postcompile and not _no_postcompile:
1931 assert self._post_compile_expanded_state is not None
1932 if not params:
1933 return dict(self._post_compile_expanded_state.parameters)
1934 else:
1935 raise exc.InvalidRequestError(
1936 "can't construct new parameters when render_postcompile "
1937 "is used; the statement is hard-linked to the original "
1938 "parameters. Use construct_expanded_state to generate a "
1939 "new statement and parameters."
1940 )
1941
1942 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1943
1944 if extracted_parameters:
1945 # related the bound parameters collected in the original cache key
1946 # to those collected in the incoming cache key. They will not have
1947 # matching names but they will line up positionally in the same
1948 # way. The parameters present in self.bind_names may be clones of
1949 # these original cache key params in the case of DML but the .key
1950 # will be guaranteed to match.
1951 if self.cache_key is None:
1952 raise exc.CompileError(
1953 "This compiled object has no original cache key; "
1954 "can't pass extracted_parameters to construct_params"
1955 )
1956 else:
1957 orig_extracted = self.cache_key[1]
1958
1959 ckbm_tuple = self._cache_key_bind_match
1960 assert ckbm_tuple is not None
1961 ckbm, _ = ckbm_tuple
1962 resolved_extracted = {
1963 bind: extracted
1964 for b, extracted in zip(orig_extracted, extracted_parameters)
1965 for bind in ckbm[b]
1966 }
1967 else:
1968 resolved_extracted = None
1969
1970 if params:
1971 pd = {}
1972 for bindparam, name in self.bind_names.items():
1973 escaped_name = (
1974 self.escaped_bind_names.get(name, name)
1975 if has_escaped_names
1976 else name
1977 )
1978
1979 if bindparam.key in params:
1980 pd[escaped_name] = params[bindparam.key]
1981 elif name in params:
1982 pd[escaped_name] = params[name]
1983
1984 elif _check and bindparam.required:
1985 if _group_number:
1986 raise exc.InvalidRequestError(
1987 "A value is required for bind parameter %r, "
1988 "in parameter group %d"
1989 % (bindparam.key, _group_number),
1990 code="cd3x",
1991 )
1992 else:
1993 raise exc.InvalidRequestError(
1994 "A value is required for bind parameter %r"
1995 % bindparam.key,
1996 code="cd3x",
1997 )
1998 else:
1999 if resolved_extracted:
2000 value_param = resolved_extracted.get(
2001 bindparam, bindparam
2002 )
2003 else:
2004 value_param = bindparam
2005
2006 if bindparam.callable:
2007 pd[escaped_name] = value_param.effective_value
2008 else:
2009 pd[escaped_name] = value_param.value
2010 return pd
2011 else:
2012 pd = {}
2013 for bindparam, name in self.bind_names.items():
2014 escaped_name = (
2015 self.escaped_bind_names.get(name, name)
2016 if has_escaped_names
2017 else name
2018 )
2019
2020 if _check and bindparam.required:
2021 if _group_number:
2022 raise exc.InvalidRequestError(
2023 "A value is required for bind parameter %r, "
2024 "in parameter group %d"
2025 % (bindparam.key, _group_number),
2026 code="cd3x",
2027 )
2028 else:
2029 raise exc.InvalidRequestError(
2030 "A value is required for bind parameter %r"
2031 % bindparam.key,
2032 code="cd3x",
2033 )
2034
2035 if resolved_extracted:
2036 value_param = resolved_extracted.get(bindparam, bindparam)
2037 else:
2038 value_param = bindparam
2039
2040 if bindparam.callable:
2041 pd[escaped_name] = value_param.effective_value
2042 else:
2043 pd[escaped_name] = value_param.value
2044
2045 return pd
2046
2047 @util.memoized_instancemethod
2048 def _get_set_input_sizes_lookup(self):
2049 dialect = self.dialect
2050
2051 include_types = dialect.include_set_input_sizes
2052 exclude_types = dialect.exclude_set_input_sizes
2053
2054 dbapi = dialect.dbapi
2055
2056 def lookup_type(typ):
2057 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2058
2059 if (
2060 dbtype is not None
2061 and (exclude_types is None or dbtype not in exclude_types)
2062 and (include_types is None or dbtype in include_types)
2063 ):
2064 return dbtype
2065 else:
2066 return None
2067
2068 inputsizes = {}
2069
2070 literal_execute_params = self.literal_execute_params
2071
2072 for bindparam in self.bind_names:
2073 if bindparam in literal_execute_params:
2074 continue
2075
2076 if bindparam.type._is_tuple_type:
2077 inputsizes[bindparam] = [
2078 lookup_type(typ)
2079 for typ in cast(TupleType, bindparam.type).types
2080 ]
2081 else:
2082 inputsizes[bindparam] = lookup_type(bindparam.type)
2083
2084 return inputsizes
2085
2086 @property
2087 def params(self):
2088 """Return the bind param dictionary embedded into this
2089 compiled object, for those values that are present.
2090
2091 .. seealso::
2092
2093 :ref:`faq_sql_expression_string` - includes a usage example for
2094 debugging use cases.
2095
2096 """
2097 return self.construct_params(_check=False)
2098
2099 def _process_parameters_for_postcompile(
2100 self,
2101 parameters: _MutableCoreSingleExecuteParams,
2102 _populate_self: bool = False,
2103 ) -> ExpandedState:
2104 """handle special post compile parameters.
2105
2106 These include:
2107
2108 * "expanding" parameters -typically IN tuples that are rendered
2109 on a per-parameter basis for an otherwise fixed SQL statement string.
2110
2111 * literal_binds compiled with the literal_execute flag. Used for
2112 things like SQL Server "TOP N" where the driver does not accommodate
2113 N as a bound parameter.
2114
2115 """
2116
2117 expanded_parameters = {}
2118 new_positiontup: Optional[List[str]]
2119
2120 pre_expanded_string = self._pre_expanded_string
2121 if pre_expanded_string is None:
2122 pre_expanded_string = self.string
2123
2124 if self.positional:
2125 new_positiontup = []
2126
2127 pre_expanded_positiontup = self._pre_expanded_positiontup
2128 if pre_expanded_positiontup is None:
2129 pre_expanded_positiontup = self.positiontup
2130
2131 else:
2132 new_positiontup = pre_expanded_positiontup = None
2133
2134 processors = self._bind_processors
2135 single_processors = cast(
2136 "Mapping[str, _BindProcessorType[Any]]", processors
2137 )
2138 tuple_processors = cast(
2139 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2140 )
2141
2142 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2143
2144 replacement_expressions: Dict[str, Any] = {}
2145 to_update_sets: Dict[str, Any] = {}
2146
2147 # notes:
2148 # *unescaped* parameter names in:
2149 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2150 #
2151 # *escaped* parameter names in:
2152 # construct_params(), replacement_expressions
2153
2154 numeric_positiontup: Optional[List[str]] = None
2155
2156 if self.positional and pre_expanded_positiontup is not None:
2157 names: Iterable[str] = pre_expanded_positiontup
2158 if self._numeric_binds:
2159 numeric_positiontup = []
2160 else:
2161 names = self.bind_names.values()
2162
2163 ebn = self.escaped_bind_names
2164 for name in names:
2165 escaped_name = ebn.get(name, name) if ebn else name
2166 parameter = self.binds[name]
2167
2168 if parameter in self.literal_execute_params:
2169 if escaped_name not in replacement_expressions:
2170 replacement_expressions[escaped_name] = (
2171 self.render_literal_bindparam(
2172 parameter,
2173 render_literal_value=parameters.pop(escaped_name),
2174 )
2175 )
2176 continue
2177
2178 if parameter in self.post_compile_params:
2179 if escaped_name in replacement_expressions:
2180 to_update = to_update_sets[escaped_name]
2181 values = None
2182 else:
2183 # we are removing the parameter from parameters
2184 # because it is a list value, which is not expected by
2185 # TypeEngine objects that would otherwise be asked to
2186 # process it. the single name is being replaced with
2187 # individual numbered parameters for each value in the
2188 # param.
2189 #
2190 # note we are also inserting *escaped* parameter names
2191 # into the given dictionary. default dialect will
2192 # use these param names directly as they will not be
2193 # in the escaped_bind_names dictionary.
2194 values = parameters.pop(name)
2195
2196 leep_res = self._literal_execute_expanding_parameter(
2197 escaped_name, parameter, values
2198 )
2199 (to_update, replacement_expr) = leep_res
2200
2201 to_update_sets[escaped_name] = to_update
2202 replacement_expressions[escaped_name] = replacement_expr
2203
2204 if not parameter.literal_execute:
2205 parameters.update(to_update)
2206 if parameter.type._is_tuple_type:
2207 assert values is not None
2208 new_processors.update(
2209 (
2210 "%s_%s_%s" % (name, i, j),
2211 tuple_processors[name][j - 1],
2212 )
2213 for i, tuple_element in enumerate(values, 1)
2214 for j, _ in enumerate(tuple_element, 1)
2215 if name in tuple_processors
2216 and tuple_processors[name][j - 1] is not None
2217 )
2218 else:
2219 new_processors.update(
2220 (key, single_processors[name])
2221 for key, _ in to_update
2222 if name in single_processors
2223 )
2224 if numeric_positiontup is not None:
2225 numeric_positiontup.extend(
2226 name for name, _ in to_update
2227 )
2228 elif new_positiontup is not None:
2229 # to_update has escaped names, but that's ok since
2230 # these are new names, that aren't in the
2231 # escaped_bind_names dict.
2232 new_positiontup.extend(name for name, _ in to_update)
2233 expanded_parameters[name] = [
2234 expand_key for expand_key, _ in to_update
2235 ]
2236 elif new_positiontup is not None:
2237 new_positiontup.append(name)
2238
2239 def process_expanding(m):
2240 key = m.group(1)
2241 expr = replacement_expressions[key]
2242
2243 # if POSTCOMPILE included a bind_expression, render that
2244 # around each element
2245 if m.group(2):
2246 tok = m.group(2).split("~~")
2247 be_left, be_right = tok[1], tok[3]
2248 expr = ", ".join(
2249 "%s%s%s" % (be_left, exp, be_right)
2250 for exp in expr.split(", ")
2251 )
2252 return expr
2253
2254 statement = re.sub(
2255 self._post_compile_pattern, process_expanding, pre_expanded_string
2256 )
2257
2258 if numeric_positiontup is not None:
2259 assert new_positiontup is not None
2260 param_pos = {
2261 key: f"{self._numeric_binds_identifier_char}{num}"
2262 for num, key in enumerate(
2263 numeric_positiontup, self.next_numeric_pos
2264 )
2265 }
2266 # Can't use format here since % chars are not escaped.
2267 statement = self._pyformat_pattern.sub(
2268 lambda m: param_pos[m.group(1)], statement
2269 )
2270 new_positiontup.extend(numeric_positiontup)
2271
2272 expanded_state = ExpandedState(
2273 statement,
2274 parameters,
2275 new_processors,
2276 new_positiontup,
2277 expanded_parameters,
2278 )
2279
2280 if _populate_self:
2281 # this is for the "render_postcompile" flag, which is not
2282 # otherwise used internally and is for end-user debugging and
2283 # special use cases.
2284 self._pre_expanded_string = pre_expanded_string
2285 self._pre_expanded_positiontup = pre_expanded_positiontup
2286 self.string = expanded_state.statement
2287 self.positiontup = (
2288 list(expanded_state.positiontup or ())
2289 if self.positional
2290 else None
2291 )
2292 self._post_compile_expanded_state = expanded_state
2293
2294 return expanded_state
2295
2296 @util.preload_module("sqlalchemy.engine.cursor")
2297 def _create_result_map(self):
2298 """utility method used for unit tests only."""
2299 cursor = util.preloaded.engine_cursor
2300 return cursor.CursorResultMetaData._create_description_match_map(
2301 self._result_columns
2302 )
2303
2304 # assigned by crud.py for insert/update statements
2305 _get_bind_name_for_col: _BindNameForColProtocol
2306
2307 @util.memoized_property
2308 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2309 getter = self._get_bind_name_for_col
2310 return getter
2311
2312 @util.memoized_property
2313 @util.preload_module("sqlalchemy.engine.result")
2314 def _inserted_primary_key_from_lastrowid_getter(self):
2315 result = util.preloaded.engine_result
2316
2317 param_key_getter = self._within_exec_param_key_getter
2318
2319 assert self.compile_state is not None
2320 statement = self.compile_state.statement
2321
2322 if TYPE_CHECKING:
2323 assert isinstance(statement, Insert)
2324
2325 table = statement.table
2326
2327 getters = [
2328 (operator.methodcaller("get", param_key_getter(col), None), col)
2329 for col in table.primary_key
2330 ]
2331
2332 autoinc_getter = None
2333 autoinc_col = table._autoincrement_column
2334 if autoinc_col is not None:
2335 # apply type post processors to the lastrowid
2336 lastrowid_processor = autoinc_col.type._cached_result_processor(
2337 self.dialect, None
2338 )
2339 autoinc_key = param_key_getter(autoinc_col)
2340
2341 # if a bind value is present for the autoincrement column
2342 # in the parameters, we need to do the logic dictated by
2343 # #7998; honor a non-None user-passed parameter over lastrowid.
2344 # previously in the 1.4 series we weren't fetching lastrowid
2345 # at all if the key were present in the parameters
2346 if autoinc_key in self.binds:
2347
2348 def _autoinc_getter(lastrowid, parameters):
2349 param_value = parameters.get(autoinc_key, lastrowid)
2350 if param_value is not None:
2351 # they supplied non-None parameter, use that.
2352 # SQLite at least is observed to return the wrong
2353 # cursor.lastrowid for INSERT..ON CONFLICT so it
2354 # can't be used in all cases
2355 return param_value
2356 else:
2357 # use lastrowid
2358 return lastrowid
2359
2360 # work around mypy https://github.com/python/mypy/issues/14027
2361 autoinc_getter = _autoinc_getter
2362
2363 else:
2364 lastrowid_processor = None
2365
2366 row_fn = result.result_tuple([col.key for col in table.primary_key])
2367
2368 def get(lastrowid, parameters):
2369 """given cursor.lastrowid value and the parameters used for INSERT,
2370 return a "row" that represents the primary key, either by
2371 using the "lastrowid" or by extracting values from the parameters
2372 that were sent along with the INSERT.
2373
2374 """
2375 if lastrowid_processor is not None:
2376 lastrowid = lastrowid_processor(lastrowid)
2377
2378 if lastrowid is None:
2379 return row_fn(getter(parameters) for getter, col in getters)
2380 else:
2381 return row_fn(
2382 (
2383 (
2384 autoinc_getter(lastrowid, parameters)
2385 if autoinc_getter is not None
2386 else lastrowid
2387 )
2388 if col is autoinc_col
2389 else getter(parameters)
2390 )
2391 for getter, col in getters
2392 )
2393
2394 return get
2395
2396 @util.memoized_property
2397 @util.preload_module("sqlalchemy.engine.result")
2398 def _inserted_primary_key_from_returning_getter(self):
2399 result = util.preloaded.engine_result
2400
2401 assert self.compile_state is not None
2402 statement = self.compile_state.statement
2403
2404 if TYPE_CHECKING:
2405 assert isinstance(statement, Insert)
2406
2407 param_key_getter = self._within_exec_param_key_getter
2408 table = statement.table
2409
2410 returning = self.implicit_returning
2411 assert returning is not None
2412 ret = {col: idx for idx, col in enumerate(returning)}
2413
2414 getters = cast(
2415 "List[Tuple[Callable[[Any], Any], bool]]",
2416 [
2417 (
2418 (operator.itemgetter(ret[col]), True)
2419 if col in ret
2420 else (
2421 operator.methodcaller(
2422 "get", param_key_getter(col), None
2423 ),
2424 False,
2425 )
2426 )
2427 for col in table.primary_key
2428 ],
2429 )
2430
2431 row_fn = result.result_tuple([col.key for col in table.primary_key])
2432
2433 def get(row, parameters):
2434 return row_fn(
2435 getter(row) if use_row else getter(parameters)
2436 for getter, use_row in getters
2437 )
2438
2439 return get
2440
2441 def default_from(self) -> str:
2442 """Called when a SELECT statement has no froms, and no FROM clause is
2443 to be appended.
2444
2445 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2446 output.
2447
2448 """
2449 return ""
2450
2451 def visit_override_binds(self, override_binds, **kw):
2452 """SQL compile the nested element of an _OverrideBinds with
2453 bindparams swapped out.
2454
2455 The _OverrideBinds is not normally expected to be compiled; it
2456 is meant to be used when an already cached statement is to be used,
2457 the compilation was already performed, and only the bound params should
2458 be swapped in at execution time.
2459
2460 However, there are test cases that exericise this object, and
2461 additionally the ORM subquery loader is known to feed in expressions
2462 which include this construct into new queries (discovered in #11173),
2463 so it has to do the right thing at compile time as well.
2464
2465 """
2466
2467 # get SQL text first
2468 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2469
2470 # for a test compile that is not for caching, change binds after the
2471 # fact. note that we don't try to
2472 # swap the bindparam as we compile, because our element may be
2473 # elsewhere in the statement already (e.g. a subquery or perhaps a
2474 # CTE) and was already visited / compiled. See
2475 # test_relationship_criteria.py ->
2476 # test_selectinload_local_criteria_subquery
2477 for k in override_binds.translate:
2478 if k not in self.binds:
2479 continue
2480 bp = self.binds[k]
2481
2482 # so this would work, just change the value of bp in place.
2483 # but we dont want to mutate things outside.
2484 # bp.value = override_binds.translate[bp.key]
2485 # continue
2486
2487 # instead, need to replace bp with new_bp or otherwise accommodate
2488 # in all internal collections
2489 new_bp = bp._with_value(
2490 override_binds.translate[bp.key],
2491 maintain_key=True,
2492 required=False,
2493 )
2494
2495 name = self.bind_names[bp]
2496 self.binds[k] = self.binds[name] = new_bp
2497 self.bind_names[new_bp] = name
2498 self.bind_names.pop(bp, None)
2499
2500 if bp in self.post_compile_params:
2501 self.post_compile_params |= {new_bp}
2502 if bp in self.literal_execute_params:
2503 self.literal_execute_params |= {new_bp}
2504
2505 ckbm_tuple = self._cache_key_bind_match
2506 if ckbm_tuple:
2507 ckbm, cksm = ckbm_tuple
2508 for bp in bp._cloned_set:
2509 if bp.key in cksm:
2510 cb = cksm[bp.key]
2511 ckbm[cb].append(new_bp)
2512
2513 return sqltext
2514
2515 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2516 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2517
2518 def visit_select_statement_grouping(self, grouping, **kwargs):
2519 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2520
2521 def visit_label_reference(
2522 self, element, within_columns_clause=False, **kwargs
2523 ):
2524 if self.stack and self.dialect.supports_simple_order_by_label:
2525 try:
2526 compile_state = cast(
2527 "Union[SelectState, CompoundSelectState]",
2528 self.stack[-1]["compile_state"],
2529 )
2530 except KeyError as ke:
2531 raise exc.CompileError(
2532 "Can't resolve label reference for ORDER BY / "
2533 "GROUP BY / DISTINCT etc."
2534 ) from ke
2535
2536 (
2537 with_cols,
2538 only_froms,
2539 only_cols,
2540 ) = compile_state._label_resolve_dict
2541 if within_columns_clause:
2542 resolve_dict = only_froms
2543 else:
2544 resolve_dict = only_cols
2545
2546 # this can be None in the case that a _label_reference()
2547 # were subject to a replacement operation, in which case
2548 # the replacement of the Label element may have changed
2549 # to something else like a ColumnClause expression.
2550 order_by_elem = element.element._order_by_label_element
2551
2552 if (
2553 order_by_elem is not None
2554 and order_by_elem.name in resolve_dict
2555 and order_by_elem.shares_lineage(
2556 resolve_dict[order_by_elem.name]
2557 )
2558 ):
2559 kwargs["render_label_as_label"] = (
2560 element.element._order_by_label_element
2561 )
2562 return self.process(
2563 element.element,
2564 within_columns_clause=within_columns_clause,
2565 **kwargs,
2566 )
2567
2568 def visit_textual_label_reference(
2569 self, element, within_columns_clause=False, **kwargs
2570 ):
2571 if not self.stack:
2572 # compiling the element outside of the context of a SELECT
2573 return self.process(element._text_clause)
2574
2575 try:
2576 compile_state = cast(
2577 "Union[SelectState, CompoundSelectState]",
2578 self.stack[-1]["compile_state"],
2579 )
2580 except KeyError as ke:
2581 coercions._no_text_coercion(
2582 element.element,
2583 extra=(
2584 "Can't resolve label reference for ORDER BY / "
2585 "GROUP BY / DISTINCT etc."
2586 ),
2587 exc_cls=exc.CompileError,
2588 err=ke,
2589 )
2590
2591 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2592 try:
2593 if within_columns_clause:
2594 col = only_froms[element.element]
2595 else:
2596 col = with_cols[element.element]
2597 except KeyError as err:
2598 coercions._no_text_coercion(
2599 element.element,
2600 extra=(
2601 "Can't resolve label reference for ORDER BY / "
2602 "GROUP BY / DISTINCT etc."
2603 ),
2604 exc_cls=exc.CompileError,
2605 err=err,
2606 )
2607 else:
2608 kwargs["render_label_as_label"] = col
2609 return self.process(
2610 col, within_columns_clause=within_columns_clause, **kwargs
2611 )
2612
2613 def visit_label(
2614 self,
2615 label,
2616 add_to_result_map=None,
2617 within_label_clause=False,
2618 within_columns_clause=False,
2619 render_label_as_label=None,
2620 result_map_targets=(),
2621 within_tstring=False,
2622 **kw,
2623 ):
2624 if within_tstring:
2625 raise exc.CompileError(
2626 "Using label() directly inside tstring is not supported "
2627 "as it is ambiguous how the label expression should be "
2628 "rendered without knowledge of how it's being used in SQL"
2629 )
2630 # only render labels within the columns clause
2631 # or ORDER BY clause of a select. dialect-specific compilers
2632 # can modify this behavior.
2633 render_label_with_as = (
2634 within_columns_clause and not within_label_clause
2635 )
2636 render_label_only = render_label_as_label is label
2637
2638 if render_label_only or render_label_with_as:
2639 if isinstance(label.name, elements._truncated_label):
2640 labelname = self._truncated_identifier("colident", label.name)
2641 else:
2642 labelname = label.name
2643
2644 if render_label_with_as:
2645 if add_to_result_map is not None:
2646 add_to_result_map(
2647 labelname,
2648 label.name,
2649 (label, labelname) + label._alt_names + result_map_targets,
2650 label.type,
2651 )
2652 return (
2653 label.element._compiler_dispatch(
2654 self,
2655 within_columns_clause=True,
2656 within_label_clause=True,
2657 **kw,
2658 )
2659 + OPERATORS[operators.as_]
2660 + self.preparer.format_label(label, labelname)
2661 )
2662 elif render_label_only:
2663 return self.preparer.format_label(label, labelname)
2664 else:
2665 return label.element._compiler_dispatch(
2666 self, within_columns_clause=False, **kw
2667 )
2668
2669 def _fallback_column_name(self, column):
2670 raise exc.CompileError(
2671 "Cannot compile Column object until its 'name' is assigned."
2672 )
2673
2674 def visit_lambda_element(self, element, **kw):
2675 sql_element = element._resolved
2676 return self.process(sql_element, **kw)
2677
2678 def visit_column(
2679 self,
2680 column: ColumnClause[Any],
2681 add_to_result_map: Optional[_ResultMapAppender] = None,
2682 include_table: bool = True,
2683 result_map_targets: Tuple[Any, ...] = (),
2684 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2685 **kwargs: Any,
2686 ) -> str:
2687 name = orig_name = column.name
2688 if name is None:
2689 name = self._fallback_column_name(column)
2690
2691 is_literal = column.is_literal
2692 if not is_literal and isinstance(name, elements._truncated_label):
2693 name = self._truncated_identifier("colident", name)
2694
2695 if add_to_result_map is not None:
2696 targets = (column, name, column.key) + result_map_targets
2697 if column._tq_label:
2698 targets += (column._tq_label,)
2699
2700 add_to_result_map(name, orig_name, targets, column.type)
2701
2702 if is_literal:
2703 # note we are not currently accommodating for
2704 # literal_column(quoted_name('ident', True)) here
2705 name = self.escape_literal_column(name)
2706 else:
2707 name = self.preparer.quote(name)
2708 table = column.table
2709 if table is None or not include_table or not table.named_with_column:
2710 return name
2711 else:
2712 effective_schema = self.preparer.schema_for_object(table)
2713
2714 if effective_schema:
2715 schema_prefix = (
2716 self.preparer.quote_schema(effective_schema) + "."
2717 )
2718 else:
2719 schema_prefix = ""
2720
2721 if TYPE_CHECKING:
2722 assert isinstance(table, NamedFromClause)
2723 tablename = table.name
2724
2725 if (
2726 not effective_schema
2727 and ambiguous_table_name_map
2728 and tablename in ambiguous_table_name_map
2729 ):
2730 tablename = ambiguous_table_name_map[tablename]
2731
2732 if isinstance(tablename, elements._truncated_label):
2733 tablename = self._truncated_identifier("alias", tablename)
2734
2735 return schema_prefix + self.preparer.quote(tablename) + "." + name
2736
2737 def visit_collation(self, element, **kw):
2738 return self.preparer.format_collation(element.collation)
2739
2740 def visit_fromclause(self, fromclause, **kwargs):
2741 return fromclause.name
2742
2743 def visit_index(self, index, **kwargs):
2744 return index.name
2745
2746 def visit_typeclause(self, typeclause, **kw):
2747 kw["type_expression"] = typeclause
2748 kw["identifier_preparer"] = self.preparer
2749 return self.dialect.type_compiler_instance.process(
2750 typeclause.type, **kw
2751 )
2752
2753 def post_process_text(self, text):
2754 if self.preparer._double_percents:
2755 text = text.replace("%", "%%")
2756 return text
2757
2758 def escape_literal_column(self, text):
2759 if self.preparer._double_percents:
2760 text = text.replace("%", "%%")
2761 return text
2762
2763 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2764 if self._collect_params:
2765 self._add_to_params(textclause)
2766
2767 def do_bindparam(m):
2768 name = m.group(1)
2769 if name in textclause._bindparams:
2770 return self.process(textclause._bindparams[name], **kw)
2771 else:
2772 return self.bindparam_string(name, **kw)
2773
2774 if not self.stack:
2775 self.isplaintext = True
2776
2777 if add_to_result_map:
2778 # text() object is present in the columns clause of a
2779 # select(). Add a no-name entry to the result map so that
2780 # row[text()] produces a result
2781 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2782
2783 # un-escape any \:params
2784 return BIND_PARAMS_ESC.sub(
2785 lambda m: m.group(1),
2786 BIND_PARAMS.sub(
2787 do_bindparam, self.post_process_text(textclause.text)
2788 ),
2789 )
2790
2791 def visit_tstring(self, tstring, add_to_result_map=None, **kw):
2792 if self._collect_params:
2793 self._add_to_params(tstring)
2794
2795 if not self.stack:
2796 self.isplaintext = True
2797
2798 if add_to_result_map:
2799 # tstring() object is present in the columns clause of a
2800 # select(). Add a no-name entry to the result map so that
2801 # row[tstring()] produces a result
2802 add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
2803
2804 # Process each part and concatenate
2805 kw["within_tstring"] = True
2806 return "".join(self.process(part, **kw) for part in tstring.parts)
2807
2808 def visit_textual_select(
2809 self, taf, compound_index=None, asfrom=False, **kw
2810 ):
2811 if self._collect_params:
2812 self._add_to_params(taf)
2813 toplevel = not self.stack
2814 entry = self._default_stack_entry if toplevel else self.stack[-1]
2815
2816 new_entry: _CompilerStackEntry = {
2817 "correlate_froms": set(),
2818 "asfrom_froms": set(),
2819 "selectable": taf,
2820 }
2821 self.stack.append(new_entry)
2822
2823 if taf._independent_ctes:
2824 self._dispatch_independent_ctes(taf, kw)
2825
2826 populate_result_map = (
2827 toplevel
2828 or (
2829 compound_index == 0
2830 and entry.get("need_result_map_for_compound", False)
2831 )
2832 or entry.get("need_result_map_for_nested", False)
2833 )
2834
2835 if populate_result_map:
2836 self._ordered_columns = self._textual_ordered_columns = (
2837 taf.positional
2838 )
2839
2840 # enable looser result column matching when the SQL text links to
2841 # Column objects by name only
2842 self._loose_column_name_matching = not taf.positional and bool(
2843 taf.column_args
2844 )
2845
2846 for c in taf.column_args:
2847 self.process(
2848 c,
2849 within_columns_clause=True,
2850 add_to_result_map=self._add_to_result_map,
2851 )
2852
2853 text = self.process(taf.element, **kw)
2854 if self.ctes:
2855 nesting_level = len(self.stack) if not toplevel else None
2856 text = self._render_cte_clause(nesting_level=nesting_level) + text
2857
2858 self.stack.pop(-1)
2859
2860 return text
2861
2862 def visit_null(self, expr: Null, **kw: Any) -> str:
2863 return "NULL"
2864
2865 def visit_true(self, expr: True_, **kw: Any) -> str:
2866 if self.dialect.supports_native_boolean:
2867 return "true"
2868 else:
2869 return "1"
2870
2871 def visit_false(self, expr: False_, **kw: Any) -> str:
2872 if self.dialect.supports_native_boolean:
2873 return "false"
2874 else:
2875 return "0"
2876
2877 def _generate_delimited_list(self, elements, separator, **kw):
2878 return separator.join(
2879 s
2880 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2881 if s
2882 )
2883
2884 def _generate_delimited_and_list(self, clauses, **kw):
2885 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2886 operators.and_,
2887 elements.True_._singleton,
2888 elements.False_._singleton,
2889 clauses,
2890 )
2891 if lcc == 1:
2892 return clauses[0]._compiler_dispatch(self, **kw)
2893 else:
2894 separator = OPERATORS[operators.and_]
2895 return separator.join(
2896 s
2897 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2898 if s
2899 )
2900
2901 def visit_tuple(self, clauselist, **kw):
2902 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2903
2904 def visit_element_list(self, element, **kw):
2905 return self._generate_delimited_list(element.clauses, " ", **kw)
2906
2907 def visit_order_by_list(self, element, **kw):
2908 return self._generate_delimited_list(element.clauses, ", ", **kw)
2909
2910 def visit_clauselist(self, clauselist, **kw):
2911 sep = clauselist.operator
2912 if sep is None:
2913 sep = " "
2914 else:
2915 sep = OPERATORS[clauselist.operator]
2916
2917 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2918
2919 def visit_expression_clauselist(self, clauselist, **kw):
2920 operator_ = clauselist.operator
2921
2922 disp = self._get_operator_dispatch(
2923 operator_, "expression_clauselist", None
2924 )
2925 if disp:
2926 return disp(clauselist, operator_, **kw)
2927
2928 try:
2929 opstring = OPERATORS[operator_]
2930 except KeyError as err:
2931 raise exc.UnsupportedCompilationError(self, operator_) from err
2932 else:
2933 kw["_in_operator_expression"] = True
2934 return self._generate_delimited_list(
2935 clauselist.clauses, opstring, **kw
2936 )
2937
2938 def visit_case(self, clause, **kwargs):
2939 x = "CASE "
2940 if clause.value is not None:
2941 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2942 for cond, result in clause.whens:
2943 x += (
2944 "WHEN "
2945 + cond._compiler_dispatch(self, **kwargs)
2946 + " THEN "
2947 + result._compiler_dispatch(self, **kwargs)
2948 + " "
2949 )
2950 if clause.else_ is not None:
2951 x += (
2952 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2953 )
2954 x += "END"
2955 return x
2956
2957 def visit_type_coerce(self, type_coerce, **kw):
2958 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2959
2960 def visit_cast(self, cast, **kwargs):
2961 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2962 match = re.match("(.*)( COLLATE .*)", type_clause)
2963 return "CAST(%s AS %s)%s" % (
2964 cast.clause._compiler_dispatch(self, **kwargs),
2965 match.group(1) if match else type_clause,
2966 match.group(2) if match else "",
2967 )
2968
2969 def visit_frame_clause(self, frameclause, **kw):
2970
2971 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2972 left = "UNBOUNDED PRECEDING"
2973 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2974 left = "CURRENT ROW"
2975 else:
2976 val = self.process(frameclause.lower_bind, **kw)
2977 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2978 left = f"{val} PRECEDING"
2979 else:
2980 left = f"{val} FOLLOWING"
2981
2982 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2983 right = "UNBOUNDED FOLLOWING"
2984 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2985 right = "CURRENT ROW"
2986 else:
2987 val = self.process(frameclause.upper_bind, **kw)
2988 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2989 right = f"{val} PRECEDING"
2990 else:
2991 right = f"{val} FOLLOWING"
2992
2993 return f"{left} AND {right}"
2994
2995 def visit_over(self, over, **kwargs):
2996 text = over.element._compiler_dispatch(self, **kwargs)
2997 if over.range_ is not None:
2998 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2999 elif over.rows is not None:
3000 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
3001 elif over.groups is not None:
3002 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
3003 else:
3004 range_ = None
3005
3006 if range_ is not None and over.exclude is not None:
3007 range_ += " EXCLUDE " + self.preparer.validate_sql_phrase(
3008 over.exclude, _WINDOW_EXCLUDE_RE
3009 )
3010
3011 return "%s OVER (%s)" % (
3012 text,
3013 " ".join(
3014 [
3015 "%s BY %s"
3016 % (word, clause._compiler_dispatch(self, **kwargs))
3017 for word, clause in (
3018 ("PARTITION", over.partition_by),
3019 ("ORDER", over.order_by),
3020 )
3021 if clause is not None and len(clause)
3022 ]
3023 + ([range_] if range_ else [])
3024 ),
3025 )
3026
3027 def visit_withingroup(self, withingroup, **kwargs):
3028 return "%s WITHIN GROUP (ORDER BY %s)" % (
3029 withingroup.element._compiler_dispatch(self, **kwargs),
3030 withingroup.order_by._compiler_dispatch(self, **kwargs),
3031 )
3032
3033 def visit_funcfilter(self, funcfilter, **kwargs):
3034 return "%s FILTER (WHERE %s)" % (
3035 funcfilter.func._compiler_dispatch(self, **kwargs),
3036 funcfilter.criterion._compiler_dispatch(self, **kwargs),
3037 )
3038
3039 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
3040 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
3041 raise exc.CompileError(
3042 "this dialect does not support "
3043 "ORDER BY within an aggregate function"
3044 )
3045 elif (
3046 self.dialect.aggregate_order_by_style
3047 is AggregateOrderByStyle.INLINE
3048 ):
3049 new_fn = aggregateorderby.element._clone()
3050 new_fn.clause_expr = elements.Grouping(
3051 aggregate_orderby_inline(
3052 new_fn.clause_expr.element, aggregateorderby.order_by
3053 )
3054 )
3055
3056 return new_fn._compiler_dispatch(self, **kwargs)
3057 else:
3058 return self.visit_withingroup(aggregateorderby, **kwargs)
3059
3060 def visit_aggregate_orderby_inline(self, element, **kw):
3061 return "%s ORDER BY %s" % (
3062 self.process(element.element, **kw),
3063 self.process(element.aggregate_order_by, **kw),
3064 )
3065
3066 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3067 # aggreagate_order_by attribute is present if visit_function
3068 # gave us a Function with aggregate_orderby_inline() as the inner
3069 # contents
3070 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3071
3072 literal_exec = dict(kw)
3073 literal_exec["literal_execute"] = True
3074
3075 # break up the function into its components so we can apply
3076 # literal_execute to the second argument (the delimiter)
3077 cl = list(fn.clauses)
3078 expr, delimiter = cl[0:2]
3079 if (
3080 order_by is not None
3081 and self.dialect.aggregate_order_by_style
3082 is AggregateOrderByStyle.INLINE
3083 ):
3084 return (
3085 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3086 f"{delimiter._compiler_dispatch(self, **literal_exec)} "
3087 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3088 )
3089 else:
3090 return (
3091 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3092 f"{delimiter._compiler_dispatch(self, **literal_exec)})"
3093 )
3094
3095 def visit_extract(self, extract, **kwargs):
3096 field = self.extract_map.get(extract.field, extract.field)
3097 return "EXTRACT(%s FROM %s)" % (
3098 field,
3099 extract.expr._compiler_dispatch(self, **kwargs),
3100 )
3101
3102 def visit_scalar_function_column(self, element, **kw):
3103 compiled_fn = self.visit_function(element.fn, **kw)
3104 compiled_col = self.visit_column(element, **kw)
3105 return "(%s).%s" % (compiled_fn, compiled_col)
3106
3107 def visit_function(
3108 self,
3109 func: Function[Any],
3110 add_to_result_map: Optional[_ResultMapAppender] = None,
3111 **kwargs: Any,
3112 ) -> str:
3113 if self._collect_params:
3114 self._add_to_params(func)
3115 if add_to_result_map is not None:
3116 add_to_result_map(func.name, func.name, (func.name,), func.type)
3117
3118 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3119
3120 text: str
3121
3122 if disp:
3123 text = disp(func, **kwargs)
3124 else:
3125 name = FUNCTIONS.get(func._deannotate().__class__, None)
3126 if name:
3127 if func._has_args:
3128 name += "%(expr)s"
3129 else:
3130 name = func.name
3131 name = (
3132 self.preparer.quote(name)
3133 if self.preparer._requires_quotes_illegal_chars(name)
3134 or isinstance(name, elements.quoted_name)
3135 else name
3136 )
3137 name = name + "%(expr)s"
3138 text = ".".join(
3139 [
3140 (
3141 self.preparer.quote(tok)
3142 if self.preparer._requires_quotes_illegal_chars(tok)
3143 or isinstance(name, elements.quoted_name)
3144 else tok
3145 )
3146 for tok in func.packagenames
3147 ]
3148 + [name]
3149 ) % {"expr": self.function_argspec(func, **kwargs)}
3150
3151 if func._with_ordinality:
3152 text += " WITH ORDINALITY"
3153 return text
3154
3155 def visit_next_value_func(self, next_value, **kw):
3156 return self.visit_sequence(next_value.sequence)
3157
3158 def visit_sequence(self, sequence, **kw):
3159 raise NotImplementedError(
3160 "Dialect '%s' does not support sequence increments."
3161 % self.dialect.name
3162 )
3163
3164 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3165 return func.clause_expr._compiler_dispatch(self, **kwargs)
3166
3167 def visit_compound_select(
3168 self, cs, asfrom=False, compound_index=None, **kwargs
3169 ):
3170 if self._collect_params:
3171 self._add_to_params(cs)
3172 toplevel = not self.stack
3173
3174 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3175
3176 if toplevel and not self.compile_state:
3177 self.compile_state = compile_state
3178
3179 compound_stmt = compile_state.statement
3180
3181 entry = self._default_stack_entry if toplevel else self.stack[-1]
3182 need_result_map = toplevel or (
3183 not compound_index
3184 and entry.get("need_result_map_for_compound", False)
3185 )
3186
3187 # indicates there is already a CompoundSelect in play
3188 if compound_index == 0:
3189 entry["select_0"] = cs
3190
3191 self.stack.append(
3192 {
3193 "correlate_froms": entry["correlate_froms"],
3194 "asfrom_froms": entry["asfrom_froms"],
3195 "selectable": cs,
3196 "compile_state": compile_state,
3197 "need_result_map_for_compound": need_result_map,
3198 }
3199 )
3200
3201 if compound_stmt._independent_ctes:
3202 self._dispatch_independent_ctes(compound_stmt, kwargs)
3203
3204 keyword = self.compound_keywords[cs.keyword]
3205
3206 text = (" " + keyword + " ").join(
3207 (
3208 c._compiler_dispatch(
3209 self, asfrom=asfrom, compound_index=i, **kwargs
3210 )
3211 for i, c in enumerate(cs.selects)
3212 )
3213 )
3214
3215 kwargs["include_table"] = False
3216 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3217 text += self.order_by_clause(cs, **kwargs)
3218 if cs._has_row_limiting_clause:
3219 text += self._row_limit_clause(cs, **kwargs)
3220
3221 if self.ctes:
3222 nesting_level = len(self.stack) if not toplevel else None
3223 text = (
3224 self._render_cte_clause(
3225 nesting_level=nesting_level,
3226 include_following_stack=True,
3227 )
3228 + text
3229 )
3230
3231 self.stack.pop(-1)
3232 return text
3233
3234 def _row_limit_clause(self, cs, **kwargs):
3235 if cs._fetch_clause is not None:
3236 return self.fetch_clause(cs, **kwargs)
3237 else:
3238 return self.limit_clause(cs, **kwargs)
3239
3240 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3241 attrname = "visit_%s_%s%s" % (
3242 operator_.__name__,
3243 qualifier1,
3244 "_" + qualifier2 if qualifier2 else "",
3245 )
3246 return getattr(self, attrname, None)
3247
3248 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3249 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3250 return getattr(self, attrname, None)
3251
3252 def visit_unary(
3253 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3254 ):
3255 if add_to_result_map is not None:
3256 result_map_targets += (unary,)
3257 kw["add_to_result_map"] = add_to_result_map
3258 kw["result_map_targets"] = result_map_targets
3259
3260 if unary.operator:
3261 if unary.modifier:
3262 raise exc.CompileError(
3263 "Unary expression does not support operator "
3264 "and modifier simultaneously"
3265 )
3266 disp = self._get_operator_dispatch(
3267 unary.operator, "unary", "operator"
3268 )
3269 if disp:
3270 return disp(unary, unary.operator, **kw)
3271 else:
3272 return self._generate_generic_unary_operator(
3273 unary, OPERATORS[unary.operator], **kw
3274 )
3275 elif unary.modifier:
3276 disp = self._get_operator_dispatch(
3277 unary.modifier, "unary", "modifier"
3278 )
3279 if disp:
3280 return disp(unary, unary.modifier, **kw)
3281 else:
3282 return self._generate_generic_unary_modifier(
3283 unary, OPERATORS[unary.modifier], **kw
3284 )
3285 else:
3286 raise exc.CompileError(
3287 "Unary expression has no operator or modifier"
3288 )
3289
3290 def visit_truediv_binary(self, binary, operator, **kw):
3291 if self.dialect.div_is_floordiv:
3292 return (
3293 self.process(binary.left, **kw)
3294 + " / "
3295 # TODO: would need a fast cast again here,
3296 # unless we want to use an implicit cast like "+ 0.0"
3297 + self.process(
3298 elements.Cast(
3299 binary.right,
3300 (
3301 binary.right.type
3302 if binary.right.type._type_affinity
3303 in (sqltypes.Numeric, sqltypes.Float)
3304 else sqltypes.Numeric()
3305 ),
3306 ),
3307 **kw,
3308 )
3309 )
3310 else:
3311 return (
3312 self.process(binary.left, **kw)
3313 + " / "
3314 + self.process(binary.right, **kw)
3315 )
3316
3317 def visit_floordiv_binary(self, binary, operator, **kw):
3318 if (
3319 self.dialect.div_is_floordiv
3320 and binary.right.type._type_affinity is sqltypes.Integer
3321 ):
3322 return (
3323 self.process(binary.left, **kw)
3324 + " / "
3325 + self.process(binary.right, **kw)
3326 )
3327 else:
3328 return "FLOOR(%s)" % (
3329 self.process(binary.left, **kw)
3330 + " / "
3331 + self.process(binary.right, **kw)
3332 )
3333
3334 def visit_is_true_unary_operator(self, element, operator, **kw):
3335 if (
3336 element._is_implicitly_boolean
3337 or self.dialect.supports_native_boolean
3338 ):
3339 return self.process(element.element, **kw)
3340 else:
3341 return "%s = 1" % self.process(element.element, **kw)
3342
3343 def visit_is_false_unary_operator(self, element, operator, **kw):
3344 if (
3345 element._is_implicitly_boolean
3346 or self.dialect.supports_native_boolean
3347 ):
3348 return "NOT %s" % self.process(element.element, **kw)
3349 else:
3350 return "%s = 0" % self.process(element.element, **kw)
3351
3352 def visit_not_match_op_binary(self, binary, operator, **kw):
3353 return "NOT %s" % self.visit_binary(
3354 binary, override_operator=operators.match_op
3355 )
3356
3357 def visit_not_in_op_binary(self, binary, operator, **kw):
3358 # The brackets are required in the NOT IN operation because the empty
3359 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3360 # The presence of the OR makes the brackets required.
3361 return "(%s)" % self._generate_generic_binary(
3362 binary, OPERATORS[operator], **kw
3363 )
3364
3365 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3366 if expand_op is operators.not_in_op:
3367 if len(type_) > 1:
3368 return "(%s)) OR (1 = 1" % (
3369 ", ".join("NULL" for element in type_)
3370 )
3371 else:
3372 return "NULL) OR (1 = 1"
3373 elif expand_op is operators.in_op:
3374 if len(type_) > 1:
3375 return "(%s)) AND (1 != 1" % (
3376 ", ".join("NULL" for element in type_)
3377 )
3378 else:
3379 return "NULL) AND (1 != 1"
3380 else:
3381 return self.visit_empty_set_expr(type_)
3382
3383 def visit_empty_set_expr(self, element_types, **kw):
3384 raise NotImplementedError(
3385 "Dialect '%s' does not support empty set expression."
3386 % self.dialect.name
3387 )
3388
3389 def _literal_execute_expanding_parameter_literal_binds(
3390 self, parameter, values, bind_expression_template=None
3391 ):
3392 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3393
3394 if not values:
3395 # empty IN expression. note we don't need to use
3396 # bind_expression_template here because there are no
3397 # expressions to render.
3398
3399 if typ_dialect_impl._is_tuple_type:
3400 replacement_expression = (
3401 "VALUES " if self.dialect.tuple_in_values else ""
3402 ) + self.visit_empty_set_op_expr(
3403 parameter.type.types, parameter.expand_op
3404 )
3405
3406 else:
3407 replacement_expression = self.visit_empty_set_op_expr(
3408 [parameter.type], parameter.expand_op
3409 )
3410
3411 elif typ_dialect_impl._is_tuple_type or (
3412 typ_dialect_impl._isnull
3413 and isinstance(values[0], collections_abc.Sequence)
3414 and not isinstance(values[0], (str, bytes))
3415 ):
3416 if typ_dialect_impl._has_bind_expression:
3417 raise NotImplementedError(
3418 "bind_expression() on TupleType not supported with "
3419 "literal_binds"
3420 )
3421
3422 replacement_expression = (
3423 "VALUES " if self.dialect.tuple_in_values else ""
3424 ) + ", ".join(
3425 "(%s)"
3426 % (
3427 ", ".join(
3428 self.render_literal_value(value, param_type)
3429 for value, param_type in zip(
3430 tuple_element, parameter.type.types
3431 )
3432 )
3433 )
3434 for i, tuple_element in enumerate(values)
3435 )
3436 else:
3437 if bind_expression_template:
3438 post_compile_pattern = self._post_compile_pattern
3439 m = post_compile_pattern.search(bind_expression_template)
3440 assert m and m.group(
3441 2
3442 ), "unexpected format for expanding parameter"
3443
3444 tok = m.group(2).split("~~")
3445 be_left, be_right = tok[1], tok[3]
3446 replacement_expression = ", ".join(
3447 "%s%s%s"
3448 % (
3449 be_left,
3450 self.render_literal_value(value, parameter.type),
3451 be_right,
3452 )
3453 for value in values
3454 )
3455 else:
3456 replacement_expression = ", ".join(
3457 self.render_literal_value(value, parameter.type)
3458 for value in values
3459 )
3460
3461 return (), replacement_expression
3462
3463 def _literal_execute_expanding_parameter(self, name, parameter, values):
3464 if parameter.literal_execute:
3465 return self._literal_execute_expanding_parameter_literal_binds(
3466 parameter, values
3467 )
3468
3469 dialect = self.dialect
3470 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3471
3472 if self._numeric_binds:
3473 bind_template = self.compilation_bindtemplate
3474 else:
3475 bind_template = self.bindtemplate
3476
3477 if (
3478 self.dialect._bind_typing_render_casts
3479 and typ_dialect_impl.render_bind_cast
3480 ):
3481
3482 def _render_bindtemplate(name):
3483 return self.render_bind_cast(
3484 parameter.type,
3485 typ_dialect_impl,
3486 bind_template % {"name": name},
3487 )
3488
3489 else:
3490
3491 def _render_bindtemplate(name):
3492 return bind_template % {"name": name}
3493
3494 if not values:
3495 to_update = []
3496 if typ_dialect_impl._is_tuple_type:
3497 replacement_expression = self.visit_empty_set_op_expr(
3498 parameter.type.types, parameter.expand_op
3499 )
3500 else:
3501 replacement_expression = self.visit_empty_set_op_expr(
3502 [parameter.type], parameter.expand_op
3503 )
3504
3505 elif typ_dialect_impl._is_tuple_type or (
3506 typ_dialect_impl._isnull
3507 and isinstance(values[0], collections_abc.Sequence)
3508 and not isinstance(values[0], (str, bytes))
3509 ):
3510 assert not typ_dialect_impl._is_array
3511 to_update = [
3512 ("%s_%s_%s" % (name, i, j), value)
3513 for i, tuple_element in enumerate(values, 1)
3514 for j, value in enumerate(tuple_element, 1)
3515 ]
3516
3517 replacement_expression = (
3518 "VALUES " if dialect.tuple_in_values else ""
3519 ) + ", ".join(
3520 "(%s)"
3521 % (
3522 ", ".join(
3523 _render_bindtemplate(
3524 to_update[i * len(tuple_element) + j][0]
3525 )
3526 for j, value in enumerate(tuple_element)
3527 )
3528 )
3529 for i, tuple_element in enumerate(values)
3530 )
3531 else:
3532 to_update = [
3533 ("%s_%s" % (name, i), value)
3534 for i, value in enumerate(values, 1)
3535 ]
3536 replacement_expression = ", ".join(
3537 _render_bindtemplate(key) for key, value in to_update
3538 )
3539
3540 return to_update, replacement_expression
3541
3542 def visit_binary(
3543 self,
3544 binary,
3545 override_operator=None,
3546 eager_grouping=False,
3547 from_linter=None,
3548 lateral_from_linter=None,
3549 **kw,
3550 ):
3551 if from_linter and operators.is_comparison(binary.operator):
3552 if lateral_from_linter is not None:
3553 enclosing_lateral = kw["enclosing_lateral"]
3554 lateral_from_linter.edges.update(
3555 itertools.product(
3556 _de_clone(
3557 binary.left._from_objects + [enclosing_lateral]
3558 ),
3559 _de_clone(
3560 binary.right._from_objects + [enclosing_lateral]
3561 ),
3562 )
3563 )
3564 else:
3565 from_linter.edges.update(
3566 itertools.product(
3567 _de_clone(binary.left._from_objects),
3568 _de_clone(binary.right._from_objects),
3569 )
3570 )
3571
3572 # don't allow "? = ?" to render
3573 if (
3574 self.ansi_bind_rules
3575 and isinstance(binary.left, elements.BindParameter)
3576 and isinstance(binary.right, elements.BindParameter)
3577 ):
3578 kw["literal_execute"] = True
3579
3580 operator_ = override_operator or binary.operator
3581 disp = self._get_operator_dispatch(operator_, "binary", None)
3582 if disp:
3583 return disp(binary, operator_, **kw)
3584 else:
3585 try:
3586 opstring = OPERATORS[operator_]
3587 except KeyError as err:
3588 raise exc.UnsupportedCompilationError(self, operator_) from err
3589 else:
3590 return self._generate_generic_binary(
3591 binary,
3592 opstring,
3593 from_linter=from_linter,
3594 lateral_from_linter=lateral_from_linter,
3595 **kw,
3596 )
3597
3598 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3599 return self.process(element.sql_function, **kw)
3600
3601 def visit_mod_binary(self, binary, operator, **kw):
3602 if self.preparer._double_percents:
3603 return (
3604 self.process(binary.left, **kw)
3605 + " %% "
3606 + self.process(binary.right, **kw)
3607 )
3608 else:
3609 return (
3610 self.process(binary.left, **kw)
3611 + " % "
3612 + self.process(binary.right, **kw)
3613 )
3614
3615 def visit_custom_op_binary(self, element, operator, **kw):
3616 if operator.visit_name:
3617 disp = self._get_custom_operator_dispatch(operator, "binary")
3618 if disp:
3619 return disp(element, operator, **kw)
3620
3621 kw["eager_grouping"] = operator.eager_grouping
3622 return self._generate_generic_binary(
3623 element,
3624 " " + self.escape_literal_column(operator.opstring) + " ",
3625 **kw,
3626 )
3627
3628 def visit_custom_op_unary_operator(self, element, operator, **kw):
3629 if operator.visit_name:
3630 disp = self._get_custom_operator_dispatch(operator, "unary")
3631 if disp:
3632 return disp(element, operator, **kw)
3633
3634 return self._generate_generic_unary_operator(
3635 element, self.escape_literal_column(operator.opstring) + " ", **kw
3636 )
3637
3638 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3639 if operator.visit_name:
3640 disp = self._get_custom_operator_dispatch(operator, "unary")
3641 if disp:
3642 return disp(element, operator, **kw)
3643
3644 return self._generate_generic_unary_modifier(
3645 element, " " + self.escape_literal_column(operator.opstring), **kw
3646 )
3647
3648 def _generate_generic_binary(
3649 self,
3650 binary: BinaryExpression[Any],
3651 opstring: str,
3652 eager_grouping: bool = False,
3653 **kw: Any,
3654 ) -> str:
3655 _in_operator_expression = kw.get("_in_operator_expression", False)
3656
3657 kw["_in_operator_expression"] = True
3658 kw["_binary_op"] = binary.operator
3659 text = (
3660 binary.left._compiler_dispatch(
3661 self, eager_grouping=eager_grouping, **kw
3662 )
3663 + opstring
3664 + binary.right._compiler_dispatch(
3665 self, eager_grouping=eager_grouping, **kw
3666 )
3667 )
3668
3669 if _in_operator_expression and eager_grouping:
3670 text = "(%s)" % text
3671 return text
3672
3673 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3674 return opstring + unary.element._compiler_dispatch(self, **kw)
3675
3676 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3677 return unary.element._compiler_dispatch(self, **kw) + opstring
3678
3679 @util.memoized_property
3680 def _like_percent_literal(self):
3681 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3682
3683 def visit_ilike_case_insensitive_operand(self, element, **kw):
3684 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3685
3686 def visit_contains_op_binary(self, binary, operator, **kw):
3687 binary = binary._clone()
3688 percent = self._like_percent_literal
3689 binary.right = percent.concat(binary.right).concat(percent)
3690 return self.visit_like_op_binary(binary, operator, **kw)
3691
3692 def visit_not_contains_op_binary(self, binary, operator, **kw):
3693 binary = binary._clone()
3694 percent = self._like_percent_literal
3695 binary.right = percent.concat(binary.right).concat(percent)
3696 return self.visit_not_like_op_binary(binary, operator, **kw)
3697
3698 def visit_icontains_op_binary(self, binary, operator, **kw):
3699 binary = binary._clone()
3700 percent = self._like_percent_literal
3701 binary.left = ilike_case_insensitive(binary.left)
3702 binary.right = percent.concat(
3703 ilike_case_insensitive(binary.right)
3704 ).concat(percent)
3705 return self.visit_ilike_op_binary(binary, operator, **kw)
3706
3707 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3708 binary = binary._clone()
3709 percent = self._like_percent_literal
3710 binary.left = ilike_case_insensitive(binary.left)
3711 binary.right = percent.concat(
3712 ilike_case_insensitive(binary.right)
3713 ).concat(percent)
3714 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3715
3716 def visit_startswith_op_binary(self, binary, operator, **kw):
3717 binary = binary._clone()
3718 percent = self._like_percent_literal
3719 binary.right = percent._rconcat(binary.right)
3720 return self.visit_like_op_binary(binary, operator, **kw)
3721
3722 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3723 binary = binary._clone()
3724 percent = self._like_percent_literal
3725 binary.right = percent._rconcat(binary.right)
3726 return self.visit_not_like_op_binary(binary, operator, **kw)
3727
3728 def visit_istartswith_op_binary(self, binary, operator, **kw):
3729 binary = binary._clone()
3730 percent = self._like_percent_literal
3731 binary.left = ilike_case_insensitive(binary.left)
3732 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3733 return self.visit_ilike_op_binary(binary, operator, **kw)
3734
3735 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3736 binary = binary._clone()
3737 percent = self._like_percent_literal
3738 binary.left = ilike_case_insensitive(binary.left)
3739 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3740 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3741
3742 def visit_endswith_op_binary(self, binary, operator, **kw):
3743 binary = binary._clone()
3744 percent = self._like_percent_literal
3745 binary.right = percent.concat(binary.right)
3746 return self.visit_like_op_binary(binary, operator, **kw)
3747
3748 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3749 binary = binary._clone()
3750 percent = self._like_percent_literal
3751 binary.right = percent.concat(binary.right)
3752 return self.visit_not_like_op_binary(binary, operator, **kw)
3753
3754 def visit_iendswith_op_binary(self, binary, operator, **kw):
3755 binary = binary._clone()
3756 percent = self._like_percent_literal
3757 binary.left = ilike_case_insensitive(binary.left)
3758 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3759 return self.visit_ilike_op_binary(binary, operator, **kw)
3760
3761 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3762 binary = binary._clone()
3763 percent = self._like_percent_literal
3764 binary.left = ilike_case_insensitive(binary.left)
3765 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3766 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3767
3768 def visit_like_op_binary(self, binary, operator, **kw):
3769 escape = binary.modifiers.get("escape", None)
3770
3771 return "%s LIKE %s" % (
3772 binary.left._compiler_dispatch(self, **kw),
3773 binary.right._compiler_dispatch(self, **kw),
3774 ) + (
3775 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3776 if escape is not None
3777 else ""
3778 )
3779
3780 def visit_not_like_op_binary(self, binary, operator, **kw):
3781 escape = binary.modifiers.get("escape", None)
3782 return "%s NOT LIKE %s" % (
3783 binary.left._compiler_dispatch(self, **kw),
3784 binary.right._compiler_dispatch(self, **kw),
3785 ) + (
3786 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3787 if escape is not None
3788 else ""
3789 )
3790
3791 def visit_ilike_op_binary(self, binary, operator, **kw):
3792 if operator is operators.ilike_op:
3793 binary = binary._clone()
3794 binary.left = ilike_case_insensitive(binary.left)
3795 binary.right = ilike_case_insensitive(binary.right)
3796 # else we assume ilower() has been applied
3797
3798 return self.visit_like_op_binary(binary, operator, **kw)
3799
3800 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3801 if operator is operators.not_ilike_op:
3802 binary = binary._clone()
3803 binary.left = ilike_case_insensitive(binary.left)
3804 binary.right = ilike_case_insensitive(binary.right)
3805 # else we assume ilower() has been applied
3806
3807 return self.visit_not_like_op_binary(binary, operator, **kw)
3808
3809 def visit_between_op_binary(self, binary, operator, **kw):
3810 symmetric = binary.modifiers.get("symmetric", False)
3811 return self._generate_generic_binary(
3812 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3813 )
3814
3815 def visit_not_between_op_binary(self, binary, operator, **kw):
3816 symmetric = binary.modifiers.get("symmetric", False)
3817 return self._generate_generic_binary(
3818 binary,
3819 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3820 **kw,
3821 )
3822
3823 def visit_regexp_match_op_binary(
3824 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3825 ) -> str:
3826 raise exc.CompileError(
3827 "%s dialect does not support regular expressions"
3828 % self.dialect.name
3829 )
3830
3831 def visit_not_regexp_match_op_binary(
3832 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3833 ) -> str:
3834 raise exc.CompileError(
3835 "%s dialect does not support regular expressions"
3836 % self.dialect.name
3837 )
3838
3839 def visit_regexp_replace_op_binary(
3840 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3841 ) -> str:
3842 raise exc.CompileError(
3843 "%s dialect does not support regular expression replacements"
3844 % self.dialect.name
3845 )
3846
3847 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3848 if bindmarkers is None:
3849 raise exc.CompileError(
3850 "DML target objects may only be used with "
3851 "compiled INSERT or UPDATE statements"
3852 )
3853
3854 bindmarkers[element.column.key] = element
3855 return f"__BINDMARKER_~~{element.column.key}~~"
3856
3857 def visit_bindparam(
3858 self,
3859 bindparam,
3860 within_columns_clause=False,
3861 literal_binds=False,
3862 skip_bind_expression=False,
3863 literal_execute=False,
3864 render_postcompile=False,
3865 is_upsert_set=False,
3866 **kwargs,
3867 ):
3868 # Detect parametrized bindparams in upsert SET clause for issue #13130
3869 if (
3870 is_upsert_set
3871 and bindparam.value is None
3872 and bindparam.callable is None
3873 and self._insertmanyvalues is not None
3874 ):
3875 self._insertmanyvalues = self._insertmanyvalues._replace(
3876 has_upsert_bound_parameters=True
3877 )
3878
3879 if not skip_bind_expression:
3880 impl = bindparam.type.dialect_impl(self.dialect)
3881 if impl._has_bind_expression:
3882 bind_expression = impl.bind_expression(bindparam)
3883 wrapped = self.process(
3884 bind_expression,
3885 skip_bind_expression=True,
3886 within_columns_clause=within_columns_clause,
3887 literal_binds=literal_binds and not bindparam.expanding,
3888 literal_execute=literal_execute,
3889 render_postcompile=render_postcompile,
3890 **kwargs,
3891 )
3892 if bindparam.expanding:
3893 # for postcompile w/ expanding, move the "wrapped" part
3894 # of this into the inside
3895
3896 m = re.match(
3897 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3898 )
3899 assert m, "unexpected format for expanding parameter"
3900 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3901 m.group(2),
3902 m.group(1),
3903 m.group(3),
3904 )
3905
3906 if literal_binds:
3907 ret = self.render_literal_bindparam(
3908 bindparam,
3909 within_columns_clause=True,
3910 bind_expression_template=wrapped,
3911 **kwargs,
3912 )
3913 return f"({ret})"
3914
3915 return wrapped
3916
3917 if not literal_binds:
3918 literal_execute = (
3919 literal_execute
3920 or bindparam.literal_execute
3921 or (within_columns_clause and self.ansi_bind_rules)
3922 )
3923 post_compile = literal_execute or bindparam.expanding
3924 else:
3925 post_compile = False
3926
3927 if literal_binds:
3928 ret = self.render_literal_bindparam(
3929 bindparam, within_columns_clause=True, **kwargs
3930 )
3931 if bindparam.expanding:
3932 ret = f"({ret})"
3933 return ret
3934
3935 name = self._truncate_bindparam(bindparam)
3936
3937 if name in self.binds:
3938 existing = self.binds[name]
3939 if existing is not bindparam:
3940 if (
3941 (existing.unique or bindparam.unique)
3942 and not existing.proxy_set.intersection(
3943 bindparam.proxy_set
3944 )
3945 and not existing._cloned_set.intersection(
3946 bindparam._cloned_set
3947 )
3948 ):
3949 raise exc.CompileError(
3950 "Bind parameter '%s' conflicts with "
3951 "unique bind parameter of the same name" % name
3952 )
3953 elif existing.expanding != bindparam.expanding:
3954 raise exc.CompileError(
3955 "Can't reuse bound parameter name '%s' in both "
3956 "'expanding' (e.g. within an IN expression) and "
3957 "non-expanding contexts. If this parameter is to "
3958 "receive a list/array value, set 'expanding=True' on "
3959 "it for expressions that aren't IN, otherwise use "
3960 "a different parameter name." % (name,)
3961 )
3962 elif existing._is_crud or bindparam._is_crud:
3963 if existing._is_crud and bindparam._is_crud:
3964 # TODO: this condition is not well understood.
3965 # see tests in test/sql/test_update.py
3966 raise exc.CompileError(
3967 "Encountered unsupported case when compiling an "
3968 "INSERT or UPDATE statement. If this is a "
3969 "multi-table "
3970 "UPDATE statement, please provide string-named "
3971 "arguments to the "
3972 "values() method with distinct names; support for "
3973 "multi-table UPDATE statements that "
3974 "target multiple tables for UPDATE is very "
3975 "limited",
3976 )
3977 else:
3978 raise exc.CompileError(
3979 f"bindparam() name '{bindparam.key}' is reserved "
3980 "for automatic usage in the VALUES or SET "
3981 "clause of this "
3982 "insert/update statement. Please use a "
3983 "name other than column name when using "
3984 "bindparam() "
3985 "with insert() or update() (for example, "
3986 f"'b_{bindparam.key}')."
3987 )
3988
3989 self.binds[bindparam.key] = self.binds[name] = bindparam
3990
3991 # if we are given a cache key that we're going to match against,
3992 # relate the bindparam here to one that is most likely present
3993 # in the "extracted params" portion of the cache key. this is used
3994 # to set up a positional mapping that is used to determine the
3995 # correct parameters for a subsequent use of this compiled with
3996 # a different set of parameter values. here, we accommodate for
3997 # parameters that may have been cloned both before and after the cache
3998 # key was been generated.
3999 ckbm_tuple = self._cache_key_bind_match
4000
4001 if ckbm_tuple:
4002 ckbm, cksm = ckbm_tuple
4003 for bp in bindparam._cloned_set:
4004 if bp.key in cksm:
4005 cb = cksm[bp.key]
4006 ckbm[cb].append(bindparam)
4007
4008 if bindparam.isoutparam:
4009 self.has_out_parameters = True
4010
4011 if post_compile:
4012 if render_postcompile:
4013 self._render_postcompile = True
4014
4015 if literal_execute:
4016 self.literal_execute_params |= {bindparam}
4017 else:
4018 self.post_compile_params |= {bindparam}
4019
4020 ret = self.bindparam_string(
4021 name,
4022 post_compile=post_compile,
4023 expanding=bindparam.expanding,
4024 bindparam_type=bindparam.type,
4025 **kwargs,
4026 )
4027
4028 if bindparam.expanding:
4029 ret = f"({ret})"
4030
4031 return ret
4032
4033 def render_bind_cast(self, type_, dbapi_type, sqltext):
4034 raise NotImplementedError()
4035
4036 def render_literal_bindparam(
4037 self,
4038 bindparam,
4039 render_literal_value=NO_ARG,
4040 bind_expression_template=None,
4041 **kw,
4042 ):
4043 if render_literal_value is not NO_ARG:
4044 value = render_literal_value
4045 else:
4046 if bindparam.value is None and bindparam.callable is None:
4047 op = kw.get("_binary_op", None)
4048 if op and op not in (operators.is_, operators.is_not):
4049 util.warn_limited(
4050 "Bound parameter '%s' rendering literal NULL in a SQL "
4051 "expression; comparisons to NULL should not use "
4052 "operators outside of 'is' or 'is not'",
4053 (bindparam.key,),
4054 )
4055 return self.process(sqltypes.NULLTYPE, **kw)
4056 value = bindparam.effective_value
4057
4058 if bindparam.expanding:
4059 leep = self._literal_execute_expanding_parameter_literal_binds
4060 to_update, replacement_expr = leep(
4061 bindparam,
4062 value,
4063 bind_expression_template=bind_expression_template,
4064 )
4065 return replacement_expr
4066 else:
4067 return self.render_literal_value(value, bindparam.type)
4068
4069 def render_literal_value(
4070 self, value: Any, type_: sqltypes.TypeEngine[Any]
4071 ) -> str:
4072 """Render the value of a bind parameter as a quoted literal.
4073
4074 This is used for statement sections that do not accept bind parameters
4075 on the target driver/database.
4076
4077 This should be implemented by subclasses using the quoting services
4078 of the DBAPI.
4079
4080 """
4081
4082 if value is None and not type_.should_evaluate_none:
4083 # issue #10535 - handle NULL in the compiler without placing
4084 # this onto each type, except for "evaluate None" types
4085 # (e.g. JSON)
4086 return self.process(elements.Null._instance())
4087
4088 processor = type_._cached_literal_processor(self.dialect)
4089 if processor:
4090 try:
4091 return processor(value)
4092 except Exception as e:
4093 raise exc.CompileError(
4094 f"Could not render literal value "
4095 f'"{sql_util._repr_single_value(value)}" '
4096 f"with datatype "
4097 f"{type_}; see parent stack trace for "
4098 "more detail."
4099 ) from e
4100
4101 else:
4102 raise exc.CompileError(
4103 f"No literal value renderer is available for literal value "
4104 f'"{sql_util._repr_single_value(value)}" '
4105 f"with datatype {type_}"
4106 )
4107
4108 def _truncate_bindparam(self, bindparam):
4109 if bindparam in self.bind_names:
4110 return self.bind_names[bindparam]
4111
4112 bind_name = bindparam.key
4113 if isinstance(bind_name, elements._truncated_label):
4114 bind_name = self._truncated_identifier("bindparam", bind_name)
4115
4116 # add to bind_names for translation
4117 self.bind_names[bindparam] = bind_name
4118
4119 return bind_name
4120
4121 def _truncated_identifier(
4122 self, ident_class: str, name: _truncated_label
4123 ) -> str:
4124 if (ident_class, name) in self.truncated_names:
4125 return self.truncated_names[(ident_class, name)]
4126
4127 anonname = name.apply_map(self.anon_map)
4128
4129 if len(anonname) > self.label_length - 6:
4130 counter = self._truncated_counters.get(ident_class, 1)
4131 truncname = (
4132 anonname[0 : max(self.label_length - 6, 0)]
4133 + "_"
4134 + hex(counter)[2:]
4135 )
4136 self._truncated_counters[ident_class] = counter + 1
4137 else:
4138 truncname = anonname
4139 self.truncated_names[(ident_class, name)] = truncname
4140 return truncname
4141
4142 def _anonymize(self, name: str) -> str:
4143 return name % self.anon_map
4144
4145 def bindparam_string(
4146 self,
4147 name: str,
4148 post_compile: bool = False,
4149 expanding: bool = False,
4150 escaped_from: Optional[str] = None,
4151 bindparam_type: Optional[TypeEngine[Any]] = None,
4152 accumulate_bind_names: Optional[Set[str]] = None,
4153 visited_bindparam: Optional[List[str]] = None,
4154 **kw: Any,
4155 ) -> str:
4156 # TODO: accumulate_bind_names is passed by crud.py to gather
4157 # names on a per-value basis, visited_bindparam is passed by
4158 # visit_insert() to collect all parameters in the statement.
4159 # see if this gathering can be simplified somehow
4160 if accumulate_bind_names is not None:
4161 accumulate_bind_names.add(name)
4162 if visited_bindparam is not None:
4163 visited_bindparam.append(name)
4164
4165 if not escaped_from:
4166 if self._bind_translate_re.search(name):
4167 # not quite the translate use case as we want to
4168 # also get a quick boolean if we even found
4169 # unusual characters in the name
4170 new_name = self._bind_translate_re.sub(
4171 lambda m: self._bind_translate_chars[m.group(0)],
4172 name,
4173 )
4174 escaped_from = name
4175 name = new_name
4176
4177 if escaped_from:
4178 self.escaped_bind_names = self.escaped_bind_names.union(
4179 {escaped_from: name}
4180 )
4181 if post_compile:
4182 ret = "__[POSTCOMPILE_%s]" % name
4183 if expanding:
4184 # for expanding, bound parameters or literal values will be
4185 # rendered per item
4186 return ret
4187
4188 # otherwise, for non-expanding "literal execute", apply
4189 # bind casts as determined by the datatype
4190 if bindparam_type is not None:
4191 type_impl = bindparam_type._unwrapped_dialect_impl(
4192 self.dialect
4193 )
4194 if type_impl.render_literal_cast:
4195 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4196 return ret
4197 elif self.state is CompilerState.COMPILING:
4198 ret = self.compilation_bindtemplate % {"name": name}
4199 else:
4200 ret = self.bindtemplate % {"name": name}
4201
4202 if (
4203 bindparam_type is not None
4204 and self.dialect._bind_typing_render_casts
4205 ):
4206 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4207 if type_impl.render_bind_cast:
4208 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4209
4210 return ret
4211
4212 def _dispatch_independent_ctes(self, stmt, kw):
4213 local_kw = kw.copy()
4214 local_kw.pop("cte_opts", None)
4215 for cte, opt in zip(
4216 stmt._independent_ctes, stmt._independent_ctes_opts
4217 ):
4218 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4219
4220 def visit_cte(
4221 self,
4222 cte: CTE,
4223 asfrom: bool = False,
4224 ashint: bool = False,
4225 fromhints: Optional[_FromHintsType] = None,
4226 visiting_cte: Optional[CTE] = None,
4227 from_linter: Optional[FromLinter] = None,
4228 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4229 **kwargs: Any,
4230 ) -> Optional[str]:
4231 self_ctes = self._init_cte_state()
4232 assert self_ctes is self.ctes
4233
4234 kwargs["visiting_cte"] = cte
4235
4236 cte_name = cte.name
4237
4238 if isinstance(cte_name, elements._truncated_label):
4239 cte_name = self._truncated_identifier("alias", cte_name)
4240
4241 is_new_cte = True
4242 embedded_in_current_named_cte = False
4243
4244 _reference_cte = cte._get_reference_cte()
4245
4246 nesting = cte.nesting or cte_opts.nesting
4247
4248 # check for CTE already encountered
4249 if _reference_cte in self.level_name_by_cte:
4250 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4251 _reference_cte
4252 ]
4253 assert _ == cte_name
4254
4255 cte_level_name = (cte_level, cte_name)
4256 existing_cte = self.ctes_by_level_name[cte_level_name]
4257
4258 # check if we are receiving it here with a specific
4259 # "nest_here" location; if so, move it to this location
4260
4261 if cte_opts.nesting:
4262 if existing_cte_opts.nesting:
4263 raise exc.CompileError(
4264 "CTE is stated as 'nest_here' in "
4265 "more than one location"
4266 )
4267
4268 old_level_name = (cte_level, cte_name)
4269 cte_level = len(self.stack) if nesting else 1
4270 cte_level_name = new_level_name = (cte_level, cte_name)
4271
4272 del self.ctes_by_level_name[old_level_name]
4273 self.ctes_by_level_name[new_level_name] = existing_cte
4274 self.level_name_by_cte[_reference_cte] = new_level_name + (
4275 cte_opts,
4276 )
4277
4278 else:
4279 cte_level = len(self.stack) if nesting else 1
4280 cte_level_name = (cte_level, cte_name)
4281
4282 if cte_level_name in self.ctes_by_level_name:
4283 existing_cte = self.ctes_by_level_name[cte_level_name]
4284 else:
4285 existing_cte = None
4286
4287 if existing_cte is not None:
4288 embedded_in_current_named_cte = visiting_cte is existing_cte
4289
4290 # we've generated a same-named CTE that we are enclosed in,
4291 # or this is the same CTE. just return the name.
4292 if cte is existing_cte._restates or cte is existing_cte:
4293 is_new_cte = False
4294 elif existing_cte is cte._restates:
4295 # we've generated a same-named CTE that is
4296 # enclosed in us - we take precedence, so
4297 # discard the text for the "inner".
4298 del self_ctes[existing_cte]
4299
4300 existing_cte_reference_cte = existing_cte._get_reference_cte()
4301
4302 assert existing_cte_reference_cte is _reference_cte
4303 assert existing_cte_reference_cte is existing_cte
4304
4305 del self.level_name_by_cte[existing_cte_reference_cte]
4306 else:
4307 if (
4308 # if the two CTEs have the same hash, which we expect
4309 # here means that one/both is an annotated of the other
4310 (hash(cte) == hash(existing_cte))
4311 # or...
4312 or (
4313 (
4314 # if they are clones, i.e. they came from the ORM
4315 # or some other visit method
4316 cte._is_clone_of is not None
4317 or existing_cte._is_clone_of is not None
4318 )
4319 # and are deep-copy identical
4320 and cte.compare(existing_cte)
4321 )
4322 ):
4323 # then consider these two CTEs the same
4324 is_new_cte = False
4325 else:
4326 # otherwise these are two CTEs that either will render
4327 # differently, or were indicated separately by the user,
4328 # with the same name
4329 raise exc.CompileError(
4330 "Multiple, unrelated CTEs found with "
4331 "the same name: %r" % cte_name
4332 )
4333
4334 if not asfrom and not is_new_cte:
4335 return None
4336
4337 if cte._cte_alias is not None:
4338 pre_alias_cte = cte._cte_alias
4339 cte_pre_alias_name = cte._cte_alias.name
4340 if isinstance(cte_pre_alias_name, elements._truncated_label):
4341 cte_pre_alias_name = self._truncated_identifier(
4342 "alias", cte_pre_alias_name
4343 )
4344 else:
4345 pre_alias_cte = cte
4346 cte_pre_alias_name = None
4347
4348 if is_new_cte:
4349 self.ctes_by_level_name[cte_level_name] = cte
4350 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4351 cte_opts,
4352 )
4353
4354 if pre_alias_cte not in self.ctes:
4355 self.visit_cte(pre_alias_cte, **kwargs)
4356
4357 if not cte_pre_alias_name and cte not in self_ctes:
4358 if cte.recursive:
4359 self.ctes_recursive = True
4360 text = self.preparer.format_alias(cte, cte_name)
4361 if cte.recursive or cte.element.name_cte_columns:
4362 col_source = cte.element
4363
4364 # TODO: can we get at the .columns_plus_names collection
4365 # that is already (or will be?) generated for the SELECT
4366 # rather than calling twice?
4367 recur_cols = [
4368 # TODO: proxy_name is not technically safe,
4369 # see test_cte->
4370 # test_with_recursive_no_name_currently_buggy. not
4371 # clear what should be done with such a case
4372 fallback_label_name or proxy_name
4373 for (
4374 _,
4375 proxy_name,
4376 fallback_label_name,
4377 c,
4378 repeated,
4379 ) in (col_source._generate_columns_plus_names(True))
4380 if not repeated
4381 ]
4382
4383 text += "(%s)" % (
4384 ", ".join(
4385 self.preparer.format_label_name(
4386 ident, anon_map=self.anon_map
4387 )
4388 for ident in recur_cols
4389 )
4390 )
4391
4392 assert kwargs.get("subquery", False) is False
4393
4394 if not self.stack:
4395 # toplevel, this is a stringify of the
4396 # cte directly. just compile the inner
4397 # the way alias() does.
4398 return cte.element._compiler_dispatch(
4399 self, asfrom=asfrom, **kwargs
4400 )
4401 else:
4402 prefixes = self._generate_prefixes(
4403 cte, cte._prefixes, **kwargs
4404 )
4405 inner = cte.element._compiler_dispatch(
4406 self, asfrom=True, **kwargs
4407 )
4408
4409 text += " AS %s\n(%s)" % (prefixes, inner)
4410
4411 if cte._suffixes:
4412 text += " " + self._generate_prefixes(
4413 cte, cte._suffixes, **kwargs
4414 )
4415
4416 self_ctes[cte] = text
4417
4418 if asfrom:
4419 if from_linter:
4420 from_linter.froms[cte._de_clone()] = cte_name
4421
4422 if not is_new_cte and embedded_in_current_named_cte:
4423 return self.preparer.format_alias(cte, cte_name)
4424
4425 if cte_pre_alias_name:
4426 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4427 if self.preparer._requires_quotes(cte_name):
4428 cte_name = self.preparer.quote(cte_name)
4429 text += self.get_render_as_alias_suffix(cte_name)
4430 return text # type: ignore[no-any-return]
4431 else:
4432 return self.preparer.format_alias(cte, cte_name)
4433
4434 return None
4435
4436 def visit_table_valued_alias(self, element, **kw):
4437 if element.joins_implicitly:
4438 kw["from_linter"] = None
4439 if element._is_lateral:
4440 return self.visit_lateral(element, **kw)
4441 else:
4442 return self.visit_alias(element, **kw)
4443
4444 def visit_table_valued_column(self, element, **kw):
4445 return self.visit_column(element, **kw)
4446
4447 def visit_alias(
4448 self,
4449 alias,
4450 asfrom=False,
4451 ashint=False,
4452 iscrud=False,
4453 fromhints=None,
4454 subquery=False,
4455 lateral=False,
4456 enclosing_alias=None,
4457 from_linter=None,
4458 within_tstring=False,
4459 **kwargs,
4460 ):
4461 if lateral:
4462 if "enclosing_lateral" not in kwargs:
4463 # if lateral is set and enclosing_lateral is not
4464 # present, we assume we are being called directly
4465 # from visit_lateral() and we need to set enclosing_lateral.
4466 assert alias._is_lateral
4467 kwargs["enclosing_lateral"] = alias
4468
4469 # for lateral objects, we track a second from_linter that is...
4470 # lateral! to the level above us.
4471 if (
4472 from_linter
4473 and "lateral_from_linter" not in kwargs
4474 and "enclosing_lateral" in kwargs
4475 ):
4476 kwargs["lateral_from_linter"] = from_linter
4477
4478 if enclosing_alias is not None and enclosing_alias.element is alias:
4479 inner = alias.element._compiler_dispatch(
4480 self,
4481 asfrom=asfrom,
4482 ashint=ashint,
4483 iscrud=iscrud,
4484 fromhints=fromhints,
4485 lateral=lateral,
4486 enclosing_alias=alias,
4487 **kwargs,
4488 )
4489 if subquery and (asfrom or lateral):
4490 inner = "(%s)" % (inner,)
4491 return inner
4492 else:
4493 kwargs["enclosing_alias"] = alias
4494
4495 if asfrom or ashint or within_tstring:
4496 if isinstance(alias.name, elements._truncated_label):
4497 alias_name = self._truncated_identifier("alias", alias.name)
4498 else:
4499 alias_name = alias.name
4500
4501 if ashint:
4502 return self.preparer.format_alias(alias, alias_name)
4503 elif asfrom or within_tstring:
4504 if from_linter:
4505 from_linter.froms[alias._de_clone()] = alias_name
4506
4507 inner = alias.element._compiler_dispatch(
4508 self, asfrom=True, lateral=lateral, **kwargs
4509 )
4510 if subquery:
4511 inner = "(%s)" % (inner,)
4512
4513 ret = inner + self.get_render_as_alias_suffix(
4514 self.preparer.format_alias(alias, alias_name)
4515 )
4516
4517 if alias._supports_derived_columns and alias._render_derived:
4518 ret += "(%s)" % (
4519 ", ".join(
4520 "%s%s"
4521 % (
4522 self.preparer.quote(col.name),
4523 (
4524 " %s"
4525 % self.dialect.type_compiler_instance.process(
4526 col.type, **kwargs
4527 )
4528 if alias._render_derived_w_types
4529 else ""
4530 ),
4531 )
4532 for col in alias.c
4533 )
4534 )
4535
4536 if fromhints and alias in fromhints:
4537 ret = self.format_from_hint_text(
4538 ret, alias, fromhints[alias], iscrud
4539 )
4540
4541 return ret
4542 else:
4543 # note we cancel the "subquery" flag here as well
4544 return alias.element._compiler_dispatch(
4545 self, lateral=lateral, **kwargs
4546 )
4547
4548 def visit_subquery(self, subquery, **kw):
4549 kw["subquery"] = True
4550 return self.visit_alias(subquery, **kw)
4551
4552 def visit_lateral(self, lateral_, **kw):
4553 kw["lateral"] = True
4554 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4555
4556 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4557 text = "%s TABLESAMPLE %s" % (
4558 self.visit_alias(tablesample, asfrom=True, **kw),
4559 tablesample._get_method()._compiler_dispatch(self, **kw),
4560 )
4561
4562 if tablesample.seed is not None:
4563 text += " REPEATABLE (%s)" % (
4564 tablesample.seed._compiler_dispatch(self, **kw)
4565 )
4566
4567 return text
4568
4569 def _render_values(self, element, **kw):
4570 kw.setdefault("literal_binds", element.literal_binds)
4571 tuples = ", ".join(
4572 self.process(
4573 elements.Tuple(
4574 types=element._column_types, *elem
4575 ).self_group(),
4576 **kw,
4577 )
4578 for chunk in element._data
4579 for elem in chunk
4580 )
4581 return f"VALUES {tuples}"
4582
4583 def visit_values(
4584 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4585 ):
4586
4587 if element._independent_ctes:
4588 self._dispatch_independent_ctes(element, kw)
4589
4590 v = self._render_values(element, **kw)
4591
4592 if element._unnamed:
4593 name = None
4594 elif isinstance(element.name, elements._truncated_label):
4595 name = self._truncated_identifier("values", element.name)
4596 else:
4597 name = element.name
4598
4599 if element._is_lateral:
4600 lateral = "LATERAL "
4601 else:
4602 lateral = ""
4603
4604 if asfrom:
4605 if from_linter:
4606 from_linter.froms[element._de_clone()] = (
4607 name if name is not None else "(unnamed VALUES element)"
4608 )
4609
4610 if visiting_cte is not None and visiting_cte.element is element:
4611 if element._is_lateral:
4612 raise exc.CompileError(
4613 "Can't use a LATERAL VALUES expression inside of a CTE"
4614 )
4615 elif name:
4616 kw["include_table"] = False
4617 v = "%s(%s)%s (%s)" % (
4618 lateral,
4619 v,
4620 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4621 (
4622 ", ".join(
4623 c._compiler_dispatch(self, **kw)
4624 for c in element.columns
4625 )
4626 ),
4627 )
4628 else:
4629 v = "%s(%s)" % (lateral, v)
4630 return v
4631
4632 def visit_scalar_values(self, element, **kw):
4633 return f"({self._render_values(element, **kw)})"
4634
4635 def get_render_as_alias_suffix(self, alias_name_text):
4636 return " AS " + alias_name_text
4637
4638 def _add_to_result_map(
4639 self,
4640 keyname: str,
4641 name: str,
4642 objects: Tuple[Any, ...],
4643 type_: TypeEngine[Any],
4644 ) -> None:
4645
4646 # note objects must be non-empty for cursor.py to handle the
4647 # collection properly
4648 assert objects
4649
4650 if keyname is None or keyname == "*":
4651 self._ordered_columns = False
4652 self._ad_hoc_textual = True
4653 if type_._is_tuple_type:
4654 raise exc.CompileError(
4655 "Most backends don't support SELECTing "
4656 "from a tuple() object. If this is an ORM query, "
4657 "consider using the Bundle object."
4658 )
4659 self._result_columns.append(
4660 ResultColumnsEntry(keyname, name, objects, type_)
4661 )
4662
4663 def _label_returning_column(
4664 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4665 ):
4666 """Render a column with necessary labels inside of a RETURNING clause.
4667
4668 This method is provided for individual dialects in place of calling
4669 the _label_select_column method directly, so that the two use cases
4670 of RETURNING vs. SELECT can be disambiguated going forward.
4671
4672 .. versionadded:: 1.4.21
4673
4674 """
4675 return self._label_select_column(
4676 None,
4677 column,
4678 populate_result_map,
4679 False,
4680 {} if column_clause_args is None else column_clause_args,
4681 **kw,
4682 )
4683
4684 def _label_select_column(
4685 self,
4686 select,
4687 column,
4688 populate_result_map,
4689 asfrom,
4690 column_clause_args,
4691 name=None,
4692 proxy_name=None,
4693 fallback_label_name=None,
4694 within_columns_clause=True,
4695 column_is_repeated=False,
4696 need_column_expressions=False,
4697 include_table=True,
4698 ):
4699 """produce labeled columns present in a select()."""
4700 impl = column.type.dialect_impl(self.dialect)
4701
4702 if impl._has_column_expression and (
4703 need_column_expressions or populate_result_map
4704 ):
4705 col_expr = impl.column_expression(column)
4706 else:
4707 col_expr = column
4708
4709 if populate_result_map:
4710 # pass an "add_to_result_map" callable into the compilation
4711 # of embedded columns. this collects information about the
4712 # column as it will be fetched in the result and is coordinated
4713 # with cursor.description when the query is executed.
4714 add_to_result_map = self._add_to_result_map
4715
4716 # if the SELECT statement told us this column is a repeat,
4717 # wrap the callable with one that prevents the addition of the
4718 # targets
4719 if column_is_repeated:
4720 _add_to_result_map = add_to_result_map
4721
4722 def add_to_result_map(keyname, name, objects, type_):
4723 _add_to_result_map(keyname, name, (keyname,), type_)
4724
4725 # if we redefined col_expr for type expressions, wrap the
4726 # callable with one that adds the original column to the targets
4727 elif col_expr is not column:
4728 _add_to_result_map = add_to_result_map
4729
4730 def add_to_result_map(keyname, name, objects, type_):
4731 _add_to_result_map(
4732 keyname, name, (column,) + objects, type_
4733 )
4734
4735 else:
4736 add_to_result_map = None
4737
4738 # this method is used by some of the dialects for RETURNING,
4739 # which has different inputs. _label_returning_column was added
4740 # as the better target for this now however for 1.4 we will keep
4741 # _label_select_column directly compatible with this use case.
4742 # these assertions right now set up the current expected inputs
4743 assert within_columns_clause, (
4744 "_label_select_column is only relevant within "
4745 "the columns clause of a SELECT or RETURNING"
4746 )
4747 if isinstance(column, elements.Label):
4748 if col_expr is not column:
4749 result_expr = _CompileLabel(
4750 col_expr, column.name, alt_names=(column.element,)
4751 )
4752 else:
4753 result_expr = col_expr
4754
4755 elif name:
4756 # here, _columns_plus_names has determined there's an explicit
4757 # label name we need to use. this is the default for
4758 # tablenames_plus_columnnames as well as when columns are being
4759 # deduplicated on name
4760
4761 assert (
4762 proxy_name is not None
4763 ), "proxy_name is required if 'name' is passed"
4764
4765 result_expr = _CompileLabel(
4766 col_expr,
4767 name,
4768 alt_names=(
4769 proxy_name,
4770 # this is a hack to allow legacy result column lookups
4771 # to work as they did before; this goes away in 2.0.
4772 # TODO: this only seems to be tested indirectly
4773 # via test/orm/test_deprecations.py. should be a
4774 # resultset test for this
4775 column._tq_label,
4776 ),
4777 )
4778 else:
4779 # determine here whether this column should be rendered in
4780 # a labelled context or not, as we were given no required label
4781 # name from the caller. Here we apply heuristics based on the kind
4782 # of SQL expression involved.
4783
4784 if col_expr is not column:
4785 # type-specific expression wrapping the given column,
4786 # so we render a label
4787 render_with_label = True
4788 elif isinstance(column, elements.ColumnClause):
4789 # table-bound column, we render its name as a label if we are
4790 # inside of a subquery only
4791 render_with_label = (
4792 asfrom
4793 and not column.is_literal
4794 and column.table is not None
4795 )
4796 elif isinstance(column, elements.TextClause):
4797 render_with_label = False
4798 elif isinstance(column, elements.UnaryExpression):
4799 # unary expression. notes added as of #12681
4800 #
4801 # By convention, the visit_unary() method
4802 # itself does not add an entry to the result map, and relies
4803 # upon either the inner expression creating a result map
4804 # entry, or if not, by creating a label here that produces
4805 # the result map entry. Where that happens is based on whether
4806 # or not the element immediately inside the unary is a
4807 # NamedColumn subclass or not.
4808 #
4809 # Now, this also impacts how the SELECT is written; if
4810 # we decide to generate a label here, we get the usual
4811 # "~(x+y) AS anon_1" thing in the columns clause. If we
4812 # don't, we don't get an AS at all, we get like
4813 # "~table.column".
4814 #
4815 # But here is the important thing as of modernish (like 1.4)
4816 # versions of SQLAlchemy - **whether or not the AS <label>
4817 # is present in the statement is not actually important**.
4818 # We target result columns **positionally** for a fully
4819 # compiled ``Select()`` object; before 1.4 we needed those
4820 # labels to match in cursor.description etc etc but now it
4821 # really doesn't matter.
4822 # So really, we could set render_with_label True in all cases.
4823 # Or we could just have visit_unary() populate the result map
4824 # in all cases.
4825 #
4826 # What we're doing here is strictly trying to not rock the
4827 # boat too much with when we do/don't render "AS label";
4828 # labels being present helps in the edge cases that we
4829 # "fall back" to named cursor.description matching, labels
4830 # not being present for columns keeps us from having awkward
4831 # phrases like "SELECT DISTINCT table.x AS x".
4832 render_with_label = (
4833 (
4834 # exception case to detect if we render "not boolean"
4835 # as "not <col>" for native boolean or "<col> = 1"
4836 # for non-native boolean. this is controlled by
4837 # visit_is_<true|false>_unary_operator
4838 column.operator
4839 in (operators.is_false, operators.is_true)
4840 and not self.dialect.supports_native_boolean
4841 )
4842 or column._wraps_unnamed_column()
4843 or asfrom
4844 )
4845 elif (
4846 # general class of expressions that don't have a SQL-column
4847 # addressable name. includes scalar selects, bind parameters,
4848 # SQL functions, others
4849 not isinstance(column, elements.NamedColumn)
4850 # deeper check that indicates there's no natural "name" to
4851 # this element, which accommodates for custom SQL constructs
4852 # that might have a ".name" attribute (but aren't SQL
4853 # functions) but are not implementing this more recently added
4854 # base class. in theory the "NamedColumn" check should be
4855 # enough, however here we seek to maintain legacy behaviors
4856 # as well.
4857 and column._non_anon_label is None
4858 ):
4859 render_with_label = True
4860 else:
4861 render_with_label = False
4862
4863 if render_with_label:
4864 if not fallback_label_name:
4865 # used by the RETURNING case right now. we generate it
4866 # here as 3rd party dialects may be referring to
4867 # _label_select_column method directly instead of the
4868 # just-added _label_returning_column method
4869 assert not column_is_repeated
4870 fallback_label_name = column._anon_name_label
4871
4872 fallback_label_name = (
4873 elements._truncated_label(fallback_label_name)
4874 if not isinstance(
4875 fallback_label_name, elements._truncated_label
4876 )
4877 else fallback_label_name
4878 )
4879
4880 result_expr = _CompileLabel(
4881 col_expr, fallback_label_name, alt_names=(proxy_name,)
4882 )
4883 else:
4884 result_expr = col_expr
4885
4886 column_clause_args.update(
4887 within_columns_clause=within_columns_clause,
4888 add_to_result_map=add_to_result_map,
4889 include_table=include_table,
4890 within_tstring=False,
4891 )
4892 return result_expr._compiler_dispatch(self, **column_clause_args)
4893
4894 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4895 hinttext = self.get_from_hint_text(table, hint)
4896 if hinttext:
4897 sqltext += " " + hinttext
4898 return sqltext
4899
4900 def get_select_hint_text(self, byfroms):
4901 return None
4902
4903 def get_from_hint_text(
4904 self, table: FromClause, text: Optional[str]
4905 ) -> Optional[str]:
4906 return None
4907
4908 def get_crud_hint_text(self, table, text):
4909 return None
4910
4911 def get_statement_hint_text(self, hint_texts):
4912 return " ".join(hint_texts)
4913
4914 _default_stack_entry: _CompilerStackEntry
4915
4916 if not typing.TYPE_CHECKING:
4917 _default_stack_entry = util.immutabledict(
4918 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4919 )
4920
4921 def _display_froms_for_select(
4922 self, select_stmt, asfrom, lateral=False, **kw
4923 ):
4924 # utility method to help external dialects
4925 # get the correct from list for a select.
4926 # specifically the oracle dialect needs this feature
4927 # right now.
4928 toplevel = not self.stack
4929 entry = self._default_stack_entry if toplevel else self.stack[-1]
4930
4931 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4932
4933 correlate_froms = entry["correlate_froms"]
4934 asfrom_froms = entry["asfrom_froms"]
4935
4936 if asfrom and not lateral:
4937 froms = compile_state._get_display_froms(
4938 explicit_correlate_froms=correlate_froms.difference(
4939 asfrom_froms
4940 ),
4941 implicit_correlate_froms=(),
4942 )
4943 else:
4944 froms = compile_state._get_display_froms(
4945 explicit_correlate_froms=correlate_froms,
4946 implicit_correlate_froms=asfrom_froms,
4947 )
4948 return froms
4949
4950 translate_select_structure: Any = None
4951 """if not ``None``, should be a callable which accepts ``(select_stmt,
4952 **kw)`` and returns a select object. this is used for structural changes
4953 mostly to accommodate for LIMIT/OFFSET schemes
4954
4955 """
4956
4957 def visit_select(
4958 self,
4959 select_stmt,
4960 asfrom=False,
4961 insert_into=False,
4962 fromhints=None,
4963 compound_index=None,
4964 select_wraps_for=None,
4965 lateral=False,
4966 from_linter=None,
4967 **kwargs,
4968 ):
4969 assert select_wraps_for is None, (
4970 "SQLAlchemy 1.4 requires use of "
4971 "the translate_select_structure hook for structural "
4972 "translations of SELECT objects"
4973 )
4974 if self._collect_params:
4975 self._add_to_params(select_stmt)
4976
4977 # initial setup of SELECT. the compile_state_factory may now
4978 # be creating a totally different SELECT from the one that was
4979 # passed in. for ORM use this will convert from an ORM-state
4980 # SELECT to a regular "Core" SELECT. other composed operations
4981 # such as computation of joins will be performed.
4982
4983 kwargs["within_columns_clause"] = False
4984
4985 compile_state = select_stmt._compile_state_factory(
4986 select_stmt, self, **kwargs
4987 )
4988 kwargs["ambiguous_table_name_map"] = (
4989 compile_state._ambiguous_table_name_map
4990 )
4991
4992 select_stmt = compile_state.statement
4993
4994 toplevel = not self.stack
4995
4996 if toplevel and not self.compile_state:
4997 self.compile_state = compile_state
4998
4999 is_embedded_select = compound_index is not None or insert_into
5000
5001 # translate step for Oracle, SQL Server which often need to
5002 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
5003 # other conditions
5004 if self.translate_select_structure:
5005 new_select_stmt = self.translate_select_structure(
5006 select_stmt, asfrom=asfrom, **kwargs
5007 )
5008
5009 # if SELECT was restructured, maintain a link to the originals
5010 # and assemble a new compile state
5011 if new_select_stmt is not select_stmt:
5012 compile_state_wraps_for = compile_state
5013 select_wraps_for = select_stmt
5014 select_stmt = new_select_stmt
5015
5016 compile_state = select_stmt._compile_state_factory(
5017 select_stmt, self, **kwargs
5018 )
5019 select_stmt = compile_state.statement
5020
5021 entry = self._default_stack_entry if toplevel else self.stack[-1]
5022
5023 populate_result_map = need_column_expressions = (
5024 toplevel
5025 or entry.get("need_result_map_for_compound", False)
5026 or entry.get("need_result_map_for_nested", False)
5027 )
5028
5029 # indicates there is a CompoundSelect in play and we are not the
5030 # first select
5031 if compound_index:
5032 populate_result_map = False
5033
5034 # this was first proposed as part of #3372; however, it is not
5035 # reached in current tests and could possibly be an assertion
5036 # instead.
5037 if not populate_result_map and "add_to_result_map" in kwargs:
5038 del kwargs["add_to_result_map"]
5039
5040 froms = self._setup_select_stack(
5041 select_stmt, compile_state, entry, asfrom, lateral, compound_index
5042 )
5043
5044 column_clause_args = kwargs.copy()
5045 column_clause_args.update(
5046 {"within_label_clause": False, "within_columns_clause": False}
5047 )
5048
5049 text = "SELECT " # we're off to a good start !
5050
5051 if select_stmt._post_select_clause is not None:
5052 psc = self.process(select_stmt._post_select_clause, **kwargs)
5053 if psc is not None:
5054 text += psc + " "
5055
5056 if select_stmt._hints:
5057 hint_text, byfrom = self._setup_select_hints(select_stmt)
5058 if hint_text:
5059 text += hint_text + " "
5060 else:
5061 byfrom = None
5062
5063 if select_stmt._independent_ctes:
5064 self._dispatch_independent_ctes(select_stmt, kwargs)
5065
5066 if select_stmt._prefixes:
5067 text += self._generate_prefixes(
5068 select_stmt, select_stmt._prefixes, **kwargs
5069 )
5070
5071 text += self.get_select_precolumns(select_stmt, **kwargs)
5072
5073 if select_stmt._pre_columns_clause is not None:
5074 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5075 if pcc is not None:
5076 text += pcc + " "
5077
5078 # the actual list of columns to print in the SELECT column list.
5079 inner_columns = [
5080 c
5081 for c in [
5082 self._label_select_column(
5083 select_stmt,
5084 column,
5085 populate_result_map,
5086 asfrom,
5087 column_clause_args,
5088 name=name,
5089 proxy_name=proxy_name,
5090 fallback_label_name=fallback_label_name,
5091 column_is_repeated=repeated,
5092 need_column_expressions=need_column_expressions,
5093 )
5094 for (
5095 name,
5096 proxy_name,
5097 fallback_label_name,
5098 column,
5099 repeated,
5100 ) in compile_state.columns_plus_names
5101 ]
5102 if c is not None
5103 ]
5104
5105 if populate_result_map and select_wraps_for is not None:
5106 # if this select was generated from translate_select,
5107 # rewrite the targeted columns in the result map
5108
5109 translate = dict(
5110 zip(
5111 [
5112 name
5113 for (
5114 key,
5115 proxy_name,
5116 fallback_label_name,
5117 name,
5118 repeated,
5119 ) in compile_state.columns_plus_names
5120 ],
5121 [
5122 name
5123 for (
5124 key,
5125 proxy_name,
5126 fallback_label_name,
5127 name,
5128 repeated,
5129 ) in compile_state_wraps_for.columns_plus_names
5130 ],
5131 )
5132 )
5133
5134 self._result_columns = [
5135 ResultColumnsEntry(
5136 key, name, tuple(translate.get(o, o) for o in obj), type_
5137 )
5138 for key, name, obj, type_ in self._result_columns
5139 ]
5140
5141 text = self._compose_select_body(
5142 text,
5143 select_stmt,
5144 compile_state,
5145 inner_columns,
5146 froms,
5147 byfrom,
5148 toplevel,
5149 kwargs,
5150 )
5151
5152 if select_stmt._post_body_clause is not None:
5153 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5154 if pbc:
5155 text += " " + pbc
5156
5157 if select_stmt._statement_hints:
5158 per_dialect = [
5159 ht
5160 for (dialect_name, ht) in select_stmt._statement_hints
5161 if dialect_name in ("*", self.dialect.name)
5162 ]
5163 if per_dialect:
5164 text += " " + self.get_statement_hint_text(per_dialect)
5165
5166 # In compound query, CTEs are shared at the compound level
5167 if self.ctes and (not is_embedded_select or toplevel):
5168 nesting_level = len(self.stack) if not toplevel else None
5169 text = self._render_cte_clause(nesting_level=nesting_level) + text
5170
5171 if select_stmt._suffixes:
5172 text += " " + self._generate_prefixes(
5173 select_stmt, select_stmt._suffixes, **kwargs
5174 )
5175
5176 self.stack.pop(-1)
5177
5178 return text
5179
5180 def _setup_select_hints(
5181 self, select: Select[Unpack[TupleAny]]
5182 ) -> Tuple[str, _FromHintsType]:
5183 byfrom = {
5184 from_: hinttext
5185 % {"name": from_._compiler_dispatch(self, ashint=True)}
5186 for (from_, dialect), hinttext in select._hints.items()
5187 if dialect in ("*", self.dialect.name)
5188 }
5189 hint_text = self.get_select_hint_text(byfrom)
5190 return hint_text, byfrom
5191
5192 def _setup_select_stack(
5193 self, select, compile_state, entry, asfrom, lateral, compound_index
5194 ):
5195 correlate_froms = entry["correlate_froms"]
5196 asfrom_froms = entry["asfrom_froms"]
5197
5198 if compound_index == 0:
5199 entry["select_0"] = select
5200 elif compound_index:
5201 select_0 = entry["select_0"]
5202 numcols = len(select_0._all_selected_columns)
5203
5204 if len(compile_state.columns_plus_names) != numcols:
5205 raise exc.CompileError(
5206 "All selectables passed to "
5207 "CompoundSelect must have identical numbers of "
5208 "columns; select #%d has %d columns, select "
5209 "#%d has %d"
5210 % (
5211 1,
5212 numcols,
5213 compound_index + 1,
5214 len(select._all_selected_columns),
5215 )
5216 )
5217
5218 if asfrom and not lateral:
5219 froms = compile_state._get_display_froms(
5220 explicit_correlate_froms=correlate_froms.difference(
5221 asfrom_froms
5222 ),
5223 implicit_correlate_froms=(),
5224 )
5225 else:
5226 froms = compile_state._get_display_froms(
5227 explicit_correlate_froms=correlate_froms,
5228 implicit_correlate_froms=asfrom_froms,
5229 )
5230
5231 new_correlate_froms = set(_from_objects(*froms))
5232 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5233
5234 new_entry: _CompilerStackEntry = {
5235 "asfrom_froms": new_correlate_froms,
5236 "correlate_froms": all_correlate_froms,
5237 "selectable": select,
5238 "compile_state": compile_state,
5239 }
5240 self.stack.append(new_entry)
5241
5242 return froms
5243
5244 def _compose_select_body(
5245 self,
5246 text,
5247 select,
5248 compile_state,
5249 inner_columns,
5250 froms,
5251 byfrom,
5252 toplevel,
5253 kwargs,
5254 ):
5255 text += ", ".join(inner_columns)
5256
5257 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5258 from_linter = FromLinter({}, set())
5259 warn_linting = self.linting & WARN_LINTING
5260 if toplevel:
5261 self.from_linter = from_linter
5262 else:
5263 from_linter = None
5264 warn_linting = False
5265
5266 # adjust the whitespace for no inner columns, part of #9440,
5267 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5268 # "SELECT FROM ...".
5269 # while it would be better to have built the SELECT starting string
5270 # without trailing whitespace first, then add whitespace only if inner
5271 # cols were present, this breaks compatibility with various custom
5272 # compilation schemes that are currently being tested.
5273 if not inner_columns:
5274 text = text.rstrip()
5275
5276 if froms:
5277 text += " \nFROM "
5278
5279 if select._hints:
5280 text += ", ".join(
5281 [
5282 f._compiler_dispatch(
5283 self,
5284 asfrom=True,
5285 fromhints=byfrom,
5286 from_linter=from_linter,
5287 **kwargs,
5288 )
5289 for f in froms
5290 ]
5291 )
5292 else:
5293 text += ", ".join(
5294 [
5295 f._compiler_dispatch(
5296 self,
5297 asfrom=True,
5298 from_linter=from_linter,
5299 **kwargs,
5300 )
5301 for f in froms
5302 ]
5303 )
5304 else:
5305 text += self.default_from()
5306
5307 if select._where_criteria:
5308 t = self._generate_delimited_and_list(
5309 select._where_criteria, from_linter=from_linter, **kwargs
5310 )
5311 if t:
5312 text += " \nWHERE " + t
5313
5314 if warn_linting:
5315 assert from_linter is not None
5316 from_linter.warn()
5317
5318 if select._group_by_clauses:
5319 text += self.group_by_clause(select, **kwargs)
5320
5321 if select._having_criteria:
5322 t = self._generate_delimited_and_list(
5323 select._having_criteria, **kwargs
5324 )
5325 if t:
5326 text += " \nHAVING " + t
5327
5328 if select._post_criteria_clause is not None:
5329 pcc = self.process(select._post_criteria_clause, **kwargs)
5330 if pcc is not None:
5331 text += " \n" + pcc
5332
5333 if select._order_by_clauses:
5334 text += self.order_by_clause(select, **kwargs)
5335
5336 if select._has_row_limiting_clause:
5337 text += self._row_limit_clause(select, **kwargs)
5338
5339 if select._for_update_arg is not None:
5340 text += self.for_update_clause(select, **kwargs)
5341
5342 return text
5343
5344 def _generate_prefixes(self, stmt, prefixes, **kw):
5345 clause = " ".join(
5346 prefix._compiler_dispatch(self, **kw)
5347 for prefix, dialect_name in prefixes
5348 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5349 )
5350 if clause:
5351 clause += " "
5352 return clause
5353
5354 def _render_cte_clause(
5355 self,
5356 nesting_level=None,
5357 include_following_stack=False,
5358 ):
5359 """
5360 include_following_stack
5361 Also render the nesting CTEs on the next stack. Useful for
5362 SQL structures like UNION or INSERT that can wrap SELECT
5363 statements containing nesting CTEs.
5364 """
5365 if not self.ctes:
5366 return ""
5367
5368 ctes: MutableMapping[CTE, str]
5369
5370 if nesting_level and nesting_level > 1:
5371 ctes = util.OrderedDict()
5372 for cte in list(self.ctes.keys()):
5373 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5374 cte._get_reference_cte()
5375 ]
5376 nesting = cte.nesting or cte_opts.nesting
5377 is_rendered_level = cte_level == nesting_level or (
5378 include_following_stack and cte_level == nesting_level + 1
5379 )
5380 if not (nesting and is_rendered_level):
5381 continue
5382
5383 ctes[cte] = self.ctes[cte]
5384
5385 else:
5386 ctes = self.ctes
5387
5388 if not ctes:
5389 return ""
5390 ctes_recursive = any([cte.recursive for cte in ctes])
5391
5392 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5393 cte_text += ", \n".join([txt for txt in ctes.values()])
5394 cte_text += "\n "
5395
5396 if nesting_level and nesting_level > 1:
5397 for cte in list(ctes.keys()):
5398 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5399 cte._get_reference_cte()
5400 ]
5401 del self.ctes[cte]
5402 del self.ctes_by_level_name[(cte_level, cte_name)]
5403 del self.level_name_by_cte[cte._get_reference_cte()]
5404
5405 return cte_text
5406
5407 def get_cte_preamble(self, recursive):
5408 if recursive:
5409 return "WITH RECURSIVE"
5410 else:
5411 return "WITH"
5412
5413 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5414 """Called when building a ``SELECT`` statement, position is just
5415 before column list.
5416
5417 """
5418 if select._distinct_on:
5419 util.warn_deprecated(
5420 "DISTINCT ON is currently supported only by the PostgreSQL "
5421 "dialect. Use of DISTINCT ON for other backends is currently "
5422 "silently ignored, however this usage is deprecated, and will "
5423 "raise CompileError in a future release for all backends "
5424 "that do not support this syntax.",
5425 version="1.4",
5426 )
5427 return "DISTINCT " if select._distinct else ""
5428
5429 def group_by_clause(self, select, **kw):
5430 """allow dialects to customize how GROUP BY is rendered."""
5431
5432 group_by = self._generate_delimited_list(
5433 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5434 )
5435 if group_by:
5436 return " GROUP BY " + group_by
5437 else:
5438 return ""
5439
5440 def order_by_clause(self, select, **kw):
5441 """allow dialects to customize how ORDER BY is rendered."""
5442
5443 order_by = self._generate_delimited_list(
5444 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5445 )
5446
5447 if order_by:
5448 return " ORDER BY " + order_by
5449 else:
5450 return ""
5451
5452 def for_update_clause(self, select, **kw):
5453 return " FOR UPDATE"
5454
5455 def returning_clause(
5456 self,
5457 stmt: UpdateBase,
5458 returning_cols: Sequence[_ColumnsClauseElement],
5459 *,
5460 populate_result_map: bool,
5461 **kw: Any,
5462 ) -> str:
5463 columns = [
5464 self._label_returning_column(
5465 stmt,
5466 column,
5467 populate_result_map,
5468 fallback_label_name=fallback_label_name,
5469 column_is_repeated=repeated,
5470 name=name,
5471 proxy_name=proxy_name,
5472 **kw,
5473 )
5474 for (
5475 name,
5476 proxy_name,
5477 fallback_label_name,
5478 column,
5479 repeated,
5480 ) in stmt._generate_columns_plus_names(
5481 True, cols=base._select_iterables(returning_cols)
5482 )
5483 ]
5484
5485 return "RETURNING " + ", ".join(columns)
5486
5487 def limit_clause(self, select, **kw):
5488 text = ""
5489 if select._limit_clause is not None:
5490 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5491 if select._offset_clause is not None:
5492 if select._limit_clause is None:
5493 text += "\n LIMIT -1"
5494 text += " OFFSET " + self.process(select._offset_clause, **kw)
5495 return text
5496
5497 def fetch_clause(
5498 self,
5499 select,
5500 fetch_clause=None,
5501 require_offset=False,
5502 use_literal_execute_for_simple_int=False,
5503 **kw,
5504 ):
5505 if fetch_clause is None:
5506 fetch_clause = select._fetch_clause
5507 fetch_clause_options = select._fetch_clause_options
5508 else:
5509 fetch_clause_options = {"percent": False, "with_ties": False}
5510
5511 text = ""
5512
5513 if select._offset_clause is not None:
5514 offset_clause = select._offset_clause
5515 if (
5516 use_literal_execute_for_simple_int
5517 and select._simple_int_clause(offset_clause)
5518 ):
5519 offset_clause = offset_clause.render_literal_execute()
5520 offset_str = self.process(offset_clause, **kw)
5521 text += "\n OFFSET %s ROWS" % offset_str
5522 elif require_offset:
5523 text += "\n OFFSET 0 ROWS"
5524
5525 if fetch_clause is not None:
5526 if (
5527 use_literal_execute_for_simple_int
5528 and select._simple_int_clause(fetch_clause)
5529 ):
5530 fetch_clause = fetch_clause.render_literal_execute()
5531 text += "\n FETCH FIRST %s%s ROWS %s" % (
5532 self.process(fetch_clause, **kw),
5533 " PERCENT" if fetch_clause_options["percent"] else "",
5534 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5535 )
5536 return text
5537
5538 def visit_table(
5539 self,
5540 table,
5541 asfrom=False,
5542 iscrud=False,
5543 ashint=False,
5544 fromhints=None,
5545 use_schema=True,
5546 from_linter=None,
5547 ambiguous_table_name_map=None,
5548 enclosing_alias=None,
5549 within_tstring=False,
5550 **kwargs,
5551 ):
5552 if from_linter:
5553 from_linter.froms[table] = table.fullname
5554
5555 if asfrom or ashint or within_tstring:
5556 effective_schema = self.preparer.schema_for_object(table)
5557
5558 if use_schema and effective_schema:
5559 ret = (
5560 self.preparer.quote_schema(effective_schema)
5561 + "."
5562 + self.preparer.quote(table.name)
5563 )
5564 else:
5565 ret = self.preparer.quote(table.name)
5566
5567 if (
5568 (
5569 enclosing_alias is None
5570 or enclosing_alias.element is not table
5571 )
5572 and not effective_schema
5573 and ambiguous_table_name_map
5574 and table.name in ambiguous_table_name_map
5575 ):
5576 anon_name = self._truncated_identifier(
5577 "alias", ambiguous_table_name_map[table.name]
5578 )
5579
5580 ret = ret + self.get_render_as_alias_suffix(
5581 self.preparer.format_alias(None, anon_name)
5582 )
5583
5584 if fromhints and table in fromhints:
5585 ret = self.format_from_hint_text(
5586 ret, table, fromhints[table], iscrud
5587 )
5588 return ret
5589 else:
5590 return ""
5591
5592 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5593 if from_linter:
5594 from_linter.edges.update(
5595 itertools.product(
5596 _de_clone(join.left._from_objects),
5597 _de_clone(join.right._from_objects),
5598 )
5599 )
5600
5601 if join.full:
5602 join_type = " FULL OUTER JOIN "
5603 elif join.isouter:
5604 join_type = " LEFT OUTER JOIN "
5605 else:
5606 join_type = " JOIN "
5607 return (
5608 join.left._compiler_dispatch(
5609 self, asfrom=True, from_linter=from_linter, **kwargs
5610 )
5611 + join_type
5612 + join.right._compiler_dispatch(
5613 self, asfrom=True, from_linter=from_linter, **kwargs
5614 )
5615 + " ON "
5616 # TODO: likely need asfrom=True here?
5617 + join.onclause._compiler_dispatch(
5618 self, from_linter=from_linter, **kwargs
5619 )
5620 )
5621
5622 def _setup_crud_hints(self, stmt, table_text):
5623 dialect_hints = {
5624 table: hint_text
5625 for (table, dialect), hint_text in stmt._hints.items()
5626 if dialect in ("*", self.dialect.name)
5627 }
5628 if stmt.table in dialect_hints:
5629 table_text = self.format_from_hint_text(
5630 table_text, stmt.table, dialect_hints[stmt.table], True
5631 )
5632 return dialect_hints, table_text
5633
5634 # within the realm of "insertmanyvalues sentinel columns",
5635 # these lookups match different kinds of Column() configurations
5636 # to specific backend capabilities. they are broken into two
5637 # lookups, one for autoincrement columns and the other for non
5638 # autoincrement columns
5639 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5640 {
5641 _SentinelDefaultCharacterization.CLIENTSIDE: (
5642 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5643 ),
5644 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5645 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5646 ),
5647 _SentinelDefaultCharacterization.NONE: (
5648 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5649 ),
5650 _SentinelDefaultCharacterization.IDENTITY: (
5651 InsertmanyvaluesSentinelOpts.IDENTITY
5652 ),
5653 _SentinelDefaultCharacterization.SEQUENCE: (
5654 InsertmanyvaluesSentinelOpts.SEQUENCE
5655 ),
5656 _SentinelDefaultCharacterization.MONOTONIC_FUNCTION: (
5657 InsertmanyvaluesSentinelOpts.MONOTONIC_FUNCTION
5658 ),
5659 }
5660 )
5661 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5662 {
5663 _SentinelDefaultCharacterization.NONE: (
5664 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5665 ),
5666 }
5667 )
5668
5669 def _get_sentinel_column_for_table(
5670 self, table: Table
5671 ) -> Optional[Sequence[Column[Any]]]:
5672 """given a :class:`.Table`, return a usable sentinel column or
5673 columns for this dialect if any.
5674
5675 Return None if no sentinel columns could be identified, or raise an
5676 error if a column was marked as a sentinel explicitly but isn't
5677 compatible with this dialect.
5678
5679 """
5680
5681 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5682 sentinel_characteristics = table._sentinel_column_characteristics
5683
5684 sent_cols = sentinel_characteristics.columns
5685
5686 if sent_cols is None:
5687 return None
5688
5689 if sentinel_characteristics.is_autoinc:
5690 bitmask = self._sentinel_col_autoinc_lookup.get(
5691 sentinel_characteristics.default_characterization, 0
5692 )
5693 else:
5694 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5695 sentinel_characteristics.default_characterization, 0
5696 )
5697
5698 if sentinel_opts & bitmask:
5699 return sent_cols
5700
5701 if sentinel_characteristics.is_explicit:
5702 # a column was explicitly marked as insert_sentinel=True,
5703 # however it is not compatible with this dialect. they should
5704 # not indicate this column as a sentinel if they need to include
5705 # this dialect.
5706
5707 # TODO: do we want non-primary key explicit sentinel cols
5708 # that can gracefully degrade for some backends?
5709 # insert_sentinel="degrade" perhaps. not for the initial release.
5710 # I am hoping people are generally not dealing with this sentinel
5711 # business at all.
5712
5713 # if is_explicit is True, there will be only one sentinel column.
5714
5715 raise exc.InvalidRequestError(
5716 f"Column {sent_cols[0]} can't be explicitly "
5717 "marked as a sentinel column when using the "
5718 f"{self.dialect.name} dialect, as the "
5719 "particular type of default generation on this column is "
5720 "not currently compatible with this dialect's specific "
5721 f"INSERT..RETURNING syntax which can receive the "
5722 "server-generated value in "
5723 "a deterministic way. To remove this error, remove "
5724 "insert_sentinel=True from primary key autoincrement "
5725 "columns; these columns are automatically used as "
5726 "sentinels for supported dialects in any case."
5727 )
5728
5729 return None
5730
5731 def _deliver_insertmanyvalues_batches(
5732 self,
5733 statement: str,
5734 parameters: _DBAPIMultiExecuteParams,
5735 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5736 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5737 batch_size: int,
5738 sort_by_parameter_order: bool,
5739 schema_translate_map: Optional[SchemaTranslateMapType],
5740 ) -> Iterator[_InsertManyValuesBatch]:
5741 imv = self._insertmanyvalues
5742 assert imv is not None
5743
5744 if not imv.sentinel_param_keys:
5745 _sentinel_from_params = None
5746 else:
5747 _sentinel_from_params = operator.itemgetter(
5748 *imv.sentinel_param_keys
5749 )
5750
5751 lenparams = len(parameters)
5752 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5753 # backend doesn't support
5754 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5755 # at the moment this is basically SQL Server due to
5756 # not being able to use DEFAULT for identity column
5757 # just yield out that many single statements! still
5758 # faster than a whole connection.execute() call ;)
5759 #
5760 # note we still are taking advantage of the fact that we know
5761 # we are using RETURNING. The generalized approach of fetching
5762 # cursor.lastrowid etc. still goes through the more heavyweight
5763 # "ExecutionContext per statement" system as it isn't usable
5764 # as a generic "RETURNING" approach
5765 use_row_at_a_time = True
5766 downgraded = False
5767 elif not self.dialect.supports_multivalues_insert or (
5768 sort_by_parameter_order
5769 and self._result_columns
5770 and (
5771 imv.sentinel_columns is None
5772 or (
5773 imv.includes_upsert_behaviors
5774 and not imv.embed_values_counter
5775 )
5776 )
5777 ):
5778 # deterministic order was requested and the compiler could
5779 # not organize sentinel columns for this dialect/statement.
5780 # use row at a time. Note: if embed_values_counter is True,
5781 # the counter itself provides the ordering capability we need,
5782 # so we can use batch mode even with upsert behaviors.
5783 use_row_at_a_time = True
5784 downgraded = True
5785 elif (
5786 imv.has_upsert_bound_parameters
5787 and not imv.embed_values_counter
5788 and self._result_columns
5789 ):
5790 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
5791 # and parametrized bindparams in the SET clause, we must use
5792 # row-at-a-time. Batching multiple rows in a single statement
5793 # doesn't work when the SET clause contains bound parameters that
5794 # will receive different values per row, as there's only one SET
5795 # clause per statement. See issue #13130.
5796 use_row_at_a_time = True
5797 downgraded = True
5798 else:
5799 use_row_at_a_time = False
5800 downgraded = False
5801
5802 if use_row_at_a_time:
5803 for batchnum, (param, compiled_param) in enumerate(
5804 cast(
5805 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5806 zip(parameters, compiled_parameters),
5807 ),
5808 1,
5809 ):
5810 yield _InsertManyValuesBatch(
5811 statement,
5812 param,
5813 generic_setinputsizes,
5814 [param],
5815 (
5816 [_sentinel_from_params(compiled_param)]
5817 if _sentinel_from_params
5818 else []
5819 ),
5820 1,
5821 batchnum,
5822 lenparams,
5823 sort_by_parameter_order,
5824 downgraded,
5825 )
5826 return
5827
5828 if schema_translate_map:
5829 rst = functools.partial(
5830 self.preparer._render_schema_translates,
5831 schema_translate_map=schema_translate_map,
5832 )
5833 else:
5834 rst = None
5835
5836 imv_single_values_expr = imv.single_values_expr
5837 if rst:
5838 imv_single_values_expr = rst(imv_single_values_expr)
5839
5840 executemany_values = f"({imv_single_values_expr})"
5841 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5842
5843 # Use optional insertmanyvalues_max_parameters
5844 # to further shrink the batch size so that there are no more than
5845 # insertmanyvalues_max_parameters params.
5846 # Currently used by SQL Server, which limits statements to 2100 bound
5847 # parameters (actually 2099).
5848 max_params = self.dialect.insertmanyvalues_max_parameters
5849 if max_params:
5850 total_num_of_params = len(self.bind_names)
5851 num_params_per_batch = len(imv.insert_crud_params)
5852 num_params_outside_of_batch = (
5853 total_num_of_params - num_params_per_batch
5854 )
5855 batch_size = min(
5856 batch_size,
5857 (
5858 (max_params - num_params_outside_of_batch)
5859 // num_params_per_batch
5860 ),
5861 )
5862
5863 batches = cast("List[Sequence[Any]]", list(parameters))
5864 compiled_batches = cast(
5865 "List[Sequence[Any]]", list(compiled_parameters)
5866 )
5867
5868 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5869 batchnum = 1
5870 total_batches = lenparams // batch_size + (
5871 1 if lenparams % batch_size else 0
5872 )
5873
5874 insert_crud_params = imv.insert_crud_params
5875 assert insert_crud_params is not None
5876
5877 if rst:
5878 insert_crud_params = [
5879 (col, key, rst(expr), st)
5880 for col, key, expr, st in insert_crud_params
5881 ]
5882
5883 escaped_bind_names: Mapping[str, str]
5884 expand_pos_lower_index = expand_pos_upper_index = 0
5885
5886 if not self.positional:
5887 if self.escaped_bind_names:
5888 escaped_bind_names = self.escaped_bind_names
5889 else:
5890 escaped_bind_names = {}
5891
5892 all_keys = set(parameters[0])
5893
5894 def apply_placeholders(keys, formatted):
5895 for key in keys:
5896 key = escaped_bind_names.get(key, key)
5897 formatted = formatted.replace(
5898 self.bindtemplate % {"name": key},
5899 self.bindtemplate
5900 % {"name": f"{key}__EXECMANY_INDEX__"},
5901 )
5902 return formatted
5903
5904 if imv.embed_values_counter:
5905 imv_values_counter = ", _IMV_VALUES_COUNTER"
5906 else:
5907 imv_values_counter = ""
5908 formatted_values_clause = f"""({', '.join(
5909 apply_placeholders(bind_keys, formatted)
5910 for _, _, formatted, bind_keys in insert_crud_params
5911 )}{imv_values_counter})"""
5912
5913 keys_to_replace = all_keys.intersection(
5914 escaped_bind_names.get(key, key)
5915 for _, _, _, bind_keys in insert_crud_params
5916 for key in bind_keys
5917 )
5918 base_parameters = {
5919 key: parameters[0][key]
5920 for key in all_keys.difference(keys_to_replace)
5921 }
5922
5923 executemany_values_w_comma = ""
5924 else:
5925 formatted_values_clause = ""
5926 keys_to_replace = set()
5927 base_parameters = {}
5928
5929 if imv.embed_values_counter:
5930 executemany_values_w_comma = (
5931 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5932 )
5933 else:
5934 executemany_values_w_comma = f"({imv_single_values_expr}), "
5935
5936 all_names_we_will_expand: Set[str] = set()
5937 for elem in imv.insert_crud_params:
5938 all_names_we_will_expand.update(elem[3])
5939
5940 # get the start and end position in a particular list
5941 # of parameters where we will be doing the "expanding".
5942 # statements can have params on either side or both sides,
5943 # given RETURNING and CTEs
5944 if all_names_we_will_expand:
5945 positiontup = self.positiontup
5946 assert positiontup is not None
5947
5948 all_expand_positions = {
5949 idx
5950 for idx, name in enumerate(positiontup)
5951 if name in all_names_we_will_expand
5952 }
5953 expand_pos_lower_index = min(all_expand_positions)
5954 expand_pos_upper_index = max(all_expand_positions) + 1
5955 assert (
5956 len(all_expand_positions)
5957 == expand_pos_upper_index - expand_pos_lower_index
5958 )
5959
5960 if self._numeric_binds:
5961 escaped = re.escape(self._numeric_binds_identifier_char)
5962 executemany_values_w_comma = re.sub(
5963 rf"{escaped}\d+", "%s", executemany_values_w_comma
5964 )
5965
5966 while batches:
5967 batch = batches[0:batch_size]
5968 compiled_batch = compiled_batches[0:batch_size]
5969
5970 batches[0:batch_size] = []
5971 compiled_batches[0:batch_size] = []
5972
5973 if batches:
5974 current_batch_size = batch_size
5975 else:
5976 current_batch_size = len(batch)
5977
5978 if generic_setinputsizes:
5979 # if setinputsizes is present, expand this collection to
5980 # suit the batch length as well
5981 # currently this will be mssql+pyodbc for internal dialects
5982 processed_setinputsizes = [
5983 (new_key, len_, typ)
5984 for new_key, len_, typ in (
5985 (f"{key}_{index}", len_, typ)
5986 for index in range(current_batch_size)
5987 for key, len_, typ in generic_setinputsizes
5988 )
5989 ]
5990
5991 replaced_parameters: Any
5992 if self.positional:
5993 num_ins_params = imv.num_positional_params_counted
5994
5995 batch_iterator: Iterable[Sequence[Any]]
5996 extra_params_left: Sequence[Any]
5997 extra_params_right: Sequence[Any]
5998
5999 if num_ins_params == len(batch[0]):
6000 extra_params_left = extra_params_right = ()
6001 batch_iterator = batch
6002 else:
6003 extra_params_left = batch[0][:expand_pos_lower_index]
6004 extra_params_right = batch[0][expand_pos_upper_index:]
6005 batch_iterator = (
6006 b[expand_pos_lower_index:expand_pos_upper_index]
6007 for b in batch
6008 )
6009
6010 if imv.embed_values_counter:
6011 expanded_values_string = (
6012 "".join(
6013 executemany_values_w_comma.replace(
6014 "_IMV_VALUES_COUNTER", str(i)
6015 )
6016 for i, _ in enumerate(batch)
6017 )
6018 )[:-2]
6019 else:
6020 expanded_values_string = (
6021 (executemany_values_w_comma * current_batch_size)
6022 )[:-2]
6023
6024 if self._numeric_binds and num_ins_params > 0:
6025 # numeric will always number the parameters inside of
6026 # VALUES (and thus order self.positiontup) to be higher
6027 # than non-VALUES parameters, no matter where in the
6028 # statement those non-VALUES parameters appear (this is
6029 # ensured in _process_numeric by numbering first all
6030 # params that are not in _values_bindparam)
6031 # therefore all extra params are always
6032 # on the left side and numbered lower than the VALUES
6033 # parameters
6034 assert not extra_params_right
6035
6036 start = expand_pos_lower_index + 1
6037 end = num_ins_params * (current_batch_size) + start
6038
6039 # need to format here, since statement may contain
6040 # unescaped %, while values_string contains just (%s, %s)
6041 positions = tuple(
6042 f"{self._numeric_binds_identifier_char}{i}"
6043 for i in range(start, end)
6044 )
6045 expanded_values_string = expanded_values_string % positions
6046
6047 replaced_statement = statement.replace(
6048 "__EXECMANY_TOKEN__", expanded_values_string
6049 )
6050
6051 replaced_parameters = tuple(
6052 itertools.chain.from_iterable(batch_iterator)
6053 )
6054
6055 replaced_parameters = (
6056 extra_params_left
6057 + replaced_parameters
6058 + extra_params_right
6059 )
6060
6061 else:
6062 replaced_values_clauses = []
6063 replaced_parameters = base_parameters.copy()
6064
6065 for i, param in enumerate(batch):
6066 fmv = formatted_values_clause.replace(
6067 "EXECMANY_INDEX__", str(i)
6068 )
6069 if imv.embed_values_counter:
6070 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
6071
6072 replaced_values_clauses.append(fmv)
6073 replaced_parameters.update(
6074 {f"{key}__{i}": param[key] for key in keys_to_replace}
6075 )
6076
6077 replaced_statement = statement.replace(
6078 "__EXECMANY_TOKEN__",
6079 ", ".join(replaced_values_clauses),
6080 )
6081
6082 yield _InsertManyValuesBatch(
6083 replaced_statement,
6084 replaced_parameters,
6085 processed_setinputsizes,
6086 batch,
6087 (
6088 [_sentinel_from_params(cb) for cb in compiled_batch]
6089 if _sentinel_from_params
6090 else []
6091 ),
6092 current_batch_size,
6093 batchnum,
6094 total_batches,
6095 sort_by_parameter_order,
6096 False,
6097 )
6098 batchnum += 1
6099
6100 def visit_insert(
6101 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6102 ):
6103 compile_state = insert_stmt._compile_state_factory(
6104 insert_stmt, self, **kw
6105 )
6106 insert_stmt = compile_state.statement
6107
6108 if visiting_cte is not None:
6109 kw["visiting_cte"] = visiting_cte
6110 toplevel = False
6111 else:
6112 toplevel = not self.stack
6113
6114 if toplevel:
6115 self.isinsert = True
6116 if not self.dml_compile_state:
6117 self.dml_compile_state = compile_state
6118 if not self.compile_state:
6119 self.compile_state = compile_state
6120
6121 self.stack.append(
6122 {
6123 "correlate_froms": set(),
6124 "asfrom_froms": set(),
6125 "selectable": insert_stmt,
6126 }
6127 )
6128
6129 counted_bindparam = 0
6130
6131 # reset any incoming "visited_bindparam" collection
6132 visited_bindparam = None
6133
6134 # for positional, insertmanyvalues needs to know how many
6135 # bound parameters are in the VALUES sequence; there's no simple
6136 # rule because default expressions etc. can have zero or more
6137 # params inside them. After multiple attempts to figure this out,
6138 # this very simplistic "count after" works and is
6139 # likely the least amount of callcounts, though looks clumsy
6140 if self.positional and visiting_cte is None:
6141 # if we are inside a CTE, don't count parameters
6142 # here since they won't be for insertmanyvalues. keep
6143 # visited_bindparam at None so no counting happens.
6144 # see #9173
6145 visited_bindparam = []
6146
6147 crud_params_struct = crud._get_crud_params(
6148 self,
6149 insert_stmt,
6150 compile_state,
6151 toplevel,
6152 visited_bindparam=visited_bindparam,
6153 **kw,
6154 )
6155
6156 if self.positional and visited_bindparam is not None:
6157 counted_bindparam = len(visited_bindparam)
6158 if self._numeric_binds:
6159 if self._values_bindparam is not None:
6160 self._values_bindparam += visited_bindparam
6161 else:
6162 self._values_bindparam = visited_bindparam
6163
6164 crud_params_single = crud_params_struct.single_params
6165
6166 if (
6167 not crud_params_single
6168 and not self.dialect.supports_default_values
6169 and not self.dialect.supports_default_metavalue
6170 and not self.dialect.supports_empty_insert
6171 ):
6172 raise exc.CompileError(
6173 "The '%s' dialect with current database "
6174 "version settings does not support empty "
6175 "inserts." % self.dialect.name
6176 )
6177
6178 if compile_state._has_multi_parameters:
6179 if not self.dialect.supports_multivalues_insert:
6180 raise exc.CompileError(
6181 "The '%s' dialect with current database "
6182 "version settings does not support "
6183 "in-place multirow inserts." % self.dialect.name
6184 )
6185 elif (
6186 self.implicit_returning or insert_stmt._returning
6187 ) and insert_stmt._sort_by_parameter_order:
6188 raise exc.CompileError(
6189 "RETURNING cannot be deterministically sorted when "
6190 "using an INSERT which includes multi-row values()."
6191 )
6192 crud_params_single = crud_params_struct.single_params
6193 else:
6194 crud_params_single = crud_params_struct.single_params
6195
6196 preparer = self.preparer
6197 supports_default_values = self.dialect.supports_default_values
6198
6199 text = "INSERT "
6200
6201 if insert_stmt._prefixes:
6202 text += self._generate_prefixes(
6203 insert_stmt, insert_stmt._prefixes, **kw
6204 )
6205
6206 text += "INTO "
6207 table_text = preparer.format_table(insert_stmt.table)
6208
6209 if insert_stmt._hints:
6210 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6211
6212 if insert_stmt._independent_ctes:
6213 self._dispatch_independent_ctes(insert_stmt, kw)
6214
6215 text += table_text
6216
6217 if crud_params_single or not supports_default_values:
6218 text += " (%s)" % ", ".join(
6219 [expr for _, expr, _, _ in crud_params_single]
6220 )
6221
6222 # look for insertmanyvalues attributes that would have been configured
6223 # by crud.py as it scanned through the columns to be part of the
6224 # INSERT
6225 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6226 named_sentinel_params: Optional[Sequence[str]] = None
6227 add_sentinel_cols = None
6228 implicit_sentinel = False
6229
6230 returning_cols = self.implicit_returning or insert_stmt._returning
6231 if returning_cols:
6232 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6233 if add_sentinel_cols is not None:
6234 assert use_insertmanyvalues
6235
6236 # search for the sentinel column explicitly present
6237 # in the INSERT columns list, and additionally check that
6238 # this column has a bound parameter name set up that's in the
6239 # parameter list. If both of these cases are present, it means
6240 # we will have a client side value for the sentinel in each
6241 # parameter set.
6242
6243 _params_by_col = {
6244 col: param_names
6245 for col, _, _, param_names in crud_params_single
6246 }
6247 named_sentinel_params = []
6248 for _add_sentinel_col in add_sentinel_cols:
6249 if _add_sentinel_col not in _params_by_col:
6250 named_sentinel_params = None
6251 break
6252 param_name = self._within_exec_param_key_getter(
6253 _add_sentinel_col
6254 )
6255 if param_name not in _params_by_col[_add_sentinel_col]:
6256 named_sentinel_params = None
6257 break
6258 named_sentinel_params.append(param_name)
6259
6260 if named_sentinel_params is None:
6261 # if we are not going to have a client side value for
6262 # the sentinel in the parameter set, that means it's
6263 # an autoincrement, an IDENTITY, or a server-side SQL
6264 # expression like nextval('seqname'). So this is
6265 # an "implicit" sentinel; we will look for it in
6266 # RETURNING
6267 # only, and then sort on it. For this case on PG,
6268 # SQL Server we have to use a special INSERT form
6269 # that guarantees the server side function lines up with
6270 # the entries in the VALUES.
6271 if (
6272 self.dialect.insertmanyvalues_implicit_sentinel
6273 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6274 ):
6275 implicit_sentinel = True
6276 else:
6277 # here, we are not using a sentinel at all
6278 # and we are likely the SQLite dialect.
6279 # The first add_sentinel_col that we have should not
6280 # be marked as "insert_sentinel=True". if it was,
6281 # an error should have been raised in
6282 # _get_sentinel_column_for_table.
6283 assert not add_sentinel_cols[0]._insert_sentinel, (
6284 "sentinel selection rules should have prevented "
6285 "us from getting here for this dialect"
6286 )
6287
6288 # always put the sentinel columns last. even if they are
6289 # in the returning list already, they will be there twice
6290 # then.
6291 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6292
6293 returning_clause = self.returning_clause(
6294 insert_stmt,
6295 returning_cols,
6296 populate_result_map=toplevel,
6297 )
6298
6299 if self.returning_precedes_values:
6300 text += " " + returning_clause
6301
6302 else:
6303 returning_clause = None
6304
6305 if insert_stmt.select is not None:
6306 # placed here by crud.py
6307 select_text = self.process(
6308 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6309 )
6310
6311 if self.ctes and self.dialect.cte_follows_insert:
6312 nesting_level = len(self.stack) if not toplevel else None
6313 text += " %s%s" % (
6314 self._render_cte_clause(
6315 nesting_level=nesting_level,
6316 include_following_stack=True,
6317 ),
6318 select_text,
6319 )
6320 else:
6321 text += " %s" % select_text
6322 elif not crud_params_single and supports_default_values:
6323 text += " DEFAULT VALUES"
6324 if use_insertmanyvalues:
6325 self._insertmanyvalues = _InsertManyValues(
6326 True,
6327 self.dialect.default_metavalue_token,
6328 crud_params_single,
6329 counted_bindparam,
6330 sort_by_parameter_order=(
6331 insert_stmt._sort_by_parameter_order
6332 ),
6333 includes_upsert_behaviors=(
6334 insert_stmt._post_values_clause is not None
6335 ),
6336 sentinel_columns=add_sentinel_cols,
6337 num_sentinel_columns=(
6338 len(add_sentinel_cols) if add_sentinel_cols else 0
6339 ),
6340 implicit_sentinel=implicit_sentinel,
6341 )
6342 elif compile_state._has_multi_parameters:
6343 text += " VALUES %s" % (
6344 ", ".join(
6345 "(%s)"
6346 % (", ".join(value for _, _, value, _ in crud_param_set))
6347 for crud_param_set in crud_params_struct.all_multi_params
6348 ),
6349 )
6350 elif use_insertmanyvalues:
6351 if (
6352 implicit_sentinel
6353 and (
6354 self.dialect.insertmanyvalues_implicit_sentinel
6355 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6356 )
6357 # this is checking if we have
6358 # INSERT INTO table (id) VALUES (DEFAULT).
6359 and not (crud_params_struct.is_default_metavalue_only)
6360 ):
6361 # if we have a sentinel column that is server generated,
6362 # then for selected backends render the VALUES list as a
6363 # subquery. This is the orderable form supported by
6364 # PostgreSQL and in fewer cases SQL Server
6365 embed_sentinel_value = True
6366
6367 render_bind_casts = (
6368 self.dialect.insertmanyvalues_implicit_sentinel
6369 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6370 )
6371
6372 add_sentinel_set = add_sentinel_cols or ()
6373
6374 insert_single_values_expr = ", ".join(
6375 [
6376 value
6377 for col, _, value, _ in crud_params_single
6378 if col not in add_sentinel_set
6379 ]
6380 )
6381
6382 colnames = ", ".join(
6383 f"p{i}"
6384 for i, cp in enumerate(crud_params_single)
6385 if cp[0] not in add_sentinel_set
6386 )
6387
6388 if render_bind_casts:
6389 # render casts for the SELECT list. For PG, we are
6390 # already rendering bind casts in the parameter list,
6391 # selectively for the more "tricky" types like ARRAY.
6392 # however, even for the "easy" types, if the parameter
6393 # is NULL for every entry, PG gives up and says
6394 # "it must be TEXT", which fails for other easy types
6395 # like ints. So we cast on this side too.
6396 colnames_w_cast = ", ".join(
6397 (
6398 self.render_bind_cast(
6399 col.type,
6400 col.type._unwrapped_dialect_impl(self.dialect),
6401 f"p{i}",
6402 )
6403 if col not in add_sentinel_set
6404 else expr
6405 )
6406 for i, (col, _, expr, _) in enumerate(
6407 crud_params_single
6408 )
6409 )
6410 else:
6411 colnames_w_cast = ", ".join(
6412 (f"p{i}" if col not in add_sentinel_set else expr)
6413 for i, (col, _, expr, _) in enumerate(
6414 crud_params_single
6415 )
6416 )
6417
6418 insert_crud_params = [
6419 elem
6420 for elem in crud_params_single
6421 if elem[0] not in add_sentinel_set
6422 ]
6423
6424 text += (
6425 f" SELECT {colnames_w_cast} FROM "
6426 f"(VALUES ({insert_single_values_expr})) "
6427 f"AS imp_sen({colnames}, sen_counter) "
6428 "ORDER BY sen_counter"
6429 )
6430
6431 else:
6432 # otherwise, if no sentinel or backend doesn't support
6433 # orderable subquery form, use a plain VALUES list
6434 embed_sentinel_value = False
6435 insert_crud_params = crud_params_single
6436 insert_single_values_expr = ", ".join(
6437 [value for _, _, value, _ in crud_params_single]
6438 )
6439
6440 text += f" VALUES ({insert_single_values_expr})"
6441
6442 self._insertmanyvalues = _InsertManyValues(
6443 is_default_expr=False,
6444 single_values_expr=insert_single_values_expr,
6445 insert_crud_params=insert_crud_params,
6446 num_positional_params_counted=counted_bindparam,
6447 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6448 includes_upsert_behaviors=(
6449 insert_stmt._post_values_clause is not None
6450 ),
6451 sentinel_columns=add_sentinel_cols,
6452 num_sentinel_columns=(
6453 len(add_sentinel_cols) if add_sentinel_cols else 0
6454 ),
6455 sentinel_param_keys=named_sentinel_params,
6456 implicit_sentinel=implicit_sentinel,
6457 embed_values_counter=embed_sentinel_value,
6458 )
6459
6460 else:
6461 insert_single_values_expr = ", ".join(
6462 [value for _, _, value, _ in crud_params_single]
6463 )
6464
6465 text += f" VALUES ({insert_single_values_expr})"
6466
6467 if insert_stmt._post_values_clause is not None:
6468 post_values_clause = self.process(
6469 insert_stmt._post_values_clause, **kw
6470 )
6471 if post_values_clause:
6472 text += " " + post_values_clause
6473
6474 if returning_clause and not self.returning_precedes_values:
6475 text += " " + returning_clause
6476
6477 if self.ctes and not self.dialect.cte_follows_insert:
6478 nesting_level = len(self.stack) if not toplevel else None
6479 text = (
6480 self._render_cte_clause(
6481 nesting_level=nesting_level,
6482 include_following_stack=True,
6483 )
6484 + text
6485 )
6486
6487 self.stack.pop(-1)
6488
6489 return text
6490
6491 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6492 """Provide a hook to override the initial table clause
6493 in an UPDATE statement.
6494
6495 MySQL overrides this.
6496
6497 """
6498 kw["asfrom"] = True
6499 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6500
6501 def update_from_clause(
6502 self, update_stmt, from_table, extra_froms, from_hints, **kw
6503 ):
6504 """Provide a hook to override the generation of an
6505 UPDATE..FROM clause.
6506 MySQL and MSSQL override this.
6507 """
6508 raise NotImplementedError(
6509 "This backend does not support multiple-table "
6510 "criteria within UPDATE"
6511 )
6512
6513 def update_post_criteria_clause(
6514 self, update_stmt: Update, **kw: Any
6515 ) -> Optional[str]:
6516 """provide a hook to override generation after the WHERE criteria
6517 in an UPDATE statement
6518
6519 .. versionadded:: 2.1
6520
6521 """
6522 if update_stmt._post_criteria_clause is not None:
6523 return self.process(
6524 update_stmt._post_criteria_clause,
6525 **kw,
6526 )
6527 else:
6528 return None
6529
6530 def delete_post_criteria_clause(
6531 self, delete_stmt: Delete, **kw: Any
6532 ) -> Optional[str]:
6533 """provide a hook to override generation after the WHERE criteria
6534 in a DELETE statement
6535
6536 .. versionadded:: 2.1
6537
6538 """
6539 if delete_stmt._post_criteria_clause is not None:
6540 return self.process(
6541 delete_stmt._post_criteria_clause,
6542 **kw,
6543 )
6544 else:
6545 return None
6546
6547 def visit_update(
6548 self,
6549 update_stmt: Update,
6550 visiting_cte: Optional[CTE] = None,
6551 **kw: Any,
6552 ) -> str:
6553 compile_state = update_stmt._compile_state_factory(
6554 update_stmt, self, **kw
6555 )
6556 if TYPE_CHECKING:
6557 assert isinstance(compile_state, UpdateDMLState)
6558 update_stmt = compile_state.statement # type: ignore[assignment]
6559
6560 if visiting_cte is not None:
6561 kw["visiting_cte"] = visiting_cte
6562 toplevel = False
6563 else:
6564 toplevel = not self.stack
6565
6566 if toplevel:
6567 self.isupdate = True
6568 if not self.dml_compile_state:
6569 self.dml_compile_state = compile_state
6570 if not self.compile_state:
6571 self.compile_state = compile_state
6572
6573 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6574 from_linter = FromLinter({}, set())
6575 warn_linting = self.linting & WARN_LINTING
6576 if toplevel:
6577 self.from_linter = from_linter
6578 else:
6579 from_linter = None
6580 warn_linting = False
6581
6582 extra_froms = compile_state._extra_froms
6583 is_multitable = bool(extra_froms)
6584
6585 if is_multitable:
6586 # main table might be a JOIN
6587 main_froms = set(_from_objects(update_stmt.table))
6588 render_extra_froms = [
6589 f for f in extra_froms if f not in main_froms
6590 ]
6591 correlate_froms = main_froms.union(extra_froms)
6592 else:
6593 render_extra_froms = []
6594 correlate_froms = {update_stmt.table}
6595
6596 self.stack.append(
6597 {
6598 "correlate_froms": correlate_froms,
6599 "asfrom_froms": correlate_froms,
6600 "selectable": update_stmt,
6601 }
6602 )
6603
6604 text = "UPDATE "
6605
6606 if update_stmt._prefixes:
6607 text += self._generate_prefixes(
6608 update_stmt, update_stmt._prefixes, **kw
6609 )
6610
6611 table_text = self.update_tables_clause(
6612 update_stmt,
6613 update_stmt.table,
6614 render_extra_froms,
6615 from_linter=from_linter,
6616 **kw,
6617 )
6618 crud_params_struct = crud._get_crud_params(
6619 self, update_stmt, compile_state, toplevel, **kw
6620 )
6621 crud_params = crud_params_struct.single_params
6622
6623 if update_stmt._hints:
6624 dialect_hints, table_text = self._setup_crud_hints(
6625 update_stmt, table_text
6626 )
6627 else:
6628 dialect_hints = None
6629
6630 if update_stmt._independent_ctes:
6631 self._dispatch_independent_ctes(update_stmt, kw)
6632
6633 text += table_text
6634
6635 text += " SET "
6636 text += ", ".join(
6637 expr + "=" + value
6638 for _, expr, value, _ in cast(
6639 "List[Tuple[Any, str, str, Any]]", crud_params
6640 )
6641 )
6642
6643 if self.implicit_returning or update_stmt._returning:
6644 if self.returning_precedes_values:
6645 text += " " + self.returning_clause(
6646 update_stmt,
6647 self.implicit_returning or update_stmt._returning,
6648 populate_result_map=toplevel,
6649 )
6650
6651 if extra_froms:
6652 extra_from_text = self.update_from_clause(
6653 update_stmt,
6654 update_stmt.table,
6655 render_extra_froms,
6656 dialect_hints,
6657 from_linter=from_linter,
6658 **kw,
6659 )
6660 if extra_from_text:
6661 text += " " + extra_from_text
6662
6663 if update_stmt._where_criteria:
6664 t = self._generate_delimited_and_list(
6665 update_stmt._where_criteria, from_linter=from_linter, **kw
6666 )
6667 if t:
6668 text += " WHERE " + t
6669
6670 ulc = self.update_post_criteria_clause(
6671 update_stmt, from_linter=from_linter, **kw
6672 )
6673 if ulc:
6674 text += " " + ulc
6675
6676 if (
6677 self.implicit_returning or update_stmt._returning
6678 ) and not self.returning_precedes_values:
6679 text += " " + self.returning_clause(
6680 update_stmt,
6681 self.implicit_returning or update_stmt._returning,
6682 populate_result_map=toplevel,
6683 )
6684
6685 if self.ctes:
6686 nesting_level = len(self.stack) if not toplevel else None
6687 text = self._render_cte_clause(nesting_level=nesting_level) + text
6688
6689 if warn_linting:
6690 assert from_linter is not None
6691 from_linter.warn(stmt_type="UPDATE")
6692
6693 self.stack.pop(-1)
6694
6695 return text # type: ignore[no-any-return]
6696
6697 def delete_extra_from_clause(
6698 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6699 ):
6700 """Provide a hook to override the generation of an
6701 DELETE..FROM clause.
6702
6703 This can be used to implement DELETE..USING for example.
6704
6705 MySQL and MSSQL override this.
6706
6707 """
6708 raise NotImplementedError(
6709 "This backend does not support multiple-table "
6710 "criteria within DELETE"
6711 )
6712
6713 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6714 return from_table._compiler_dispatch(
6715 self, asfrom=True, iscrud=True, **kw
6716 )
6717
6718 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6719 compile_state = delete_stmt._compile_state_factory(
6720 delete_stmt, self, **kw
6721 )
6722 delete_stmt = compile_state.statement
6723
6724 if visiting_cte is not None:
6725 kw["visiting_cte"] = visiting_cte
6726 toplevel = False
6727 else:
6728 toplevel = not self.stack
6729
6730 if toplevel:
6731 self.isdelete = True
6732 if not self.dml_compile_state:
6733 self.dml_compile_state = compile_state
6734 if not self.compile_state:
6735 self.compile_state = compile_state
6736
6737 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6738 from_linter = FromLinter({}, set())
6739 warn_linting = self.linting & WARN_LINTING
6740 if toplevel:
6741 self.from_linter = from_linter
6742 else:
6743 from_linter = None
6744 warn_linting = False
6745
6746 extra_froms = compile_state._extra_froms
6747
6748 correlate_froms = {delete_stmt.table}.union(extra_froms)
6749 self.stack.append(
6750 {
6751 "correlate_froms": correlate_froms,
6752 "asfrom_froms": correlate_froms,
6753 "selectable": delete_stmt,
6754 }
6755 )
6756
6757 text = "DELETE "
6758
6759 if delete_stmt._prefixes:
6760 text += self._generate_prefixes(
6761 delete_stmt, delete_stmt._prefixes, **kw
6762 )
6763
6764 text += "FROM "
6765
6766 try:
6767 table_text = self.delete_table_clause(
6768 delete_stmt,
6769 delete_stmt.table,
6770 extra_froms,
6771 from_linter=from_linter,
6772 )
6773 except TypeError:
6774 # anticipate 3rd party dialects that don't include **kw
6775 # TODO: remove in 2.1
6776 table_text = self.delete_table_clause(
6777 delete_stmt, delete_stmt.table, extra_froms
6778 )
6779 if from_linter:
6780 _ = self.process(delete_stmt.table, from_linter=from_linter)
6781
6782 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6783
6784 if delete_stmt._hints:
6785 dialect_hints, table_text = self._setup_crud_hints(
6786 delete_stmt, table_text
6787 )
6788 else:
6789 dialect_hints = None
6790
6791 if delete_stmt._independent_ctes:
6792 self._dispatch_independent_ctes(delete_stmt, kw)
6793
6794 text += table_text
6795
6796 if (
6797 self.implicit_returning or delete_stmt._returning
6798 ) and self.returning_precedes_values:
6799 text += " " + self.returning_clause(
6800 delete_stmt,
6801 self.implicit_returning or delete_stmt._returning,
6802 populate_result_map=toplevel,
6803 )
6804
6805 if extra_froms:
6806 extra_from_text = self.delete_extra_from_clause(
6807 delete_stmt,
6808 delete_stmt.table,
6809 extra_froms,
6810 dialect_hints,
6811 from_linter=from_linter,
6812 **kw,
6813 )
6814 if extra_from_text:
6815 text += " " + extra_from_text
6816
6817 if delete_stmt._where_criteria:
6818 t = self._generate_delimited_and_list(
6819 delete_stmt._where_criteria, from_linter=from_linter, **kw
6820 )
6821 if t:
6822 text += " WHERE " + t
6823
6824 dlc = self.delete_post_criteria_clause(
6825 delete_stmt, from_linter=from_linter, **kw
6826 )
6827 if dlc:
6828 text += " " + dlc
6829
6830 if (
6831 self.implicit_returning or delete_stmt._returning
6832 ) and not self.returning_precedes_values:
6833 text += " " + self.returning_clause(
6834 delete_stmt,
6835 self.implicit_returning or delete_stmt._returning,
6836 populate_result_map=toplevel,
6837 )
6838
6839 if self.ctes:
6840 nesting_level = len(self.stack) if not toplevel else None
6841 text = self._render_cte_clause(nesting_level=nesting_level) + text
6842
6843 if warn_linting:
6844 assert from_linter is not None
6845 from_linter.warn(stmt_type="DELETE")
6846
6847 self.stack.pop(-1)
6848
6849 return text
6850
6851 def visit_savepoint(self, savepoint_stmt, **kw):
6852 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6853
6854 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6855 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6856 savepoint_stmt
6857 )
6858
6859 def visit_release_savepoint(self, savepoint_stmt, **kw):
6860 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6861 savepoint_stmt
6862 )
6863
6864
6865class StrSQLCompiler(SQLCompiler):
6866 """A :class:`.SQLCompiler` subclass which allows a small selection
6867 of non-standard SQL features to render into a string value.
6868
6869 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6870 element is directly stringified without calling upon the
6871 :meth:`_expression.ClauseElement.compile` method.
6872 It can render a limited set
6873 of non-standard SQL constructs to assist in basic stringification,
6874 however for more substantial custom or dialect-specific SQL constructs,
6875 it will be necessary to make use of
6876 :meth:`_expression.ClauseElement.compile`
6877 directly.
6878
6879 .. seealso::
6880
6881 :ref:`faq_sql_expression_string`
6882
6883 """
6884
6885 def _fallback_column_name(self, column):
6886 return "<name unknown>"
6887
6888 @util.preload_module("sqlalchemy.engine.url")
6889 def visit_unsupported_compilation(self, element, err, **kw):
6890 if element.stringify_dialect != "default":
6891 url = util.preloaded.engine_url
6892 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6893
6894 compiler = dialect.statement_compiler(
6895 dialect, None, _supporting_against=self
6896 )
6897 if not isinstance(compiler, StrSQLCompiler):
6898 return compiler.process(element, **kw)
6899
6900 return super().visit_unsupported_compilation(element, err)
6901
6902 def visit_getitem_binary(self, binary, operator, **kw):
6903 return "%s[%s]" % (
6904 self.process(binary.left, **kw),
6905 self.process(binary.right, **kw),
6906 )
6907
6908 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6909 return self.visit_getitem_binary(binary, operator, **kw)
6910
6911 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6912 return self.visit_getitem_binary(binary, operator, **kw)
6913
6914 def visit_sequence(self, sequence, **kw):
6915 return (
6916 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6917 )
6918
6919 def returning_clause(
6920 self,
6921 stmt: UpdateBase,
6922 returning_cols: Sequence[_ColumnsClauseElement],
6923 *,
6924 populate_result_map: bool,
6925 **kw: Any,
6926 ) -> str:
6927 columns = [
6928 self._label_select_column(None, c, True, False, {})
6929 for c in base._select_iterables(returning_cols)
6930 ]
6931 return "RETURNING " + ", ".join(columns)
6932
6933 def update_from_clause(
6934 self, update_stmt, from_table, extra_froms, from_hints, **kw
6935 ):
6936 kw["asfrom"] = True
6937 return "FROM " + ", ".join(
6938 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6939 for t in extra_froms
6940 )
6941
6942 def delete_extra_from_clause(
6943 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6944 ):
6945 kw["asfrom"] = True
6946 return ", " + ", ".join(
6947 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6948 for t in extra_froms
6949 )
6950
6951 def visit_empty_set_expr(self, element_types, **kw):
6952 return "SELECT 1 WHERE 1!=1"
6953
6954 def get_from_hint_text(self, table, text):
6955 return "[%s]" % text
6956
6957 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6958 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6959
6960 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6961 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6962
6963 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6964 return "<regexp replace>(%s, %s)" % (
6965 binary.left._compiler_dispatch(self, **kw),
6966 binary.right._compiler_dispatch(self, **kw),
6967 )
6968
6969 def visit_try_cast(self, cast, **kwargs):
6970 return "TRY_CAST(%s AS %s)" % (
6971 cast.clause._compiler_dispatch(self, **kwargs),
6972 cast.typeclause._compiler_dispatch(self, **kwargs),
6973 )
6974
6975
6976class DDLCompiler(Compiled):
6977 is_ddl = True
6978
6979 if TYPE_CHECKING:
6980
6981 def __init__(
6982 self,
6983 dialect: Dialect,
6984 statement: ExecutableDDLElement,
6985 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6986 render_schema_translate: bool = ...,
6987 compile_kwargs: Mapping[str, Any] = ...,
6988 ): ...
6989
6990 @util.ro_memoized_property
6991 def sql_compiler(self) -> SQLCompiler:
6992 return self.dialect.statement_compiler(
6993 self.dialect, None, schema_translate_map=self.schema_translate_map
6994 )
6995
6996 @util.memoized_property
6997 def type_compiler(self):
6998 return self.dialect.type_compiler_instance
6999
7000 def construct_params(
7001 self,
7002 params: Optional[_CoreSingleExecuteParams] = None,
7003 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
7004 escape_names: bool = True,
7005 ) -> Optional[_MutableCoreSingleExecuteParams]:
7006 return None
7007
7008 def visit_ddl(self, ddl, **kwargs):
7009 # table events can substitute table and schema name
7010 context = ddl.context
7011 if isinstance(ddl.target, schema.Table):
7012 context = context.copy()
7013
7014 preparer = self.preparer
7015 path = preparer.format_table_seq(ddl.target)
7016 if len(path) == 1:
7017 table, sch = path[0], ""
7018 else:
7019 table, sch = path[-1], path[0]
7020
7021 context.setdefault("table", table)
7022 context.setdefault("schema", sch)
7023 context.setdefault("fullname", preparer.format_table(ddl.target))
7024
7025 return self.sql_compiler.post_process_text(ddl.statement % context)
7026
7027 def visit_create_schema(self, create, **kw):
7028 text = "CREATE SCHEMA "
7029 if create.if_not_exists:
7030 text += "IF NOT EXISTS "
7031 return text + self.preparer.format_schema(create.element)
7032
7033 def visit_drop_schema(self, drop, **kw):
7034 text = "DROP SCHEMA "
7035 if drop.if_exists:
7036 text += "IF EXISTS "
7037 text += self.preparer.format_schema(drop.element)
7038 if drop.cascade:
7039 text += " CASCADE"
7040 return text
7041
7042 def visit_create_table(self, create, **kw):
7043 table = create.element
7044 preparer = self.preparer
7045
7046 text = "\nCREATE "
7047 if table._prefixes:
7048 text += " ".join(table._prefixes) + " "
7049
7050 text += "TABLE "
7051 if create.if_not_exists:
7052 text += "IF NOT EXISTS "
7053
7054 text += preparer.format_table(table) + " "
7055
7056 create_table_suffix = self.create_table_suffix(table)
7057 if create_table_suffix:
7058 text += create_table_suffix + " "
7059
7060 text += "("
7061
7062 separator = "\n"
7063
7064 # if only one primary key, specify it along with the column
7065 first_pk = False
7066 for create_column in create.columns:
7067 column = create_column.element
7068 try:
7069 processed = self.process(
7070 create_column, first_pk=column.primary_key and not first_pk
7071 )
7072 if processed is not None:
7073 text += separator
7074 separator = ", \n"
7075 text += "\t" + processed
7076 if column.primary_key:
7077 first_pk = True
7078 except exc.CompileError as ce:
7079 raise exc.CompileError(
7080 "(in table '%s', column '%s'): %s"
7081 % (table.description, column.name, ce.args[0])
7082 ) from ce
7083
7084 const = self.create_table_constraints(
7085 table,
7086 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
7087 )
7088 if const:
7089 text += separator + "\t" + const
7090
7091 text += "\n)%s\n\n" % self.post_create_table(table)
7092 return text
7093
7094 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
7095 return self._generate_table_select(element, "view", **kw)
7096
7097 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
7098 return self._generate_table_select(element, "create_table_as", **kw)
7099
7100 def _generate_table_select(
7101 self,
7102 element: _TableViaSelect,
7103 type_: str,
7104 if_not_exists: Optional[bool] = None,
7105 **kw: Any,
7106 ) -> str:
7107 prep = self.preparer
7108
7109 inner_kw = dict(kw)
7110 inner_kw["literal_binds"] = True
7111 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7112
7113 # Use if_not_exists parameter if provided, otherwise use element's
7114 use_if_not_exists = (
7115 if_not_exists
7116 if if_not_exists is not None
7117 else element.if_not_exists
7118 )
7119
7120 parts = [
7121 "CREATE",
7122 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7123 "TEMPORARY" if element.temporary else None,
7124 (
7125 "MATERIALIZED VIEW"
7126 if type_ == "view" and getattr(element, "materialized", False)
7127 else "TABLE" if type_ == "create_table_as" else "VIEW"
7128 ),
7129 "IF NOT EXISTS" if use_if_not_exists else None,
7130 prep.format_table(element.table),
7131 "AS",
7132 select_sql,
7133 ]
7134 return " ".join(p for p in parts if p)
7135
7136 def visit_create_column(self, create, first_pk=False, **kw):
7137 column = create.element
7138
7139 if column.system:
7140 return None
7141
7142 text = self.get_column_specification(column, first_pk=first_pk)
7143 const = " ".join(
7144 self.process(constraint) for constraint in column.constraints
7145 )
7146 if const:
7147 text += " " + const
7148
7149 return text
7150
7151 def create_table_constraints(
7152 self, table, _include_foreign_key_constraints=None, **kw
7153 ):
7154 # On some DB order is significant: visit PK first, then the
7155 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7156 constraints = []
7157 if table.primary_key:
7158 constraints.append(table.primary_key)
7159
7160 all_fkcs = table.foreign_key_constraints
7161 if _include_foreign_key_constraints is not None:
7162 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7163 else:
7164 omit_fkcs = set()
7165
7166 constraints.extend(
7167 [
7168 c
7169 for c in table._sorted_constraints
7170 if c is not table.primary_key and c not in omit_fkcs
7171 ]
7172 )
7173
7174 return ", \n\t".join(
7175 p
7176 for p in (
7177 self.process(constraint)
7178 for constraint in constraints
7179 if (constraint._should_create_for_compiler(self))
7180 and (
7181 not self.dialect.supports_alter
7182 or not getattr(constraint, "use_alter", False)
7183 )
7184 )
7185 if p is not None
7186 )
7187
7188 def visit_drop_table(self, drop, **kw):
7189 text = "\nDROP TABLE "
7190 if drop.if_exists:
7191 text += "IF EXISTS "
7192 return text + self.preparer.format_table(drop.element)
7193
7194 def visit_drop_view(self, drop, **kw):
7195 text = "\nDROP "
7196 if drop.materialized:
7197 text += "MATERIALIZED VIEW "
7198 else:
7199 text += "VIEW "
7200 if drop.if_exists:
7201 text += "IF EXISTS "
7202 return text + self.preparer.format_table(drop.element)
7203
7204 def _verify_index_table(self, index: Index) -> None:
7205 if index.table is None:
7206 raise exc.CompileError(
7207 "Index '%s' is not associated with any table." % index.name
7208 )
7209
7210 def visit_create_index(
7211 self, create, include_schema=False, include_table_schema=True, **kw
7212 ):
7213 index = create.element
7214 self._verify_index_table(index)
7215 preparer = self.preparer
7216 text = "CREATE "
7217 if index.unique:
7218 text += "UNIQUE "
7219 if index.name is None:
7220 raise exc.CompileError(
7221 "CREATE INDEX requires that the index have a name"
7222 )
7223
7224 text += "INDEX "
7225 if create.if_not_exists:
7226 text += "IF NOT EXISTS "
7227
7228 text += "%s ON %s (%s)" % (
7229 self._prepared_index_name(index, include_schema=include_schema),
7230 preparer.format_table(
7231 index.table, use_schema=include_table_schema
7232 ),
7233 ", ".join(
7234 self.sql_compiler.process(
7235 expr, include_table=False, literal_binds=True
7236 )
7237 for expr in index.expressions
7238 ),
7239 )
7240 return text
7241
7242 def visit_drop_index(self, drop, **kw):
7243 index = drop.element
7244
7245 if index.name is None:
7246 raise exc.CompileError(
7247 "DROP INDEX requires that the index have a name"
7248 )
7249 text = "\nDROP INDEX "
7250 if drop.if_exists:
7251 text += "IF EXISTS "
7252
7253 return text + self._prepared_index_name(index, include_schema=True)
7254
7255 def _prepared_index_name(
7256 self, index: Index, include_schema: bool = False
7257 ) -> str:
7258 if index.table is not None:
7259 effective_schema = self.preparer.schema_for_object(index.table)
7260 else:
7261 effective_schema = None
7262 if include_schema and effective_schema:
7263 schema_name = self.preparer.quote_schema(effective_schema)
7264 else:
7265 schema_name = None
7266
7267 index_name: str = self.preparer.format_index(index)
7268
7269 if schema_name:
7270 index_name = schema_name + "." + index_name
7271 return index_name
7272
7273 def visit_add_constraint(self, create, **kw):
7274 return "ALTER TABLE %s ADD %s" % (
7275 self.preparer.format_table(create.element.table),
7276 self.process(create.element),
7277 )
7278
7279 def visit_set_table_comment(self, create, **kw):
7280 return "COMMENT ON TABLE %s IS %s" % (
7281 self.preparer.format_table(create.element),
7282 self.sql_compiler.render_literal_value(
7283 create.element.comment, sqltypes.String()
7284 ),
7285 )
7286
7287 def visit_drop_table_comment(self, drop, **kw):
7288 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7289 drop.element
7290 )
7291
7292 def visit_set_column_comment(self, create, **kw):
7293 return "COMMENT ON COLUMN %s IS %s" % (
7294 self.preparer.format_column(
7295 create.element, use_table=True, use_schema=True
7296 ),
7297 self.sql_compiler.render_literal_value(
7298 create.element.comment, sqltypes.String()
7299 ),
7300 )
7301
7302 def visit_drop_column_comment(self, drop, **kw):
7303 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7304 drop.element, use_table=True
7305 )
7306
7307 def visit_set_constraint_comment(self, create, **kw):
7308 raise exc.UnsupportedCompilationError(self, type(create))
7309
7310 def visit_drop_constraint_comment(self, drop, **kw):
7311 raise exc.UnsupportedCompilationError(self, type(drop))
7312
7313 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7314 text = []
7315 if identity_options.increment is not None:
7316 text.append("INCREMENT BY %d" % identity_options.increment)
7317 if identity_options.start is not None:
7318 text.append("START WITH %d" % identity_options.start)
7319 if identity_options.minvalue is not None:
7320 text.append("MINVALUE %d" % identity_options.minvalue)
7321 if identity_options.maxvalue is not None:
7322 text.append("MAXVALUE %d" % identity_options.maxvalue)
7323 if identity_options.nominvalue is not None:
7324 text.append("NO MINVALUE")
7325 if identity_options.nomaxvalue is not None:
7326 text.append("NO MAXVALUE")
7327 if identity_options.cache is not None:
7328 text.append("CACHE %d" % identity_options.cache)
7329 if identity_options.cycle is not None:
7330 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7331 return " ".join(text)
7332
7333 def visit_create_sequence(self, create, prefix=None, **kw):
7334 text = "CREATE SEQUENCE "
7335 if create.if_not_exists:
7336 text += "IF NOT EXISTS "
7337 text += self.preparer.format_sequence(create.element)
7338
7339 if prefix:
7340 text += prefix
7341 options = self.get_identity_options(create.element)
7342 if options:
7343 text += " " + options
7344 return text
7345
7346 def visit_drop_sequence(self, drop, **kw):
7347 text = "DROP SEQUENCE "
7348 if drop.if_exists:
7349 text += "IF EXISTS "
7350 return text + self.preparer.format_sequence(drop.element)
7351
7352 def visit_drop_constraint(self, drop, **kw):
7353 constraint = drop.element
7354 if constraint.name is not None:
7355 formatted_name = self.preparer.format_constraint(constraint)
7356 else:
7357 formatted_name = None
7358
7359 if formatted_name is None:
7360 raise exc.CompileError(
7361 "Can't emit DROP CONSTRAINT for constraint %r; "
7362 "it has no name" % drop.element
7363 )
7364 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7365 self.preparer.format_table(drop.element.table),
7366 "IF EXISTS " if drop.if_exists else "",
7367 formatted_name,
7368 " CASCADE" if drop.cascade else "",
7369 )
7370
7371 def get_column_specification(
7372 self, column: Column[Any], **kwargs: Any
7373 ) -> str:
7374 colspec = (
7375 self.preparer.format_column(column)
7376 + " "
7377 + self.dialect.type_compiler_instance.process(
7378 column.type, type_expression=column
7379 )
7380 )
7381 default = self.get_column_default_string(column)
7382 if default is not None:
7383 colspec += " DEFAULT " + default
7384
7385 if column.computed is not None:
7386 colspec += " " + self.process(column.computed)
7387
7388 if (
7389 column.identity is not None
7390 and self.dialect.supports_identity_columns
7391 ):
7392 colspec += " " + self.process(column.identity)
7393
7394 if not column.nullable and (
7395 not column.identity or not self.dialect.supports_identity_columns
7396 ):
7397 colspec += " NOT NULL"
7398 return colspec
7399
7400 def create_table_suffix(self, table: Table) -> str:
7401 return ""
7402
7403 def post_create_table(self, table: Table) -> str:
7404 return ""
7405
7406 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7407 if isinstance(column.server_default, schema.DefaultClause):
7408 return self.render_default_string(column.server_default.arg)
7409 else:
7410 return None
7411
7412 def render_default_string(self, default: Union[Visitable, str]) -> str:
7413 if isinstance(default, str):
7414 return self.sql_compiler.render_literal_value(
7415 default, sqltypes.STRINGTYPE
7416 )
7417 else:
7418 return self.sql_compiler.process(default, literal_binds=True)
7419
7420 def visit_table_or_column_check_constraint(self, constraint, **kw):
7421 if constraint.is_column_level:
7422 return self.visit_column_check_constraint(constraint)
7423 else:
7424 return self.visit_check_constraint(constraint)
7425
7426 def visit_check_constraint(self, constraint, **kw):
7427 text = self.define_constraint_preamble(constraint, **kw)
7428 text += self.define_check_body(constraint, **kw)
7429 text += self.define_constraint_deferrability(constraint)
7430 return text
7431
7432 def visit_column_check_constraint(self, constraint, **kw):
7433 text = self.define_constraint_preamble(constraint, **kw)
7434 text += self.define_check_body(constraint, **kw)
7435 text += self.define_constraint_deferrability(constraint)
7436 return text
7437
7438 def visit_primary_key_constraint(
7439 self, constraint: PrimaryKeyConstraint, **kw: Any
7440 ) -> str:
7441 if len(constraint) == 0:
7442 return ""
7443 text = self.define_constraint_preamble(constraint, **kw)
7444 text += self.define_primary_key_body(constraint, **kw)
7445 text += self.define_constraint_deferrability(constraint)
7446 return text
7447
7448 def visit_foreign_key_constraint(
7449 self, constraint: ForeignKeyConstraint, **kw: Any
7450 ) -> str:
7451 text = self.define_constraint_preamble(constraint, **kw)
7452 text += self.define_foreign_key_body(constraint, **kw)
7453 text += self.define_constraint_match(constraint)
7454 text += self.define_constraint_cascades(constraint)
7455 text += self.define_constraint_deferrability(constraint)
7456 return text
7457
7458 def define_constraint_remote_table(self, constraint, table, preparer):
7459 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7460
7461 return preparer.format_table(table)
7462
7463 def visit_unique_constraint(
7464 self, constraint: UniqueConstraint, **kw: Any
7465 ) -> str:
7466 if len(constraint) == 0:
7467 return ""
7468 text = self.define_constraint_preamble(constraint, **kw)
7469 text += self.define_unique_body(constraint, **kw)
7470 text += self.define_constraint_deferrability(constraint)
7471 return text
7472
7473 def define_constraint_preamble(
7474 self, constraint: Constraint, **kw: Any
7475 ) -> str:
7476 text = ""
7477 if constraint.name is not None:
7478 formatted_name = self.preparer.format_constraint(constraint)
7479 if formatted_name is not None:
7480 text += "CONSTRAINT %s " % formatted_name
7481 return text
7482
7483 def define_primary_key_body(
7484 self, constraint: PrimaryKeyConstraint, **kw: Any
7485 ) -> str:
7486 text = ""
7487 text += "PRIMARY KEY "
7488 text += "(%s)" % ", ".join(
7489 self.preparer.quote(c.name)
7490 for c in (
7491 constraint.columns_autoinc_first
7492 if constraint._implicit_generated
7493 else constraint.columns
7494 )
7495 )
7496 return text
7497
7498 def define_foreign_key_body(
7499 self, constraint: ForeignKeyConstraint, **kw: Any
7500 ) -> str:
7501 preparer = self.preparer
7502 remote_table = list(constraint.elements)[0].column.table
7503 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7504 ", ".join(
7505 preparer.quote(f.parent.name) for f in constraint.elements
7506 ),
7507 self.define_constraint_remote_table(
7508 constraint, remote_table, preparer
7509 ),
7510 ", ".join(
7511 preparer.quote(f.column.name) for f in constraint.elements
7512 ),
7513 )
7514 return text
7515
7516 def define_unique_body(
7517 self, constraint: UniqueConstraint, **kw: Any
7518 ) -> str:
7519 text = "UNIQUE %s(%s)" % (
7520 self.define_unique_constraint_distinct(constraint, **kw),
7521 ", ".join(self.preparer.quote(c.name) for c in constraint),
7522 )
7523 return text
7524
7525 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7526 text = "CHECK (%s)" % self.sql_compiler.process(
7527 constraint.sqltext, include_table=False, literal_binds=True
7528 )
7529 return text
7530
7531 def define_unique_constraint_distinct(
7532 self, constraint: UniqueConstraint, **kw: Any
7533 ) -> str:
7534 return ""
7535
7536 def define_constraint_cascades(
7537 self, constraint: ForeignKeyConstraint
7538 ) -> str:
7539 text = ""
7540 if constraint.ondelete is not None:
7541 text += self.define_constraint_ondelete_cascade(constraint)
7542
7543 if constraint.onupdate is not None:
7544 text += self.define_constraint_onupdate_cascade(constraint)
7545 return text
7546
7547 def define_constraint_ondelete_cascade(
7548 self, constraint: ForeignKeyConstraint
7549 ) -> str:
7550 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7551 constraint.ondelete, FK_ON_DELETE
7552 )
7553
7554 def define_constraint_onupdate_cascade(
7555 self, constraint: ForeignKeyConstraint
7556 ) -> str:
7557 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7558 constraint.onupdate, FK_ON_UPDATE
7559 )
7560
7561 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7562 text = ""
7563 if constraint.deferrable is not None:
7564 if constraint.deferrable:
7565 text += " DEFERRABLE"
7566 else:
7567 text += " NOT DEFERRABLE"
7568 if constraint.initially is not None:
7569 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7570 constraint.initially, FK_INITIALLY
7571 )
7572 return text
7573
7574 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7575 text = ""
7576 if constraint.match is not None:
7577 text += " MATCH %s" % constraint.match
7578 return text
7579
7580 def visit_computed_column(self, generated, **kw):
7581 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7582 generated.sqltext, include_table=False, literal_binds=True
7583 )
7584 if generated.persisted is True:
7585 text += " STORED"
7586 elif generated.persisted is False:
7587 text += " VIRTUAL"
7588 return text
7589
7590 def visit_identity_column(self, identity, **kw):
7591 text = "GENERATED %s AS IDENTITY" % (
7592 "ALWAYS" if identity.always else "BY DEFAULT",
7593 )
7594 options = self.get_identity_options(identity)
7595 if options:
7596 text += " (%s)" % options
7597 return text
7598
7599
7600class GenericTypeCompiler(TypeCompiler):
7601 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7602 return "FLOAT"
7603
7604 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7605 return "DOUBLE"
7606
7607 def visit_DOUBLE_PRECISION(
7608 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7609 ) -> str:
7610 return "DOUBLE PRECISION"
7611
7612 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7613 return "REAL"
7614
7615 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7616 if type_.precision is None:
7617 return "NUMERIC"
7618 elif type_.scale is None:
7619 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7620 else:
7621 return "NUMERIC(%(precision)s, %(scale)s)" % {
7622 "precision": type_.precision,
7623 "scale": type_.scale,
7624 }
7625
7626 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7627 if type_.precision is None:
7628 return "DECIMAL"
7629 elif type_.scale is None:
7630 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7631 else:
7632 return "DECIMAL(%(precision)s, %(scale)s)" % {
7633 "precision": type_.precision,
7634 "scale": type_.scale,
7635 }
7636
7637 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7638 return "INTEGER"
7639
7640 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7641 return "SMALLINT"
7642
7643 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7644 return "BIGINT"
7645
7646 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7647 return "TIMESTAMP"
7648
7649 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7650 return "DATETIME"
7651
7652 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7653 return "DATE"
7654
7655 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7656 return "TIME"
7657
7658 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7659 return "CLOB"
7660
7661 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7662 return "NCLOB"
7663
7664 def _render_string_type(
7665 self, name: str, length: Optional[int], collation: Optional[str]
7666 ) -> str:
7667 text = name
7668 if length:
7669 text += f"({length})"
7670 if collation:
7671 text += f' COLLATE "{collation}"'
7672 return text
7673
7674 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7675 return self._render_string_type("CHAR", type_.length, type_.collation)
7676
7677 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7678 return self._render_string_type("NCHAR", type_.length, type_.collation)
7679
7680 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7681 return self._render_string_type(
7682 "VARCHAR", type_.length, type_.collation
7683 )
7684
7685 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7686 return self._render_string_type(
7687 "NVARCHAR", type_.length, type_.collation
7688 )
7689
7690 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7691 return self._render_string_type("TEXT", type_.length, type_.collation)
7692
7693 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7694 return "UUID"
7695
7696 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7697 return "BLOB"
7698
7699 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7700 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7701
7702 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7703 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7704
7705 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7706 return "BOOLEAN"
7707
7708 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7709 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7710 return self._render_string_type("CHAR", length=32, collation=None)
7711 else:
7712 return self.visit_UUID(type_, **kw)
7713
7714 def visit_large_binary(
7715 self, type_: sqltypes.LargeBinary, **kw: Any
7716 ) -> str:
7717 return self.visit_BLOB(type_, **kw)
7718
7719 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7720 return self.visit_BOOLEAN(type_, **kw)
7721
7722 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7723 return self.visit_TIME(type_, **kw)
7724
7725 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7726 return self.visit_DATETIME(type_, **kw)
7727
7728 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7729 return self.visit_DATE(type_, **kw)
7730
7731 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7732 return self.visit_BIGINT(type_, **kw)
7733
7734 def visit_small_integer(
7735 self, type_: sqltypes.SmallInteger, **kw: Any
7736 ) -> str:
7737 return self.visit_SMALLINT(type_, **kw)
7738
7739 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7740 return self.visit_INTEGER(type_, **kw)
7741
7742 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7743 return self.visit_REAL(type_, **kw)
7744
7745 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7746 return self.visit_FLOAT(type_, **kw)
7747
7748 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7749 return self.visit_DOUBLE(type_, **kw)
7750
7751 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7752 return self.visit_NUMERIC(type_, **kw)
7753
7754 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7755 return self.visit_VARCHAR(type_, **kw)
7756
7757 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7758 return self.visit_VARCHAR(type_, **kw)
7759
7760 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7761 return self.visit_TEXT(type_, **kw)
7762
7763 def visit_unicode_text(
7764 self, type_: sqltypes.UnicodeText, **kw: Any
7765 ) -> str:
7766 return self.visit_TEXT(type_, **kw)
7767
7768 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7769 return self.visit_VARCHAR(type_, **kw)
7770
7771 def visit_null(self, type_, **kw):
7772 raise exc.CompileError(
7773 "Can't generate DDL for %r; "
7774 "did you forget to specify a "
7775 "type on this Column?" % type_
7776 )
7777
7778 def visit_type_decorator(
7779 self, type_: TypeDecorator[Any], **kw: Any
7780 ) -> str:
7781 return self.process(type_.type_engine(self.dialect), **kw)
7782
7783 def visit_user_defined(
7784 self, type_: UserDefinedType[Any], **kw: Any
7785 ) -> str:
7786 return type_.get_col_spec(**kw)
7787
7788
7789class StrSQLTypeCompiler(GenericTypeCompiler):
7790 def process(self, type_, **kw):
7791 try:
7792 _compiler_dispatch = type_._compiler_dispatch
7793 except AttributeError:
7794 return self._visit_unknown(type_, **kw)
7795 else:
7796 return _compiler_dispatch(self, **kw)
7797
7798 def __getattr__(self, key):
7799 if key.startswith("visit_"):
7800 return self._visit_unknown
7801 else:
7802 raise AttributeError(key)
7803
7804 def _visit_unknown(self, type_, **kw):
7805 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7806 return type_.__class__.__name__
7807 else:
7808 return repr(type_)
7809
7810 def visit_null(self, type_, **kw):
7811 return "NULL"
7812
7813 def visit_user_defined(self, type_, **kw):
7814 try:
7815 get_col_spec = type_.get_col_spec
7816 except AttributeError:
7817 return repr(type_)
7818 else:
7819 return get_col_spec(**kw)
7820
7821
7822class _SchemaForObjectCallable(Protocol):
7823 def __call__(self, obj: Any, /) -> str: ...
7824
7825
7826class _BindNameForColProtocol(Protocol):
7827 def __call__(self, col: ColumnClause[Any]) -> str: ...
7828
7829
7830class IdentifierPreparer:
7831 """Handle quoting and case-folding of identifiers based on options."""
7832
7833 reserved_words = RESERVED_WORDS
7834
7835 legal_characters = LEGAL_CHARACTERS
7836
7837 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7838
7839 initial_quote: str
7840
7841 final_quote: str
7842
7843 _strings: MutableMapping[str, str]
7844
7845 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7846 """Return the .schema attribute for an object.
7847
7848 For the default IdentifierPreparer, the schema for an object is always
7849 the value of the ".schema" attribute. if the preparer is replaced
7850 with one that has a non-empty schema_translate_map, the value of the
7851 ".schema" attribute is rendered a symbol that will be converted to a
7852 real schema name from the mapping post-compile.
7853
7854 """
7855
7856 _includes_none_schema_translate: bool = False
7857
7858 def __init__(
7859 self,
7860 dialect: Dialect,
7861 initial_quote: str = '"',
7862 final_quote: Optional[str] = None,
7863 escape_quote: str = '"',
7864 quote_case_sensitive_collations: bool = True,
7865 omit_schema: bool = False,
7866 ):
7867 """Construct a new ``IdentifierPreparer`` object.
7868
7869 initial_quote
7870 Character that begins a delimited identifier.
7871
7872 final_quote
7873 Character that ends a delimited identifier. Defaults to
7874 `initial_quote`.
7875
7876 omit_schema
7877 Prevent prepending schema name. Useful for databases that do
7878 not support schemae.
7879 """
7880
7881 self.dialect = dialect
7882 self.initial_quote = initial_quote
7883 self.final_quote = final_quote or self.initial_quote
7884 self.escape_quote = escape_quote
7885 self.escape_to_quote = self.escape_quote * 2
7886 self.omit_schema = omit_schema
7887 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7888 self._strings = {}
7889 self._double_percents = self.dialect.paramstyle in (
7890 "format",
7891 "pyformat",
7892 )
7893
7894 def _with_schema_translate(self, schema_translate_map):
7895 prep = self.__class__.__new__(self.__class__)
7896 prep.__dict__.update(self.__dict__)
7897
7898 includes_none = None in schema_translate_map
7899
7900 def symbol_getter(obj):
7901 name = obj.schema
7902 if obj._use_schema_map and (name is not None or includes_none):
7903 if name is not None and ("[" in name or "]" in name):
7904 raise exc.CompileError(
7905 "Square bracket characters ([]) not supported "
7906 "in schema translate name '%s'" % name
7907 )
7908 return quoted_name(
7909 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7910 )
7911 else:
7912 return obj.schema
7913
7914 prep.schema_for_object = symbol_getter
7915 prep._includes_none_schema_translate = includes_none
7916 return prep
7917
7918 def _render_schema_translates(
7919 self, statement: str, schema_translate_map: SchemaTranslateMapType
7920 ) -> str:
7921 d = schema_translate_map
7922 if None in d:
7923 if not self._includes_none_schema_translate:
7924 raise exc.InvalidRequestError(
7925 "schema translate map which previously did not have "
7926 "`None` present as a key now has `None` present; compiled "
7927 "statement may lack adequate placeholders. Please use "
7928 "consistent keys in successive "
7929 "schema_translate_map dictionaries."
7930 )
7931
7932 d["_none"] = d[None] # type: ignore[index]
7933
7934 def replace(m):
7935 name = m.group(2)
7936 if name in d:
7937 effective_schema = d[name]
7938 else:
7939 if name in (None, "_none"):
7940 raise exc.InvalidRequestError(
7941 "schema translate map which previously had `None` "
7942 "present as a key now no longer has it present; don't "
7943 "know how to apply schema for compiled statement. "
7944 "Please use consistent keys in successive "
7945 "schema_translate_map dictionaries."
7946 )
7947 effective_schema = name
7948
7949 if not effective_schema:
7950 effective_schema = self.dialect.default_schema_name
7951 if not effective_schema:
7952 # TODO: no coverage here
7953 raise exc.CompileError(
7954 "Dialect has no default schema name; can't "
7955 "use None as dynamic schema target."
7956 )
7957 return self.quote_schema(effective_schema)
7958
7959 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7960
7961 def _escape_identifier(self, value: str) -> str:
7962 """Escape an identifier.
7963
7964 Subclasses should override this to provide database-dependent
7965 escaping behavior.
7966 """
7967
7968 value = value.replace(self.escape_quote, self.escape_to_quote)
7969 if self._double_percents:
7970 value = value.replace("%", "%%")
7971 return value
7972
7973 def _unescape_identifier(self, value: str) -> str:
7974 """Canonicalize an escaped identifier.
7975
7976 Subclasses should override this to provide database-dependent
7977 unescaping behavior that reverses _escape_identifier.
7978 """
7979
7980 return value.replace(self.escape_to_quote, self.escape_quote)
7981
7982 def validate_sql_phrase(self, element, reg):
7983 """keyword sequence filter.
7984
7985 a filter for elements that are intended to represent keyword sequences,
7986 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7987 should be present.
7988
7989 """
7990
7991 if element is not None and not reg.match(element):
7992 raise exc.CompileError(
7993 "Unexpected SQL phrase: %r (matching against %r)"
7994 % (element, reg.pattern)
7995 )
7996 return element
7997
7998 def quote_identifier(self, value: str) -> str:
7999 """Quote an identifier.
8000
8001 Subclasses should override this to provide database-dependent
8002 quoting behavior.
8003 """
8004
8005 return (
8006 self.initial_quote
8007 + self._escape_identifier(value)
8008 + self.final_quote
8009 )
8010
8011 def _requires_quotes(self, value: str) -> bool:
8012 """Return True if the given identifier requires quoting."""
8013 lc_value = value.lower()
8014 return (
8015 lc_value in self.reserved_words
8016 or value[0] in self.illegal_initial_characters
8017 or not self.legal_characters.match(str(value))
8018 or (lc_value != value)
8019 )
8020
8021 def _requires_quotes_illegal_chars(self, value):
8022 """Return True if the given identifier requires quoting, but
8023 not taking case convention into account."""
8024 return not self.legal_characters.match(str(value))
8025
8026 def quote_schema(self, schema: str) -> str:
8027 """Conditionally quote a schema name.
8028
8029
8030 The name is quoted if it is a reserved word, contains quote-necessary
8031 characters, or is an instance of :class:`.quoted_name` which includes
8032 ``quote`` set to ``True``.
8033
8034 Subclasses can override this to provide database-dependent
8035 quoting behavior for schema names.
8036
8037 :param schema: string schema name
8038 """
8039 return self.quote(schema)
8040
8041 def quote(self, ident: str) -> str:
8042 """Conditionally quote an identifier.
8043
8044 The identifier is quoted if it is a reserved word, contains
8045 quote-necessary characters, or is an instance of
8046 :class:`.quoted_name` which includes ``quote`` set to ``True``.
8047
8048 Subclasses can override this to provide database-dependent
8049 quoting behavior for identifier names.
8050
8051 :param ident: string identifier
8052 """
8053 force = getattr(ident, "quote", None)
8054
8055 if force is None:
8056 if ident in self._strings:
8057 return self._strings[ident]
8058 else:
8059 if self._requires_quotes(ident):
8060 self._strings[ident] = self.quote_identifier(ident)
8061 else:
8062 self._strings[ident] = ident
8063 return self._strings[ident]
8064 elif force:
8065 return self.quote_identifier(ident)
8066 else:
8067 return ident
8068
8069 def format_collation(self, collation_name):
8070 if self.quote_case_sensitive_collations:
8071 return self.quote(collation_name)
8072 else:
8073 return collation_name
8074
8075 def format_sequence(
8076 self, sequence: schema.Sequence, use_schema: bool = True
8077 ) -> str:
8078 name = self.quote(sequence.name)
8079
8080 effective_schema = self.schema_for_object(sequence)
8081
8082 if (
8083 not self.omit_schema
8084 and use_schema
8085 and effective_schema is not None
8086 ):
8087 name = self.quote_schema(effective_schema) + "." + name
8088 return name
8089
8090 def format_label(
8091 self, label: Label[Any], name: Optional[str] = None
8092 ) -> str:
8093 return self.quote(name or label.name)
8094
8095 def format_alias(
8096 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
8097 ) -> str:
8098 if name is None:
8099 assert alias is not None
8100 return self.quote(alias.name)
8101 else:
8102 return self.quote(name)
8103
8104 def format_savepoint(self, savepoint, name=None):
8105 # Running the savepoint name through quoting is unnecessary
8106 # for all known dialects. This is here to support potential
8107 # third party use cases
8108 ident = name or savepoint.ident
8109 if self._requires_quotes(ident):
8110 ident = self.quote_identifier(ident)
8111 return ident
8112
8113 @util.preload_module("sqlalchemy.sql.naming")
8114 def format_constraint(
8115 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
8116 ) -> Optional[str]:
8117 naming = util.preloaded.sql_naming
8118
8119 if constraint.name is _NONE_NAME:
8120 name = naming._constraint_name_for_table(
8121 constraint, constraint.table
8122 )
8123
8124 if name is None:
8125 return None
8126 else:
8127 name = constraint.name
8128
8129 assert name is not None
8130 if constraint.__visit_name__ == "index":
8131 return self.truncate_and_render_index_name(
8132 name, _alembic_quote=_alembic_quote
8133 )
8134 else:
8135 return self.truncate_and_render_constraint_name(
8136 name, _alembic_quote=_alembic_quote
8137 )
8138
8139 def truncate_and_render_index_name(
8140 self, name: str, _alembic_quote: bool = True
8141 ) -> str:
8142 # calculate these at format time so that ad-hoc changes
8143 # to dialect.max_identifier_length etc. can be reflected
8144 # as IdentifierPreparer is long lived
8145 max_ = (
8146 self.dialect.max_index_name_length
8147 or self.dialect.max_identifier_length
8148 )
8149 return self._truncate_and_render_maxlen_name(
8150 name, max_, _alembic_quote
8151 )
8152
8153 def truncate_and_render_constraint_name(
8154 self, name: str, _alembic_quote: bool = True
8155 ) -> str:
8156 # calculate these at format time so that ad-hoc changes
8157 # to dialect.max_identifier_length etc. can be reflected
8158 # as IdentifierPreparer is long lived
8159 max_ = (
8160 self.dialect.max_constraint_name_length
8161 or self.dialect.max_identifier_length
8162 )
8163 return self._truncate_and_render_maxlen_name(
8164 name, max_, _alembic_quote
8165 )
8166
8167 def _truncate_and_render_maxlen_name(
8168 self, name: str, max_: int, _alembic_quote: bool
8169 ) -> str:
8170 if isinstance(name, elements._truncated_label):
8171 if len(name) > max_:
8172 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8173 else:
8174 self.dialect.validate_identifier(name)
8175
8176 if not _alembic_quote:
8177 return name
8178 else:
8179 return self.quote(name)
8180
8181 def format_index(self, index: Index) -> str:
8182 name = self.format_constraint(index)
8183 assert name is not None
8184 return name
8185
8186 def format_table(
8187 self,
8188 table: FromClause,
8189 use_schema: bool = True,
8190 name: Optional[str] = None,
8191 ) -> str:
8192 """Prepare a quoted table and schema name."""
8193 if name is None:
8194 if TYPE_CHECKING:
8195 assert isinstance(table, NamedFromClause)
8196 name = table.name
8197
8198 result = self.quote(name)
8199
8200 effective_schema = self.schema_for_object(table)
8201
8202 if not self.omit_schema and use_schema and effective_schema:
8203 result = self.quote_schema(effective_schema) + "." + result
8204 return result
8205
8206 def format_schema(self, name):
8207 """Prepare a quoted schema name."""
8208
8209 return self.quote(name)
8210
8211 def format_label_name(
8212 self,
8213 name,
8214 anon_map=None,
8215 ):
8216 """Prepare a quoted column name."""
8217
8218 if anon_map is not None and isinstance(
8219 name, elements._truncated_label
8220 ):
8221 name = name.apply_map(anon_map)
8222
8223 return self.quote(name)
8224
8225 def format_column(
8226 self,
8227 column: ColumnElement[Any],
8228 use_table: bool = False,
8229 name: Optional[str] = None,
8230 table_name: Optional[str] = None,
8231 use_schema: bool = False,
8232 anon_map: Optional[Mapping[str, Any]] = None,
8233 ) -> str:
8234 """Prepare a quoted column name."""
8235
8236 if name is None:
8237 name = column.name
8238 assert name is not None
8239
8240 if anon_map is not None and isinstance(
8241 name, elements._truncated_label
8242 ):
8243 name = name.apply_map(anon_map)
8244
8245 if not getattr(column, "is_literal", False):
8246 if use_table:
8247 return (
8248 self.format_table(
8249 column.table, use_schema=use_schema, name=table_name
8250 )
8251 + "."
8252 + self.quote(name)
8253 )
8254 else:
8255 return self.quote(name)
8256 else:
8257 # literal textual elements get stuck into ColumnClause a lot,
8258 # which shouldn't get quoted
8259
8260 if use_table:
8261 return (
8262 self.format_table(
8263 column.table, use_schema=use_schema, name=table_name
8264 )
8265 + "."
8266 + name
8267 )
8268 else:
8269 return name
8270
8271 def format_table_seq(self, table, use_schema=True):
8272 """Format table name and schema as a tuple."""
8273
8274 # Dialects with more levels in their fully qualified references
8275 # ('database', 'owner', etc.) could override this and return
8276 # a longer sequence.
8277
8278 effective_schema = self.schema_for_object(table)
8279
8280 if not self.omit_schema and use_schema and effective_schema:
8281 return (
8282 self.quote_schema(effective_schema),
8283 self.format_table(table, use_schema=False),
8284 )
8285 else:
8286 return (self.format_table(table, use_schema=False),)
8287
8288 @util.memoized_property
8289 def _r_identifiers(self):
8290 initial, final, escaped_final = (
8291 re.escape(s)
8292 for s in (
8293 self.initial_quote,
8294 self.final_quote,
8295 self._escape_identifier(self.final_quote),
8296 )
8297 )
8298 r = re.compile(
8299 r"(?:"
8300 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8301 r"|([^\.]+))(?=\.|$))+"
8302 % {"initial": initial, "final": final, "escaped": escaped_final}
8303 )
8304 return r
8305
8306 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8307 """Unpack 'schema.table.column'-like strings into components."""
8308
8309 r = self._r_identifiers
8310 return [
8311 self._unescape_identifier(i)
8312 for i in [a or b for a, b in r.findall(identifiers)]
8313 ]