1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Mapping
48from typing import MutableMapping
49from typing import NamedTuple
50from typing import NoReturn
51from typing import Optional
52from typing import Pattern
53from typing import Sequence
54from typing import Set
55from typing import Tuple
56from typing import Type
57from typing import TYPE_CHECKING
58from typing import Union
59
60from . import base
61from . import coercions
62from . import crud
63from . import elements
64from . import functions
65from . import operators
66from . import roles
67from . import schema
68from . import selectable
69from . import sqltypes
70from . import util as sql_util
71from ._typing import is_column_element
72from ._typing import is_dml
73from .base import _de_clone
74from .base import _from_objects
75from .base import _NONE_NAME
76from .base import _SentinelDefaultCharacterization
77from .base import NO_ARG
78from .elements import quoted_name
79from .sqltypes import TupleType
80from .visitors import prefix_anon_map
81from .. import exc
82from .. import util
83from ..util import FastIntFlag
84from ..util.typing import Literal
85from ..util.typing import Protocol
86from ..util.typing import Self
87from ..util.typing import TypedDict
88
89if typing.TYPE_CHECKING:
90 from .annotation import _AnnotationDict
91 from .base import _AmbiguousTableNameMap
92 from .base import CompileState
93 from .base import Executable
94 from .cache_key import CacheKey
95 from .ddl import ExecutableDDLElement
96 from .dml import Insert
97 from .dml import Update
98 from .dml import UpdateBase
99 from .dml import UpdateDMLState
100 from .dml import ValuesBase
101 from .elements import _truncated_label
102 from .elements import BinaryExpression
103 from .elements import BindParameter
104 from .elements import ClauseElement
105 from .elements import ColumnClause
106 from .elements import ColumnElement
107 from .elements import False_
108 from .elements import Label
109 from .elements import Null
110 from .elements import True_
111 from .functions import Function
112 from .schema import CheckConstraint
113 from .schema import Column
114 from .schema import Constraint
115 from .schema import ForeignKeyConstraint
116 from .schema import IdentityOptions
117 from .schema import Index
118 from .schema import PrimaryKeyConstraint
119 from .schema import Table
120 from .schema import UniqueConstraint
121 from .selectable import _ColumnsClauseElement
122 from .selectable import AliasedReturnsRows
123 from .selectable import CompoundSelectState
124 from .selectable import CTE
125 from .selectable import FromClause
126 from .selectable import NamedFromClause
127 from .selectable import ReturnsRows
128 from .selectable import Select
129 from .selectable import SelectState
130 from .type_api import _BindProcessorType
131 from .type_api import TypeDecorator
132 from .type_api import TypeEngine
133 from .type_api import UserDefinedType
134 from .visitors import Visitable
135 from ..engine.cursor import CursorResultMetaData
136 from ..engine.interfaces import _CoreSingleExecuteParams
137 from ..engine.interfaces import _DBAPIAnyExecuteParams
138 from ..engine.interfaces import _DBAPIMultiExecuteParams
139 from ..engine.interfaces import _DBAPISingleExecuteParams
140 from ..engine.interfaces import _ExecuteOptions
141 from ..engine.interfaces import _GenericSetInputSizesType
142 from ..engine.interfaces import _MutableCoreSingleExecuteParams
143 from ..engine.interfaces import Dialect
144 from ..engine.interfaces import SchemaTranslateMapType
145
146
147_FromHintsType = Dict["FromClause", str]
148
149RESERVED_WORDS = {
150 "all",
151 "analyse",
152 "analyze",
153 "and",
154 "any",
155 "array",
156 "as",
157 "asc",
158 "asymmetric",
159 "authorization",
160 "between",
161 "binary",
162 "both",
163 "case",
164 "cast",
165 "check",
166 "collate",
167 "column",
168 "constraint",
169 "create",
170 "cross",
171 "current_date",
172 "current_role",
173 "current_time",
174 "current_timestamp",
175 "current_user",
176 "default",
177 "deferrable",
178 "desc",
179 "distinct",
180 "do",
181 "else",
182 "end",
183 "except",
184 "false",
185 "for",
186 "foreign",
187 "freeze",
188 "from",
189 "full",
190 "grant",
191 "group",
192 "having",
193 "ilike",
194 "in",
195 "initially",
196 "inner",
197 "intersect",
198 "into",
199 "is",
200 "isnull",
201 "join",
202 "leading",
203 "left",
204 "like",
205 "limit",
206 "localtime",
207 "localtimestamp",
208 "natural",
209 "new",
210 "not",
211 "notnull",
212 "null",
213 "off",
214 "offset",
215 "old",
216 "on",
217 "only",
218 "or",
219 "order",
220 "outer",
221 "overlaps",
222 "placing",
223 "primary",
224 "references",
225 "right",
226 "select",
227 "session_user",
228 "set",
229 "similar",
230 "some",
231 "symmetric",
232 "table",
233 "then",
234 "to",
235 "trailing",
236 "true",
237 "union",
238 "unique",
239 "user",
240 "using",
241 "verbose",
242 "when",
243 "where",
244}
245
246LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
247LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
248ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
249
250FK_ON_DELETE = re.compile(
251 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
252)
253FK_ON_UPDATE = re.compile(
254 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
255)
256FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
257BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
258BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
259
260_pyformat_template = "%%(%(name)s)s"
261BIND_TEMPLATES = {
262 "pyformat": _pyformat_template,
263 "qmark": "?",
264 "format": "%%s",
265 "numeric": ":[_POSITION]",
266 "numeric_dollar": "$[_POSITION]",
267 "named": ":%(name)s",
268}
269
270
271OPERATORS = {
272 # binary
273 operators.and_: " AND ",
274 operators.or_: " OR ",
275 operators.add: " + ",
276 operators.mul: " * ",
277 operators.sub: " - ",
278 operators.mod: " % ",
279 operators.neg: "-",
280 operators.lt: " < ",
281 operators.le: " <= ",
282 operators.ne: " != ",
283 operators.gt: " > ",
284 operators.ge: " >= ",
285 operators.eq: " = ",
286 operators.is_distinct_from: " IS DISTINCT FROM ",
287 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
288 operators.concat_op: " || ",
289 operators.match_op: " MATCH ",
290 operators.not_match_op: " NOT MATCH ",
291 operators.in_op: " IN ",
292 operators.not_in_op: " NOT IN ",
293 operators.comma_op: ", ",
294 operators.from_: " FROM ",
295 operators.as_: " AS ",
296 operators.is_: " IS ",
297 operators.is_not: " IS NOT ",
298 operators.collate: " COLLATE ",
299 # unary
300 operators.exists: "EXISTS ",
301 operators.distinct_op: "DISTINCT ",
302 operators.inv: "NOT ",
303 operators.any_op: "ANY ",
304 operators.all_op: "ALL ",
305 # modifiers
306 operators.desc_op: " DESC",
307 operators.asc_op: " ASC",
308 operators.nulls_first_op: " NULLS FIRST",
309 operators.nulls_last_op: " NULLS LAST",
310 # bitwise
311 operators.bitwise_xor_op: " ^ ",
312 operators.bitwise_or_op: " | ",
313 operators.bitwise_and_op: " & ",
314 operators.bitwise_not_op: "~",
315 operators.bitwise_lshift_op: " << ",
316 operators.bitwise_rshift_op: " >> ",
317}
318
319FUNCTIONS: Dict[Type[Function[Any]], str] = {
320 functions.coalesce: "coalesce",
321 functions.current_date: "CURRENT_DATE",
322 functions.current_time: "CURRENT_TIME",
323 functions.current_timestamp: "CURRENT_TIMESTAMP",
324 functions.current_user: "CURRENT_USER",
325 functions.localtime: "LOCALTIME",
326 functions.localtimestamp: "LOCALTIMESTAMP",
327 functions.random: "random",
328 functions.sysdate: "sysdate",
329 functions.session_user: "SESSION_USER",
330 functions.user: "USER",
331 functions.cube: "CUBE",
332 functions.rollup: "ROLLUP",
333 functions.grouping_sets: "GROUPING SETS",
334}
335
336
337EXTRACT_MAP = {
338 "month": "month",
339 "day": "day",
340 "year": "year",
341 "second": "second",
342 "hour": "hour",
343 "doy": "doy",
344 "minute": "minute",
345 "quarter": "quarter",
346 "dow": "dow",
347 "week": "week",
348 "epoch": "epoch",
349 "milliseconds": "milliseconds",
350 "microseconds": "microseconds",
351 "timezone_hour": "timezone_hour",
352 "timezone_minute": "timezone_minute",
353}
354
355COMPOUND_KEYWORDS = {
356 selectable._CompoundSelectKeyword.UNION: "UNION",
357 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
358 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
359 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
360 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
361 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
362}
363
364
365class ResultColumnsEntry(NamedTuple):
366 """Tracks a column expression that is expected to be represented
367 in the result rows for this statement.
368
369 This normally refers to the columns clause of a SELECT statement
370 but may also refer to a RETURNING clause, as well as for dialect-specific
371 emulations.
372
373 """
374
375 keyname: str
376 """string name that's expected in cursor.description"""
377
378 name: str
379 """column name, may be labeled"""
380
381 objects: Tuple[Any, ...]
382 """sequence of objects that should be able to locate this column
383 in a RowMapping. This is typically string names and aliases
384 as well as Column objects.
385
386 """
387
388 type: TypeEngine[Any]
389 """Datatype to be associated with this column. This is where
390 the "result processing" logic directly links the compiled statement
391 to the rows that come back from the cursor.
392
393 """
394
395
396class _ResultMapAppender(Protocol):
397 def __call__(
398 self,
399 keyname: str,
400 name: str,
401 objects: Sequence[Any],
402 type_: TypeEngine[Any],
403 ) -> None: ...
404
405
406# integer indexes into ResultColumnsEntry used by cursor.py.
407# some profiling showed integer access faster than named tuple
408RM_RENDERED_NAME: Literal[0] = 0
409RM_NAME: Literal[1] = 1
410RM_OBJECTS: Literal[2] = 2
411RM_TYPE: Literal[3] = 3
412
413
414class _BaseCompilerStackEntry(TypedDict):
415 asfrom_froms: Set[FromClause]
416 correlate_froms: Set[FromClause]
417 selectable: ReturnsRows
418
419
420class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
421 compile_state: CompileState
422 need_result_map_for_nested: bool
423 need_result_map_for_compound: bool
424 select_0: ReturnsRows
425 insert_from_select: Select[Any]
426
427
428class ExpandedState(NamedTuple):
429 """represents state to use when producing "expanded" and
430 "post compile" bound parameters for a statement.
431
432 "expanded" parameters are parameters that are generated at
433 statement execution time to suit a number of parameters passed, the most
434 prominent example being the individual elements inside of an IN expression.
435
436 "post compile" parameters are parameters where the SQL literal value
437 will be rendered into the SQL statement at execution time, rather than
438 being passed as separate parameters to the driver.
439
440 To create an :class:`.ExpandedState` instance, use the
441 :meth:`.SQLCompiler.construct_expanded_state` method on any
442 :class:`.SQLCompiler` instance.
443
444 """
445
446 statement: str
447 """String SQL statement with parameters fully expanded"""
448
449 parameters: _CoreSingleExecuteParams
450 """Parameter dictionary with parameters fully expanded.
451
452 For a statement that uses named parameters, this dictionary will map
453 exactly to the names in the statement. For a statement that uses
454 positional parameters, the :attr:`.ExpandedState.positional_parameters`
455 will yield a tuple with the positional parameter set.
456
457 """
458
459 processors: Mapping[str, _BindProcessorType[Any]]
460 """mapping of bound value processors"""
461
462 positiontup: Optional[Sequence[str]]
463 """Sequence of string names indicating the order of positional
464 parameters"""
465
466 parameter_expansion: Mapping[str, List[str]]
467 """Mapping representing the intermediary link from original parameter
468 name to list of "expanded" parameter names, for those parameters that
469 were expanded."""
470
471 @property
472 def positional_parameters(self) -> Tuple[Any, ...]:
473 """Tuple of positional parameters, for statements that were compiled
474 using a positional paramstyle.
475
476 """
477 if self.positiontup is None:
478 raise exc.InvalidRequestError(
479 "statement does not use a positional paramstyle"
480 )
481 return tuple(self.parameters[key] for key in self.positiontup)
482
483 @property
484 def additional_parameters(self) -> _CoreSingleExecuteParams:
485 """synonym for :attr:`.ExpandedState.parameters`."""
486 return self.parameters
487
488
489class _InsertManyValues(NamedTuple):
490 """represents state to use for executing an "insertmanyvalues" statement.
491
492 The primary consumers of this object are the
493 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
494 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
495
496 .. versionadded:: 2.0
497
498 """
499
500 is_default_expr: bool
501 """if True, the statement is of the form
502 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
503
504 """
505
506 single_values_expr: str
507 """The rendered "values" clause of the INSERT statement.
508
509 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
510 The insertmanyvalues logic uses this string as a search and replace
511 target.
512
513 """
514
515 insert_crud_params: List[crud._CrudParamElementStr]
516 """List of Column / bind names etc. used while rewriting the statement"""
517
518 num_positional_params_counted: int
519 """the number of bound parameters in a single-row statement.
520
521 This count may be larger or smaller than the actual number of columns
522 targeted in the INSERT, as it accommodates for SQL expressions
523 in the values list that may have zero or more parameters embedded
524 within them.
525
526 This count is part of what's used to organize rewritten parameter lists
527 when batching.
528
529 """
530
531 sort_by_parameter_order: bool = False
532 """if the deterministic_returnined_order parameter were used on the
533 insert.
534
535 All of the attributes following this will only be used if this is True.
536
537 """
538
539 includes_upsert_behaviors: bool = False
540 """if True, we have to accommodate for upsert behaviors.
541
542 This will in some cases downgrade "insertmanyvalues" that requests
543 deterministic ordering.
544
545 """
546
547 sentinel_columns: Optional[Sequence[Column[Any]]] = None
548 """List of sentinel columns that were located.
549
550 This list is only here if the INSERT asked for
551 sort_by_parameter_order=True,
552 and dialect-appropriate sentinel columns were located.
553
554 .. versionadded:: 2.0.10
555
556 """
557
558 num_sentinel_columns: int = 0
559 """how many sentinel columns are in the above list, if any.
560
561 This is the same as
562 ``len(sentinel_columns) if sentinel_columns is not None else 0``
563
564 """
565
566 sentinel_param_keys: Optional[Sequence[str]] = None
567 """parameter str keys in each param dictionary / tuple
568 that would link to the client side "sentinel" values for that row, which
569 we can use to match up parameter sets to result rows.
570
571 This is only present if sentinel_columns is present and the INSERT
572 statement actually refers to client side values for these sentinel
573 columns.
574
575 .. versionadded:: 2.0.10
576
577 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
578 only, used against the "compiled parameteters" collection before
579 the parameters were converted by bound parameter processors
580
581 """
582
583 implicit_sentinel: bool = False
584 """if True, we have exactly one sentinel column and it uses a server side
585 value, currently has to generate an incrementing integer value.
586
587 The dialect in question would have asserted that it supports receiving
588 these values back and sorting on that value as a means of guaranteeing
589 correlation with the incoming parameter list.
590
591 .. versionadded:: 2.0.10
592
593 """
594
595 embed_values_counter: bool = False
596 """Whether to embed an incrementing integer counter in each parameter
597 set within the VALUES clause as parameters are batched over.
598
599 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
600 where a subquery is used to produce value tuples. Current support
601 includes PostgreSQL, Microsoft SQL Server.
602
603 .. versionadded:: 2.0.10
604
605 """
606
607
608class _InsertManyValuesBatch(NamedTuple):
609 """represents an individual batch SQL statement for insertmanyvalues.
610
611 This is passed through the
612 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
613 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
614 to the :class:`.Connection` within the
615 :meth:`.Connection._exec_insertmany_context` method.
616
617 .. versionadded:: 2.0.10
618
619 """
620
621 replaced_statement: str
622 replaced_parameters: _DBAPIAnyExecuteParams
623 processed_setinputsizes: Optional[_GenericSetInputSizesType]
624 batch: Sequence[_DBAPISingleExecuteParams]
625 sentinel_values: Sequence[Tuple[Any, ...]]
626 current_batch_size: int
627 batchnum: int
628 total_batches: int
629 rows_sorted: bool
630 is_downgraded: bool
631
632
633class InsertmanyvaluesSentinelOpts(FastIntFlag):
634 """bitflag enum indicating styles of PK defaults
635 which can work as implicit sentinel columns
636
637 """
638
639 NOT_SUPPORTED = 1
640 AUTOINCREMENT = 2
641 IDENTITY = 4
642 SEQUENCE = 8
643
644 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
645 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
646
647 USE_INSERT_FROM_SELECT = 16
648 RENDER_SELECT_COL_CASTS = 64
649
650
651class CompilerState(IntEnum):
652 COMPILING = 0
653 """statement is present, compilation phase in progress"""
654
655 STRING_APPLIED = 1
656 """statement is present, string form of the statement has been applied.
657
658 Additional processors by subclasses may still be pending.
659
660 """
661
662 NO_STATEMENT = 2
663 """compiler does not have a statement to compile, is used
664 for method access"""
665
666
667class Linting(IntEnum):
668 """represent preferences for the 'SQL linting' feature.
669
670 this feature currently includes support for flagging cartesian products
671 in SQL statements.
672
673 """
674
675 NO_LINTING = 0
676 "Disable all linting."
677
678 COLLECT_CARTESIAN_PRODUCTS = 1
679 """Collect data on FROMs and cartesian products and gather into
680 'self.from_linter'"""
681
682 WARN_LINTING = 2
683 "Emit warnings for linters that find problems"
684
685 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
686 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
687 and WARN_LINTING"""
688
689
690NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
691 Linting
692)
693
694
695class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
696 """represents current state for the "cartesian product" detection
697 feature."""
698
699 def lint(self, start=None):
700 froms = self.froms
701 if not froms:
702 return None, None
703
704 edges = set(self.edges)
705 the_rest = set(froms)
706
707 if start is not None:
708 start_with = start
709 the_rest.remove(start_with)
710 else:
711 start_with = the_rest.pop()
712
713 stack = collections.deque([start_with])
714
715 while stack and the_rest:
716 node = stack.popleft()
717 the_rest.discard(node)
718
719 # comparison of nodes in edges here is based on hash equality, as
720 # there are "annotated" elements that match the non-annotated ones.
721 # to remove the need for in-python hash() calls, use native
722 # containment routines (e.g. "node in edge", "edge.index(node)")
723 to_remove = {edge for edge in edges if node in edge}
724
725 # appendleft the node in each edge that is not
726 # the one that matched.
727 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
728 edges.difference_update(to_remove)
729
730 # FROMS left over? boom
731 if the_rest:
732 return the_rest, start_with
733 else:
734 return None, None
735
736 def warn(self, stmt_type="SELECT"):
737 the_rest, start_with = self.lint()
738
739 # FROMS left over? boom
740 if the_rest:
741 froms = the_rest
742 if froms:
743 template = (
744 "{stmt_type} statement has a cartesian product between "
745 "FROM element(s) {froms} and "
746 'FROM element "{start}". Apply join condition(s) '
747 "between each element to resolve."
748 )
749 froms_str = ", ".join(
750 f'"{self.froms[from_]}"' for from_ in froms
751 )
752 message = template.format(
753 stmt_type=stmt_type,
754 froms=froms_str,
755 start=self.froms[start_with],
756 )
757
758 util.warn(message)
759
760
761class Compiled:
762 """Represent a compiled SQL or DDL expression.
763
764 The ``__str__`` method of the ``Compiled`` object should produce
765 the actual text of the statement. ``Compiled`` objects are
766 specific to their underlying database dialect, and also may
767 or may not be specific to the columns referenced within a
768 particular set of bind parameters. In no case should the
769 ``Compiled`` object be dependent on the actual values of those
770 bind parameters, even though it may reference those values as
771 defaults.
772 """
773
774 statement: Optional[ClauseElement] = None
775 "The statement to compile."
776 string: str = ""
777 "The string representation of the ``statement``"
778
779 state: CompilerState
780 """description of the compiler's state"""
781
782 is_sql = False
783 is_ddl = False
784
785 _cached_metadata: Optional[CursorResultMetaData] = None
786
787 _result_columns: Optional[List[ResultColumnsEntry]] = None
788
789 schema_translate_map: Optional[SchemaTranslateMapType] = None
790
791 execution_options: _ExecuteOptions = util.EMPTY_DICT
792 """
793 Execution options propagated from the statement. In some cases,
794 sub-elements of the statement can modify these.
795 """
796
797 preparer: IdentifierPreparer
798
799 _annotations: _AnnotationDict = util.EMPTY_DICT
800
801 compile_state: Optional[CompileState] = None
802 """Optional :class:`.CompileState` object that maintains additional
803 state used by the compiler.
804
805 Major executable objects such as :class:`_expression.Insert`,
806 :class:`_expression.Update`, :class:`_expression.Delete`,
807 :class:`_expression.Select` will generate this
808 state when compiled in order to calculate additional information about the
809 object. For the top level object that is to be executed, the state can be
810 stored here where it can also have applicability towards result set
811 processing.
812
813 .. versionadded:: 1.4
814
815 """
816
817 dml_compile_state: Optional[CompileState] = None
818 """Optional :class:`.CompileState` assigned at the same point that
819 .isinsert, .isupdate, or .isdelete is assigned.
820
821 This will normally be the same object as .compile_state, with the
822 exception of cases like the :class:`.ORMFromStatementCompileState`
823 object.
824
825 .. versionadded:: 1.4.40
826
827 """
828
829 cache_key: Optional[CacheKey] = None
830 """The :class:`.CacheKey` that was generated ahead of creating this
831 :class:`.Compiled` object.
832
833 This is used for routines that need access to the original
834 :class:`.CacheKey` instance generated when the :class:`.Compiled`
835 instance was first cached, typically in order to reconcile
836 the original list of :class:`.BindParameter` objects with a
837 per-statement list that's generated on each call.
838
839 """
840
841 _gen_time: float
842 """Generation time of this :class:`.Compiled`, used for reporting
843 cache stats."""
844
845 def __init__(
846 self,
847 dialect: Dialect,
848 statement: Optional[ClauseElement],
849 schema_translate_map: Optional[SchemaTranslateMapType] = None,
850 render_schema_translate: bool = False,
851 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
852 ):
853 """Construct a new :class:`.Compiled` object.
854
855 :param dialect: :class:`.Dialect` to compile against.
856
857 :param statement: :class:`_expression.ClauseElement` to be compiled.
858
859 :param schema_translate_map: dictionary of schema names to be
860 translated when forming the resultant SQL
861
862 .. seealso::
863
864 :ref:`schema_translating`
865
866 :param compile_kwargs: additional kwargs that will be
867 passed to the initial call to :meth:`.Compiled.process`.
868
869
870 """
871 self.dialect = dialect
872 self.preparer = self.dialect.identifier_preparer
873 if schema_translate_map:
874 self.schema_translate_map = schema_translate_map
875 self.preparer = self.preparer._with_schema_translate(
876 schema_translate_map
877 )
878
879 if statement is not None:
880 self.state = CompilerState.COMPILING
881 self.statement = statement
882 self.can_execute = statement.supports_execution
883 self._annotations = statement._annotations
884 if self.can_execute:
885 if TYPE_CHECKING:
886 assert isinstance(statement, Executable)
887 self.execution_options = statement._execution_options
888 self.string = self.process(self.statement, **compile_kwargs)
889
890 if render_schema_translate:
891 assert schema_translate_map is not None
892 self.string = self.preparer._render_schema_translates(
893 self.string, schema_translate_map
894 )
895
896 self.state = CompilerState.STRING_APPLIED
897 else:
898 self.state = CompilerState.NO_STATEMENT
899
900 self._gen_time = perf_counter()
901
902 def __init_subclass__(cls) -> None:
903 cls._init_compiler_cls()
904 return super().__init_subclass__()
905
906 @classmethod
907 def _init_compiler_cls(cls):
908 pass
909
910 def _execute_on_connection(
911 self, connection, distilled_params, execution_options
912 ):
913 if self.can_execute:
914 return connection._execute_compiled(
915 self, distilled_params, execution_options
916 )
917 else:
918 raise exc.ObjectNotExecutableError(self.statement)
919
920 def visit_unsupported_compilation(self, element, err, **kw):
921 raise exc.UnsupportedCompilationError(self, type(element)) from err
922
923 @property
924 def sql_compiler(self) -> SQLCompiler:
925 """Return a Compiled that is capable of processing SQL expressions.
926
927 If this compiler is one, it would likely just return 'self'.
928
929 """
930
931 raise NotImplementedError()
932
933 def process(self, obj: Visitable, **kwargs: Any) -> str:
934 return obj._compiler_dispatch(self, **kwargs)
935
936 def __str__(self) -> str:
937 """Return the string text of the generated SQL or DDL."""
938
939 if self.state is CompilerState.STRING_APPLIED:
940 return self.string
941 else:
942 return ""
943
944 def construct_params(
945 self,
946 params: Optional[_CoreSingleExecuteParams] = None,
947 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
948 escape_names: bool = True,
949 ) -> Optional[_MutableCoreSingleExecuteParams]:
950 """Return the bind params for this compiled object.
951
952 :param params: a dict of string/object pairs whose values will
953 override bind values compiled in to the
954 statement.
955 """
956
957 raise NotImplementedError()
958
959 @property
960 def params(self):
961 """Return the bind params for this compiled object."""
962 return self.construct_params()
963
964
965class TypeCompiler(util.EnsureKWArg):
966 """Produces DDL specification for TypeEngine objects."""
967
968 ensure_kwarg = r"visit_\w+"
969
970 def __init__(self, dialect: Dialect):
971 self.dialect = dialect
972
973 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
974 if (
975 type_._variant_mapping
976 and self.dialect.name in type_._variant_mapping
977 ):
978 type_ = type_._variant_mapping[self.dialect.name]
979 return type_._compiler_dispatch(self, **kw)
980
981 def visit_unsupported_compilation(
982 self, element: Any, err: Exception, **kw: Any
983 ) -> NoReturn:
984 raise exc.UnsupportedCompilationError(self, element) from err
985
986
987# this was a Visitable, but to allow accurate detection of
988# column elements this is actually a column element
989class _CompileLabel(
990 roles.BinaryElementRole[Any], elements.CompilerColumnElement
991):
992 """lightweight label object which acts as an expression.Label."""
993
994 __visit_name__ = "label"
995 __slots__ = "element", "name", "_alt_names"
996
997 def __init__(self, col, name, alt_names=()):
998 self.element = col
999 self.name = name
1000 self._alt_names = (col,) + alt_names
1001
1002 @property
1003 def proxy_set(self):
1004 return self.element.proxy_set
1005
1006 @property
1007 def type(self):
1008 return self.element.type
1009
1010 def self_group(self, **kw):
1011 return self
1012
1013
1014class ilike_case_insensitive(
1015 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1016):
1017 """produce a wrapping element for a case-insensitive portion of
1018 an ILIKE construct.
1019
1020 The construct usually renders the ``lower()`` function, but on
1021 PostgreSQL will pass silently with the assumption that "ILIKE"
1022 is being used.
1023
1024 .. versionadded:: 2.0
1025
1026 """
1027
1028 __visit_name__ = "ilike_case_insensitive_operand"
1029 __slots__ = "element", "comparator"
1030
1031 def __init__(self, element):
1032 self.element = element
1033 self.comparator = element.comparator
1034
1035 @property
1036 def proxy_set(self):
1037 return self.element.proxy_set
1038
1039 @property
1040 def type(self):
1041 return self.element.type
1042
1043 def self_group(self, **kw):
1044 return self
1045
1046 def _with_binary_element_type(self, type_):
1047 return ilike_case_insensitive(
1048 self.element._with_binary_element_type(type_)
1049 )
1050
1051
1052class SQLCompiler(Compiled):
1053 """Default implementation of :class:`.Compiled`.
1054
1055 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1056
1057 """
1058
1059 extract_map = EXTRACT_MAP
1060
1061 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1062 util.immutabledict(
1063 {
1064 "%": "P",
1065 "(": "A",
1066 ")": "Z",
1067 ":": "C",
1068 ".": "_",
1069 "[": "_",
1070 "]": "_",
1071 " ": "_",
1072 }
1073 )
1074 )
1075 """A mapping (e.g. dict or similar) containing a lookup of
1076 characters keyed to replacement characters which will be applied to all
1077 'bind names' used in SQL statements as a form of 'escaping'; the given
1078 characters are replaced entirely with the 'replacement' character when
1079 rendered in the SQL statement, and a similar translation is performed
1080 on the incoming names used in parameter dictionaries passed to methods
1081 like :meth:`_engine.Connection.execute`.
1082
1083 This allows bound parameter names used in :func:`_sql.bindparam` and
1084 other constructs to have any arbitrary characters present without any
1085 concern for characters that aren't allowed at all on the target database.
1086
1087 Third party dialects can establish their own dictionary here to replace the
1088 default mapping, which will ensure that the particular characters in the
1089 mapping will never appear in a bound parameter name.
1090
1091 The dictionary is evaluated at **class creation time**, so cannot be
1092 modified at runtime; it must be present on the class when the class
1093 is first declared.
1094
1095 Note that for dialects that have additional bound parameter rules such
1096 as additional restrictions on leading characters, the
1097 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1098 See the cx_Oracle compiler for an example of this.
1099
1100 .. versionadded:: 2.0.0rc1
1101
1102 """
1103
1104 _bind_translate_re: ClassVar[Pattern[str]]
1105 _bind_translate_chars: ClassVar[Mapping[str, str]]
1106
1107 is_sql = True
1108
1109 compound_keywords = COMPOUND_KEYWORDS
1110
1111 isdelete: bool = False
1112 isinsert: bool = False
1113 isupdate: bool = False
1114 """class-level defaults which can be set at the instance
1115 level to define if this Compiled instance represents
1116 INSERT/UPDATE/DELETE
1117 """
1118
1119 postfetch: Optional[List[Column[Any]]]
1120 """list of columns that can be post-fetched after INSERT or UPDATE to
1121 receive server-updated values"""
1122
1123 insert_prefetch: Sequence[Column[Any]] = ()
1124 """list of columns for which default values should be evaluated before
1125 an INSERT takes place"""
1126
1127 update_prefetch: Sequence[Column[Any]] = ()
1128 """list of columns for which onupdate default values should be evaluated
1129 before an UPDATE takes place"""
1130
1131 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1132 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1133 statement, used to receive newly generated values of columns.
1134
1135 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1136 ``returning`` collection, which was not a generalized RETURNING
1137 collection and instead was in fact specific to the "implicit returning"
1138 feature.
1139
1140 """
1141
1142 isplaintext: bool = False
1143
1144 binds: Dict[str, BindParameter[Any]]
1145 """a dictionary of bind parameter keys to BindParameter instances."""
1146
1147 bind_names: Dict[BindParameter[Any], str]
1148 """a dictionary of BindParameter instances to "compiled" names
1149 that are actually present in the generated SQL"""
1150
1151 stack: List[_CompilerStackEntry]
1152 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1153 tracked in this stack using an entry format."""
1154
1155 returning_precedes_values: bool = False
1156 """set to True classwide to generate RETURNING
1157 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1158 """
1159
1160 render_table_with_column_in_update_from: bool = False
1161 """set to True classwide to indicate the SET clause
1162 in a multi-table UPDATE statement should qualify
1163 columns with the table name (i.e. MySQL only)
1164 """
1165
1166 ansi_bind_rules: bool = False
1167 """SQL 92 doesn't allow bind parameters to be used
1168 in the columns clause of a SELECT, nor does it allow
1169 ambiguous expressions like "? = ?". A compiler
1170 subclass can set this flag to False if the target
1171 driver/DB enforces this
1172 """
1173
1174 bindtemplate: str
1175 """template to render bound parameters based on paramstyle."""
1176
1177 compilation_bindtemplate: str
1178 """template used by compiler to render parameters before positional
1179 paramstyle application"""
1180
1181 _numeric_binds_identifier_char: str
1182 """Character that's used to as the identifier of a numerical bind param.
1183 For example if this char is set to ``$``, numerical binds will be rendered
1184 in the form ``$1, $2, $3``.
1185 """
1186
1187 _result_columns: List[ResultColumnsEntry]
1188 """relates label names in the final SQL to a tuple of local
1189 column/label name, ColumnElement object (if any) and
1190 TypeEngine. CursorResult uses this for type processing and
1191 column targeting"""
1192
1193 _textual_ordered_columns: bool = False
1194 """tell the result object that the column names as rendered are important,
1195 but they are also "ordered" vs. what is in the compiled object here.
1196
1197 As of 1.4.42 this condition is only present when the statement is a
1198 TextualSelect, e.g. text("....").columns(...), where it is required
1199 that the columns are considered positionally and not by name.
1200
1201 """
1202
1203 _ad_hoc_textual: bool = False
1204 """tell the result that we encountered text() or '*' constructs in the
1205 middle of the result columns, but we also have compiled columns, so
1206 if the number of columns in cursor.description does not match how many
1207 expressions we have, that means we can't rely on positional at all and
1208 should match on name.
1209
1210 """
1211
1212 _ordered_columns: bool = True
1213 """
1214 if False, means we can't be sure the list of entries
1215 in _result_columns is actually the rendered order. Usually
1216 True unless using an unordered TextualSelect.
1217 """
1218
1219 _loose_column_name_matching: bool = False
1220 """tell the result object that the SQL statement is textual, wants to match
1221 up to Column objects, and may be using the ._tq_label in the SELECT rather
1222 than the base name.
1223
1224 """
1225
1226 _numeric_binds: bool = False
1227 """
1228 True if paramstyle is "numeric". This paramstyle is trickier than
1229 all the others.
1230
1231 """
1232
1233 _render_postcompile: bool = False
1234 """
1235 whether to render out POSTCOMPILE params during the compile phase.
1236
1237 This attribute is used only for end-user invocation of stmt.compile();
1238 it's never used for actual statement execution, where instead the
1239 dialect internals access and render the internal postcompile structure
1240 directly.
1241
1242 """
1243
1244 _post_compile_expanded_state: Optional[ExpandedState] = None
1245 """When render_postcompile is used, the ``ExpandedState`` used to create
1246 the "expanded" SQL is assigned here, and then used by the ``.params``
1247 accessor and ``.construct_params()`` methods for their return values.
1248
1249 .. versionadded:: 2.0.0rc1
1250
1251 """
1252
1253 _pre_expanded_string: Optional[str] = None
1254 """Stores the original string SQL before 'post_compile' is applied,
1255 for cases where 'post_compile' were used.
1256
1257 """
1258
1259 _pre_expanded_positiontup: Optional[List[str]] = None
1260
1261 _insertmanyvalues: Optional[_InsertManyValues] = None
1262
1263 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1264
1265 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1266 """bindparameter objects that are rendered as literal values at statement
1267 execution time.
1268
1269 """
1270
1271 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1272 """bindparameter objects that are rendered as bound parameter placeholders
1273 at statement execution time.
1274
1275 """
1276
1277 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1278 """Late escaping of bound parameter names that has to be converted
1279 to the original name when looking in the parameter dictionary.
1280
1281 """
1282
1283 has_out_parameters = False
1284 """if True, there are bindparam() objects that have the isoutparam
1285 flag set."""
1286
1287 postfetch_lastrowid = False
1288 """if True, and this in insert, use cursor.lastrowid to populate
1289 result.inserted_primary_key. """
1290
1291 _cache_key_bind_match: Optional[
1292 Tuple[
1293 Dict[
1294 BindParameter[Any],
1295 List[BindParameter[Any]],
1296 ],
1297 Dict[
1298 str,
1299 BindParameter[Any],
1300 ],
1301 ]
1302 ] = None
1303 """a mapping that will relate the BindParameter object we compile
1304 to those that are part of the extracted collection of parameters
1305 in the cache key, if we were given a cache key.
1306
1307 """
1308
1309 positiontup: Optional[List[str]] = None
1310 """for a compiled construct that uses a positional paramstyle, will be
1311 a sequence of strings, indicating the names of bound parameters in order.
1312
1313 This is used in order to render bound parameters in their correct order,
1314 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1315 render parameters.
1316
1317 This sequence always contains the unescaped name of the parameters.
1318
1319 .. seealso::
1320
1321 :ref:`faq_sql_expression_string` - includes a usage example for
1322 debugging use cases.
1323
1324 """
1325 _values_bindparam: Optional[List[str]] = None
1326
1327 _visited_bindparam: Optional[List[str]] = None
1328
1329 inline: bool = False
1330
1331 ctes: Optional[MutableMapping[CTE, str]]
1332
1333 # Detect same CTE references - Dict[(level, name), cte]
1334 # Level is required for supporting nesting
1335 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1336
1337 # To retrieve key/level in ctes_by_level_name -
1338 # Dict[cte_reference, (level, cte_name, cte_opts)]
1339 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1340
1341 ctes_recursive: bool
1342
1343 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1344 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1345 _positional_pattern = re.compile(
1346 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1347 )
1348
1349 @classmethod
1350 def _init_compiler_cls(cls):
1351 cls._init_bind_translate()
1352
1353 @classmethod
1354 def _init_bind_translate(cls):
1355 reg = re.escape("".join(cls.bindname_escape_characters))
1356 cls._bind_translate_re = re.compile(f"[{reg}]")
1357 cls._bind_translate_chars = cls.bindname_escape_characters
1358
1359 def __init__(
1360 self,
1361 dialect: Dialect,
1362 statement: Optional[ClauseElement],
1363 cache_key: Optional[CacheKey] = None,
1364 column_keys: Optional[Sequence[str]] = None,
1365 for_executemany: bool = False,
1366 linting: Linting = NO_LINTING,
1367 _supporting_against: Optional[SQLCompiler] = None,
1368 **kwargs: Any,
1369 ):
1370 """Construct a new :class:`.SQLCompiler` object.
1371
1372 :param dialect: :class:`.Dialect` to be used
1373
1374 :param statement: :class:`_expression.ClauseElement` to be compiled
1375
1376 :param column_keys: a list of column names to be compiled into an
1377 INSERT or UPDATE statement.
1378
1379 :param for_executemany: whether INSERT / UPDATE statements should
1380 expect that they are to be invoked in an "executemany" style,
1381 which may impact how the statement will be expected to return the
1382 values of defaults and autoincrement / sequences and similar.
1383 Depending on the backend and driver in use, support for retrieving
1384 these values may be disabled which means SQL expressions may
1385 be rendered inline, RETURNING may not be rendered, etc.
1386
1387 :param kwargs: additional keyword arguments to be consumed by the
1388 superclass.
1389
1390 """
1391 self.column_keys = column_keys
1392
1393 self.cache_key = cache_key
1394
1395 if cache_key:
1396 cksm = {b.key: b for b in cache_key[1]}
1397 ckbm = {b: [b] for b in cache_key[1]}
1398 self._cache_key_bind_match = (ckbm, cksm)
1399
1400 # compile INSERT/UPDATE defaults/sequences to expect executemany
1401 # style execution, which may mean no pre-execute of defaults,
1402 # or no RETURNING
1403 self.for_executemany = for_executemany
1404
1405 self.linting = linting
1406
1407 # a dictionary of bind parameter keys to BindParameter
1408 # instances.
1409 self.binds = {}
1410
1411 # a dictionary of BindParameter instances to "compiled" names
1412 # that are actually present in the generated SQL
1413 self.bind_names = util.column_dict()
1414
1415 # stack which keeps track of nested SELECT statements
1416 self.stack = []
1417
1418 self._result_columns = []
1419
1420 # true if the paramstyle is positional
1421 self.positional = dialect.positional
1422 if self.positional:
1423 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1424 if nb:
1425 self._numeric_binds_identifier_char = (
1426 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1427 )
1428
1429 self.compilation_bindtemplate = _pyformat_template
1430 else:
1431 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1432
1433 self.ctes = None
1434
1435 self.label_length = (
1436 dialect.label_length or dialect.max_identifier_length
1437 )
1438
1439 # a map which tracks "anonymous" identifiers that are created on
1440 # the fly here
1441 self.anon_map = prefix_anon_map()
1442
1443 # a map which tracks "truncated" names based on
1444 # dialect.label_length or dialect.max_identifier_length
1445 self.truncated_names: Dict[Tuple[str, str], str] = {}
1446 self._truncated_counters: Dict[str, int] = {}
1447
1448 Compiled.__init__(self, dialect, statement, **kwargs)
1449
1450 if self.isinsert or self.isupdate or self.isdelete:
1451 if TYPE_CHECKING:
1452 assert isinstance(statement, UpdateBase)
1453
1454 if self.isinsert or self.isupdate:
1455 if TYPE_CHECKING:
1456 assert isinstance(statement, ValuesBase)
1457 if statement._inline:
1458 self.inline = True
1459 elif self.for_executemany and (
1460 not self.isinsert
1461 or (
1462 self.dialect.insert_executemany_returning
1463 and statement._return_defaults
1464 )
1465 ):
1466 self.inline = True
1467
1468 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1469
1470 if _supporting_against:
1471 self.__dict__.update(
1472 {
1473 k: v
1474 for k, v in _supporting_against.__dict__.items()
1475 if k
1476 not in {
1477 "state",
1478 "dialect",
1479 "preparer",
1480 "positional",
1481 "_numeric_binds",
1482 "compilation_bindtemplate",
1483 "bindtemplate",
1484 }
1485 }
1486 )
1487
1488 if self.state is CompilerState.STRING_APPLIED:
1489 if self.positional:
1490 if self._numeric_binds:
1491 self._process_numeric()
1492 else:
1493 self._process_positional()
1494
1495 if self._render_postcompile:
1496 parameters = self.construct_params(
1497 escape_names=False,
1498 _no_postcompile=True,
1499 )
1500
1501 self._process_parameters_for_postcompile(
1502 parameters, _populate_self=True
1503 )
1504
1505 @property
1506 def insert_single_values_expr(self) -> Optional[str]:
1507 """When an INSERT is compiled with a single set of parameters inside
1508 a VALUES expression, the string is assigned here, where it can be
1509 used for insert batching schemes to rewrite the VALUES expression.
1510
1511 .. versionadded:: 1.3.8
1512
1513 .. versionchanged:: 2.0 This collection is no longer used by
1514 SQLAlchemy's built-in dialects, in favor of the currently
1515 internal ``_insertmanyvalues`` collection that is used only by
1516 :class:`.SQLCompiler`.
1517
1518 """
1519 if self._insertmanyvalues is None:
1520 return None
1521 else:
1522 return self._insertmanyvalues.single_values_expr
1523
1524 @util.ro_memoized_property
1525 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1526 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1527
1528 This is either the so-called "implicit returning" columns which are
1529 calculated by the compiler on the fly, or those present based on what's
1530 present in ``self.statement._returning`` (expanded into individual
1531 columns using the ``._all_selected_columns`` attribute) i.e. those set
1532 explicitly using the :meth:`.UpdateBase.returning` method.
1533
1534 .. versionadded:: 2.0
1535
1536 """
1537 if self.implicit_returning:
1538 return self.implicit_returning
1539 elif self.statement is not None and is_dml(self.statement):
1540 return [
1541 c
1542 for c in self.statement._all_selected_columns
1543 if is_column_element(c)
1544 ]
1545
1546 else:
1547 return None
1548
1549 @property
1550 def returning(self):
1551 """backwards compatibility; returns the
1552 effective_returning collection.
1553
1554 """
1555 return self.effective_returning
1556
1557 @property
1558 def current_executable(self):
1559 """Return the current 'executable' that is being compiled.
1560
1561 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1562 :class:`_sql.Update`, :class:`_sql.Delete`,
1563 :class:`_sql.CompoundSelect` object that is being compiled.
1564 Specifically it's assigned to the ``self.stack`` list of elements.
1565
1566 When a statement like the above is being compiled, it normally
1567 is also assigned to the ``.statement`` attribute of the
1568 :class:`_sql.Compiler` object. However, all SQL constructs are
1569 ultimately nestable, and this attribute should never be consulted
1570 by a ``visit_`` method, as it is not guaranteed to be assigned
1571 nor guaranteed to correspond to the current statement being compiled.
1572
1573 .. versionadded:: 1.3.21
1574
1575 For compatibility with previous versions, use the following
1576 recipe::
1577
1578 statement = getattr(self, "current_executable", False)
1579 if statement is False:
1580 statement = self.stack[-1]["selectable"]
1581
1582 For versions 1.4 and above, ensure only .current_executable
1583 is used; the format of "self.stack" may change.
1584
1585
1586 """
1587 try:
1588 return self.stack[-1]["selectable"]
1589 except IndexError as ie:
1590 raise IndexError("Compiler does not have a stack entry") from ie
1591
1592 @property
1593 def prefetch(self):
1594 return list(self.insert_prefetch) + list(self.update_prefetch)
1595
1596 @util.memoized_property
1597 def _global_attributes(self) -> Dict[Any, Any]:
1598 return {}
1599
1600 @util.memoized_instancemethod
1601 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1602 """Initialize collections related to CTEs only if
1603 a CTE is located, to save on the overhead of
1604 these collections otherwise.
1605
1606 """
1607 # collect CTEs to tack on top of a SELECT
1608 # To store the query to print - Dict[cte, text_query]
1609 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1610 self.ctes = ctes
1611
1612 # Detect same CTE references - Dict[(level, name), cte]
1613 # Level is required for supporting nesting
1614 self.ctes_by_level_name = {}
1615
1616 # To retrieve key/level in ctes_by_level_name -
1617 # Dict[cte_reference, (level, cte_name, cte_opts)]
1618 self.level_name_by_cte = {}
1619
1620 self.ctes_recursive = False
1621
1622 return ctes
1623
1624 @contextlib.contextmanager
1625 def _nested_result(self):
1626 """special API to support the use case of 'nested result sets'"""
1627 result_columns, ordered_columns = (
1628 self._result_columns,
1629 self._ordered_columns,
1630 )
1631 self._result_columns, self._ordered_columns = [], False
1632
1633 try:
1634 if self.stack:
1635 entry = self.stack[-1]
1636 entry["need_result_map_for_nested"] = True
1637 else:
1638 entry = None
1639 yield self._result_columns, self._ordered_columns
1640 finally:
1641 if entry:
1642 entry.pop("need_result_map_for_nested")
1643 self._result_columns, self._ordered_columns = (
1644 result_columns,
1645 ordered_columns,
1646 )
1647
1648 def _process_positional(self):
1649 assert not self.positiontup
1650 assert self.state is CompilerState.STRING_APPLIED
1651 assert not self._numeric_binds
1652
1653 if self.dialect.paramstyle == "format":
1654 placeholder = "%s"
1655 else:
1656 assert self.dialect.paramstyle == "qmark"
1657 placeholder = "?"
1658
1659 positions = []
1660
1661 def find_position(m: re.Match[str]) -> str:
1662 normal_bind = m.group(1)
1663 if normal_bind:
1664 positions.append(normal_bind)
1665 return placeholder
1666 else:
1667 # this a post-compile bind
1668 positions.append(m.group(2))
1669 return m.group(0)
1670
1671 self.string = re.sub(
1672 self._positional_pattern, find_position, self.string
1673 )
1674
1675 if self.escaped_bind_names:
1676 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1677 assert len(self.escaped_bind_names) == len(reverse_escape)
1678 self.positiontup = [
1679 reverse_escape.get(name, name) for name in positions
1680 ]
1681 else:
1682 self.positiontup = positions
1683
1684 if self._insertmanyvalues:
1685 positions = []
1686
1687 single_values_expr = re.sub(
1688 self._positional_pattern,
1689 find_position,
1690 self._insertmanyvalues.single_values_expr,
1691 )
1692 insert_crud_params = [
1693 (
1694 v[0],
1695 v[1],
1696 re.sub(self._positional_pattern, find_position, v[2]),
1697 v[3],
1698 )
1699 for v in self._insertmanyvalues.insert_crud_params
1700 ]
1701
1702 self._insertmanyvalues = self._insertmanyvalues._replace(
1703 single_values_expr=single_values_expr,
1704 insert_crud_params=insert_crud_params,
1705 )
1706
1707 def _process_numeric(self):
1708 assert self._numeric_binds
1709 assert self.state is CompilerState.STRING_APPLIED
1710
1711 num = 1
1712 param_pos: Dict[str, str] = {}
1713 order: Iterable[str]
1714 if self._insertmanyvalues and self._values_bindparam is not None:
1715 # bindparams that are not in values are always placed first.
1716 # this avoids the need of changing them when using executemany
1717 # values () ()
1718 order = itertools.chain(
1719 (
1720 name
1721 for name in self.bind_names.values()
1722 if name not in self._values_bindparam
1723 ),
1724 self.bind_names.values(),
1725 )
1726 else:
1727 order = self.bind_names.values()
1728
1729 for bind_name in order:
1730 if bind_name in param_pos:
1731 continue
1732 bind = self.binds[bind_name]
1733 if (
1734 bind in self.post_compile_params
1735 or bind in self.literal_execute_params
1736 ):
1737 # set to None to just mark the in positiontup, it will not
1738 # be replaced below.
1739 param_pos[bind_name] = None # type: ignore
1740 else:
1741 ph = f"{self._numeric_binds_identifier_char}{num}"
1742 num += 1
1743 param_pos[bind_name] = ph
1744
1745 self.next_numeric_pos = num
1746
1747 self.positiontup = list(param_pos)
1748 if self.escaped_bind_names:
1749 len_before = len(param_pos)
1750 param_pos = {
1751 self.escaped_bind_names.get(name, name): pos
1752 for name, pos in param_pos.items()
1753 }
1754 assert len(param_pos) == len_before
1755
1756 # Can't use format here since % chars are not escaped.
1757 self.string = self._pyformat_pattern.sub(
1758 lambda m: param_pos[m.group(1)], self.string
1759 )
1760
1761 if self._insertmanyvalues:
1762 single_values_expr = (
1763 # format is ok here since single_values_expr includes only
1764 # place-holders
1765 self._insertmanyvalues.single_values_expr
1766 % param_pos
1767 )
1768 insert_crud_params = [
1769 (v[0], v[1], "%s", v[3])
1770 for v in self._insertmanyvalues.insert_crud_params
1771 ]
1772
1773 self._insertmanyvalues = self._insertmanyvalues._replace(
1774 # This has the numbers (:1, :2)
1775 single_values_expr=single_values_expr,
1776 # The single binds are instead %s so they can be formatted
1777 insert_crud_params=insert_crud_params,
1778 )
1779
1780 @util.memoized_property
1781 def _bind_processors(
1782 self,
1783 ) -> MutableMapping[
1784 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1785 ]:
1786 # mypy is not able to see the two value types as the above Union,
1787 # it just sees "object". don't know how to resolve
1788 return {
1789 key: value # type: ignore
1790 for key, value in (
1791 (
1792 self.bind_names[bindparam],
1793 (
1794 bindparam.type._cached_bind_processor(self.dialect)
1795 if not bindparam.type._is_tuple_type
1796 else tuple(
1797 elem_type._cached_bind_processor(self.dialect)
1798 for elem_type in cast(
1799 TupleType, bindparam.type
1800 ).types
1801 )
1802 ),
1803 )
1804 for bindparam in self.bind_names
1805 )
1806 if value is not None
1807 }
1808
1809 def is_subquery(self):
1810 return len(self.stack) > 1
1811
1812 @property
1813 def sql_compiler(self) -> Self:
1814 return self
1815
1816 def construct_expanded_state(
1817 self,
1818 params: Optional[_CoreSingleExecuteParams] = None,
1819 escape_names: bool = True,
1820 ) -> ExpandedState:
1821 """Return a new :class:`.ExpandedState` for a given parameter set.
1822
1823 For queries that use "expanding" or other late-rendered parameters,
1824 this method will provide for both the finalized SQL string as well
1825 as the parameters that would be used for a particular parameter set.
1826
1827 .. versionadded:: 2.0.0rc1
1828
1829 """
1830 parameters = self.construct_params(
1831 params,
1832 escape_names=escape_names,
1833 _no_postcompile=True,
1834 )
1835 return self._process_parameters_for_postcompile(
1836 parameters,
1837 )
1838
1839 def construct_params(
1840 self,
1841 params: Optional[_CoreSingleExecuteParams] = None,
1842 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1843 escape_names: bool = True,
1844 _group_number: Optional[int] = None,
1845 _check: bool = True,
1846 _no_postcompile: bool = False,
1847 ) -> _MutableCoreSingleExecuteParams:
1848 """return a dictionary of bind parameter keys and values"""
1849
1850 if self._render_postcompile and not _no_postcompile:
1851 assert self._post_compile_expanded_state is not None
1852 if not params:
1853 return dict(self._post_compile_expanded_state.parameters)
1854 else:
1855 raise exc.InvalidRequestError(
1856 "can't construct new parameters when render_postcompile "
1857 "is used; the statement is hard-linked to the original "
1858 "parameters. Use construct_expanded_state to generate a "
1859 "new statement and parameters."
1860 )
1861
1862 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1863
1864 if extracted_parameters:
1865 # related the bound parameters collected in the original cache key
1866 # to those collected in the incoming cache key. They will not have
1867 # matching names but they will line up positionally in the same
1868 # way. The parameters present in self.bind_names may be clones of
1869 # these original cache key params in the case of DML but the .key
1870 # will be guaranteed to match.
1871 if self.cache_key is None:
1872 raise exc.CompileError(
1873 "This compiled object has no original cache key; "
1874 "can't pass extracted_parameters to construct_params"
1875 )
1876 else:
1877 orig_extracted = self.cache_key[1]
1878
1879 ckbm_tuple = self._cache_key_bind_match
1880 assert ckbm_tuple is not None
1881 ckbm, _ = ckbm_tuple
1882 resolved_extracted = {
1883 bind: extracted
1884 for b, extracted in zip(orig_extracted, extracted_parameters)
1885 for bind in ckbm[b]
1886 }
1887 else:
1888 resolved_extracted = None
1889
1890 if params:
1891 pd = {}
1892 for bindparam, name in self.bind_names.items():
1893 escaped_name = (
1894 self.escaped_bind_names.get(name, name)
1895 if has_escaped_names
1896 else name
1897 )
1898
1899 if bindparam.key in params:
1900 pd[escaped_name] = params[bindparam.key]
1901 elif name in params:
1902 pd[escaped_name] = params[name]
1903
1904 elif _check and bindparam.required:
1905 if _group_number:
1906 raise exc.InvalidRequestError(
1907 "A value is required for bind parameter %r, "
1908 "in parameter group %d"
1909 % (bindparam.key, _group_number),
1910 code="cd3x",
1911 )
1912 else:
1913 raise exc.InvalidRequestError(
1914 "A value is required for bind parameter %r"
1915 % bindparam.key,
1916 code="cd3x",
1917 )
1918 else:
1919 if resolved_extracted:
1920 value_param = resolved_extracted.get(
1921 bindparam, bindparam
1922 )
1923 else:
1924 value_param = bindparam
1925
1926 if bindparam.callable:
1927 pd[escaped_name] = value_param.effective_value
1928 else:
1929 pd[escaped_name] = value_param.value
1930 return pd
1931 else:
1932 pd = {}
1933 for bindparam, name in self.bind_names.items():
1934 escaped_name = (
1935 self.escaped_bind_names.get(name, name)
1936 if has_escaped_names
1937 else name
1938 )
1939
1940 if _check and bindparam.required:
1941 if _group_number:
1942 raise exc.InvalidRequestError(
1943 "A value is required for bind parameter %r, "
1944 "in parameter group %d"
1945 % (bindparam.key, _group_number),
1946 code="cd3x",
1947 )
1948 else:
1949 raise exc.InvalidRequestError(
1950 "A value is required for bind parameter %r"
1951 % bindparam.key,
1952 code="cd3x",
1953 )
1954
1955 if resolved_extracted:
1956 value_param = resolved_extracted.get(bindparam, bindparam)
1957 else:
1958 value_param = bindparam
1959
1960 if bindparam.callable:
1961 pd[escaped_name] = value_param.effective_value
1962 else:
1963 pd[escaped_name] = value_param.value
1964
1965 return pd
1966
1967 @util.memoized_instancemethod
1968 def _get_set_input_sizes_lookup(self):
1969 dialect = self.dialect
1970
1971 include_types = dialect.include_set_input_sizes
1972 exclude_types = dialect.exclude_set_input_sizes
1973
1974 dbapi = dialect.dbapi
1975
1976 def lookup_type(typ):
1977 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
1978
1979 if (
1980 dbtype is not None
1981 and (exclude_types is None or dbtype not in exclude_types)
1982 and (include_types is None or dbtype in include_types)
1983 ):
1984 return dbtype
1985 else:
1986 return None
1987
1988 inputsizes = {}
1989
1990 literal_execute_params = self.literal_execute_params
1991
1992 for bindparam in self.bind_names:
1993 if bindparam in literal_execute_params:
1994 continue
1995
1996 if bindparam.type._is_tuple_type:
1997 inputsizes[bindparam] = [
1998 lookup_type(typ)
1999 for typ in cast(TupleType, bindparam.type).types
2000 ]
2001 else:
2002 inputsizes[bindparam] = lookup_type(bindparam.type)
2003
2004 return inputsizes
2005
2006 @property
2007 def params(self):
2008 """Return the bind param dictionary embedded into this
2009 compiled object, for those values that are present.
2010
2011 .. seealso::
2012
2013 :ref:`faq_sql_expression_string` - includes a usage example for
2014 debugging use cases.
2015
2016 """
2017 return self.construct_params(_check=False)
2018
2019 def _process_parameters_for_postcompile(
2020 self,
2021 parameters: _MutableCoreSingleExecuteParams,
2022 _populate_self: bool = False,
2023 ) -> ExpandedState:
2024 """handle special post compile parameters.
2025
2026 These include:
2027
2028 * "expanding" parameters -typically IN tuples that are rendered
2029 on a per-parameter basis for an otherwise fixed SQL statement string.
2030
2031 * literal_binds compiled with the literal_execute flag. Used for
2032 things like SQL Server "TOP N" where the driver does not accommodate
2033 N as a bound parameter.
2034
2035 """
2036
2037 expanded_parameters = {}
2038 new_positiontup: Optional[List[str]]
2039
2040 pre_expanded_string = self._pre_expanded_string
2041 if pre_expanded_string is None:
2042 pre_expanded_string = self.string
2043
2044 if self.positional:
2045 new_positiontup = []
2046
2047 pre_expanded_positiontup = self._pre_expanded_positiontup
2048 if pre_expanded_positiontup is None:
2049 pre_expanded_positiontup = self.positiontup
2050
2051 else:
2052 new_positiontup = pre_expanded_positiontup = None
2053
2054 processors = self._bind_processors
2055 single_processors = cast(
2056 "Mapping[str, _BindProcessorType[Any]]", processors
2057 )
2058 tuple_processors = cast(
2059 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2060 )
2061
2062 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2063
2064 replacement_expressions: Dict[str, Any] = {}
2065 to_update_sets: Dict[str, Any] = {}
2066
2067 # notes:
2068 # *unescaped* parameter names in:
2069 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2070 #
2071 # *escaped* parameter names in:
2072 # construct_params(), replacement_expressions
2073
2074 numeric_positiontup: Optional[List[str]] = None
2075
2076 if self.positional and pre_expanded_positiontup is not None:
2077 names: Iterable[str] = pre_expanded_positiontup
2078 if self._numeric_binds:
2079 numeric_positiontup = []
2080 else:
2081 names = self.bind_names.values()
2082
2083 ebn = self.escaped_bind_names
2084 for name in names:
2085 escaped_name = ebn.get(name, name) if ebn else name
2086 parameter = self.binds[name]
2087
2088 if parameter in self.literal_execute_params:
2089 if escaped_name not in replacement_expressions:
2090 replacement_expressions[escaped_name] = (
2091 self.render_literal_bindparam(
2092 parameter,
2093 render_literal_value=parameters.pop(escaped_name),
2094 )
2095 )
2096 continue
2097
2098 if parameter in self.post_compile_params:
2099 if escaped_name in replacement_expressions:
2100 to_update = to_update_sets[escaped_name]
2101 values = None
2102 else:
2103 # we are removing the parameter from parameters
2104 # because it is a list value, which is not expected by
2105 # TypeEngine objects that would otherwise be asked to
2106 # process it. the single name is being replaced with
2107 # individual numbered parameters for each value in the
2108 # param.
2109 #
2110 # note we are also inserting *escaped* parameter names
2111 # into the given dictionary. default dialect will
2112 # use these param names directly as they will not be
2113 # in the escaped_bind_names dictionary.
2114 values = parameters.pop(name)
2115
2116 leep_res = self._literal_execute_expanding_parameter(
2117 escaped_name, parameter, values
2118 )
2119 (to_update, replacement_expr) = leep_res
2120
2121 to_update_sets[escaped_name] = to_update
2122 replacement_expressions[escaped_name] = replacement_expr
2123
2124 if not parameter.literal_execute:
2125 parameters.update(to_update)
2126 if parameter.type._is_tuple_type:
2127 assert values is not None
2128 new_processors.update(
2129 (
2130 "%s_%s_%s" % (name, i, j),
2131 tuple_processors[name][j - 1],
2132 )
2133 for i, tuple_element in enumerate(values, 1)
2134 for j, _ in enumerate(tuple_element, 1)
2135 if name in tuple_processors
2136 and tuple_processors[name][j - 1] is not None
2137 )
2138 else:
2139 new_processors.update(
2140 (key, single_processors[name])
2141 for key, _ in to_update
2142 if name in single_processors
2143 )
2144 if numeric_positiontup is not None:
2145 numeric_positiontup.extend(
2146 name for name, _ in to_update
2147 )
2148 elif new_positiontup is not None:
2149 # to_update has escaped names, but that's ok since
2150 # these are new names, that aren't in the
2151 # escaped_bind_names dict.
2152 new_positiontup.extend(name for name, _ in to_update)
2153 expanded_parameters[name] = [
2154 expand_key for expand_key, _ in to_update
2155 ]
2156 elif new_positiontup is not None:
2157 new_positiontup.append(name)
2158
2159 def process_expanding(m):
2160 key = m.group(1)
2161 expr = replacement_expressions[key]
2162
2163 # if POSTCOMPILE included a bind_expression, render that
2164 # around each element
2165 if m.group(2):
2166 tok = m.group(2).split("~~")
2167 be_left, be_right = tok[1], tok[3]
2168 expr = ", ".join(
2169 "%s%s%s" % (be_left, exp, be_right)
2170 for exp in expr.split(", ")
2171 )
2172 return expr
2173
2174 statement = re.sub(
2175 self._post_compile_pattern, process_expanding, pre_expanded_string
2176 )
2177
2178 if numeric_positiontup is not None:
2179 assert new_positiontup is not None
2180 param_pos = {
2181 key: f"{self._numeric_binds_identifier_char}{num}"
2182 for num, key in enumerate(
2183 numeric_positiontup, self.next_numeric_pos
2184 )
2185 }
2186 # Can't use format here since % chars are not escaped.
2187 statement = self._pyformat_pattern.sub(
2188 lambda m: param_pos[m.group(1)], statement
2189 )
2190 new_positiontup.extend(numeric_positiontup)
2191
2192 expanded_state = ExpandedState(
2193 statement,
2194 parameters,
2195 new_processors,
2196 new_positiontup,
2197 expanded_parameters,
2198 )
2199
2200 if _populate_self:
2201 # this is for the "render_postcompile" flag, which is not
2202 # otherwise used internally and is for end-user debugging and
2203 # special use cases.
2204 self._pre_expanded_string = pre_expanded_string
2205 self._pre_expanded_positiontup = pre_expanded_positiontup
2206 self.string = expanded_state.statement
2207 self.positiontup = (
2208 list(expanded_state.positiontup or ())
2209 if self.positional
2210 else None
2211 )
2212 self._post_compile_expanded_state = expanded_state
2213
2214 return expanded_state
2215
2216 @util.preload_module("sqlalchemy.engine.cursor")
2217 def _create_result_map(self):
2218 """utility method used for unit tests only."""
2219 cursor = util.preloaded.engine_cursor
2220 return cursor.CursorResultMetaData._create_description_match_map(
2221 self._result_columns
2222 )
2223
2224 # assigned by crud.py for insert/update statements
2225 _get_bind_name_for_col: _BindNameForColProtocol
2226
2227 @util.memoized_property
2228 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2229 getter = self._get_bind_name_for_col
2230 return getter
2231
2232 @util.memoized_property
2233 @util.preload_module("sqlalchemy.engine.result")
2234 def _inserted_primary_key_from_lastrowid_getter(self):
2235 result = util.preloaded.engine_result
2236
2237 param_key_getter = self._within_exec_param_key_getter
2238
2239 assert self.compile_state is not None
2240 statement = self.compile_state.statement
2241
2242 if TYPE_CHECKING:
2243 assert isinstance(statement, Insert)
2244
2245 table = statement.table
2246
2247 getters = [
2248 (operator.methodcaller("get", param_key_getter(col), None), col)
2249 for col in table.primary_key
2250 ]
2251
2252 autoinc_getter = None
2253 autoinc_col = table._autoincrement_column
2254 if autoinc_col is not None:
2255 # apply type post processors to the lastrowid
2256 lastrowid_processor = autoinc_col.type._cached_result_processor(
2257 self.dialect, None
2258 )
2259 autoinc_key = param_key_getter(autoinc_col)
2260
2261 # if a bind value is present for the autoincrement column
2262 # in the parameters, we need to do the logic dictated by
2263 # #7998; honor a non-None user-passed parameter over lastrowid.
2264 # previously in the 1.4 series we weren't fetching lastrowid
2265 # at all if the key were present in the parameters
2266 if autoinc_key in self.binds:
2267
2268 def _autoinc_getter(lastrowid, parameters):
2269 param_value = parameters.get(autoinc_key, lastrowid)
2270 if param_value is not None:
2271 # they supplied non-None parameter, use that.
2272 # SQLite at least is observed to return the wrong
2273 # cursor.lastrowid for INSERT..ON CONFLICT so it
2274 # can't be used in all cases
2275 return param_value
2276 else:
2277 # use lastrowid
2278 return lastrowid
2279
2280 # work around mypy https://github.com/python/mypy/issues/14027
2281 autoinc_getter = _autoinc_getter
2282
2283 else:
2284 lastrowid_processor = None
2285
2286 row_fn = result.result_tuple([col.key for col in table.primary_key])
2287
2288 def get(lastrowid, parameters):
2289 """given cursor.lastrowid value and the parameters used for INSERT,
2290 return a "row" that represents the primary key, either by
2291 using the "lastrowid" or by extracting values from the parameters
2292 that were sent along with the INSERT.
2293
2294 """
2295 if lastrowid_processor is not None:
2296 lastrowid = lastrowid_processor(lastrowid)
2297
2298 if lastrowid is None:
2299 return row_fn(getter(parameters) for getter, col in getters)
2300 else:
2301 return row_fn(
2302 (
2303 (
2304 autoinc_getter(lastrowid, parameters)
2305 if autoinc_getter is not None
2306 else lastrowid
2307 )
2308 if col is autoinc_col
2309 else getter(parameters)
2310 )
2311 for getter, col in getters
2312 )
2313
2314 return get
2315
2316 @util.memoized_property
2317 @util.preload_module("sqlalchemy.engine.result")
2318 def _inserted_primary_key_from_returning_getter(self):
2319 result = util.preloaded.engine_result
2320
2321 assert self.compile_state is not None
2322 statement = self.compile_state.statement
2323
2324 if TYPE_CHECKING:
2325 assert isinstance(statement, Insert)
2326
2327 param_key_getter = self._within_exec_param_key_getter
2328 table = statement.table
2329
2330 returning = self.implicit_returning
2331 assert returning is not None
2332 ret = {col: idx for idx, col in enumerate(returning)}
2333
2334 getters = cast(
2335 "List[Tuple[Callable[[Any], Any], bool]]",
2336 [
2337 (
2338 (operator.itemgetter(ret[col]), True)
2339 if col in ret
2340 else (
2341 operator.methodcaller(
2342 "get", param_key_getter(col), None
2343 ),
2344 False,
2345 )
2346 )
2347 for col in table.primary_key
2348 ],
2349 )
2350
2351 row_fn = result.result_tuple([col.key for col in table.primary_key])
2352
2353 def get(row, parameters):
2354 return row_fn(
2355 getter(row) if use_row else getter(parameters)
2356 for getter, use_row in getters
2357 )
2358
2359 return get
2360
2361 def default_from(self) -> str:
2362 """Called when a SELECT statement has no froms, and no FROM clause is
2363 to be appended.
2364
2365 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2366 output.
2367
2368 """
2369 return ""
2370
2371 def visit_override_binds(self, override_binds, **kw):
2372 """SQL compile the nested element of an _OverrideBinds with
2373 bindparams swapped out.
2374
2375 The _OverrideBinds is not normally expected to be compiled; it
2376 is meant to be used when an already cached statement is to be used,
2377 the compilation was already performed, and only the bound params should
2378 be swapped in at execution time.
2379
2380 However, there are test cases that exericise this object, and
2381 additionally the ORM subquery loader is known to feed in expressions
2382 which include this construct into new queries (discovered in #11173),
2383 so it has to do the right thing at compile time as well.
2384
2385 """
2386
2387 # get SQL text first
2388 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2389
2390 # for a test compile that is not for caching, change binds after the
2391 # fact. note that we don't try to
2392 # swap the bindparam as we compile, because our element may be
2393 # elsewhere in the statement already (e.g. a subquery or perhaps a
2394 # CTE) and was already visited / compiled. See
2395 # test_relationship_criteria.py ->
2396 # test_selectinload_local_criteria_subquery
2397 for k in override_binds.translate:
2398 if k not in self.binds:
2399 continue
2400 bp = self.binds[k]
2401
2402 # so this would work, just change the value of bp in place.
2403 # but we dont want to mutate things outside.
2404 # bp.value = override_binds.translate[bp.key]
2405 # continue
2406
2407 # instead, need to replace bp with new_bp or otherwise accommodate
2408 # in all internal collections
2409 new_bp = bp._with_value(
2410 override_binds.translate[bp.key],
2411 maintain_key=True,
2412 required=False,
2413 )
2414
2415 name = self.bind_names[bp]
2416 self.binds[k] = self.binds[name] = new_bp
2417 self.bind_names[new_bp] = name
2418 self.bind_names.pop(bp, None)
2419
2420 if bp in self.post_compile_params:
2421 self.post_compile_params |= {new_bp}
2422 if bp in self.literal_execute_params:
2423 self.literal_execute_params |= {new_bp}
2424
2425 ckbm_tuple = self._cache_key_bind_match
2426 if ckbm_tuple:
2427 ckbm, cksm = ckbm_tuple
2428 for bp in bp._cloned_set:
2429 if bp.key in cksm:
2430 cb = cksm[bp.key]
2431 ckbm[cb].append(new_bp)
2432
2433 return sqltext
2434
2435 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2436 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2437
2438 def visit_select_statement_grouping(self, grouping, **kwargs):
2439 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2440
2441 def visit_label_reference(
2442 self, element, within_columns_clause=False, **kwargs
2443 ):
2444 if self.stack and self.dialect.supports_simple_order_by_label:
2445 try:
2446 compile_state = cast(
2447 "Union[SelectState, CompoundSelectState]",
2448 self.stack[-1]["compile_state"],
2449 )
2450 except KeyError as ke:
2451 raise exc.CompileError(
2452 "Can't resolve label reference for ORDER BY / "
2453 "GROUP BY / DISTINCT etc."
2454 ) from ke
2455
2456 (
2457 with_cols,
2458 only_froms,
2459 only_cols,
2460 ) = compile_state._label_resolve_dict
2461 if within_columns_clause:
2462 resolve_dict = only_froms
2463 else:
2464 resolve_dict = only_cols
2465
2466 # this can be None in the case that a _label_reference()
2467 # were subject to a replacement operation, in which case
2468 # the replacement of the Label element may have changed
2469 # to something else like a ColumnClause expression.
2470 order_by_elem = element.element._order_by_label_element
2471
2472 if (
2473 order_by_elem is not None
2474 and order_by_elem.name in resolve_dict
2475 and order_by_elem.shares_lineage(
2476 resolve_dict[order_by_elem.name]
2477 )
2478 ):
2479 kwargs["render_label_as_label"] = (
2480 element.element._order_by_label_element
2481 )
2482 return self.process(
2483 element.element,
2484 within_columns_clause=within_columns_clause,
2485 **kwargs,
2486 )
2487
2488 def visit_textual_label_reference(
2489 self, element, within_columns_clause=False, **kwargs
2490 ):
2491 if not self.stack:
2492 # compiling the element outside of the context of a SELECT
2493 return self.process(element._text_clause)
2494
2495 try:
2496 compile_state = cast(
2497 "Union[SelectState, CompoundSelectState]",
2498 self.stack[-1]["compile_state"],
2499 )
2500 except KeyError as ke:
2501 coercions._no_text_coercion(
2502 element.element,
2503 extra=(
2504 "Can't resolve label reference for ORDER BY / "
2505 "GROUP BY / DISTINCT etc."
2506 ),
2507 exc_cls=exc.CompileError,
2508 err=ke,
2509 )
2510
2511 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2512 try:
2513 if within_columns_clause:
2514 col = only_froms[element.element]
2515 else:
2516 col = with_cols[element.element]
2517 except KeyError as err:
2518 coercions._no_text_coercion(
2519 element.element,
2520 extra=(
2521 "Can't resolve label reference for ORDER BY / "
2522 "GROUP BY / DISTINCT etc."
2523 ),
2524 exc_cls=exc.CompileError,
2525 err=err,
2526 )
2527 else:
2528 kwargs["render_label_as_label"] = col
2529 return self.process(
2530 col, within_columns_clause=within_columns_clause, **kwargs
2531 )
2532
2533 def visit_label(
2534 self,
2535 label,
2536 add_to_result_map=None,
2537 within_label_clause=False,
2538 within_columns_clause=False,
2539 render_label_as_label=None,
2540 result_map_targets=(),
2541 **kw,
2542 ):
2543 # only render labels within the columns clause
2544 # or ORDER BY clause of a select. dialect-specific compilers
2545 # can modify this behavior.
2546 render_label_with_as = (
2547 within_columns_clause and not within_label_clause
2548 )
2549 render_label_only = render_label_as_label is label
2550
2551 if render_label_only or render_label_with_as:
2552 if isinstance(label.name, elements._truncated_label):
2553 labelname = self._truncated_identifier("colident", label.name)
2554 else:
2555 labelname = label.name
2556
2557 if render_label_with_as:
2558 if add_to_result_map is not None:
2559 add_to_result_map(
2560 labelname,
2561 label.name,
2562 (label, labelname) + label._alt_names + result_map_targets,
2563 label.type,
2564 )
2565 return (
2566 label.element._compiler_dispatch(
2567 self,
2568 within_columns_clause=True,
2569 within_label_clause=True,
2570 **kw,
2571 )
2572 + OPERATORS[operators.as_]
2573 + self.preparer.format_label(label, labelname)
2574 )
2575 elif render_label_only:
2576 return self.preparer.format_label(label, labelname)
2577 else:
2578 return label.element._compiler_dispatch(
2579 self, within_columns_clause=False, **kw
2580 )
2581
2582 def _fallback_column_name(self, column):
2583 raise exc.CompileError(
2584 "Cannot compile Column object until its 'name' is assigned."
2585 )
2586
2587 def visit_lambda_element(self, element, **kw):
2588 sql_element = element._resolved
2589 return self.process(sql_element, **kw)
2590
2591 def visit_column(
2592 self,
2593 column: ColumnClause[Any],
2594 add_to_result_map: Optional[_ResultMapAppender] = None,
2595 include_table: bool = True,
2596 result_map_targets: Tuple[Any, ...] = (),
2597 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2598 **kwargs: Any,
2599 ) -> str:
2600 name = orig_name = column.name
2601 if name is None:
2602 name = self._fallback_column_name(column)
2603
2604 is_literal = column.is_literal
2605 if not is_literal and isinstance(name, elements._truncated_label):
2606 name = self._truncated_identifier("colident", name)
2607
2608 if add_to_result_map is not None:
2609 targets = (column, name, column.key) + result_map_targets
2610 if column._tq_label:
2611 targets += (column._tq_label,)
2612
2613 add_to_result_map(name, orig_name, targets, column.type)
2614
2615 if is_literal:
2616 # note we are not currently accommodating for
2617 # literal_column(quoted_name('ident', True)) here
2618 name = self.escape_literal_column(name)
2619 else:
2620 name = self.preparer.quote(name)
2621 table = column.table
2622 if table is None or not include_table or not table.named_with_column:
2623 return name
2624 else:
2625 effective_schema = self.preparer.schema_for_object(table)
2626
2627 if effective_schema:
2628 schema_prefix = (
2629 self.preparer.quote_schema(effective_schema) + "."
2630 )
2631 else:
2632 schema_prefix = ""
2633
2634 if TYPE_CHECKING:
2635 assert isinstance(table, NamedFromClause)
2636 tablename = table.name
2637
2638 if (
2639 not effective_schema
2640 and ambiguous_table_name_map
2641 and tablename in ambiguous_table_name_map
2642 ):
2643 tablename = ambiguous_table_name_map[tablename]
2644
2645 if isinstance(tablename, elements._truncated_label):
2646 tablename = self._truncated_identifier("alias", tablename)
2647
2648 return schema_prefix + self.preparer.quote(tablename) + "." + name
2649
2650 def visit_collation(self, element, **kw):
2651 return self.preparer.format_collation(element.collation)
2652
2653 def visit_fromclause(self, fromclause, **kwargs):
2654 return fromclause.name
2655
2656 def visit_index(self, index, **kwargs):
2657 return index.name
2658
2659 def visit_typeclause(self, typeclause, **kw):
2660 kw["type_expression"] = typeclause
2661 kw["identifier_preparer"] = self.preparer
2662 return self.dialect.type_compiler_instance.process(
2663 typeclause.type, **kw
2664 )
2665
2666 def post_process_text(self, text):
2667 if self.preparer._double_percents:
2668 text = text.replace("%", "%%")
2669 return text
2670
2671 def escape_literal_column(self, text):
2672 if self.preparer._double_percents:
2673 text = text.replace("%", "%%")
2674 return text
2675
2676 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2677 def do_bindparam(m):
2678 name = m.group(1)
2679 if name in textclause._bindparams:
2680 return self.process(textclause._bindparams[name], **kw)
2681 else:
2682 return self.bindparam_string(name, **kw)
2683
2684 if not self.stack:
2685 self.isplaintext = True
2686
2687 if add_to_result_map:
2688 # text() object is present in the columns clause of a
2689 # select(). Add a no-name entry to the result map so that
2690 # row[text()] produces a result
2691 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2692
2693 # un-escape any \:params
2694 return BIND_PARAMS_ESC.sub(
2695 lambda m: m.group(1),
2696 BIND_PARAMS.sub(
2697 do_bindparam, self.post_process_text(textclause.text)
2698 ),
2699 )
2700
2701 def visit_textual_select(
2702 self, taf, compound_index=None, asfrom=False, **kw
2703 ):
2704 toplevel = not self.stack
2705 entry = self._default_stack_entry if toplevel else self.stack[-1]
2706
2707 new_entry: _CompilerStackEntry = {
2708 "correlate_froms": set(),
2709 "asfrom_froms": set(),
2710 "selectable": taf,
2711 }
2712 self.stack.append(new_entry)
2713
2714 if taf._independent_ctes:
2715 self._dispatch_independent_ctes(taf, kw)
2716
2717 populate_result_map = (
2718 toplevel
2719 or (
2720 compound_index == 0
2721 and entry.get("need_result_map_for_compound", False)
2722 )
2723 or entry.get("need_result_map_for_nested", False)
2724 )
2725
2726 if populate_result_map:
2727 self._ordered_columns = self._textual_ordered_columns = (
2728 taf.positional
2729 )
2730
2731 # enable looser result column matching when the SQL text links to
2732 # Column objects by name only
2733 self._loose_column_name_matching = not taf.positional and bool(
2734 taf.column_args
2735 )
2736
2737 for c in taf.column_args:
2738 self.process(
2739 c,
2740 within_columns_clause=True,
2741 add_to_result_map=self._add_to_result_map,
2742 )
2743
2744 text = self.process(taf.element, **kw)
2745 if self.ctes:
2746 nesting_level = len(self.stack) if not toplevel else None
2747 text = self._render_cte_clause(nesting_level=nesting_level) + text
2748
2749 self.stack.pop(-1)
2750
2751 return text
2752
2753 def visit_null(self, expr: Null, **kw: Any) -> str:
2754 return "NULL"
2755
2756 def visit_true(self, expr: True_, **kw: Any) -> str:
2757 if self.dialect.supports_native_boolean:
2758 return "true"
2759 else:
2760 return "1"
2761
2762 def visit_false(self, expr: False_, **kw: Any) -> str:
2763 if self.dialect.supports_native_boolean:
2764 return "false"
2765 else:
2766 return "0"
2767
2768 def _generate_delimited_list(self, elements, separator, **kw):
2769 return separator.join(
2770 s
2771 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2772 if s
2773 )
2774
2775 def _generate_delimited_and_list(self, clauses, **kw):
2776 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2777 operators.and_,
2778 elements.True_._singleton,
2779 elements.False_._singleton,
2780 clauses,
2781 )
2782 if lcc == 1:
2783 return clauses[0]._compiler_dispatch(self, **kw)
2784 else:
2785 separator = OPERATORS[operators.and_]
2786 return separator.join(
2787 s
2788 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2789 if s
2790 )
2791
2792 def visit_tuple(self, clauselist, **kw):
2793 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2794
2795 def visit_clauselist(self, clauselist, **kw):
2796 sep = clauselist.operator
2797 if sep is None:
2798 sep = " "
2799 else:
2800 sep = OPERATORS[clauselist.operator]
2801
2802 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2803
2804 def visit_expression_clauselist(self, clauselist, **kw):
2805 operator_ = clauselist.operator
2806
2807 disp = self._get_operator_dispatch(
2808 operator_, "expression_clauselist", None
2809 )
2810 if disp:
2811 return disp(clauselist, operator_, **kw)
2812
2813 try:
2814 opstring = OPERATORS[operator_]
2815 except KeyError as err:
2816 raise exc.UnsupportedCompilationError(self, operator_) from err
2817 else:
2818 kw["_in_operator_expression"] = True
2819 return self._generate_delimited_list(
2820 clauselist.clauses, opstring, **kw
2821 )
2822
2823 def visit_case(self, clause, **kwargs):
2824 x = "CASE "
2825 if clause.value is not None:
2826 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2827 for cond, result in clause.whens:
2828 x += (
2829 "WHEN "
2830 + cond._compiler_dispatch(self, **kwargs)
2831 + " THEN "
2832 + result._compiler_dispatch(self, **kwargs)
2833 + " "
2834 )
2835 if clause.else_ is not None:
2836 x += (
2837 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2838 )
2839 x += "END"
2840 return x
2841
2842 def visit_type_coerce(self, type_coerce, **kw):
2843 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2844
2845 def visit_cast(self, cast, **kwargs):
2846 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2847 match = re.match("(.*)( COLLATE .*)", type_clause)
2848 return "CAST(%s AS %s)%s" % (
2849 cast.clause._compiler_dispatch(self, **kwargs),
2850 match.group(1) if match else type_clause,
2851 match.group(2) if match else "",
2852 )
2853
2854 def _format_frame_clause(self, range_, **kw):
2855 return "%s AND %s" % (
2856 (
2857 "UNBOUNDED PRECEDING"
2858 if range_[0] is elements.RANGE_UNBOUNDED
2859 else (
2860 "CURRENT ROW"
2861 if range_[0] is elements.RANGE_CURRENT
2862 else (
2863 "%s PRECEDING"
2864 % (
2865 self.process(
2866 elements.literal(abs(range_[0])), **kw
2867 ),
2868 )
2869 if range_[0] < 0
2870 else "%s FOLLOWING"
2871 % (self.process(elements.literal(range_[0]), **kw),)
2872 )
2873 )
2874 ),
2875 (
2876 "UNBOUNDED FOLLOWING"
2877 if range_[1] is elements.RANGE_UNBOUNDED
2878 else (
2879 "CURRENT ROW"
2880 if range_[1] is elements.RANGE_CURRENT
2881 else (
2882 "%s PRECEDING"
2883 % (
2884 self.process(
2885 elements.literal(abs(range_[1])), **kw
2886 ),
2887 )
2888 if range_[1] < 0
2889 else "%s FOLLOWING"
2890 % (self.process(elements.literal(range_[1]), **kw),)
2891 )
2892 )
2893 ),
2894 )
2895
2896 def visit_over(self, over, **kwargs):
2897 text = over.element._compiler_dispatch(self, **kwargs)
2898 if over.range_ is not None:
2899 range_ = "RANGE BETWEEN %s" % self._format_frame_clause(
2900 over.range_, **kwargs
2901 )
2902 elif over.rows is not None:
2903 range_ = "ROWS BETWEEN %s" % self._format_frame_clause(
2904 over.rows, **kwargs
2905 )
2906 elif over.groups is not None:
2907 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause(
2908 over.groups, **kwargs
2909 )
2910 else:
2911 range_ = None
2912
2913 return "%s OVER (%s)" % (
2914 text,
2915 " ".join(
2916 [
2917 "%s BY %s"
2918 % (word, clause._compiler_dispatch(self, **kwargs))
2919 for word, clause in (
2920 ("PARTITION", over.partition_by),
2921 ("ORDER", over.order_by),
2922 )
2923 if clause is not None and len(clause)
2924 ]
2925 + ([range_] if range_ else [])
2926 ),
2927 )
2928
2929 def visit_withingroup(self, withingroup, **kwargs):
2930 return "%s WITHIN GROUP (ORDER BY %s)" % (
2931 withingroup.element._compiler_dispatch(self, **kwargs),
2932 withingroup.order_by._compiler_dispatch(self, **kwargs),
2933 )
2934
2935 def visit_funcfilter(self, funcfilter, **kwargs):
2936 return "%s FILTER (WHERE %s)" % (
2937 funcfilter.func._compiler_dispatch(self, **kwargs),
2938 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2939 )
2940
2941 def visit_extract(self, extract, **kwargs):
2942 field = self.extract_map.get(extract.field, extract.field)
2943 return "EXTRACT(%s FROM %s)" % (
2944 field,
2945 extract.expr._compiler_dispatch(self, **kwargs),
2946 )
2947
2948 def visit_scalar_function_column(self, element, **kw):
2949 compiled_fn = self.visit_function(element.fn, **kw)
2950 compiled_col = self.visit_column(element, **kw)
2951 return "(%s).%s" % (compiled_fn, compiled_col)
2952
2953 def visit_function(
2954 self,
2955 func: Function[Any],
2956 add_to_result_map: Optional[_ResultMapAppender] = None,
2957 **kwargs: Any,
2958 ) -> str:
2959 if add_to_result_map is not None:
2960 add_to_result_map(func.name, func.name, (func.name,), func.type)
2961
2962 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
2963
2964 text: str
2965
2966 if disp:
2967 text = disp(func, **kwargs)
2968 else:
2969 name = FUNCTIONS.get(func._deannotate().__class__, None)
2970 if name:
2971 if func._has_args:
2972 name += "%(expr)s"
2973 else:
2974 name = func.name
2975 name = (
2976 self.preparer.quote(name)
2977 if self.preparer._requires_quotes_illegal_chars(name)
2978 or isinstance(name, elements.quoted_name)
2979 else name
2980 )
2981 name = name + "%(expr)s"
2982 text = ".".join(
2983 [
2984 (
2985 self.preparer.quote(tok)
2986 if self.preparer._requires_quotes_illegal_chars(tok)
2987 or isinstance(name, elements.quoted_name)
2988 else tok
2989 )
2990 for tok in func.packagenames
2991 ]
2992 + [name]
2993 ) % {"expr": self.function_argspec(func, **kwargs)}
2994
2995 if func._with_ordinality:
2996 text += " WITH ORDINALITY"
2997 return text
2998
2999 def visit_next_value_func(self, next_value, **kw):
3000 return self.visit_sequence(next_value.sequence)
3001
3002 def visit_sequence(self, sequence, **kw):
3003 raise NotImplementedError(
3004 "Dialect '%s' does not support sequence increments."
3005 % self.dialect.name
3006 )
3007
3008 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3009 return func.clause_expr._compiler_dispatch(self, **kwargs)
3010
3011 def visit_compound_select(
3012 self, cs, asfrom=False, compound_index=None, **kwargs
3013 ):
3014 toplevel = not self.stack
3015
3016 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3017
3018 if toplevel and not self.compile_state:
3019 self.compile_state = compile_state
3020
3021 compound_stmt = compile_state.statement
3022
3023 entry = self._default_stack_entry if toplevel else self.stack[-1]
3024 need_result_map = toplevel or (
3025 not compound_index
3026 and entry.get("need_result_map_for_compound", False)
3027 )
3028
3029 # indicates there is already a CompoundSelect in play
3030 if compound_index == 0:
3031 entry["select_0"] = cs
3032
3033 self.stack.append(
3034 {
3035 "correlate_froms": entry["correlate_froms"],
3036 "asfrom_froms": entry["asfrom_froms"],
3037 "selectable": cs,
3038 "compile_state": compile_state,
3039 "need_result_map_for_compound": need_result_map,
3040 }
3041 )
3042
3043 if compound_stmt._independent_ctes:
3044 self._dispatch_independent_ctes(compound_stmt, kwargs)
3045
3046 keyword = self.compound_keywords[cs.keyword]
3047
3048 text = (" " + keyword + " ").join(
3049 (
3050 c._compiler_dispatch(
3051 self, asfrom=asfrom, compound_index=i, **kwargs
3052 )
3053 for i, c in enumerate(cs.selects)
3054 )
3055 )
3056
3057 kwargs["include_table"] = False
3058 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3059 text += self.order_by_clause(cs, **kwargs)
3060 if cs._has_row_limiting_clause:
3061 text += self._row_limit_clause(cs, **kwargs)
3062
3063 if self.ctes:
3064 nesting_level = len(self.stack) if not toplevel else None
3065 text = (
3066 self._render_cte_clause(
3067 nesting_level=nesting_level,
3068 include_following_stack=True,
3069 )
3070 + text
3071 )
3072
3073 self.stack.pop(-1)
3074 return text
3075
3076 def _row_limit_clause(self, cs, **kwargs):
3077 if cs._fetch_clause is not None:
3078 return self.fetch_clause(cs, **kwargs)
3079 else:
3080 return self.limit_clause(cs, **kwargs)
3081
3082 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3083 attrname = "visit_%s_%s%s" % (
3084 operator_.__name__,
3085 qualifier1,
3086 "_" + qualifier2 if qualifier2 else "",
3087 )
3088 return getattr(self, attrname, None)
3089
3090 def visit_unary(
3091 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3092 ):
3093 if add_to_result_map is not None:
3094 result_map_targets += (unary,)
3095 kw["add_to_result_map"] = add_to_result_map
3096 kw["result_map_targets"] = result_map_targets
3097
3098 if unary.operator:
3099 if unary.modifier:
3100 raise exc.CompileError(
3101 "Unary expression does not support operator "
3102 "and modifier simultaneously"
3103 )
3104 disp = self._get_operator_dispatch(
3105 unary.operator, "unary", "operator"
3106 )
3107 if disp:
3108 return disp(unary, unary.operator, **kw)
3109 else:
3110 return self._generate_generic_unary_operator(
3111 unary, OPERATORS[unary.operator], **kw
3112 )
3113 elif unary.modifier:
3114 disp = self._get_operator_dispatch(
3115 unary.modifier, "unary", "modifier"
3116 )
3117 if disp:
3118 return disp(unary, unary.modifier, **kw)
3119 else:
3120 return self._generate_generic_unary_modifier(
3121 unary, OPERATORS[unary.modifier], **kw
3122 )
3123 else:
3124 raise exc.CompileError(
3125 "Unary expression has no operator or modifier"
3126 )
3127
3128 def visit_truediv_binary(self, binary, operator, **kw):
3129 if self.dialect.div_is_floordiv:
3130 return (
3131 self.process(binary.left, **kw)
3132 + " / "
3133 # TODO: would need a fast cast again here,
3134 # unless we want to use an implicit cast like "+ 0.0"
3135 + self.process(
3136 elements.Cast(
3137 binary.right,
3138 (
3139 binary.right.type
3140 if binary.right.type._type_affinity
3141 is sqltypes.Numeric
3142 else sqltypes.Numeric()
3143 ),
3144 ),
3145 **kw,
3146 )
3147 )
3148 else:
3149 return (
3150 self.process(binary.left, **kw)
3151 + " / "
3152 + self.process(binary.right, **kw)
3153 )
3154
3155 def visit_floordiv_binary(self, binary, operator, **kw):
3156 if (
3157 self.dialect.div_is_floordiv
3158 and binary.right.type._type_affinity is sqltypes.Integer
3159 ):
3160 return (
3161 self.process(binary.left, **kw)
3162 + " / "
3163 + self.process(binary.right, **kw)
3164 )
3165 else:
3166 return "FLOOR(%s)" % (
3167 self.process(binary.left, **kw)
3168 + " / "
3169 + self.process(binary.right, **kw)
3170 )
3171
3172 def visit_is_true_unary_operator(self, element, operator, **kw):
3173 if (
3174 element._is_implicitly_boolean
3175 or self.dialect.supports_native_boolean
3176 ):
3177 return self.process(element.element, **kw)
3178 else:
3179 return "%s = 1" % self.process(element.element, **kw)
3180
3181 def visit_is_false_unary_operator(self, element, operator, **kw):
3182 if (
3183 element._is_implicitly_boolean
3184 or self.dialect.supports_native_boolean
3185 ):
3186 return "NOT %s" % self.process(element.element, **kw)
3187 else:
3188 return "%s = 0" % self.process(element.element, **kw)
3189
3190 def visit_not_match_op_binary(self, binary, operator, **kw):
3191 return "NOT %s" % self.visit_binary(
3192 binary, override_operator=operators.match_op
3193 )
3194
3195 def visit_not_in_op_binary(self, binary, operator, **kw):
3196 # The brackets are required in the NOT IN operation because the empty
3197 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3198 # The presence of the OR makes the brackets required.
3199 return "(%s)" % self._generate_generic_binary(
3200 binary, OPERATORS[operator], **kw
3201 )
3202
3203 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3204 if expand_op is operators.not_in_op:
3205 if len(type_) > 1:
3206 return "(%s)) OR (1 = 1" % (
3207 ", ".join("NULL" for element in type_)
3208 )
3209 else:
3210 return "NULL) OR (1 = 1"
3211 elif expand_op is operators.in_op:
3212 if len(type_) > 1:
3213 return "(%s)) AND (1 != 1" % (
3214 ", ".join("NULL" for element in type_)
3215 )
3216 else:
3217 return "NULL) AND (1 != 1"
3218 else:
3219 return self.visit_empty_set_expr(type_)
3220
3221 def visit_empty_set_expr(self, element_types, **kw):
3222 raise NotImplementedError(
3223 "Dialect '%s' does not support empty set expression."
3224 % self.dialect.name
3225 )
3226
3227 def _literal_execute_expanding_parameter_literal_binds(
3228 self, parameter, values, bind_expression_template=None
3229 ):
3230 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3231
3232 if not values:
3233 # empty IN expression. note we don't need to use
3234 # bind_expression_template here because there are no
3235 # expressions to render.
3236
3237 if typ_dialect_impl._is_tuple_type:
3238 replacement_expression = (
3239 "VALUES " if self.dialect.tuple_in_values else ""
3240 ) + self.visit_empty_set_op_expr(
3241 parameter.type.types, parameter.expand_op
3242 )
3243
3244 else:
3245 replacement_expression = self.visit_empty_set_op_expr(
3246 [parameter.type], parameter.expand_op
3247 )
3248
3249 elif typ_dialect_impl._is_tuple_type or (
3250 typ_dialect_impl._isnull
3251 and isinstance(values[0], collections_abc.Sequence)
3252 and not isinstance(values[0], (str, bytes))
3253 ):
3254 if typ_dialect_impl._has_bind_expression:
3255 raise NotImplementedError(
3256 "bind_expression() on TupleType not supported with "
3257 "literal_binds"
3258 )
3259
3260 replacement_expression = (
3261 "VALUES " if self.dialect.tuple_in_values else ""
3262 ) + ", ".join(
3263 "(%s)"
3264 % (
3265 ", ".join(
3266 self.render_literal_value(value, param_type)
3267 for value, param_type in zip(
3268 tuple_element, parameter.type.types
3269 )
3270 )
3271 )
3272 for i, tuple_element in enumerate(values)
3273 )
3274 else:
3275 if bind_expression_template:
3276 post_compile_pattern = self._post_compile_pattern
3277 m = post_compile_pattern.search(bind_expression_template)
3278 assert m and m.group(
3279 2
3280 ), "unexpected format for expanding parameter"
3281
3282 tok = m.group(2).split("~~")
3283 be_left, be_right = tok[1], tok[3]
3284 replacement_expression = ", ".join(
3285 "%s%s%s"
3286 % (
3287 be_left,
3288 self.render_literal_value(value, parameter.type),
3289 be_right,
3290 )
3291 for value in values
3292 )
3293 else:
3294 replacement_expression = ", ".join(
3295 self.render_literal_value(value, parameter.type)
3296 for value in values
3297 )
3298
3299 return (), replacement_expression
3300
3301 def _literal_execute_expanding_parameter(self, name, parameter, values):
3302 if parameter.literal_execute:
3303 return self._literal_execute_expanding_parameter_literal_binds(
3304 parameter, values
3305 )
3306
3307 dialect = self.dialect
3308 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3309
3310 if self._numeric_binds:
3311 bind_template = self.compilation_bindtemplate
3312 else:
3313 bind_template = self.bindtemplate
3314
3315 if (
3316 self.dialect._bind_typing_render_casts
3317 and typ_dialect_impl.render_bind_cast
3318 ):
3319
3320 def _render_bindtemplate(name):
3321 return self.render_bind_cast(
3322 parameter.type,
3323 typ_dialect_impl,
3324 bind_template % {"name": name},
3325 )
3326
3327 else:
3328
3329 def _render_bindtemplate(name):
3330 return bind_template % {"name": name}
3331
3332 if not values:
3333 to_update = []
3334 if typ_dialect_impl._is_tuple_type:
3335 replacement_expression = self.visit_empty_set_op_expr(
3336 parameter.type.types, parameter.expand_op
3337 )
3338 else:
3339 replacement_expression = self.visit_empty_set_op_expr(
3340 [parameter.type], parameter.expand_op
3341 )
3342
3343 elif typ_dialect_impl._is_tuple_type or (
3344 typ_dialect_impl._isnull
3345 and isinstance(values[0], collections_abc.Sequence)
3346 and not isinstance(values[0], (str, bytes))
3347 ):
3348 assert not typ_dialect_impl._is_array
3349 to_update = [
3350 ("%s_%s_%s" % (name, i, j), value)
3351 for i, tuple_element in enumerate(values, 1)
3352 for j, value in enumerate(tuple_element, 1)
3353 ]
3354
3355 replacement_expression = (
3356 "VALUES " if dialect.tuple_in_values else ""
3357 ) + ", ".join(
3358 "(%s)"
3359 % (
3360 ", ".join(
3361 _render_bindtemplate(
3362 to_update[i * len(tuple_element) + j][0]
3363 )
3364 for j, value in enumerate(tuple_element)
3365 )
3366 )
3367 for i, tuple_element in enumerate(values)
3368 )
3369 else:
3370 to_update = [
3371 ("%s_%s" % (name, i), value)
3372 for i, value in enumerate(values, 1)
3373 ]
3374 replacement_expression = ", ".join(
3375 _render_bindtemplate(key) for key, value in to_update
3376 )
3377
3378 return to_update, replacement_expression
3379
3380 def visit_binary(
3381 self,
3382 binary,
3383 override_operator=None,
3384 eager_grouping=False,
3385 from_linter=None,
3386 lateral_from_linter=None,
3387 **kw,
3388 ):
3389 if from_linter and operators.is_comparison(binary.operator):
3390 if lateral_from_linter is not None:
3391 enclosing_lateral = kw["enclosing_lateral"]
3392 lateral_from_linter.edges.update(
3393 itertools.product(
3394 _de_clone(
3395 binary.left._from_objects + [enclosing_lateral]
3396 ),
3397 _de_clone(
3398 binary.right._from_objects + [enclosing_lateral]
3399 ),
3400 )
3401 )
3402 else:
3403 from_linter.edges.update(
3404 itertools.product(
3405 _de_clone(binary.left._from_objects),
3406 _de_clone(binary.right._from_objects),
3407 )
3408 )
3409
3410 # don't allow "? = ?" to render
3411 if (
3412 self.ansi_bind_rules
3413 and isinstance(binary.left, elements.BindParameter)
3414 and isinstance(binary.right, elements.BindParameter)
3415 ):
3416 kw["literal_execute"] = True
3417
3418 operator_ = override_operator or binary.operator
3419 disp = self._get_operator_dispatch(operator_, "binary", None)
3420 if disp:
3421 return disp(binary, operator_, **kw)
3422 else:
3423 try:
3424 opstring = OPERATORS[operator_]
3425 except KeyError as err:
3426 raise exc.UnsupportedCompilationError(self, operator_) from err
3427 else:
3428 return self._generate_generic_binary(
3429 binary,
3430 opstring,
3431 from_linter=from_linter,
3432 lateral_from_linter=lateral_from_linter,
3433 **kw,
3434 )
3435
3436 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3437 return self.process(element.sql_function, **kw)
3438
3439 def visit_mod_binary(self, binary, operator, **kw):
3440 if self.preparer._double_percents:
3441 return (
3442 self.process(binary.left, **kw)
3443 + " %% "
3444 + self.process(binary.right, **kw)
3445 )
3446 else:
3447 return (
3448 self.process(binary.left, **kw)
3449 + " % "
3450 + self.process(binary.right, **kw)
3451 )
3452
3453 def visit_custom_op_binary(self, element, operator, **kw):
3454 kw["eager_grouping"] = operator.eager_grouping
3455 return self._generate_generic_binary(
3456 element,
3457 " " + self.escape_literal_column(operator.opstring) + " ",
3458 **kw,
3459 )
3460
3461 def visit_custom_op_unary_operator(self, element, operator, **kw):
3462 return self._generate_generic_unary_operator(
3463 element, self.escape_literal_column(operator.opstring) + " ", **kw
3464 )
3465
3466 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3467 return self._generate_generic_unary_modifier(
3468 element, " " + self.escape_literal_column(operator.opstring), **kw
3469 )
3470
3471 def _generate_generic_binary(
3472 self,
3473 binary: BinaryExpression[Any],
3474 opstring: str,
3475 eager_grouping: bool = False,
3476 **kw: Any,
3477 ) -> str:
3478 _in_operator_expression = kw.get("_in_operator_expression", False)
3479
3480 kw["_in_operator_expression"] = True
3481 kw["_binary_op"] = binary.operator
3482 text = (
3483 binary.left._compiler_dispatch(
3484 self, eager_grouping=eager_grouping, **kw
3485 )
3486 + opstring
3487 + binary.right._compiler_dispatch(
3488 self, eager_grouping=eager_grouping, **kw
3489 )
3490 )
3491
3492 if _in_operator_expression and eager_grouping:
3493 text = "(%s)" % text
3494 return text
3495
3496 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3497 return opstring + unary.element._compiler_dispatch(self, **kw)
3498
3499 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3500 return unary.element._compiler_dispatch(self, **kw) + opstring
3501
3502 @util.memoized_property
3503 def _like_percent_literal(self):
3504 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3505
3506 def visit_ilike_case_insensitive_operand(self, element, **kw):
3507 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3508
3509 def visit_contains_op_binary(self, binary, operator, **kw):
3510 binary = binary._clone()
3511 percent = self._like_percent_literal
3512 binary.right = percent.concat(binary.right).concat(percent)
3513 return self.visit_like_op_binary(binary, operator, **kw)
3514
3515 def visit_not_contains_op_binary(self, binary, operator, **kw):
3516 binary = binary._clone()
3517 percent = self._like_percent_literal
3518 binary.right = percent.concat(binary.right).concat(percent)
3519 return self.visit_not_like_op_binary(binary, operator, **kw)
3520
3521 def visit_icontains_op_binary(self, binary, operator, **kw):
3522 binary = binary._clone()
3523 percent = self._like_percent_literal
3524 binary.left = ilike_case_insensitive(binary.left)
3525 binary.right = percent.concat(
3526 ilike_case_insensitive(binary.right)
3527 ).concat(percent)
3528 return self.visit_ilike_op_binary(binary, operator, **kw)
3529
3530 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3531 binary = binary._clone()
3532 percent = self._like_percent_literal
3533 binary.left = ilike_case_insensitive(binary.left)
3534 binary.right = percent.concat(
3535 ilike_case_insensitive(binary.right)
3536 ).concat(percent)
3537 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3538
3539 def visit_startswith_op_binary(self, binary, operator, **kw):
3540 binary = binary._clone()
3541 percent = self._like_percent_literal
3542 binary.right = percent._rconcat(binary.right)
3543 return self.visit_like_op_binary(binary, operator, **kw)
3544
3545 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3546 binary = binary._clone()
3547 percent = self._like_percent_literal
3548 binary.right = percent._rconcat(binary.right)
3549 return self.visit_not_like_op_binary(binary, operator, **kw)
3550
3551 def visit_istartswith_op_binary(self, binary, operator, **kw):
3552 binary = binary._clone()
3553 percent = self._like_percent_literal
3554 binary.left = ilike_case_insensitive(binary.left)
3555 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3556 return self.visit_ilike_op_binary(binary, operator, **kw)
3557
3558 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3559 binary = binary._clone()
3560 percent = self._like_percent_literal
3561 binary.left = ilike_case_insensitive(binary.left)
3562 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3563 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3564
3565 def visit_endswith_op_binary(self, binary, operator, **kw):
3566 binary = binary._clone()
3567 percent = self._like_percent_literal
3568 binary.right = percent.concat(binary.right)
3569 return self.visit_like_op_binary(binary, operator, **kw)
3570
3571 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3572 binary = binary._clone()
3573 percent = self._like_percent_literal
3574 binary.right = percent.concat(binary.right)
3575 return self.visit_not_like_op_binary(binary, operator, **kw)
3576
3577 def visit_iendswith_op_binary(self, binary, operator, **kw):
3578 binary = binary._clone()
3579 percent = self._like_percent_literal
3580 binary.left = ilike_case_insensitive(binary.left)
3581 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3582 return self.visit_ilike_op_binary(binary, operator, **kw)
3583
3584 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3585 binary = binary._clone()
3586 percent = self._like_percent_literal
3587 binary.left = ilike_case_insensitive(binary.left)
3588 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3589 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3590
3591 def visit_like_op_binary(self, binary, operator, **kw):
3592 escape = binary.modifiers.get("escape", None)
3593
3594 return "%s LIKE %s" % (
3595 binary.left._compiler_dispatch(self, **kw),
3596 binary.right._compiler_dispatch(self, **kw),
3597 ) + (
3598 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3599 if escape is not None
3600 else ""
3601 )
3602
3603 def visit_not_like_op_binary(self, binary, operator, **kw):
3604 escape = binary.modifiers.get("escape", None)
3605 return "%s NOT LIKE %s" % (
3606 binary.left._compiler_dispatch(self, **kw),
3607 binary.right._compiler_dispatch(self, **kw),
3608 ) + (
3609 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3610 if escape is not None
3611 else ""
3612 )
3613
3614 def visit_ilike_op_binary(self, binary, operator, **kw):
3615 if operator is operators.ilike_op:
3616 binary = binary._clone()
3617 binary.left = ilike_case_insensitive(binary.left)
3618 binary.right = ilike_case_insensitive(binary.right)
3619 # else we assume ilower() has been applied
3620
3621 return self.visit_like_op_binary(binary, operator, **kw)
3622
3623 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3624 if operator is operators.not_ilike_op:
3625 binary = binary._clone()
3626 binary.left = ilike_case_insensitive(binary.left)
3627 binary.right = ilike_case_insensitive(binary.right)
3628 # else we assume ilower() has been applied
3629
3630 return self.visit_not_like_op_binary(binary, operator, **kw)
3631
3632 def visit_between_op_binary(self, binary, operator, **kw):
3633 symmetric = binary.modifiers.get("symmetric", False)
3634 return self._generate_generic_binary(
3635 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3636 )
3637
3638 def visit_not_between_op_binary(self, binary, operator, **kw):
3639 symmetric = binary.modifiers.get("symmetric", False)
3640 return self._generate_generic_binary(
3641 binary,
3642 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3643 **kw,
3644 )
3645
3646 def visit_regexp_match_op_binary(
3647 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3648 ) -> str:
3649 raise exc.CompileError(
3650 "%s dialect does not support regular expressions"
3651 % self.dialect.name
3652 )
3653
3654 def visit_not_regexp_match_op_binary(
3655 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3656 ) -> str:
3657 raise exc.CompileError(
3658 "%s dialect does not support regular expressions"
3659 % self.dialect.name
3660 )
3661
3662 def visit_regexp_replace_op_binary(
3663 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3664 ) -> str:
3665 raise exc.CompileError(
3666 "%s dialect does not support regular expression replacements"
3667 % self.dialect.name
3668 )
3669
3670 def visit_bindparam(
3671 self,
3672 bindparam,
3673 within_columns_clause=False,
3674 literal_binds=False,
3675 skip_bind_expression=False,
3676 literal_execute=False,
3677 render_postcompile=False,
3678 **kwargs,
3679 ):
3680
3681 if not skip_bind_expression:
3682 impl = bindparam.type.dialect_impl(self.dialect)
3683 if impl._has_bind_expression:
3684 bind_expression = impl.bind_expression(bindparam)
3685 wrapped = self.process(
3686 bind_expression,
3687 skip_bind_expression=True,
3688 within_columns_clause=within_columns_clause,
3689 literal_binds=literal_binds and not bindparam.expanding,
3690 literal_execute=literal_execute,
3691 render_postcompile=render_postcompile,
3692 **kwargs,
3693 )
3694 if bindparam.expanding:
3695 # for postcompile w/ expanding, move the "wrapped" part
3696 # of this into the inside
3697
3698 m = re.match(
3699 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3700 )
3701 assert m, "unexpected format for expanding parameter"
3702 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3703 m.group(2),
3704 m.group(1),
3705 m.group(3),
3706 )
3707
3708 if literal_binds:
3709 ret = self.render_literal_bindparam(
3710 bindparam,
3711 within_columns_clause=True,
3712 bind_expression_template=wrapped,
3713 **kwargs,
3714 )
3715 return "(%s)" % ret
3716
3717 return wrapped
3718
3719 if not literal_binds:
3720 literal_execute = (
3721 literal_execute
3722 or bindparam.literal_execute
3723 or (within_columns_clause and self.ansi_bind_rules)
3724 )
3725 post_compile = literal_execute or bindparam.expanding
3726 else:
3727 post_compile = False
3728
3729 if literal_binds:
3730 ret = self.render_literal_bindparam(
3731 bindparam, within_columns_clause=True, **kwargs
3732 )
3733 if bindparam.expanding:
3734 ret = "(%s)" % ret
3735 return ret
3736
3737 name = self._truncate_bindparam(bindparam)
3738
3739 if name in self.binds:
3740 existing = self.binds[name]
3741 if existing is not bindparam:
3742 if (
3743 (existing.unique or bindparam.unique)
3744 and not existing.proxy_set.intersection(
3745 bindparam.proxy_set
3746 )
3747 and not existing._cloned_set.intersection(
3748 bindparam._cloned_set
3749 )
3750 ):
3751 raise exc.CompileError(
3752 "Bind parameter '%s' conflicts with "
3753 "unique bind parameter of the same name" % name
3754 )
3755 elif existing.expanding != bindparam.expanding:
3756 raise exc.CompileError(
3757 "Can't reuse bound parameter name '%s' in both "
3758 "'expanding' (e.g. within an IN expression) and "
3759 "non-expanding contexts. If this parameter is to "
3760 "receive a list/array value, set 'expanding=True' on "
3761 "it for expressions that aren't IN, otherwise use "
3762 "a different parameter name." % (name,)
3763 )
3764 elif existing._is_crud or bindparam._is_crud:
3765 if existing._is_crud and bindparam._is_crud:
3766 # TODO: this condition is not well understood.
3767 # see tests in test/sql/test_update.py
3768 raise exc.CompileError(
3769 "Encountered unsupported case when compiling an "
3770 "INSERT or UPDATE statement. If this is a "
3771 "multi-table "
3772 "UPDATE statement, please provide string-named "
3773 "arguments to the "
3774 "values() method with distinct names; support for "
3775 "multi-table UPDATE statements that "
3776 "target multiple tables for UPDATE is very "
3777 "limited",
3778 )
3779 else:
3780 raise exc.CompileError(
3781 f"bindparam() name '{bindparam.key}' is reserved "
3782 "for automatic usage in the VALUES or SET "
3783 "clause of this "
3784 "insert/update statement. Please use a "
3785 "name other than column name when using "
3786 "bindparam() "
3787 "with insert() or update() (for example, "
3788 f"'b_{bindparam.key}')."
3789 )
3790
3791 self.binds[bindparam.key] = self.binds[name] = bindparam
3792
3793 # if we are given a cache key that we're going to match against,
3794 # relate the bindparam here to one that is most likely present
3795 # in the "extracted params" portion of the cache key. this is used
3796 # to set up a positional mapping that is used to determine the
3797 # correct parameters for a subsequent use of this compiled with
3798 # a different set of parameter values. here, we accommodate for
3799 # parameters that may have been cloned both before and after the cache
3800 # key was been generated.
3801 ckbm_tuple = self._cache_key_bind_match
3802
3803 if ckbm_tuple:
3804 ckbm, cksm = ckbm_tuple
3805 for bp in bindparam._cloned_set:
3806 if bp.key in cksm:
3807 cb = cksm[bp.key]
3808 ckbm[cb].append(bindparam)
3809
3810 if bindparam.isoutparam:
3811 self.has_out_parameters = True
3812
3813 if post_compile:
3814 if render_postcompile:
3815 self._render_postcompile = True
3816
3817 if literal_execute:
3818 self.literal_execute_params |= {bindparam}
3819 else:
3820 self.post_compile_params |= {bindparam}
3821
3822 ret = self.bindparam_string(
3823 name,
3824 post_compile=post_compile,
3825 expanding=bindparam.expanding,
3826 bindparam_type=bindparam.type,
3827 **kwargs,
3828 )
3829
3830 if bindparam.expanding:
3831 ret = "(%s)" % ret
3832
3833 return ret
3834
3835 def render_bind_cast(self, type_, dbapi_type, sqltext):
3836 raise NotImplementedError()
3837
3838 def render_literal_bindparam(
3839 self,
3840 bindparam,
3841 render_literal_value=NO_ARG,
3842 bind_expression_template=None,
3843 **kw,
3844 ):
3845 if render_literal_value is not NO_ARG:
3846 value = render_literal_value
3847 else:
3848 if bindparam.value is None and bindparam.callable is None:
3849 op = kw.get("_binary_op", None)
3850 if op and op not in (operators.is_, operators.is_not):
3851 util.warn_limited(
3852 "Bound parameter '%s' rendering literal NULL in a SQL "
3853 "expression; comparisons to NULL should not use "
3854 "operators outside of 'is' or 'is not'",
3855 (bindparam.key,),
3856 )
3857 return self.process(sqltypes.NULLTYPE, **kw)
3858 value = bindparam.effective_value
3859
3860 if bindparam.expanding:
3861 leep = self._literal_execute_expanding_parameter_literal_binds
3862 to_update, replacement_expr = leep(
3863 bindparam,
3864 value,
3865 bind_expression_template=bind_expression_template,
3866 )
3867 return replacement_expr
3868 else:
3869 return self.render_literal_value(value, bindparam.type)
3870
3871 def render_literal_value(
3872 self, value: Any, type_: sqltypes.TypeEngine[Any]
3873 ) -> str:
3874 """Render the value of a bind parameter as a quoted literal.
3875
3876 This is used for statement sections that do not accept bind parameters
3877 on the target driver/database.
3878
3879 This should be implemented by subclasses using the quoting services
3880 of the DBAPI.
3881
3882 """
3883
3884 if value is None and not type_.should_evaluate_none:
3885 # issue #10535 - handle NULL in the compiler without placing
3886 # this onto each type, except for "evaluate None" types
3887 # (e.g. JSON)
3888 return self.process(elements.Null._instance())
3889
3890 processor = type_._cached_literal_processor(self.dialect)
3891 if processor:
3892 try:
3893 return processor(value)
3894 except Exception as e:
3895 raise exc.CompileError(
3896 f"Could not render literal value "
3897 f'"{sql_util._repr_single_value(value)}" '
3898 f"with datatype "
3899 f"{type_}; see parent stack trace for "
3900 "more detail."
3901 ) from e
3902
3903 else:
3904 raise exc.CompileError(
3905 f"No literal value renderer is available for literal value "
3906 f'"{sql_util._repr_single_value(value)}" '
3907 f"with datatype {type_}"
3908 )
3909
3910 def _truncate_bindparam(self, bindparam):
3911 if bindparam in self.bind_names:
3912 return self.bind_names[bindparam]
3913
3914 bind_name = bindparam.key
3915 if isinstance(bind_name, elements._truncated_label):
3916 bind_name = self._truncated_identifier("bindparam", bind_name)
3917
3918 # add to bind_names for translation
3919 self.bind_names[bindparam] = bind_name
3920
3921 return bind_name
3922
3923 def _truncated_identifier(
3924 self, ident_class: str, name: _truncated_label
3925 ) -> str:
3926 if (ident_class, name) in self.truncated_names:
3927 return self.truncated_names[(ident_class, name)]
3928
3929 anonname = name.apply_map(self.anon_map)
3930
3931 if len(anonname) > self.label_length - 6:
3932 counter = self._truncated_counters.get(ident_class, 1)
3933 truncname = (
3934 anonname[0 : max(self.label_length - 6, 0)]
3935 + "_"
3936 + hex(counter)[2:]
3937 )
3938 self._truncated_counters[ident_class] = counter + 1
3939 else:
3940 truncname = anonname
3941 self.truncated_names[(ident_class, name)] = truncname
3942 return truncname
3943
3944 def _anonymize(self, name: str) -> str:
3945 return name % self.anon_map
3946
3947 def bindparam_string(
3948 self,
3949 name: str,
3950 post_compile: bool = False,
3951 expanding: bool = False,
3952 escaped_from: Optional[str] = None,
3953 bindparam_type: Optional[TypeEngine[Any]] = None,
3954 accumulate_bind_names: Optional[Set[str]] = None,
3955 visited_bindparam: Optional[List[str]] = None,
3956 **kw: Any,
3957 ) -> str:
3958 # TODO: accumulate_bind_names is passed by crud.py to gather
3959 # names on a per-value basis, visited_bindparam is passed by
3960 # visit_insert() to collect all parameters in the statement.
3961 # see if this gathering can be simplified somehow
3962 if accumulate_bind_names is not None:
3963 accumulate_bind_names.add(name)
3964 if visited_bindparam is not None:
3965 visited_bindparam.append(name)
3966
3967 if not escaped_from:
3968 if self._bind_translate_re.search(name):
3969 # not quite the translate use case as we want to
3970 # also get a quick boolean if we even found
3971 # unusual characters in the name
3972 new_name = self._bind_translate_re.sub(
3973 lambda m: self._bind_translate_chars[m.group(0)],
3974 name,
3975 )
3976 escaped_from = name
3977 name = new_name
3978
3979 if escaped_from:
3980 self.escaped_bind_names = self.escaped_bind_names.union(
3981 {escaped_from: name}
3982 )
3983 if post_compile:
3984 ret = "__[POSTCOMPILE_%s]" % name
3985 if expanding:
3986 # for expanding, bound parameters or literal values will be
3987 # rendered per item
3988 return ret
3989
3990 # otherwise, for non-expanding "literal execute", apply
3991 # bind casts as determined by the datatype
3992 if bindparam_type is not None:
3993 type_impl = bindparam_type._unwrapped_dialect_impl(
3994 self.dialect
3995 )
3996 if type_impl.render_literal_cast:
3997 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
3998 return ret
3999 elif self.state is CompilerState.COMPILING:
4000 ret = self.compilation_bindtemplate % {"name": name}
4001 else:
4002 ret = self.bindtemplate % {"name": name}
4003
4004 if (
4005 bindparam_type is not None
4006 and self.dialect._bind_typing_render_casts
4007 ):
4008 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4009 if type_impl.render_bind_cast:
4010 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4011
4012 return ret
4013
4014 def _dispatch_independent_ctes(self, stmt, kw):
4015 local_kw = kw.copy()
4016 local_kw.pop("cte_opts", None)
4017 for cte, opt in zip(
4018 stmt._independent_ctes, stmt._independent_ctes_opts
4019 ):
4020 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4021
4022 def visit_cte(
4023 self,
4024 cte: CTE,
4025 asfrom: bool = False,
4026 ashint: bool = False,
4027 fromhints: Optional[_FromHintsType] = None,
4028 visiting_cte: Optional[CTE] = None,
4029 from_linter: Optional[FromLinter] = None,
4030 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4031 **kwargs: Any,
4032 ) -> Optional[str]:
4033 self_ctes = self._init_cte_state()
4034 assert self_ctes is self.ctes
4035
4036 kwargs["visiting_cte"] = cte
4037
4038 cte_name = cte.name
4039
4040 if isinstance(cte_name, elements._truncated_label):
4041 cte_name = self._truncated_identifier("alias", cte_name)
4042
4043 is_new_cte = True
4044 embedded_in_current_named_cte = False
4045
4046 _reference_cte = cte._get_reference_cte()
4047
4048 nesting = cte.nesting or cte_opts.nesting
4049
4050 # check for CTE already encountered
4051 if _reference_cte in self.level_name_by_cte:
4052 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4053 _reference_cte
4054 ]
4055 assert _ == cte_name
4056
4057 cte_level_name = (cte_level, cte_name)
4058 existing_cte = self.ctes_by_level_name[cte_level_name]
4059
4060 # check if we are receiving it here with a specific
4061 # "nest_here" location; if so, move it to this location
4062
4063 if cte_opts.nesting:
4064 if existing_cte_opts.nesting:
4065 raise exc.CompileError(
4066 "CTE is stated as 'nest_here' in "
4067 "more than one location"
4068 )
4069
4070 old_level_name = (cte_level, cte_name)
4071 cte_level = len(self.stack) if nesting else 1
4072 cte_level_name = new_level_name = (cte_level, cte_name)
4073
4074 del self.ctes_by_level_name[old_level_name]
4075 self.ctes_by_level_name[new_level_name] = existing_cte
4076 self.level_name_by_cte[_reference_cte] = new_level_name + (
4077 cte_opts,
4078 )
4079
4080 else:
4081 cte_level = len(self.stack) if nesting else 1
4082 cte_level_name = (cte_level, cte_name)
4083
4084 if cte_level_name in self.ctes_by_level_name:
4085 existing_cte = self.ctes_by_level_name[cte_level_name]
4086 else:
4087 existing_cte = None
4088
4089 if existing_cte is not None:
4090 embedded_in_current_named_cte = visiting_cte is existing_cte
4091
4092 # we've generated a same-named CTE that we are enclosed in,
4093 # or this is the same CTE. just return the name.
4094 if cte is existing_cte._restates or cte is existing_cte:
4095 is_new_cte = False
4096 elif existing_cte is cte._restates:
4097 # we've generated a same-named CTE that is
4098 # enclosed in us - we take precedence, so
4099 # discard the text for the "inner".
4100 del self_ctes[existing_cte]
4101
4102 existing_cte_reference_cte = existing_cte._get_reference_cte()
4103
4104 assert existing_cte_reference_cte is _reference_cte
4105 assert existing_cte_reference_cte is existing_cte
4106
4107 del self.level_name_by_cte[existing_cte_reference_cte]
4108 else:
4109 if (
4110 # if the two CTEs have the same hash, which we expect
4111 # here means that one/both is an annotated of the other
4112 (hash(cte) == hash(existing_cte))
4113 # or...
4114 or (
4115 (
4116 # if they are clones, i.e. they came from the ORM
4117 # or some other visit method
4118 cte._is_clone_of is not None
4119 or existing_cte._is_clone_of is not None
4120 )
4121 # and are deep-copy identical
4122 and cte.compare(existing_cte)
4123 )
4124 ):
4125 # then consider these two CTEs the same
4126 is_new_cte = False
4127 else:
4128 # otherwise these are two CTEs that either will render
4129 # differently, or were indicated separately by the user,
4130 # with the same name
4131 raise exc.CompileError(
4132 "Multiple, unrelated CTEs found with "
4133 "the same name: %r" % cte_name
4134 )
4135
4136 if not asfrom and not is_new_cte:
4137 return None
4138
4139 if cte._cte_alias is not None:
4140 pre_alias_cte = cte._cte_alias
4141 cte_pre_alias_name = cte._cte_alias.name
4142 if isinstance(cte_pre_alias_name, elements._truncated_label):
4143 cte_pre_alias_name = self._truncated_identifier(
4144 "alias", cte_pre_alias_name
4145 )
4146 else:
4147 pre_alias_cte = cte
4148 cte_pre_alias_name = None
4149
4150 if is_new_cte:
4151 self.ctes_by_level_name[cte_level_name] = cte
4152 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4153 cte_opts,
4154 )
4155
4156 if pre_alias_cte not in self.ctes:
4157 self.visit_cte(pre_alias_cte, **kwargs)
4158
4159 if not cte_pre_alias_name and cte not in self_ctes:
4160 if cte.recursive:
4161 self.ctes_recursive = True
4162 text = self.preparer.format_alias(cte, cte_name)
4163 if cte.recursive or cte.element.name_cte_columns:
4164 col_source = cte.element
4165
4166 # TODO: can we get at the .columns_plus_names collection
4167 # that is already (or will be?) generated for the SELECT
4168 # rather than calling twice?
4169 recur_cols = [
4170 # TODO: proxy_name is not technically safe,
4171 # see test_cte->
4172 # test_with_recursive_no_name_currently_buggy. not
4173 # clear what should be done with such a case
4174 fallback_label_name or proxy_name
4175 for (
4176 _,
4177 proxy_name,
4178 fallback_label_name,
4179 c,
4180 repeated,
4181 ) in (col_source._generate_columns_plus_names(True))
4182 if not repeated
4183 ]
4184
4185 text += "(%s)" % (
4186 ", ".join(
4187 self.preparer.format_label_name(
4188 ident, anon_map=self.anon_map
4189 )
4190 for ident in recur_cols
4191 )
4192 )
4193
4194 assert kwargs.get("subquery", False) is False
4195
4196 if not self.stack:
4197 # toplevel, this is a stringify of the
4198 # cte directly. just compile the inner
4199 # the way alias() does.
4200 return cte.element._compiler_dispatch(
4201 self, asfrom=asfrom, **kwargs
4202 )
4203 else:
4204 prefixes = self._generate_prefixes(
4205 cte, cte._prefixes, **kwargs
4206 )
4207 inner = cte.element._compiler_dispatch(
4208 self, asfrom=True, **kwargs
4209 )
4210
4211 text += " AS %s\n(%s)" % (prefixes, inner)
4212
4213 if cte._suffixes:
4214 text += " " + self._generate_prefixes(
4215 cte, cte._suffixes, **kwargs
4216 )
4217
4218 self_ctes[cte] = text
4219
4220 if asfrom:
4221 if from_linter:
4222 from_linter.froms[cte._de_clone()] = cte_name
4223
4224 if not is_new_cte and embedded_in_current_named_cte:
4225 return self.preparer.format_alias(cte, cte_name)
4226
4227 if cte_pre_alias_name:
4228 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4229 if self.preparer._requires_quotes(cte_name):
4230 cte_name = self.preparer.quote(cte_name)
4231 text += self.get_render_as_alias_suffix(cte_name)
4232 return text # type: ignore[no-any-return]
4233 else:
4234 return self.preparer.format_alias(cte, cte_name)
4235
4236 return None
4237
4238 def visit_table_valued_alias(self, element, **kw):
4239 if element.joins_implicitly:
4240 kw["from_linter"] = None
4241 if element._is_lateral:
4242 return self.visit_lateral(element, **kw)
4243 else:
4244 return self.visit_alias(element, **kw)
4245
4246 def visit_table_valued_column(self, element, **kw):
4247 return self.visit_column(element, **kw)
4248
4249 def visit_alias(
4250 self,
4251 alias,
4252 asfrom=False,
4253 ashint=False,
4254 iscrud=False,
4255 fromhints=None,
4256 subquery=False,
4257 lateral=False,
4258 enclosing_alias=None,
4259 from_linter=None,
4260 **kwargs,
4261 ):
4262 if lateral:
4263 if "enclosing_lateral" not in kwargs:
4264 # if lateral is set and enclosing_lateral is not
4265 # present, we assume we are being called directly
4266 # from visit_lateral() and we need to set enclosing_lateral.
4267 assert alias._is_lateral
4268 kwargs["enclosing_lateral"] = alias
4269
4270 # for lateral objects, we track a second from_linter that is...
4271 # lateral! to the level above us.
4272 if (
4273 from_linter
4274 and "lateral_from_linter" not in kwargs
4275 and "enclosing_lateral" in kwargs
4276 ):
4277 kwargs["lateral_from_linter"] = from_linter
4278
4279 if enclosing_alias is not None and enclosing_alias.element is alias:
4280 inner = alias.element._compiler_dispatch(
4281 self,
4282 asfrom=asfrom,
4283 ashint=ashint,
4284 iscrud=iscrud,
4285 fromhints=fromhints,
4286 lateral=lateral,
4287 enclosing_alias=alias,
4288 **kwargs,
4289 )
4290 if subquery and (asfrom or lateral):
4291 inner = "(%s)" % (inner,)
4292 return inner
4293 else:
4294 kwargs["enclosing_alias"] = alias
4295
4296 if asfrom or ashint:
4297 if isinstance(alias.name, elements._truncated_label):
4298 alias_name = self._truncated_identifier("alias", alias.name)
4299 else:
4300 alias_name = alias.name
4301
4302 if ashint:
4303 return self.preparer.format_alias(alias, alias_name)
4304 elif asfrom:
4305 if from_linter:
4306 from_linter.froms[alias._de_clone()] = alias_name
4307
4308 inner = alias.element._compiler_dispatch(
4309 self, asfrom=True, lateral=lateral, **kwargs
4310 )
4311 if subquery:
4312 inner = "(%s)" % (inner,)
4313
4314 ret = inner + self.get_render_as_alias_suffix(
4315 self.preparer.format_alias(alias, alias_name)
4316 )
4317
4318 if alias._supports_derived_columns and alias._render_derived:
4319 ret += "(%s)" % (
4320 ", ".join(
4321 "%s%s"
4322 % (
4323 self.preparer.quote(col.name),
4324 (
4325 " %s"
4326 % self.dialect.type_compiler_instance.process(
4327 col.type, **kwargs
4328 )
4329 if alias._render_derived_w_types
4330 else ""
4331 ),
4332 )
4333 for col in alias.c
4334 )
4335 )
4336
4337 if fromhints and alias in fromhints:
4338 ret = self.format_from_hint_text(
4339 ret, alias, fromhints[alias], iscrud
4340 )
4341
4342 return ret
4343 else:
4344 # note we cancel the "subquery" flag here as well
4345 return alias.element._compiler_dispatch(
4346 self, lateral=lateral, **kwargs
4347 )
4348
4349 def visit_subquery(self, subquery, **kw):
4350 kw["subquery"] = True
4351 return self.visit_alias(subquery, **kw)
4352
4353 def visit_lateral(self, lateral_, **kw):
4354 kw["lateral"] = True
4355 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4356
4357 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4358 text = "%s TABLESAMPLE %s" % (
4359 self.visit_alias(tablesample, asfrom=True, **kw),
4360 tablesample._get_method()._compiler_dispatch(self, **kw),
4361 )
4362
4363 if tablesample.seed is not None:
4364 text += " REPEATABLE (%s)" % (
4365 tablesample.seed._compiler_dispatch(self, **kw)
4366 )
4367
4368 return text
4369
4370 def _render_values(self, element, **kw):
4371 kw.setdefault("literal_binds", element.literal_binds)
4372 tuples = ", ".join(
4373 self.process(
4374 elements.Tuple(
4375 types=element._column_types, *elem
4376 ).self_group(),
4377 **kw,
4378 )
4379 for chunk in element._data
4380 for elem in chunk
4381 )
4382 return f"VALUES {tuples}"
4383
4384 def visit_values(
4385 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4386 ):
4387
4388 if element._independent_ctes:
4389 self._dispatch_independent_ctes(element, kw)
4390
4391 v = self._render_values(element, **kw)
4392
4393 if element._unnamed:
4394 name = None
4395 elif isinstance(element.name, elements._truncated_label):
4396 name = self._truncated_identifier("values", element.name)
4397 else:
4398 name = element.name
4399
4400 if element._is_lateral:
4401 lateral = "LATERAL "
4402 else:
4403 lateral = ""
4404
4405 if asfrom:
4406 if from_linter:
4407 from_linter.froms[element._de_clone()] = (
4408 name if name is not None else "(unnamed VALUES element)"
4409 )
4410
4411 if visiting_cte is not None and visiting_cte.element is element:
4412 if element._is_lateral:
4413 raise exc.CompileError(
4414 "Can't use a LATERAL VALUES expression inside of a CTE"
4415 )
4416 elif name:
4417 kw["include_table"] = False
4418 v = "%s(%s)%s (%s)" % (
4419 lateral,
4420 v,
4421 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4422 (
4423 ", ".join(
4424 c._compiler_dispatch(self, **kw)
4425 for c in element.columns
4426 )
4427 ),
4428 )
4429 else:
4430 v = "%s(%s)" % (lateral, v)
4431 return v
4432
4433 def visit_scalar_values(self, element, **kw):
4434 return f"({self._render_values(element, **kw)})"
4435
4436 def get_render_as_alias_suffix(self, alias_name_text):
4437 return " AS " + alias_name_text
4438
4439 def _add_to_result_map(
4440 self,
4441 keyname: str,
4442 name: str,
4443 objects: Tuple[Any, ...],
4444 type_: TypeEngine[Any],
4445 ) -> None:
4446
4447 # note objects must be non-empty for cursor.py to handle the
4448 # collection properly
4449 assert objects
4450
4451 if keyname is None or keyname == "*":
4452 self._ordered_columns = False
4453 self._ad_hoc_textual = True
4454 if type_._is_tuple_type:
4455 raise exc.CompileError(
4456 "Most backends don't support SELECTing "
4457 "from a tuple() object. If this is an ORM query, "
4458 "consider using the Bundle object."
4459 )
4460 self._result_columns.append(
4461 ResultColumnsEntry(keyname, name, objects, type_)
4462 )
4463
4464 def _label_returning_column(
4465 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4466 ):
4467 """Render a column with necessary labels inside of a RETURNING clause.
4468
4469 This method is provided for individual dialects in place of calling
4470 the _label_select_column method directly, so that the two use cases
4471 of RETURNING vs. SELECT can be disambiguated going forward.
4472
4473 .. versionadded:: 1.4.21
4474
4475 """
4476 return self._label_select_column(
4477 None,
4478 column,
4479 populate_result_map,
4480 False,
4481 {} if column_clause_args is None else column_clause_args,
4482 **kw,
4483 )
4484
4485 def _label_select_column(
4486 self,
4487 select,
4488 column,
4489 populate_result_map,
4490 asfrom,
4491 column_clause_args,
4492 name=None,
4493 proxy_name=None,
4494 fallback_label_name=None,
4495 within_columns_clause=True,
4496 column_is_repeated=False,
4497 need_column_expressions=False,
4498 include_table=True,
4499 ):
4500 """produce labeled columns present in a select()."""
4501 impl = column.type.dialect_impl(self.dialect)
4502
4503 if impl._has_column_expression and (
4504 need_column_expressions or populate_result_map
4505 ):
4506 col_expr = impl.column_expression(column)
4507 else:
4508 col_expr = column
4509
4510 if populate_result_map:
4511 # pass an "add_to_result_map" callable into the compilation
4512 # of embedded columns. this collects information about the
4513 # column as it will be fetched in the result and is coordinated
4514 # with cursor.description when the query is executed.
4515 add_to_result_map = self._add_to_result_map
4516
4517 # if the SELECT statement told us this column is a repeat,
4518 # wrap the callable with one that prevents the addition of the
4519 # targets
4520 if column_is_repeated:
4521 _add_to_result_map = add_to_result_map
4522
4523 def add_to_result_map(keyname, name, objects, type_):
4524 _add_to_result_map(keyname, name, (keyname,), type_)
4525
4526 # if we redefined col_expr for type expressions, wrap the
4527 # callable with one that adds the original column to the targets
4528 elif col_expr is not column:
4529 _add_to_result_map = add_to_result_map
4530
4531 def add_to_result_map(keyname, name, objects, type_):
4532 _add_to_result_map(
4533 keyname, name, (column,) + objects, type_
4534 )
4535
4536 else:
4537 add_to_result_map = None
4538
4539 # this method is used by some of the dialects for RETURNING,
4540 # which has different inputs. _label_returning_column was added
4541 # as the better target for this now however for 1.4 we will keep
4542 # _label_select_column directly compatible with this use case.
4543 # these assertions right now set up the current expected inputs
4544 assert within_columns_clause, (
4545 "_label_select_column is only relevant within "
4546 "the columns clause of a SELECT or RETURNING"
4547 )
4548 if isinstance(column, elements.Label):
4549 if col_expr is not column:
4550 result_expr = _CompileLabel(
4551 col_expr, column.name, alt_names=(column.element,)
4552 )
4553 else:
4554 result_expr = col_expr
4555
4556 elif name:
4557 # here, _columns_plus_names has determined there's an explicit
4558 # label name we need to use. this is the default for
4559 # tablenames_plus_columnnames as well as when columns are being
4560 # deduplicated on name
4561
4562 assert (
4563 proxy_name is not None
4564 ), "proxy_name is required if 'name' is passed"
4565
4566 result_expr = _CompileLabel(
4567 col_expr,
4568 name,
4569 alt_names=(
4570 proxy_name,
4571 # this is a hack to allow legacy result column lookups
4572 # to work as they did before; this goes away in 2.0.
4573 # TODO: this only seems to be tested indirectly
4574 # via test/orm/test_deprecations.py. should be a
4575 # resultset test for this
4576 column._tq_label,
4577 ),
4578 )
4579 else:
4580 # determine here whether this column should be rendered in
4581 # a labelled context or not, as we were given no required label
4582 # name from the caller. Here we apply heuristics based on the kind
4583 # of SQL expression involved.
4584
4585 if col_expr is not column:
4586 # type-specific expression wrapping the given column,
4587 # so we render a label
4588 render_with_label = True
4589 elif isinstance(column, elements.ColumnClause):
4590 # table-bound column, we render its name as a label if we are
4591 # inside of a subquery only
4592 render_with_label = (
4593 asfrom
4594 and not column.is_literal
4595 and column.table is not None
4596 )
4597 elif isinstance(column, elements.TextClause):
4598 render_with_label = False
4599 elif isinstance(column, elements.UnaryExpression):
4600 # unary expression. notes added as of #12681
4601 #
4602 # By convention, the visit_unary() method
4603 # itself does not add an entry to the result map, and relies
4604 # upon either the inner expression creating a result map
4605 # entry, or if not, by creating a label here that produces
4606 # the result map entry. Where that happens is based on whether
4607 # or not the element immediately inside the unary is a
4608 # NamedColumn subclass or not.
4609 #
4610 # Now, this also impacts how the SELECT is written; if
4611 # we decide to generate a label here, we get the usual
4612 # "~(x+y) AS anon_1" thing in the columns clause. If we
4613 # don't, we don't get an AS at all, we get like
4614 # "~table.column".
4615 #
4616 # But here is the important thing as of modernish (like 1.4)
4617 # versions of SQLAlchemy - **whether or not the AS <label>
4618 # is present in the statement is not actually important**.
4619 # We target result columns **positionally** for a fully
4620 # compiled ``Select()`` object; before 1.4 we needed those
4621 # labels to match in cursor.description etc etc but now it
4622 # really doesn't matter.
4623 # So really, we could set render_with_label True in all cases.
4624 # Or we could just have visit_unary() populate the result map
4625 # in all cases.
4626 #
4627 # What we're doing here is strictly trying to not rock the
4628 # boat too much with when we do/don't render "AS label";
4629 # labels being present helps in the edge cases that we
4630 # "fall back" to named cursor.description matching, labels
4631 # not being present for columns keeps us from having awkward
4632 # phrases like "SELECT DISTINCT table.x AS x".
4633 render_with_label = (
4634 (
4635 # exception case to detect if we render "not boolean"
4636 # as "not <col>" for native boolean or "<col> = 1"
4637 # for non-native boolean. this is controlled by
4638 # visit_is_<true|false>_unary_operator
4639 column.operator
4640 in (operators.is_false, operators.is_true)
4641 and not self.dialect.supports_native_boolean
4642 )
4643 or column._wraps_unnamed_column()
4644 or asfrom
4645 )
4646 elif (
4647 # general class of expressions that don't have a SQL-column
4648 # addressable name. includes scalar selects, bind parameters,
4649 # SQL functions, others
4650 not isinstance(column, elements.NamedColumn)
4651 # deeper check that indicates there's no natural "name" to
4652 # this element, which accommodates for custom SQL constructs
4653 # that might have a ".name" attribute (but aren't SQL
4654 # functions) but are not implementing this more recently added
4655 # base class. in theory the "NamedColumn" check should be
4656 # enough, however here we seek to maintain legacy behaviors
4657 # as well.
4658 and column._non_anon_label is None
4659 ):
4660 render_with_label = True
4661 else:
4662 render_with_label = False
4663
4664 if render_with_label:
4665 if not fallback_label_name:
4666 # used by the RETURNING case right now. we generate it
4667 # here as 3rd party dialects may be referring to
4668 # _label_select_column method directly instead of the
4669 # just-added _label_returning_column method
4670 assert not column_is_repeated
4671 fallback_label_name = column._anon_name_label
4672
4673 fallback_label_name = (
4674 elements._truncated_label(fallback_label_name)
4675 if not isinstance(
4676 fallback_label_name, elements._truncated_label
4677 )
4678 else fallback_label_name
4679 )
4680
4681 result_expr = _CompileLabel(
4682 col_expr, fallback_label_name, alt_names=(proxy_name,)
4683 )
4684 else:
4685 result_expr = col_expr
4686
4687 column_clause_args.update(
4688 within_columns_clause=within_columns_clause,
4689 add_to_result_map=add_to_result_map,
4690 include_table=include_table,
4691 )
4692 return result_expr._compiler_dispatch(self, **column_clause_args)
4693
4694 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4695 hinttext = self.get_from_hint_text(table, hint)
4696 if hinttext:
4697 sqltext += " " + hinttext
4698 return sqltext
4699
4700 def get_select_hint_text(self, byfroms):
4701 return None
4702
4703 def get_from_hint_text(
4704 self, table: FromClause, text: Optional[str]
4705 ) -> Optional[str]:
4706 return None
4707
4708 def get_crud_hint_text(self, table, text):
4709 return None
4710
4711 def get_statement_hint_text(self, hint_texts):
4712 return " ".join(hint_texts)
4713
4714 _default_stack_entry: _CompilerStackEntry
4715
4716 if not typing.TYPE_CHECKING:
4717 _default_stack_entry = util.immutabledict(
4718 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4719 )
4720
4721 def _display_froms_for_select(
4722 self, select_stmt, asfrom, lateral=False, **kw
4723 ):
4724 # utility method to help external dialects
4725 # get the correct from list for a select.
4726 # specifically the oracle dialect needs this feature
4727 # right now.
4728 toplevel = not self.stack
4729 entry = self._default_stack_entry if toplevel else self.stack[-1]
4730
4731 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4732
4733 correlate_froms = entry["correlate_froms"]
4734 asfrom_froms = entry["asfrom_froms"]
4735
4736 if asfrom and not lateral:
4737 froms = compile_state._get_display_froms(
4738 explicit_correlate_froms=correlate_froms.difference(
4739 asfrom_froms
4740 ),
4741 implicit_correlate_froms=(),
4742 )
4743 else:
4744 froms = compile_state._get_display_froms(
4745 explicit_correlate_froms=correlate_froms,
4746 implicit_correlate_froms=asfrom_froms,
4747 )
4748 return froms
4749
4750 translate_select_structure: Any = None
4751 """if not ``None``, should be a callable which accepts ``(select_stmt,
4752 **kw)`` and returns a select object. this is used for structural changes
4753 mostly to accommodate for LIMIT/OFFSET schemes
4754
4755 """
4756
4757 def visit_select(
4758 self,
4759 select_stmt,
4760 asfrom=False,
4761 insert_into=False,
4762 fromhints=None,
4763 compound_index=None,
4764 select_wraps_for=None,
4765 lateral=False,
4766 from_linter=None,
4767 **kwargs,
4768 ):
4769 assert select_wraps_for is None, (
4770 "SQLAlchemy 1.4 requires use of "
4771 "the translate_select_structure hook for structural "
4772 "translations of SELECT objects"
4773 )
4774
4775 # initial setup of SELECT. the compile_state_factory may now
4776 # be creating a totally different SELECT from the one that was
4777 # passed in. for ORM use this will convert from an ORM-state
4778 # SELECT to a regular "Core" SELECT. other composed operations
4779 # such as computation of joins will be performed.
4780
4781 kwargs["within_columns_clause"] = False
4782
4783 compile_state = select_stmt._compile_state_factory(
4784 select_stmt, self, **kwargs
4785 )
4786 kwargs["ambiguous_table_name_map"] = (
4787 compile_state._ambiguous_table_name_map
4788 )
4789
4790 select_stmt = compile_state.statement
4791
4792 toplevel = not self.stack
4793
4794 if toplevel and not self.compile_state:
4795 self.compile_state = compile_state
4796
4797 is_embedded_select = compound_index is not None or insert_into
4798
4799 # translate step for Oracle, SQL Server which often need to
4800 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4801 # other conditions
4802 if self.translate_select_structure:
4803 new_select_stmt = self.translate_select_structure(
4804 select_stmt, asfrom=asfrom, **kwargs
4805 )
4806
4807 # if SELECT was restructured, maintain a link to the originals
4808 # and assemble a new compile state
4809 if new_select_stmt is not select_stmt:
4810 compile_state_wraps_for = compile_state
4811 select_wraps_for = select_stmt
4812 select_stmt = new_select_stmt
4813
4814 compile_state = select_stmt._compile_state_factory(
4815 select_stmt, self, **kwargs
4816 )
4817 select_stmt = compile_state.statement
4818
4819 entry = self._default_stack_entry if toplevel else self.stack[-1]
4820
4821 populate_result_map = need_column_expressions = (
4822 toplevel
4823 or entry.get("need_result_map_for_compound", False)
4824 or entry.get("need_result_map_for_nested", False)
4825 )
4826
4827 # indicates there is a CompoundSelect in play and we are not the
4828 # first select
4829 if compound_index:
4830 populate_result_map = False
4831
4832 # this was first proposed as part of #3372; however, it is not
4833 # reached in current tests and could possibly be an assertion
4834 # instead.
4835 if not populate_result_map and "add_to_result_map" in kwargs:
4836 del kwargs["add_to_result_map"]
4837
4838 froms = self._setup_select_stack(
4839 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4840 )
4841
4842 column_clause_args = kwargs.copy()
4843 column_clause_args.update(
4844 {"within_label_clause": False, "within_columns_clause": False}
4845 )
4846
4847 text = "SELECT " # we're off to a good start !
4848
4849 if select_stmt._hints:
4850 hint_text, byfrom = self._setup_select_hints(select_stmt)
4851 if hint_text:
4852 text += hint_text + " "
4853 else:
4854 byfrom = None
4855
4856 if select_stmt._independent_ctes:
4857 self._dispatch_independent_ctes(select_stmt, kwargs)
4858
4859 if select_stmt._prefixes:
4860 text += self._generate_prefixes(
4861 select_stmt, select_stmt._prefixes, **kwargs
4862 )
4863
4864 text += self.get_select_precolumns(select_stmt, **kwargs)
4865 # the actual list of columns to print in the SELECT column list.
4866 inner_columns = [
4867 c
4868 for c in [
4869 self._label_select_column(
4870 select_stmt,
4871 column,
4872 populate_result_map,
4873 asfrom,
4874 column_clause_args,
4875 name=name,
4876 proxy_name=proxy_name,
4877 fallback_label_name=fallback_label_name,
4878 column_is_repeated=repeated,
4879 need_column_expressions=need_column_expressions,
4880 )
4881 for (
4882 name,
4883 proxy_name,
4884 fallback_label_name,
4885 column,
4886 repeated,
4887 ) in compile_state.columns_plus_names
4888 ]
4889 if c is not None
4890 ]
4891
4892 if populate_result_map and select_wraps_for is not None:
4893 # if this select was generated from translate_select,
4894 # rewrite the targeted columns in the result map
4895
4896 translate = dict(
4897 zip(
4898 [
4899 name
4900 for (
4901 key,
4902 proxy_name,
4903 fallback_label_name,
4904 name,
4905 repeated,
4906 ) in compile_state.columns_plus_names
4907 ],
4908 [
4909 name
4910 for (
4911 key,
4912 proxy_name,
4913 fallback_label_name,
4914 name,
4915 repeated,
4916 ) in compile_state_wraps_for.columns_plus_names
4917 ],
4918 )
4919 )
4920
4921 self._result_columns = [
4922 ResultColumnsEntry(
4923 key, name, tuple(translate.get(o, o) for o in obj), type_
4924 )
4925 for key, name, obj, type_ in self._result_columns
4926 ]
4927
4928 text = self._compose_select_body(
4929 text,
4930 select_stmt,
4931 compile_state,
4932 inner_columns,
4933 froms,
4934 byfrom,
4935 toplevel,
4936 kwargs,
4937 )
4938
4939 if select_stmt._statement_hints:
4940 per_dialect = [
4941 ht
4942 for (dialect_name, ht) in select_stmt._statement_hints
4943 if dialect_name in ("*", self.dialect.name)
4944 ]
4945 if per_dialect:
4946 text += " " + self.get_statement_hint_text(per_dialect)
4947
4948 # In compound query, CTEs are shared at the compound level
4949 if self.ctes and (not is_embedded_select or toplevel):
4950 nesting_level = len(self.stack) if not toplevel else None
4951 text = self._render_cte_clause(nesting_level=nesting_level) + text
4952
4953 if select_stmt._suffixes:
4954 text += " " + self._generate_prefixes(
4955 select_stmt, select_stmt._suffixes, **kwargs
4956 )
4957
4958 self.stack.pop(-1)
4959
4960 return text
4961
4962 def _setup_select_hints(
4963 self, select: Select[Any]
4964 ) -> Tuple[str, _FromHintsType]:
4965 byfrom = {
4966 from_: hinttext
4967 % {"name": from_._compiler_dispatch(self, ashint=True)}
4968 for (from_, dialect), hinttext in select._hints.items()
4969 if dialect in ("*", self.dialect.name)
4970 }
4971 hint_text = self.get_select_hint_text(byfrom)
4972 return hint_text, byfrom
4973
4974 def _setup_select_stack(
4975 self, select, compile_state, entry, asfrom, lateral, compound_index
4976 ):
4977 correlate_froms = entry["correlate_froms"]
4978 asfrom_froms = entry["asfrom_froms"]
4979
4980 if compound_index == 0:
4981 entry["select_0"] = select
4982 elif compound_index:
4983 select_0 = entry["select_0"]
4984 numcols = len(select_0._all_selected_columns)
4985
4986 if len(compile_state.columns_plus_names) != numcols:
4987 raise exc.CompileError(
4988 "All selectables passed to "
4989 "CompoundSelect must have identical numbers of "
4990 "columns; select #%d has %d columns, select "
4991 "#%d has %d"
4992 % (
4993 1,
4994 numcols,
4995 compound_index + 1,
4996 len(select._all_selected_columns),
4997 )
4998 )
4999
5000 if asfrom and not lateral:
5001 froms = compile_state._get_display_froms(
5002 explicit_correlate_froms=correlate_froms.difference(
5003 asfrom_froms
5004 ),
5005 implicit_correlate_froms=(),
5006 )
5007 else:
5008 froms = compile_state._get_display_froms(
5009 explicit_correlate_froms=correlate_froms,
5010 implicit_correlate_froms=asfrom_froms,
5011 )
5012
5013 new_correlate_froms = set(_from_objects(*froms))
5014 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5015
5016 new_entry: _CompilerStackEntry = {
5017 "asfrom_froms": new_correlate_froms,
5018 "correlate_froms": all_correlate_froms,
5019 "selectable": select,
5020 "compile_state": compile_state,
5021 }
5022 self.stack.append(new_entry)
5023
5024 return froms
5025
5026 def _compose_select_body(
5027 self,
5028 text,
5029 select,
5030 compile_state,
5031 inner_columns,
5032 froms,
5033 byfrom,
5034 toplevel,
5035 kwargs,
5036 ):
5037 text += ", ".join(inner_columns)
5038
5039 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5040 from_linter = FromLinter({}, set())
5041 warn_linting = self.linting & WARN_LINTING
5042 if toplevel:
5043 self.from_linter = from_linter
5044 else:
5045 from_linter = None
5046 warn_linting = False
5047
5048 # adjust the whitespace for no inner columns, part of #9440,
5049 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5050 # "SELECT FROM ...".
5051 # while it would be better to have built the SELECT starting string
5052 # without trailing whitespace first, then add whitespace only if inner
5053 # cols were present, this breaks compatibility with various custom
5054 # compilation schemes that are currently being tested.
5055 if not inner_columns:
5056 text = text.rstrip()
5057
5058 if froms:
5059 text += " \nFROM "
5060
5061 if select._hints:
5062 text += ", ".join(
5063 [
5064 f._compiler_dispatch(
5065 self,
5066 asfrom=True,
5067 fromhints=byfrom,
5068 from_linter=from_linter,
5069 **kwargs,
5070 )
5071 for f in froms
5072 ]
5073 )
5074 else:
5075 text += ", ".join(
5076 [
5077 f._compiler_dispatch(
5078 self,
5079 asfrom=True,
5080 from_linter=from_linter,
5081 **kwargs,
5082 )
5083 for f in froms
5084 ]
5085 )
5086 else:
5087 text += self.default_from()
5088
5089 if select._where_criteria:
5090 t = self._generate_delimited_and_list(
5091 select._where_criteria, from_linter=from_linter, **kwargs
5092 )
5093 if t:
5094 text += " \nWHERE " + t
5095
5096 if warn_linting:
5097 assert from_linter is not None
5098 from_linter.warn()
5099
5100 if select._group_by_clauses:
5101 text += self.group_by_clause(select, **kwargs)
5102
5103 if select._having_criteria:
5104 t = self._generate_delimited_and_list(
5105 select._having_criteria, **kwargs
5106 )
5107 if t:
5108 text += " \nHAVING " + t
5109
5110 if select._order_by_clauses:
5111 text += self.order_by_clause(select, **kwargs)
5112
5113 if select._has_row_limiting_clause:
5114 text += self._row_limit_clause(select, **kwargs)
5115
5116 if select._for_update_arg is not None:
5117 text += self.for_update_clause(select, **kwargs)
5118
5119 return text
5120
5121 def _generate_prefixes(self, stmt, prefixes, **kw):
5122 clause = " ".join(
5123 prefix._compiler_dispatch(self, **kw)
5124 for prefix, dialect_name in prefixes
5125 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5126 )
5127 if clause:
5128 clause += " "
5129 return clause
5130
5131 def _render_cte_clause(
5132 self,
5133 nesting_level=None,
5134 include_following_stack=False,
5135 ):
5136 """
5137 include_following_stack
5138 Also render the nesting CTEs on the next stack. Useful for
5139 SQL structures like UNION or INSERT that can wrap SELECT
5140 statements containing nesting CTEs.
5141 """
5142 if not self.ctes:
5143 return ""
5144
5145 ctes: MutableMapping[CTE, str]
5146
5147 if nesting_level and nesting_level > 1:
5148 ctes = util.OrderedDict()
5149 for cte in list(self.ctes.keys()):
5150 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5151 cte._get_reference_cte()
5152 ]
5153 nesting = cte.nesting or cte_opts.nesting
5154 is_rendered_level = cte_level == nesting_level or (
5155 include_following_stack and cte_level == nesting_level + 1
5156 )
5157 if not (nesting and is_rendered_level):
5158 continue
5159
5160 ctes[cte] = self.ctes[cte]
5161
5162 else:
5163 ctes = self.ctes
5164
5165 if not ctes:
5166 return ""
5167 ctes_recursive = any([cte.recursive for cte in ctes])
5168
5169 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5170 cte_text += ", \n".join([txt for txt in ctes.values()])
5171 cte_text += "\n "
5172
5173 if nesting_level and nesting_level > 1:
5174 for cte in list(ctes.keys()):
5175 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5176 cte._get_reference_cte()
5177 ]
5178 del self.ctes[cte]
5179 del self.ctes_by_level_name[(cte_level, cte_name)]
5180 del self.level_name_by_cte[cte._get_reference_cte()]
5181
5182 return cte_text
5183
5184 def get_cte_preamble(self, recursive):
5185 if recursive:
5186 return "WITH RECURSIVE"
5187 else:
5188 return "WITH"
5189
5190 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5191 """Called when building a ``SELECT`` statement, position is just
5192 before column list.
5193
5194 """
5195 if select._distinct_on:
5196 util.warn_deprecated(
5197 "DISTINCT ON is currently supported only by the PostgreSQL "
5198 "dialect. Use of DISTINCT ON for other backends is currently "
5199 "silently ignored, however this usage is deprecated, and will "
5200 "raise CompileError in a future release for all backends "
5201 "that do not support this syntax.",
5202 version="1.4",
5203 )
5204 return "DISTINCT " if select._distinct else ""
5205
5206 def group_by_clause(self, select, **kw):
5207 """allow dialects to customize how GROUP BY is rendered."""
5208
5209 group_by = self._generate_delimited_list(
5210 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5211 )
5212 if group_by:
5213 return " GROUP BY " + group_by
5214 else:
5215 return ""
5216
5217 def order_by_clause(self, select, **kw):
5218 """allow dialects to customize how ORDER BY is rendered."""
5219
5220 order_by = self._generate_delimited_list(
5221 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5222 )
5223
5224 if order_by:
5225 return " ORDER BY " + order_by
5226 else:
5227 return ""
5228
5229 def for_update_clause(self, select, **kw):
5230 return " FOR UPDATE"
5231
5232 def returning_clause(
5233 self,
5234 stmt: UpdateBase,
5235 returning_cols: Sequence[_ColumnsClauseElement],
5236 *,
5237 populate_result_map: bool,
5238 **kw: Any,
5239 ) -> str:
5240 columns = [
5241 self._label_returning_column(
5242 stmt,
5243 column,
5244 populate_result_map,
5245 fallback_label_name=fallback_label_name,
5246 column_is_repeated=repeated,
5247 name=name,
5248 proxy_name=proxy_name,
5249 **kw,
5250 )
5251 for (
5252 name,
5253 proxy_name,
5254 fallback_label_name,
5255 column,
5256 repeated,
5257 ) in stmt._generate_columns_plus_names(
5258 True, cols=base._select_iterables(returning_cols)
5259 )
5260 ]
5261
5262 return "RETURNING " + ", ".join(columns)
5263
5264 def limit_clause(self, select, **kw):
5265 text = ""
5266 if select._limit_clause is not None:
5267 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5268 if select._offset_clause is not None:
5269 if select._limit_clause is None:
5270 text += "\n LIMIT -1"
5271 text += " OFFSET " + self.process(select._offset_clause, **kw)
5272 return text
5273
5274 def fetch_clause(
5275 self,
5276 select,
5277 fetch_clause=None,
5278 require_offset=False,
5279 use_literal_execute_for_simple_int=False,
5280 **kw,
5281 ):
5282 if fetch_clause is None:
5283 fetch_clause = select._fetch_clause
5284 fetch_clause_options = select._fetch_clause_options
5285 else:
5286 fetch_clause_options = {"percent": False, "with_ties": False}
5287
5288 text = ""
5289
5290 if select._offset_clause is not None:
5291 offset_clause = select._offset_clause
5292 if (
5293 use_literal_execute_for_simple_int
5294 and select._simple_int_clause(offset_clause)
5295 ):
5296 offset_clause = offset_clause.render_literal_execute()
5297 offset_str = self.process(offset_clause, **kw)
5298 text += "\n OFFSET %s ROWS" % offset_str
5299 elif require_offset:
5300 text += "\n OFFSET 0 ROWS"
5301
5302 if fetch_clause is not None:
5303 if (
5304 use_literal_execute_for_simple_int
5305 and select._simple_int_clause(fetch_clause)
5306 ):
5307 fetch_clause = fetch_clause.render_literal_execute()
5308 text += "\n FETCH FIRST %s%s ROWS %s" % (
5309 self.process(fetch_clause, **kw),
5310 " PERCENT" if fetch_clause_options["percent"] else "",
5311 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5312 )
5313 return text
5314
5315 def visit_table(
5316 self,
5317 table,
5318 asfrom=False,
5319 iscrud=False,
5320 ashint=False,
5321 fromhints=None,
5322 use_schema=True,
5323 from_linter=None,
5324 ambiguous_table_name_map=None,
5325 enclosing_alias=None,
5326 **kwargs,
5327 ):
5328 if from_linter:
5329 from_linter.froms[table] = table.fullname
5330
5331 if asfrom or ashint:
5332 effective_schema = self.preparer.schema_for_object(table)
5333
5334 if use_schema and effective_schema:
5335 ret = (
5336 self.preparer.quote_schema(effective_schema)
5337 + "."
5338 + self.preparer.quote(table.name)
5339 )
5340 else:
5341 ret = self.preparer.quote(table.name)
5342
5343 if (
5344 (
5345 enclosing_alias is None
5346 or enclosing_alias.element is not table
5347 )
5348 and not effective_schema
5349 and ambiguous_table_name_map
5350 and table.name in ambiguous_table_name_map
5351 ):
5352 anon_name = self._truncated_identifier(
5353 "alias", ambiguous_table_name_map[table.name]
5354 )
5355
5356 ret = ret + self.get_render_as_alias_suffix(
5357 self.preparer.format_alias(None, anon_name)
5358 )
5359
5360 if fromhints and table in fromhints:
5361 ret = self.format_from_hint_text(
5362 ret, table, fromhints[table], iscrud
5363 )
5364 return ret
5365 else:
5366 return ""
5367
5368 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5369 if from_linter:
5370 from_linter.edges.update(
5371 itertools.product(
5372 _de_clone(join.left._from_objects),
5373 _de_clone(join.right._from_objects),
5374 )
5375 )
5376
5377 if join.full:
5378 join_type = " FULL OUTER JOIN "
5379 elif join.isouter:
5380 join_type = " LEFT OUTER JOIN "
5381 else:
5382 join_type = " JOIN "
5383 return (
5384 join.left._compiler_dispatch(
5385 self, asfrom=True, from_linter=from_linter, **kwargs
5386 )
5387 + join_type
5388 + join.right._compiler_dispatch(
5389 self, asfrom=True, from_linter=from_linter, **kwargs
5390 )
5391 + " ON "
5392 # TODO: likely need asfrom=True here?
5393 + join.onclause._compiler_dispatch(
5394 self, from_linter=from_linter, **kwargs
5395 )
5396 )
5397
5398 def _setup_crud_hints(self, stmt, table_text):
5399 dialect_hints = {
5400 table: hint_text
5401 for (table, dialect), hint_text in stmt._hints.items()
5402 if dialect in ("*", self.dialect.name)
5403 }
5404 if stmt.table in dialect_hints:
5405 table_text = self.format_from_hint_text(
5406 table_text, stmt.table, dialect_hints[stmt.table], True
5407 )
5408 return dialect_hints, table_text
5409
5410 # within the realm of "insertmanyvalues sentinel columns",
5411 # these lookups match different kinds of Column() configurations
5412 # to specific backend capabilities. they are broken into two
5413 # lookups, one for autoincrement columns and the other for non
5414 # autoincrement columns
5415 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5416 {
5417 _SentinelDefaultCharacterization.CLIENTSIDE: (
5418 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5419 ),
5420 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5421 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5422 ),
5423 _SentinelDefaultCharacterization.NONE: (
5424 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5425 ),
5426 _SentinelDefaultCharacterization.IDENTITY: (
5427 InsertmanyvaluesSentinelOpts.IDENTITY
5428 ),
5429 _SentinelDefaultCharacterization.SEQUENCE: (
5430 InsertmanyvaluesSentinelOpts.SEQUENCE
5431 ),
5432 }
5433 )
5434 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5435 {
5436 _SentinelDefaultCharacterization.NONE: (
5437 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5438 ),
5439 }
5440 )
5441
5442 def _get_sentinel_column_for_table(
5443 self, table: Table
5444 ) -> Optional[Sequence[Column[Any]]]:
5445 """given a :class:`.Table`, return a usable sentinel column or
5446 columns for this dialect if any.
5447
5448 Return None if no sentinel columns could be identified, or raise an
5449 error if a column was marked as a sentinel explicitly but isn't
5450 compatible with this dialect.
5451
5452 """
5453
5454 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5455 sentinel_characteristics = table._sentinel_column_characteristics
5456
5457 sent_cols = sentinel_characteristics.columns
5458
5459 if sent_cols is None:
5460 return None
5461
5462 if sentinel_characteristics.is_autoinc:
5463 bitmask = self._sentinel_col_autoinc_lookup.get(
5464 sentinel_characteristics.default_characterization, 0
5465 )
5466 else:
5467 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5468 sentinel_characteristics.default_characterization, 0
5469 )
5470
5471 if sentinel_opts & bitmask:
5472 return sent_cols
5473
5474 if sentinel_characteristics.is_explicit:
5475 # a column was explicitly marked as insert_sentinel=True,
5476 # however it is not compatible with this dialect. they should
5477 # not indicate this column as a sentinel if they need to include
5478 # this dialect.
5479
5480 # TODO: do we want non-primary key explicit sentinel cols
5481 # that can gracefully degrade for some backends?
5482 # insert_sentinel="degrade" perhaps. not for the initial release.
5483 # I am hoping people are generally not dealing with this sentinel
5484 # business at all.
5485
5486 # if is_explicit is True, there will be only one sentinel column.
5487
5488 raise exc.InvalidRequestError(
5489 f"Column {sent_cols[0]} can't be explicitly "
5490 "marked as a sentinel column when using the "
5491 f"{self.dialect.name} dialect, as the "
5492 "particular type of default generation on this column is "
5493 "not currently compatible with this dialect's specific "
5494 f"INSERT..RETURNING syntax which can receive the "
5495 "server-generated value in "
5496 "a deterministic way. To remove this error, remove "
5497 "insert_sentinel=True from primary key autoincrement "
5498 "columns; these columns are automatically used as "
5499 "sentinels for supported dialects in any case."
5500 )
5501
5502 return None
5503
5504 def _deliver_insertmanyvalues_batches(
5505 self,
5506 statement: str,
5507 parameters: _DBAPIMultiExecuteParams,
5508 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5509 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5510 batch_size: int,
5511 sort_by_parameter_order: bool,
5512 schema_translate_map: Optional[SchemaTranslateMapType],
5513 ) -> Iterator[_InsertManyValuesBatch]:
5514 imv = self._insertmanyvalues
5515 assert imv is not None
5516
5517 if not imv.sentinel_param_keys:
5518 _sentinel_from_params = None
5519 else:
5520 _sentinel_from_params = operator.itemgetter(
5521 *imv.sentinel_param_keys
5522 )
5523
5524 lenparams = len(parameters)
5525 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5526 # backend doesn't support
5527 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5528 # at the moment this is basically SQL Server due to
5529 # not being able to use DEFAULT for identity column
5530 # just yield out that many single statements! still
5531 # faster than a whole connection.execute() call ;)
5532 #
5533 # note we still are taking advantage of the fact that we know
5534 # we are using RETURNING. The generalized approach of fetching
5535 # cursor.lastrowid etc. still goes through the more heavyweight
5536 # "ExecutionContext per statement" system as it isn't usable
5537 # as a generic "RETURNING" approach
5538 use_row_at_a_time = True
5539 downgraded = False
5540 elif not self.dialect.supports_multivalues_insert or (
5541 sort_by_parameter_order
5542 and self._result_columns
5543 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5544 ):
5545 # deterministic order was requested and the compiler could
5546 # not organize sentinel columns for this dialect/statement.
5547 # use row at a time
5548 use_row_at_a_time = True
5549 downgraded = True
5550 else:
5551 use_row_at_a_time = False
5552 downgraded = False
5553
5554 if use_row_at_a_time:
5555 for batchnum, (param, compiled_param) in enumerate(
5556 cast(
5557 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5558 zip(parameters, compiled_parameters),
5559 ),
5560 1,
5561 ):
5562 yield _InsertManyValuesBatch(
5563 statement,
5564 param,
5565 generic_setinputsizes,
5566 [param],
5567 (
5568 [_sentinel_from_params(compiled_param)]
5569 if _sentinel_from_params
5570 else []
5571 ),
5572 1,
5573 batchnum,
5574 lenparams,
5575 sort_by_parameter_order,
5576 downgraded,
5577 )
5578 return
5579
5580 if schema_translate_map:
5581 rst = functools.partial(
5582 self.preparer._render_schema_translates,
5583 schema_translate_map=schema_translate_map,
5584 )
5585 else:
5586 rst = None
5587
5588 imv_single_values_expr = imv.single_values_expr
5589 if rst:
5590 imv_single_values_expr = rst(imv_single_values_expr)
5591
5592 executemany_values = f"({imv_single_values_expr})"
5593 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5594
5595 # Use optional insertmanyvalues_max_parameters
5596 # to further shrink the batch size so that there are no more than
5597 # insertmanyvalues_max_parameters params.
5598 # Currently used by SQL Server, which limits statements to 2100 bound
5599 # parameters (actually 2099).
5600 max_params = self.dialect.insertmanyvalues_max_parameters
5601 if max_params:
5602 total_num_of_params = len(self.bind_names)
5603 num_params_per_batch = len(imv.insert_crud_params)
5604 num_params_outside_of_batch = (
5605 total_num_of_params - num_params_per_batch
5606 )
5607 batch_size = min(
5608 batch_size,
5609 (
5610 (max_params - num_params_outside_of_batch)
5611 // num_params_per_batch
5612 ),
5613 )
5614
5615 batches = cast("List[Sequence[Any]]", list(parameters))
5616 compiled_batches = cast(
5617 "List[Sequence[Any]]", list(compiled_parameters)
5618 )
5619
5620 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5621 batchnum = 1
5622 total_batches = lenparams // batch_size + (
5623 1 if lenparams % batch_size else 0
5624 )
5625
5626 insert_crud_params = imv.insert_crud_params
5627 assert insert_crud_params is not None
5628
5629 if rst:
5630 insert_crud_params = [
5631 (col, key, rst(expr), st)
5632 for col, key, expr, st in insert_crud_params
5633 ]
5634
5635 escaped_bind_names: Mapping[str, str]
5636 expand_pos_lower_index = expand_pos_upper_index = 0
5637
5638 if not self.positional:
5639 if self.escaped_bind_names:
5640 escaped_bind_names = self.escaped_bind_names
5641 else:
5642 escaped_bind_names = {}
5643
5644 all_keys = set(parameters[0])
5645
5646 def apply_placeholders(keys, formatted):
5647 for key in keys:
5648 key = escaped_bind_names.get(key, key)
5649 formatted = formatted.replace(
5650 self.bindtemplate % {"name": key},
5651 self.bindtemplate
5652 % {"name": f"{key}__EXECMANY_INDEX__"},
5653 )
5654 return formatted
5655
5656 if imv.embed_values_counter:
5657 imv_values_counter = ", _IMV_VALUES_COUNTER"
5658 else:
5659 imv_values_counter = ""
5660 formatted_values_clause = f"""({', '.join(
5661 apply_placeholders(bind_keys, formatted)
5662 for _, _, formatted, bind_keys in insert_crud_params
5663 )}{imv_values_counter})"""
5664
5665 keys_to_replace = all_keys.intersection(
5666 escaped_bind_names.get(key, key)
5667 for _, _, _, bind_keys in insert_crud_params
5668 for key in bind_keys
5669 )
5670 base_parameters = {
5671 key: parameters[0][key]
5672 for key in all_keys.difference(keys_to_replace)
5673 }
5674 executemany_values_w_comma = ""
5675 else:
5676 formatted_values_clause = ""
5677 keys_to_replace = set()
5678 base_parameters = {}
5679
5680 if imv.embed_values_counter:
5681 executemany_values_w_comma = (
5682 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5683 )
5684 else:
5685 executemany_values_w_comma = f"({imv_single_values_expr}), "
5686
5687 all_names_we_will_expand: Set[str] = set()
5688 for elem in imv.insert_crud_params:
5689 all_names_we_will_expand.update(elem[3])
5690
5691 # get the start and end position in a particular list
5692 # of parameters where we will be doing the "expanding".
5693 # statements can have params on either side or both sides,
5694 # given RETURNING and CTEs
5695 if all_names_we_will_expand:
5696 positiontup = self.positiontup
5697 assert positiontup is not None
5698
5699 all_expand_positions = {
5700 idx
5701 for idx, name in enumerate(positiontup)
5702 if name in all_names_we_will_expand
5703 }
5704 expand_pos_lower_index = min(all_expand_positions)
5705 expand_pos_upper_index = max(all_expand_positions) + 1
5706 assert (
5707 len(all_expand_positions)
5708 == expand_pos_upper_index - expand_pos_lower_index
5709 )
5710
5711 if self._numeric_binds:
5712 escaped = re.escape(self._numeric_binds_identifier_char)
5713 executemany_values_w_comma = re.sub(
5714 rf"{escaped}\d+", "%s", executemany_values_w_comma
5715 )
5716
5717 while batches:
5718 batch = batches[0:batch_size]
5719 compiled_batch = compiled_batches[0:batch_size]
5720
5721 batches[0:batch_size] = []
5722 compiled_batches[0:batch_size] = []
5723
5724 if batches:
5725 current_batch_size = batch_size
5726 else:
5727 current_batch_size = len(batch)
5728
5729 if generic_setinputsizes:
5730 # if setinputsizes is present, expand this collection to
5731 # suit the batch length as well
5732 # currently this will be mssql+pyodbc for internal dialects
5733 processed_setinputsizes = [
5734 (new_key, len_, typ)
5735 for new_key, len_, typ in (
5736 (f"{key}_{index}", len_, typ)
5737 for index in range(current_batch_size)
5738 for key, len_, typ in generic_setinputsizes
5739 )
5740 ]
5741
5742 replaced_parameters: Any
5743 if self.positional:
5744 num_ins_params = imv.num_positional_params_counted
5745
5746 batch_iterator: Iterable[Sequence[Any]]
5747 extra_params_left: Sequence[Any]
5748 extra_params_right: Sequence[Any]
5749
5750 if num_ins_params == len(batch[0]):
5751 extra_params_left = extra_params_right = ()
5752 batch_iterator = batch
5753 else:
5754 extra_params_left = batch[0][:expand_pos_lower_index]
5755 extra_params_right = batch[0][expand_pos_upper_index:]
5756 batch_iterator = (
5757 b[expand_pos_lower_index:expand_pos_upper_index]
5758 for b in batch
5759 )
5760
5761 if imv.embed_values_counter:
5762 expanded_values_string = (
5763 "".join(
5764 executemany_values_w_comma.replace(
5765 "_IMV_VALUES_COUNTER", str(i)
5766 )
5767 for i, _ in enumerate(batch)
5768 )
5769 )[:-2]
5770 else:
5771 expanded_values_string = (
5772 (executemany_values_w_comma * current_batch_size)
5773 )[:-2]
5774
5775 if self._numeric_binds and num_ins_params > 0:
5776 # numeric will always number the parameters inside of
5777 # VALUES (and thus order self.positiontup) to be higher
5778 # than non-VALUES parameters, no matter where in the
5779 # statement those non-VALUES parameters appear (this is
5780 # ensured in _process_numeric by numbering first all
5781 # params that are not in _values_bindparam)
5782 # therefore all extra params are always
5783 # on the left side and numbered lower than the VALUES
5784 # parameters
5785 assert not extra_params_right
5786
5787 start = expand_pos_lower_index + 1
5788 end = num_ins_params * (current_batch_size) + start
5789
5790 # need to format here, since statement may contain
5791 # unescaped %, while values_string contains just (%s, %s)
5792 positions = tuple(
5793 f"{self._numeric_binds_identifier_char}{i}"
5794 for i in range(start, end)
5795 )
5796 expanded_values_string = expanded_values_string % positions
5797
5798 replaced_statement = statement.replace(
5799 "__EXECMANY_TOKEN__", expanded_values_string
5800 )
5801
5802 replaced_parameters = tuple(
5803 itertools.chain.from_iterable(batch_iterator)
5804 )
5805
5806 replaced_parameters = (
5807 extra_params_left
5808 + replaced_parameters
5809 + extra_params_right
5810 )
5811
5812 else:
5813 replaced_values_clauses = []
5814 replaced_parameters = base_parameters.copy()
5815
5816 for i, param in enumerate(batch):
5817 fmv = formatted_values_clause.replace(
5818 "EXECMANY_INDEX__", str(i)
5819 )
5820 if imv.embed_values_counter:
5821 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5822
5823 replaced_values_clauses.append(fmv)
5824 replaced_parameters.update(
5825 {f"{key}__{i}": param[key] for key in keys_to_replace}
5826 )
5827
5828 replaced_statement = statement.replace(
5829 "__EXECMANY_TOKEN__",
5830 ", ".join(replaced_values_clauses),
5831 )
5832
5833 yield _InsertManyValuesBatch(
5834 replaced_statement,
5835 replaced_parameters,
5836 processed_setinputsizes,
5837 batch,
5838 (
5839 [_sentinel_from_params(cb) for cb in compiled_batch]
5840 if _sentinel_from_params
5841 else []
5842 ),
5843 current_batch_size,
5844 batchnum,
5845 total_batches,
5846 sort_by_parameter_order,
5847 False,
5848 )
5849 batchnum += 1
5850
5851 def visit_insert(
5852 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5853 ):
5854 compile_state = insert_stmt._compile_state_factory(
5855 insert_stmt, self, **kw
5856 )
5857 insert_stmt = compile_state.statement
5858
5859 if visiting_cte is not None:
5860 kw["visiting_cte"] = visiting_cte
5861 toplevel = False
5862 else:
5863 toplevel = not self.stack
5864
5865 if toplevel:
5866 self.isinsert = True
5867 if not self.dml_compile_state:
5868 self.dml_compile_state = compile_state
5869 if not self.compile_state:
5870 self.compile_state = compile_state
5871
5872 self.stack.append(
5873 {
5874 "correlate_froms": set(),
5875 "asfrom_froms": set(),
5876 "selectable": insert_stmt,
5877 }
5878 )
5879
5880 counted_bindparam = 0
5881
5882 # reset any incoming "visited_bindparam" collection
5883 visited_bindparam = None
5884
5885 # for positional, insertmanyvalues needs to know how many
5886 # bound parameters are in the VALUES sequence; there's no simple
5887 # rule because default expressions etc. can have zero or more
5888 # params inside them. After multiple attempts to figure this out,
5889 # this very simplistic "count after" works and is
5890 # likely the least amount of callcounts, though looks clumsy
5891 if self.positional and visiting_cte is None:
5892 # if we are inside a CTE, don't count parameters
5893 # here since they won't be for insertmanyvalues. keep
5894 # visited_bindparam at None so no counting happens.
5895 # see #9173
5896 visited_bindparam = []
5897
5898 crud_params_struct = crud._get_crud_params(
5899 self,
5900 insert_stmt,
5901 compile_state,
5902 toplevel,
5903 visited_bindparam=visited_bindparam,
5904 **kw,
5905 )
5906
5907 if self.positional and visited_bindparam is not None:
5908 counted_bindparam = len(visited_bindparam)
5909 if self._numeric_binds:
5910 if self._values_bindparam is not None:
5911 self._values_bindparam += visited_bindparam
5912 else:
5913 self._values_bindparam = visited_bindparam
5914
5915 crud_params_single = crud_params_struct.single_params
5916
5917 if (
5918 not crud_params_single
5919 and not self.dialect.supports_default_values
5920 and not self.dialect.supports_default_metavalue
5921 and not self.dialect.supports_empty_insert
5922 ):
5923 raise exc.CompileError(
5924 "The '%s' dialect with current database "
5925 "version settings does not support empty "
5926 "inserts." % self.dialect.name
5927 )
5928
5929 if compile_state._has_multi_parameters:
5930 if not self.dialect.supports_multivalues_insert:
5931 raise exc.CompileError(
5932 "The '%s' dialect with current database "
5933 "version settings does not support "
5934 "in-place multirow inserts." % self.dialect.name
5935 )
5936 elif (
5937 self.implicit_returning or insert_stmt._returning
5938 ) and insert_stmt._sort_by_parameter_order:
5939 raise exc.CompileError(
5940 "RETURNING cannot be deterministically sorted when "
5941 "using an INSERT which includes multi-row values()."
5942 )
5943 crud_params_single = crud_params_struct.single_params
5944 else:
5945 crud_params_single = crud_params_struct.single_params
5946
5947 preparer = self.preparer
5948 supports_default_values = self.dialect.supports_default_values
5949
5950 text = "INSERT "
5951
5952 if insert_stmt._prefixes:
5953 text += self._generate_prefixes(
5954 insert_stmt, insert_stmt._prefixes, **kw
5955 )
5956
5957 text += "INTO "
5958 table_text = preparer.format_table(insert_stmt.table)
5959
5960 if insert_stmt._hints:
5961 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
5962
5963 if insert_stmt._independent_ctes:
5964 self._dispatch_independent_ctes(insert_stmt, kw)
5965
5966 text += table_text
5967
5968 if crud_params_single or not supports_default_values:
5969 text += " (%s)" % ", ".join(
5970 [expr for _, expr, _, _ in crud_params_single]
5971 )
5972
5973 # look for insertmanyvalues attributes that would have been configured
5974 # by crud.py as it scanned through the columns to be part of the
5975 # INSERT
5976 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
5977 named_sentinel_params: Optional[Sequence[str]] = None
5978 add_sentinel_cols = None
5979 implicit_sentinel = False
5980
5981 returning_cols = self.implicit_returning or insert_stmt._returning
5982 if returning_cols:
5983 add_sentinel_cols = crud_params_struct.use_sentinel_columns
5984 if add_sentinel_cols is not None:
5985 assert use_insertmanyvalues
5986
5987 # search for the sentinel column explicitly present
5988 # in the INSERT columns list, and additionally check that
5989 # this column has a bound parameter name set up that's in the
5990 # parameter list. If both of these cases are present, it means
5991 # we will have a client side value for the sentinel in each
5992 # parameter set.
5993
5994 _params_by_col = {
5995 col: param_names
5996 for col, _, _, param_names in crud_params_single
5997 }
5998 named_sentinel_params = []
5999 for _add_sentinel_col in add_sentinel_cols:
6000 if _add_sentinel_col not in _params_by_col:
6001 named_sentinel_params = None
6002 break
6003 param_name = self._within_exec_param_key_getter(
6004 _add_sentinel_col
6005 )
6006 if param_name not in _params_by_col[_add_sentinel_col]:
6007 named_sentinel_params = None
6008 break
6009 named_sentinel_params.append(param_name)
6010
6011 if named_sentinel_params is None:
6012 # if we are not going to have a client side value for
6013 # the sentinel in the parameter set, that means it's
6014 # an autoincrement, an IDENTITY, or a server-side SQL
6015 # expression like nextval('seqname'). So this is
6016 # an "implicit" sentinel; we will look for it in
6017 # RETURNING
6018 # only, and then sort on it. For this case on PG,
6019 # SQL Server we have to use a special INSERT form
6020 # that guarantees the server side function lines up with
6021 # the entries in the VALUES.
6022 if (
6023 self.dialect.insertmanyvalues_implicit_sentinel
6024 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6025 ):
6026 implicit_sentinel = True
6027 else:
6028 # here, we are not using a sentinel at all
6029 # and we are likely the SQLite dialect.
6030 # The first add_sentinel_col that we have should not
6031 # be marked as "insert_sentinel=True". if it was,
6032 # an error should have been raised in
6033 # _get_sentinel_column_for_table.
6034 assert not add_sentinel_cols[0]._insert_sentinel, (
6035 "sentinel selection rules should have prevented "
6036 "us from getting here for this dialect"
6037 )
6038
6039 # always put the sentinel columns last. even if they are
6040 # in the returning list already, they will be there twice
6041 # then.
6042 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6043
6044 returning_clause = self.returning_clause(
6045 insert_stmt,
6046 returning_cols,
6047 populate_result_map=toplevel,
6048 )
6049
6050 if self.returning_precedes_values:
6051 text += " " + returning_clause
6052
6053 else:
6054 returning_clause = None
6055
6056 if insert_stmt.select is not None:
6057 # placed here by crud.py
6058 select_text = self.process(
6059 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6060 )
6061
6062 if self.ctes and self.dialect.cte_follows_insert:
6063 nesting_level = len(self.stack) if not toplevel else None
6064 text += " %s%s" % (
6065 self._render_cte_clause(
6066 nesting_level=nesting_level,
6067 include_following_stack=True,
6068 ),
6069 select_text,
6070 )
6071 else:
6072 text += " %s" % select_text
6073 elif not crud_params_single and supports_default_values:
6074 text += " DEFAULT VALUES"
6075 if use_insertmanyvalues:
6076 self._insertmanyvalues = _InsertManyValues(
6077 True,
6078 self.dialect.default_metavalue_token,
6079 crud_params_single,
6080 counted_bindparam,
6081 sort_by_parameter_order=(
6082 insert_stmt._sort_by_parameter_order
6083 ),
6084 includes_upsert_behaviors=(
6085 insert_stmt._post_values_clause is not None
6086 ),
6087 sentinel_columns=add_sentinel_cols,
6088 num_sentinel_columns=(
6089 len(add_sentinel_cols) if add_sentinel_cols else 0
6090 ),
6091 implicit_sentinel=implicit_sentinel,
6092 )
6093 elif compile_state._has_multi_parameters:
6094 text += " VALUES %s" % (
6095 ", ".join(
6096 "(%s)"
6097 % (", ".join(value for _, _, value, _ in crud_param_set))
6098 for crud_param_set in crud_params_struct.all_multi_params
6099 ),
6100 )
6101 elif use_insertmanyvalues:
6102 if (
6103 implicit_sentinel
6104 and (
6105 self.dialect.insertmanyvalues_implicit_sentinel
6106 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6107 )
6108 # this is checking if we have
6109 # INSERT INTO table (id) VALUES (DEFAULT).
6110 and not (crud_params_struct.is_default_metavalue_only)
6111 ):
6112 # if we have a sentinel column that is server generated,
6113 # then for selected backends render the VALUES list as a
6114 # subquery. This is the orderable form supported by
6115 # PostgreSQL and in fewer cases SQL Server
6116 embed_sentinel_value = True
6117
6118 render_bind_casts = (
6119 self.dialect.insertmanyvalues_implicit_sentinel
6120 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6121 )
6122
6123 add_sentinel_set = add_sentinel_cols or ()
6124
6125 insert_single_values_expr = ", ".join(
6126 [
6127 value
6128 for col, _, value, _ in crud_params_single
6129 if col not in add_sentinel_set
6130 ]
6131 )
6132
6133 colnames = ", ".join(
6134 f"p{i}"
6135 for i, cp in enumerate(crud_params_single)
6136 if cp[0] not in add_sentinel_set
6137 )
6138
6139 if render_bind_casts:
6140 # render casts for the SELECT list. For PG, we are
6141 # already rendering bind casts in the parameter list,
6142 # selectively for the more "tricky" types like ARRAY.
6143 # however, even for the "easy" types, if the parameter
6144 # is NULL for every entry, PG gives up and says
6145 # "it must be TEXT", which fails for other easy types
6146 # like ints. So we cast on this side too.
6147 colnames_w_cast = ", ".join(
6148 (
6149 self.render_bind_cast(
6150 col.type,
6151 col.type._unwrapped_dialect_impl(self.dialect),
6152 f"p{i}",
6153 )
6154 if col not in add_sentinel_set
6155 else expr
6156 )
6157 for i, (col, _, expr, _) in enumerate(
6158 crud_params_single
6159 )
6160 )
6161 else:
6162 colnames_w_cast = ", ".join(
6163 (f"p{i}" if col not in add_sentinel_set else expr)
6164 for i, (col, _, expr, _) in enumerate(
6165 crud_params_single
6166 )
6167 )
6168
6169 insert_crud_params = [
6170 elem
6171 for elem in crud_params_single
6172 if elem[0] not in add_sentinel_set
6173 ]
6174
6175 text += (
6176 f" SELECT {colnames_w_cast} FROM "
6177 f"(VALUES ({insert_single_values_expr})) "
6178 f"AS imp_sen({colnames}, sen_counter) "
6179 "ORDER BY sen_counter"
6180 )
6181
6182 else:
6183 # otherwise, if no sentinel or backend doesn't support
6184 # orderable subquery form, use a plain VALUES list
6185 embed_sentinel_value = False
6186 insert_crud_params = crud_params_single
6187 insert_single_values_expr = ", ".join(
6188 [value for _, _, value, _ in crud_params_single]
6189 )
6190
6191 text += f" VALUES ({insert_single_values_expr})"
6192
6193 self._insertmanyvalues = _InsertManyValues(
6194 is_default_expr=False,
6195 single_values_expr=insert_single_values_expr,
6196 insert_crud_params=insert_crud_params,
6197 num_positional_params_counted=counted_bindparam,
6198 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6199 includes_upsert_behaviors=(
6200 insert_stmt._post_values_clause is not None
6201 ),
6202 sentinel_columns=add_sentinel_cols,
6203 num_sentinel_columns=(
6204 len(add_sentinel_cols) if add_sentinel_cols else 0
6205 ),
6206 sentinel_param_keys=named_sentinel_params,
6207 implicit_sentinel=implicit_sentinel,
6208 embed_values_counter=embed_sentinel_value,
6209 )
6210
6211 else:
6212 insert_single_values_expr = ", ".join(
6213 [value for _, _, value, _ in crud_params_single]
6214 )
6215
6216 text += f" VALUES ({insert_single_values_expr})"
6217
6218 if insert_stmt._post_values_clause is not None:
6219 post_values_clause = self.process(
6220 insert_stmt._post_values_clause, **kw
6221 )
6222 if post_values_clause:
6223 text += " " + post_values_clause
6224
6225 if returning_clause and not self.returning_precedes_values:
6226 text += " " + returning_clause
6227
6228 if self.ctes and not self.dialect.cte_follows_insert:
6229 nesting_level = len(self.stack) if not toplevel else None
6230 text = (
6231 self._render_cte_clause(
6232 nesting_level=nesting_level,
6233 include_following_stack=True,
6234 )
6235 + text
6236 )
6237
6238 self.stack.pop(-1)
6239
6240 return text
6241
6242 def update_limit_clause(self, update_stmt):
6243 """Provide a hook for MySQL to add LIMIT to the UPDATE"""
6244 return None
6245
6246 def delete_limit_clause(self, delete_stmt):
6247 """Provide a hook for MySQL to add LIMIT to the DELETE"""
6248 return None
6249
6250 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6251 """Provide a hook to override the initial table clause
6252 in an UPDATE statement.
6253
6254 MySQL overrides this.
6255
6256 """
6257 kw["asfrom"] = True
6258 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6259
6260 def update_from_clause(
6261 self, update_stmt, from_table, extra_froms, from_hints, **kw
6262 ):
6263 """Provide a hook to override the generation of an
6264 UPDATE..FROM clause.
6265
6266 MySQL and MSSQL override this.
6267
6268 """
6269 raise NotImplementedError(
6270 "This backend does not support multiple-table "
6271 "criteria within UPDATE"
6272 )
6273
6274 def visit_update(
6275 self,
6276 update_stmt: Update,
6277 visiting_cte: Optional[CTE] = None,
6278 **kw: Any,
6279 ) -> str:
6280 compile_state = update_stmt._compile_state_factory(
6281 update_stmt, self, **kw
6282 )
6283 if TYPE_CHECKING:
6284 assert isinstance(compile_state, UpdateDMLState)
6285 update_stmt = compile_state.statement # type: ignore[assignment]
6286
6287 if visiting_cte is not None:
6288 kw["visiting_cte"] = visiting_cte
6289 toplevel = False
6290 else:
6291 toplevel = not self.stack
6292
6293 if toplevel:
6294 self.isupdate = True
6295 if not self.dml_compile_state:
6296 self.dml_compile_state = compile_state
6297 if not self.compile_state:
6298 self.compile_state = compile_state
6299
6300 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6301 from_linter = FromLinter({}, set())
6302 warn_linting = self.linting & WARN_LINTING
6303 if toplevel:
6304 self.from_linter = from_linter
6305 else:
6306 from_linter = None
6307 warn_linting = False
6308
6309 extra_froms = compile_state._extra_froms
6310 is_multitable = bool(extra_froms)
6311
6312 if is_multitable:
6313 # main table might be a JOIN
6314 main_froms = set(_from_objects(update_stmt.table))
6315 render_extra_froms = [
6316 f for f in extra_froms if f not in main_froms
6317 ]
6318 correlate_froms = main_froms.union(extra_froms)
6319 else:
6320 render_extra_froms = []
6321 correlate_froms = {update_stmt.table}
6322
6323 self.stack.append(
6324 {
6325 "correlate_froms": correlate_froms,
6326 "asfrom_froms": correlate_froms,
6327 "selectable": update_stmt,
6328 }
6329 )
6330
6331 text = "UPDATE "
6332
6333 if update_stmt._prefixes:
6334 text += self._generate_prefixes(
6335 update_stmt, update_stmt._prefixes, **kw
6336 )
6337
6338 table_text = self.update_tables_clause(
6339 update_stmt,
6340 update_stmt.table,
6341 render_extra_froms,
6342 from_linter=from_linter,
6343 **kw,
6344 )
6345 crud_params_struct = crud._get_crud_params(
6346 self, update_stmt, compile_state, toplevel, **kw
6347 )
6348 crud_params = crud_params_struct.single_params
6349
6350 if update_stmt._hints:
6351 dialect_hints, table_text = self._setup_crud_hints(
6352 update_stmt, table_text
6353 )
6354 else:
6355 dialect_hints = None
6356
6357 if update_stmt._independent_ctes:
6358 self._dispatch_independent_ctes(update_stmt, kw)
6359
6360 text += table_text
6361
6362 text += " SET "
6363 text += ", ".join(
6364 expr + "=" + value
6365 for _, expr, value, _ in cast(
6366 "List[Tuple[Any, str, str, Any]]", crud_params
6367 )
6368 )
6369
6370 if self.implicit_returning or update_stmt._returning:
6371 if self.returning_precedes_values:
6372 text += " " + self.returning_clause(
6373 update_stmt,
6374 self.implicit_returning or update_stmt._returning,
6375 populate_result_map=toplevel,
6376 )
6377
6378 if extra_froms:
6379 extra_from_text = self.update_from_clause(
6380 update_stmt,
6381 update_stmt.table,
6382 render_extra_froms,
6383 dialect_hints,
6384 from_linter=from_linter,
6385 **kw,
6386 )
6387 if extra_from_text:
6388 text += " " + extra_from_text
6389
6390 if update_stmt._where_criteria:
6391 t = self._generate_delimited_and_list(
6392 update_stmt._where_criteria, from_linter=from_linter, **kw
6393 )
6394 if t:
6395 text += " WHERE " + t
6396
6397 limit_clause = self.update_limit_clause(update_stmt)
6398 if limit_clause:
6399 text += " " + limit_clause
6400
6401 if (
6402 self.implicit_returning or update_stmt._returning
6403 ) and not self.returning_precedes_values:
6404 text += " " + self.returning_clause(
6405 update_stmt,
6406 self.implicit_returning or update_stmt._returning,
6407 populate_result_map=toplevel,
6408 )
6409
6410 if self.ctes:
6411 nesting_level = len(self.stack) if not toplevel else None
6412 text = self._render_cte_clause(nesting_level=nesting_level) + text
6413
6414 if warn_linting:
6415 assert from_linter is not None
6416 from_linter.warn(stmt_type="UPDATE")
6417
6418 self.stack.pop(-1)
6419
6420 return text # type: ignore[no-any-return]
6421
6422 def delete_extra_from_clause(
6423 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6424 ):
6425 """Provide a hook to override the generation of an
6426 DELETE..FROM clause.
6427
6428 This can be used to implement DELETE..USING for example.
6429
6430 MySQL and MSSQL override this.
6431
6432 """
6433 raise NotImplementedError(
6434 "This backend does not support multiple-table "
6435 "criteria within DELETE"
6436 )
6437
6438 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6439 return from_table._compiler_dispatch(
6440 self, asfrom=True, iscrud=True, **kw
6441 )
6442
6443 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6444 compile_state = delete_stmt._compile_state_factory(
6445 delete_stmt, self, **kw
6446 )
6447 delete_stmt = compile_state.statement
6448
6449 if visiting_cte is not None:
6450 kw["visiting_cte"] = visiting_cte
6451 toplevel = False
6452 else:
6453 toplevel = not self.stack
6454
6455 if toplevel:
6456 self.isdelete = True
6457 if not self.dml_compile_state:
6458 self.dml_compile_state = compile_state
6459 if not self.compile_state:
6460 self.compile_state = compile_state
6461
6462 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6463 from_linter = FromLinter({}, set())
6464 warn_linting = self.linting & WARN_LINTING
6465 if toplevel:
6466 self.from_linter = from_linter
6467 else:
6468 from_linter = None
6469 warn_linting = False
6470
6471 extra_froms = compile_state._extra_froms
6472
6473 correlate_froms = {delete_stmt.table}.union(extra_froms)
6474 self.stack.append(
6475 {
6476 "correlate_froms": correlate_froms,
6477 "asfrom_froms": correlate_froms,
6478 "selectable": delete_stmt,
6479 }
6480 )
6481
6482 text = "DELETE "
6483
6484 if delete_stmt._prefixes:
6485 text += self._generate_prefixes(
6486 delete_stmt, delete_stmt._prefixes, **kw
6487 )
6488
6489 text += "FROM "
6490
6491 try:
6492 table_text = self.delete_table_clause(
6493 delete_stmt,
6494 delete_stmt.table,
6495 extra_froms,
6496 from_linter=from_linter,
6497 )
6498 except TypeError:
6499 # anticipate 3rd party dialects that don't include **kw
6500 # TODO: remove in 2.1
6501 table_text = self.delete_table_clause(
6502 delete_stmt, delete_stmt.table, extra_froms
6503 )
6504 if from_linter:
6505 _ = self.process(delete_stmt.table, from_linter=from_linter)
6506
6507 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6508
6509 if delete_stmt._hints:
6510 dialect_hints, table_text = self._setup_crud_hints(
6511 delete_stmt, table_text
6512 )
6513 else:
6514 dialect_hints = None
6515
6516 if delete_stmt._independent_ctes:
6517 self._dispatch_independent_ctes(delete_stmt, kw)
6518
6519 text += table_text
6520
6521 if (
6522 self.implicit_returning or delete_stmt._returning
6523 ) and self.returning_precedes_values:
6524 text += " " + self.returning_clause(
6525 delete_stmt,
6526 self.implicit_returning or delete_stmt._returning,
6527 populate_result_map=toplevel,
6528 )
6529
6530 if extra_froms:
6531 extra_from_text = self.delete_extra_from_clause(
6532 delete_stmt,
6533 delete_stmt.table,
6534 extra_froms,
6535 dialect_hints,
6536 from_linter=from_linter,
6537 **kw,
6538 )
6539 if extra_from_text:
6540 text += " " + extra_from_text
6541
6542 if delete_stmt._where_criteria:
6543 t = self._generate_delimited_and_list(
6544 delete_stmt._where_criteria, from_linter=from_linter, **kw
6545 )
6546 if t:
6547 text += " WHERE " + t
6548
6549 limit_clause = self.delete_limit_clause(delete_stmt)
6550 if limit_clause:
6551 text += " " + limit_clause
6552
6553 if (
6554 self.implicit_returning or delete_stmt._returning
6555 ) and not self.returning_precedes_values:
6556 text += " " + self.returning_clause(
6557 delete_stmt,
6558 self.implicit_returning or delete_stmt._returning,
6559 populate_result_map=toplevel,
6560 )
6561
6562 if self.ctes:
6563 nesting_level = len(self.stack) if not toplevel else None
6564 text = self._render_cte_clause(nesting_level=nesting_level) + text
6565
6566 if warn_linting:
6567 assert from_linter is not None
6568 from_linter.warn(stmt_type="DELETE")
6569
6570 self.stack.pop(-1)
6571
6572 return text
6573
6574 def visit_savepoint(self, savepoint_stmt, **kw):
6575 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6576
6577 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6578 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6579 savepoint_stmt
6580 )
6581
6582 def visit_release_savepoint(self, savepoint_stmt, **kw):
6583 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6584 savepoint_stmt
6585 )
6586
6587
6588class StrSQLCompiler(SQLCompiler):
6589 """A :class:`.SQLCompiler` subclass which allows a small selection
6590 of non-standard SQL features to render into a string value.
6591
6592 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6593 element is directly stringified without calling upon the
6594 :meth:`_expression.ClauseElement.compile` method.
6595 It can render a limited set
6596 of non-standard SQL constructs to assist in basic stringification,
6597 however for more substantial custom or dialect-specific SQL constructs,
6598 it will be necessary to make use of
6599 :meth:`_expression.ClauseElement.compile`
6600 directly.
6601
6602 .. seealso::
6603
6604 :ref:`faq_sql_expression_string`
6605
6606 """
6607
6608 def _fallback_column_name(self, column):
6609 return "<name unknown>"
6610
6611 @util.preload_module("sqlalchemy.engine.url")
6612 def visit_unsupported_compilation(self, element, err, **kw):
6613 if element.stringify_dialect != "default":
6614 url = util.preloaded.engine_url
6615 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6616
6617 compiler = dialect.statement_compiler(
6618 dialect, None, _supporting_against=self
6619 )
6620 if not isinstance(compiler, StrSQLCompiler):
6621 return compiler.process(element, **kw)
6622
6623 return super().visit_unsupported_compilation(element, err)
6624
6625 def visit_getitem_binary(self, binary, operator, **kw):
6626 return "%s[%s]" % (
6627 self.process(binary.left, **kw),
6628 self.process(binary.right, **kw),
6629 )
6630
6631 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6632 return self.visit_getitem_binary(binary, operator, **kw)
6633
6634 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6635 return self.visit_getitem_binary(binary, operator, **kw)
6636
6637 def visit_sequence(self, sequence, **kw):
6638 return (
6639 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6640 )
6641
6642 def returning_clause(
6643 self,
6644 stmt: UpdateBase,
6645 returning_cols: Sequence[_ColumnsClauseElement],
6646 *,
6647 populate_result_map: bool,
6648 **kw: Any,
6649 ) -> str:
6650 columns = [
6651 self._label_select_column(None, c, True, False, {})
6652 for c in base._select_iterables(returning_cols)
6653 ]
6654 return "RETURNING " + ", ".join(columns)
6655
6656 def update_from_clause(
6657 self, update_stmt, from_table, extra_froms, from_hints, **kw
6658 ):
6659 kw["asfrom"] = True
6660 return "FROM " + ", ".join(
6661 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6662 for t in extra_froms
6663 )
6664
6665 def delete_extra_from_clause(
6666 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6667 ):
6668 kw["asfrom"] = True
6669 return ", " + ", ".join(
6670 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6671 for t in extra_froms
6672 )
6673
6674 def visit_empty_set_expr(self, element_types, **kw):
6675 return "SELECT 1 WHERE 1!=1"
6676
6677 def get_from_hint_text(self, table, text):
6678 return "[%s]" % text
6679
6680 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6681 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6682
6683 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6684 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6685
6686 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6687 return "<regexp replace>(%s, %s)" % (
6688 binary.left._compiler_dispatch(self, **kw),
6689 binary.right._compiler_dispatch(self, **kw),
6690 )
6691
6692 def visit_try_cast(self, cast, **kwargs):
6693 return "TRY_CAST(%s AS %s)" % (
6694 cast.clause._compiler_dispatch(self, **kwargs),
6695 cast.typeclause._compiler_dispatch(self, **kwargs),
6696 )
6697
6698
6699class DDLCompiler(Compiled):
6700 is_ddl = True
6701
6702 if TYPE_CHECKING:
6703
6704 def __init__(
6705 self,
6706 dialect: Dialect,
6707 statement: ExecutableDDLElement,
6708 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6709 render_schema_translate: bool = ...,
6710 compile_kwargs: Mapping[str, Any] = ...,
6711 ): ...
6712
6713 @util.ro_memoized_property
6714 def sql_compiler(self) -> SQLCompiler:
6715 return self.dialect.statement_compiler(
6716 self.dialect, None, schema_translate_map=self.schema_translate_map
6717 )
6718
6719 @util.memoized_property
6720 def type_compiler(self):
6721 return self.dialect.type_compiler_instance
6722
6723 def construct_params(
6724 self,
6725 params: Optional[_CoreSingleExecuteParams] = None,
6726 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6727 escape_names: bool = True,
6728 ) -> Optional[_MutableCoreSingleExecuteParams]:
6729 return None
6730
6731 def visit_ddl(self, ddl, **kwargs):
6732 # table events can substitute table and schema name
6733 context = ddl.context
6734 if isinstance(ddl.target, schema.Table):
6735 context = context.copy()
6736
6737 preparer = self.preparer
6738 path = preparer.format_table_seq(ddl.target)
6739 if len(path) == 1:
6740 table, sch = path[0], ""
6741 else:
6742 table, sch = path[-1], path[0]
6743
6744 context.setdefault("table", table)
6745 context.setdefault("schema", sch)
6746 context.setdefault("fullname", preparer.format_table(ddl.target))
6747
6748 return self.sql_compiler.post_process_text(ddl.statement % context)
6749
6750 def visit_create_schema(self, create, **kw):
6751 text = "CREATE SCHEMA "
6752 if create.if_not_exists:
6753 text += "IF NOT EXISTS "
6754 return text + self.preparer.format_schema(create.element)
6755
6756 def visit_drop_schema(self, drop, **kw):
6757 text = "DROP SCHEMA "
6758 if drop.if_exists:
6759 text += "IF EXISTS "
6760 text += self.preparer.format_schema(drop.element)
6761 if drop.cascade:
6762 text += " CASCADE"
6763 return text
6764
6765 def visit_create_table(self, create, **kw):
6766 table = create.element
6767 preparer = self.preparer
6768
6769 text = "\nCREATE "
6770 if table._prefixes:
6771 text += " ".join(table._prefixes) + " "
6772
6773 text += "TABLE "
6774 if create.if_not_exists:
6775 text += "IF NOT EXISTS "
6776
6777 text += preparer.format_table(table) + " "
6778
6779 create_table_suffix = self.create_table_suffix(table)
6780 if create_table_suffix:
6781 text += create_table_suffix + " "
6782
6783 text += "("
6784
6785 separator = "\n"
6786
6787 # if only one primary key, specify it along with the column
6788 first_pk = False
6789 for create_column in create.columns:
6790 column = create_column.element
6791 try:
6792 processed = self.process(
6793 create_column, first_pk=column.primary_key and not first_pk
6794 )
6795 if processed is not None:
6796 text += separator
6797 separator = ", \n"
6798 text += "\t" + processed
6799 if column.primary_key:
6800 first_pk = True
6801 except exc.CompileError as ce:
6802 raise exc.CompileError(
6803 "(in table '%s', column '%s'): %s"
6804 % (table.description, column.name, ce.args[0])
6805 ) from ce
6806
6807 const = self.create_table_constraints(
6808 table,
6809 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6810 )
6811 if const:
6812 text += separator + "\t" + const
6813
6814 text += "\n)%s\n\n" % self.post_create_table(table)
6815 return text
6816
6817 def visit_create_column(self, create, first_pk=False, **kw):
6818 column = create.element
6819
6820 if column.system:
6821 return None
6822
6823 text = self.get_column_specification(column, first_pk=first_pk)
6824 const = " ".join(
6825 self.process(constraint) for constraint in column.constraints
6826 )
6827 if const:
6828 text += " " + const
6829
6830 return text
6831
6832 def create_table_constraints(
6833 self, table, _include_foreign_key_constraints=None, **kw
6834 ):
6835 # On some DB order is significant: visit PK first, then the
6836 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6837 constraints = []
6838 if table.primary_key:
6839 constraints.append(table.primary_key)
6840
6841 all_fkcs = table.foreign_key_constraints
6842 if _include_foreign_key_constraints is not None:
6843 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6844 else:
6845 omit_fkcs = set()
6846
6847 constraints.extend(
6848 [
6849 c
6850 for c in table._sorted_constraints
6851 if c is not table.primary_key and c not in omit_fkcs
6852 ]
6853 )
6854
6855 return ", \n\t".join(
6856 p
6857 for p in (
6858 self.process(constraint)
6859 for constraint in constraints
6860 if (constraint._should_create_for_compiler(self))
6861 and (
6862 not self.dialect.supports_alter
6863 or not getattr(constraint, "use_alter", False)
6864 )
6865 )
6866 if p is not None
6867 )
6868
6869 def visit_drop_table(self, drop, **kw):
6870 text = "\nDROP TABLE "
6871 if drop.if_exists:
6872 text += "IF EXISTS "
6873 return text + self.preparer.format_table(drop.element)
6874
6875 def visit_drop_view(self, drop, **kw):
6876 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
6877
6878 def _verify_index_table(self, index: Index) -> None:
6879 if index.table is None:
6880 raise exc.CompileError(
6881 "Index '%s' is not associated with any table." % index.name
6882 )
6883
6884 def visit_create_index(
6885 self, create, include_schema=False, include_table_schema=True, **kw
6886 ):
6887 index = create.element
6888 self._verify_index_table(index)
6889 preparer = self.preparer
6890 text = "CREATE "
6891 if index.unique:
6892 text += "UNIQUE "
6893 if index.name is None:
6894 raise exc.CompileError(
6895 "CREATE INDEX requires that the index have a name"
6896 )
6897
6898 text += "INDEX "
6899 if create.if_not_exists:
6900 text += "IF NOT EXISTS "
6901
6902 text += "%s ON %s (%s)" % (
6903 self._prepared_index_name(index, include_schema=include_schema),
6904 preparer.format_table(
6905 index.table, use_schema=include_table_schema
6906 ),
6907 ", ".join(
6908 self.sql_compiler.process(
6909 expr, include_table=False, literal_binds=True
6910 )
6911 for expr in index.expressions
6912 ),
6913 )
6914 return text
6915
6916 def visit_drop_index(self, drop, **kw):
6917 index = drop.element
6918
6919 if index.name is None:
6920 raise exc.CompileError(
6921 "DROP INDEX requires that the index have a name"
6922 )
6923 text = "\nDROP INDEX "
6924 if drop.if_exists:
6925 text += "IF EXISTS "
6926
6927 return text + self._prepared_index_name(index, include_schema=True)
6928
6929 def _prepared_index_name(
6930 self, index: Index, include_schema: bool = False
6931 ) -> str:
6932 if index.table is not None:
6933 effective_schema = self.preparer.schema_for_object(index.table)
6934 else:
6935 effective_schema = None
6936 if include_schema and effective_schema:
6937 schema_name = self.preparer.quote_schema(effective_schema)
6938 else:
6939 schema_name = None
6940
6941 index_name: str = self.preparer.format_index(index)
6942
6943 if schema_name:
6944 index_name = schema_name + "." + index_name
6945 return index_name
6946
6947 def visit_add_constraint(self, create, **kw):
6948 return "ALTER TABLE %s ADD %s" % (
6949 self.preparer.format_table(create.element.table),
6950 self.process(create.element),
6951 )
6952
6953 def visit_set_table_comment(self, create, **kw):
6954 return "COMMENT ON TABLE %s IS %s" % (
6955 self.preparer.format_table(create.element),
6956 self.sql_compiler.render_literal_value(
6957 create.element.comment, sqltypes.String()
6958 ),
6959 )
6960
6961 def visit_drop_table_comment(self, drop, **kw):
6962 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
6963 drop.element
6964 )
6965
6966 def visit_set_column_comment(self, create, **kw):
6967 return "COMMENT ON COLUMN %s IS %s" % (
6968 self.preparer.format_column(
6969 create.element, use_table=True, use_schema=True
6970 ),
6971 self.sql_compiler.render_literal_value(
6972 create.element.comment, sqltypes.String()
6973 ),
6974 )
6975
6976 def visit_drop_column_comment(self, drop, **kw):
6977 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
6978 drop.element, use_table=True
6979 )
6980
6981 def visit_set_constraint_comment(self, create, **kw):
6982 raise exc.UnsupportedCompilationError(self, type(create))
6983
6984 def visit_drop_constraint_comment(self, drop, **kw):
6985 raise exc.UnsupportedCompilationError(self, type(drop))
6986
6987 def get_identity_options(self, identity_options: IdentityOptions) -> str:
6988 text = []
6989 if identity_options.increment is not None:
6990 text.append("INCREMENT BY %d" % identity_options.increment)
6991 if identity_options.start is not None:
6992 text.append("START WITH %d" % identity_options.start)
6993 if identity_options.minvalue is not None:
6994 text.append("MINVALUE %d" % identity_options.minvalue)
6995 if identity_options.maxvalue is not None:
6996 text.append("MAXVALUE %d" % identity_options.maxvalue)
6997 if identity_options.nominvalue is not None:
6998 text.append("NO MINVALUE")
6999 if identity_options.nomaxvalue is not None:
7000 text.append("NO MAXVALUE")
7001 if identity_options.cache is not None:
7002 text.append("CACHE %d" % identity_options.cache)
7003 if identity_options.cycle is not None:
7004 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7005 return " ".join(text)
7006
7007 def visit_create_sequence(self, create, prefix=None, **kw):
7008 text = "CREATE SEQUENCE "
7009 if create.if_not_exists:
7010 text += "IF NOT EXISTS "
7011 text += self.preparer.format_sequence(create.element)
7012
7013 if prefix:
7014 text += prefix
7015 options = self.get_identity_options(create.element)
7016 if options:
7017 text += " " + options
7018 return text
7019
7020 def visit_drop_sequence(self, drop, **kw):
7021 text = "DROP SEQUENCE "
7022 if drop.if_exists:
7023 text += "IF EXISTS "
7024 return text + self.preparer.format_sequence(drop.element)
7025
7026 def visit_drop_constraint(self, drop, **kw):
7027 constraint = drop.element
7028 if constraint.name is not None:
7029 formatted_name = self.preparer.format_constraint(constraint)
7030 else:
7031 formatted_name = None
7032
7033 if formatted_name is None:
7034 raise exc.CompileError(
7035 "Can't emit DROP CONSTRAINT for constraint %r; "
7036 "it has no name" % drop.element
7037 )
7038 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7039 self.preparer.format_table(drop.element.table),
7040 "IF EXISTS " if drop.if_exists else "",
7041 formatted_name,
7042 " CASCADE" if drop.cascade else "",
7043 )
7044
7045 def get_column_specification(self, column, **kwargs):
7046 colspec = (
7047 self.preparer.format_column(column)
7048 + " "
7049 + self.dialect.type_compiler_instance.process(
7050 column.type, type_expression=column
7051 )
7052 )
7053 default = self.get_column_default_string(column)
7054 if default is not None:
7055 colspec += " DEFAULT " + default
7056
7057 if column.computed is not None:
7058 colspec += " " + self.process(column.computed)
7059
7060 if (
7061 column.identity is not None
7062 and self.dialect.supports_identity_columns
7063 ):
7064 colspec += " " + self.process(column.identity)
7065
7066 if not column.nullable and (
7067 not column.identity or not self.dialect.supports_identity_columns
7068 ):
7069 colspec += " NOT NULL"
7070 return colspec
7071
7072 def create_table_suffix(self, table):
7073 return ""
7074
7075 def post_create_table(self, table):
7076 return ""
7077
7078 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7079 if isinstance(column.server_default, schema.DefaultClause):
7080 return self.render_default_string(column.server_default.arg)
7081 else:
7082 return None
7083
7084 def render_default_string(self, default: Union[Visitable, str]) -> str:
7085 if isinstance(default, str):
7086 return self.sql_compiler.render_literal_value(
7087 default, sqltypes.STRINGTYPE
7088 )
7089 else:
7090 return self.sql_compiler.process(default, literal_binds=True)
7091
7092 def visit_table_or_column_check_constraint(self, constraint, **kw):
7093 if constraint.is_column_level:
7094 return self.visit_column_check_constraint(constraint)
7095 else:
7096 return self.visit_check_constraint(constraint)
7097
7098 def visit_check_constraint(self, constraint, **kw):
7099 text = self.define_constraint_preamble(constraint, **kw)
7100 text += self.define_check_body(constraint, **kw)
7101 text += self.define_constraint_deferrability(constraint)
7102 return text
7103
7104 def visit_column_check_constraint(self, constraint, **kw):
7105 text = self.define_constraint_preamble(constraint, **kw)
7106 text += self.define_check_body(constraint, **kw)
7107 text += self.define_constraint_deferrability(constraint)
7108 return text
7109
7110 def visit_primary_key_constraint(
7111 self, constraint: PrimaryKeyConstraint, **kw: Any
7112 ) -> str:
7113 if len(constraint) == 0:
7114 return ""
7115 text = self.define_constraint_preamble(constraint, **kw)
7116 text += self.define_primary_key_body(constraint, **kw)
7117 text += self.define_constraint_deferrability(constraint)
7118 return text
7119
7120 def visit_foreign_key_constraint(
7121 self, constraint: ForeignKeyConstraint, **kw: Any
7122 ) -> str:
7123 text = self.define_constraint_preamble(constraint, **kw)
7124 text += self.define_foreign_key_body(constraint, **kw)
7125 text += self.define_constraint_match(constraint)
7126 text += self.define_constraint_cascades(constraint)
7127 text += self.define_constraint_deferrability(constraint)
7128 return text
7129
7130 def define_constraint_remote_table(self, constraint, table, preparer):
7131 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7132
7133 return preparer.format_table(table)
7134
7135 def visit_unique_constraint(
7136 self, constraint: UniqueConstraint, **kw: Any
7137 ) -> str:
7138 if len(constraint) == 0:
7139 return ""
7140 text = self.define_constraint_preamble(constraint, **kw)
7141 text += self.define_unique_body(constraint, **kw)
7142 text += self.define_constraint_deferrability(constraint)
7143 return text
7144
7145 def define_constraint_preamble(
7146 self, constraint: Constraint, **kw: Any
7147 ) -> str:
7148 text = ""
7149 if constraint.name is not None:
7150 formatted_name = self.preparer.format_constraint(constraint)
7151 if formatted_name is not None:
7152 text += "CONSTRAINT %s " % formatted_name
7153 return text
7154
7155 def define_primary_key_body(
7156 self, constraint: PrimaryKeyConstraint, **kw: Any
7157 ) -> str:
7158 text = ""
7159 text += "PRIMARY KEY "
7160 text += "(%s)" % ", ".join(
7161 self.preparer.quote(c.name)
7162 for c in (
7163 constraint.columns_autoinc_first
7164 if constraint._implicit_generated
7165 else constraint.columns
7166 )
7167 )
7168 return text
7169
7170 def define_foreign_key_body(
7171 self, constraint: ForeignKeyConstraint, **kw: Any
7172 ) -> str:
7173 preparer = self.preparer
7174 remote_table = list(constraint.elements)[0].column.table
7175 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7176 ", ".join(
7177 preparer.quote(f.parent.name) for f in constraint.elements
7178 ),
7179 self.define_constraint_remote_table(
7180 constraint, remote_table, preparer
7181 ),
7182 ", ".join(
7183 preparer.quote(f.column.name) for f in constraint.elements
7184 ),
7185 )
7186 return text
7187
7188 def define_unique_body(
7189 self, constraint: UniqueConstraint, **kw: Any
7190 ) -> str:
7191 text = "UNIQUE %s(%s)" % (
7192 self.define_unique_constraint_distinct(constraint, **kw),
7193 ", ".join(self.preparer.quote(c.name) for c in constraint),
7194 )
7195 return text
7196
7197 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7198 text = "CHECK (%s)" % self.sql_compiler.process(
7199 constraint.sqltext, include_table=False, literal_binds=True
7200 )
7201 return text
7202
7203 def define_unique_constraint_distinct(
7204 self, constraint: UniqueConstraint, **kw: Any
7205 ) -> str:
7206 return ""
7207
7208 def define_constraint_cascades(
7209 self, constraint: ForeignKeyConstraint
7210 ) -> str:
7211 text = ""
7212 if constraint.ondelete is not None:
7213 text += self.define_constraint_ondelete_cascade(constraint)
7214
7215 if constraint.onupdate is not None:
7216 text += self.define_constraint_onupdate_cascade(constraint)
7217 return text
7218
7219 def define_constraint_ondelete_cascade(
7220 self, constraint: ForeignKeyConstraint
7221 ) -> str:
7222 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7223 constraint.ondelete, FK_ON_DELETE
7224 )
7225
7226 def define_constraint_onupdate_cascade(
7227 self, constraint: ForeignKeyConstraint
7228 ) -> str:
7229 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7230 constraint.onupdate, FK_ON_UPDATE
7231 )
7232
7233 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7234 text = ""
7235 if constraint.deferrable is not None:
7236 if constraint.deferrable:
7237 text += " DEFERRABLE"
7238 else:
7239 text += " NOT DEFERRABLE"
7240 if constraint.initially is not None:
7241 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7242 constraint.initially, FK_INITIALLY
7243 )
7244 return text
7245
7246 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7247 text = ""
7248 if constraint.match is not None:
7249 text += " MATCH %s" % constraint.match
7250 return text
7251
7252 def visit_computed_column(self, generated, **kw):
7253 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7254 generated.sqltext, include_table=False, literal_binds=True
7255 )
7256 if generated.persisted is True:
7257 text += " STORED"
7258 elif generated.persisted is False:
7259 text += " VIRTUAL"
7260 return text
7261
7262 def visit_identity_column(self, identity, **kw):
7263 text = "GENERATED %s AS IDENTITY" % (
7264 "ALWAYS" if identity.always else "BY DEFAULT",
7265 )
7266 options = self.get_identity_options(identity)
7267 if options:
7268 text += " (%s)" % options
7269 return text
7270
7271
7272class GenericTypeCompiler(TypeCompiler):
7273 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7274 return "FLOAT"
7275
7276 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7277 return "DOUBLE"
7278
7279 def visit_DOUBLE_PRECISION(
7280 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7281 ) -> str:
7282 return "DOUBLE PRECISION"
7283
7284 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7285 return "REAL"
7286
7287 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7288 if type_.precision is None:
7289 return "NUMERIC"
7290 elif type_.scale is None:
7291 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7292 else:
7293 return "NUMERIC(%(precision)s, %(scale)s)" % {
7294 "precision": type_.precision,
7295 "scale": type_.scale,
7296 }
7297
7298 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7299 if type_.precision is None:
7300 return "DECIMAL"
7301 elif type_.scale is None:
7302 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7303 else:
7304 return "DECIMAL(%(precision)s, %(scale)s)" % {
7305 "precision": type_.precision,
7306 "scale": type_.scale,
7307 }
7308
7309 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7310 return "INTEGER"
7311
7312 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7313 return "SMALLINT"
7314
7315 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7316 return "BIGINT"
7317
7318 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7319 return "TIMESTAMP"
7320
7321 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7322 return "DATETIME"
7323
7324 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7325 return "DATE"
7326
7327 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7328 return "TIME"
7329
7330 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7331 return "CLOB"
7332
7333 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7334 return "NCLOB"
7335
7336 def _render_string_type(
7337 self, name: str, length: Optional[int], collation: Optional[str]
7338 ) -> str:
7339 text = name
7340 if length:
7341 text += f"({length})"
7342 if collation:
7343 text += f' COLLATE "{collation}"'
7344 return text
7345
7346 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7347 return self._render_string_type("CHAR", type_.length, type_.collation)
7348
7349 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7350 return self._render_string_type("NCHAR", type_.length, type_.collation)
7351
7352 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7353 return self._render_string_type(
7354 "VARCHAR", type_.length, type_.collation
7355 )
7356
7357 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7358 return self._render_string_type(
7359 "NVARCHAR", type_.length, type_.collation
7360 )
7361
7362 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7363 return self._render_string_type("TEXT", type_.length, type_.collation)
7364
7365 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7366 return "UUID"
7367
7368 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7369 return "BLOB"
7370
7371 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7372 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7373
7374 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7375 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7376
7377 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7378 return "BOOLEAN"
7379
7380 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7381 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7382 return self._render_string_type("CHAR", length=32, collation=None)
7383 else:
7384 return self.visit_UUID(type_, **kw)
7385
7386 def visit_large_binary(
7387 self, type_: sqltypes.LargeBinary, **kw: Any
7388 ) -> str:
7389 return self.visit_BLOB(type_, **kw)
7390
7391 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7392 return self.visit_BOOLEAN(type_, **kw)
7393
7394 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7395 return self.visit_TIME(type_, **kw)
7396
7397 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7398 return self.visit_DATETIME(type_, **kw)
7399
7400 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7401 return self.visit_DATE(type_, **kw)
7402
7403 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7404 return self.visit_BIGINT(type_, **kw)
7405
7406 def visit_small_integer(
7407 self, type_: sqltypes.SmallInteger, **kw: Any
7408 ) -> str:
7409 return self.visit_SMALLINT(type_, **kw)
7410
7411 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7412 return self.visit_INTEGER(type_, **kw)
7413
7414 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7415 return self.visit_REAL(type_, **kw)
7416
7417 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7418 return self.visit_FLOAT(type_, **kw)
7419
7420 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7421 return self.visit_DOUBLE(type_, **kw)
7422
7423 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7424 return self.visit_NUMERIC(type_, **kw)
7425
7426 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7427 return self.visit_VARCHAR(type_, **kw)
7428
7429 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7430 return self.visit_VARCHAR(type_, **kw)
7431
7432 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7433 return self.visit_TEXT(type_, **kw)
7434
7435 def visit_unicode_text(
7436 self, type_: sqltypes.UnicodeText, **kw: Any
7437 ) -> str:
7438 return self.visit_TEXT(type_, **kw)
7439
7440 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7441 return self.visit_VARCHAR(type_, **kw)
7442
7443 def visit_null(self, type_, **kw):
7444 raise exc.CompileError(
7445 "Can't generate DDL for %r; "
7446 "did you forget to specify a "
7447 "type on this Column?" % type_
7448 )
7449
7450 def visit_type_decorator(
7451 self, type_: TypeDecorator[Any], **kw: Any
7452 ) -> str:
7453 return self.process(type_.type_engine(self.dialect), **kw)
7454
7455 def visit_user_defined(
7456 self, type_: UserDefinedType[Any], **kw: Any
7457 ) -> str:
7458 return type_.get_col_spec(**kw)
7459
7460
7461class StrSQLTypeCompiler(GenericTypeCompiler):
7462 def process(self, type_, **kw):
7463 try:
7464 _compiler_dispatch = type_._compiler_dispatch
7465 except AttributeError:
7466 return self._visit_unknown(type_, **kw)
7467 else:
7468 return _compiler_dispatch(self, **kw)
7469
7470 def __getattr__(self, key):
7471 if key.startswith("visit_"):
7472 return self._visit_unknown
7473 else:
7474 raise AttributeError(key)
7475
7476 def _visit_unknown(self, type_, **kw):
7477 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7478 return type_.__class__.__name__
7479 else:
7480 return repr(type_)
7481
7482 def visit_null(self, type_, **kw):
7483 return "NULL"
7484
7485 def visit_user_defined(self, type_, **kw):
7486 try:
7487 get_col_spec = type_.get_col_spec
7488 except AttributeError:
7489 return repr(type_)
7490 else:
7491 return get_col_spec(**kw)
7492
7493
7494class _SchemaForObjectCallable(Protocol):
7495 def __call__(self, __obj: Any) -> str: ...
7496
7497
7498class _BindNameForColProtocol(Protocol):
7499 def __call__(self, col: ColumnClause[Any]) -> str: ...
7500
7501
7502class IdentifierPreparer:
7503 """Handle quoting and case-folding of identifiers based on options."""
7504
7505 reserved_words = RESERVED_WORDS
7506
7507 legal_characters = LEGAL_CHARACTERS
7508
7509 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7510
7511 initial_quote: str
7512
7513 final_quote: str
7514
7515 _strings: MutableMapping[str, str]
7516
7517 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7518 """Return the .schema attribute for an object.
7519
7520 For the default IdentifierPreparer, the schema for an object is always
7521 the value of the ".schema" attribute. if the preparer is replaced
7522 with one that has a non-empty schema_translate_map, the value of the
7523 ".schema" attribute is rendered a symbol that will be converted to a
7524 real schema name from the mapping post-compile.
7525
7526 """
7527
7528 _includes_none_schema_translate: bool = False
7529
7530 def __init__(
7531 self,
7532 dialect: Dialect,
7533 initial_quote: str = '"',
7534 final_quote: Optional[str] = None,
7535 escape_quote: str = '"',
7536 quote_case_sensitive_collations: bool = True,
7537 omit_schema: bool = False,
7538 ):
7539 """Construct a new ``IdentifierPreparer`` object.
7540
7541 initial_quote
7542 Character that begins a delimited identifier.
7543
7544 final_quote
7545 Character that ends a delimited identifier. Defaults to
7546 `initial_quote`.
7547
7548 omit_schema
7549 Prevent prepending schema name. Useful for databases that do
7550 not support schemae.
7551 """
7552
7553 self.dialect = dialect
7554 self.initial_quote = initial_quote
7555 self.final_quote = final_quote or self.initial_quote
7556 self.escape_quote = escape_quote
7557 self.escape_to_quote = self.escape_quote * 2
7558 self.omit_schema = omit_schema
7559 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7560 self._strings = {}
7561 self._double_percents = self.dialect.paramstyle in (
7562 "format",
7563 "pyformat",
7564 )
7565
7566 def _with_schema_translate(self, schema_translate_map):
7567 prep = self.__class__.__new__(self.__class__)
7568 prep.__dict__.update(self.__dict__)
7569
7570 includes_none = None in schema_translate_map
7571
7572 def symbol_getter(obj):
7573 name = obj.schema
7574 if obj._use_schema_map and (name is not None or includes_none):
7575 if name is not None and ("[" in name or "]" in name):
7576 raise exc.CompileError(
7577 "Square bracket characters ([]) not supported "
7578 "in schema translate name '%s'" % name
7579 )
7580 return quoted_name(
7581 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7582 )
7583 else:
7584 return obj.schema
7585
7586 prep.schema_for_object = symbol_getter
7587 prep._includes_none_schema_translate = includes_none
7588 return prep
7589
7590 def _render_schema_translates(
7591 self, statement: str, schema_translate_map: SchemaTranslateMapType
7592 ) -> str:
7593 d = schema_translate_map
7594 if None in d:
7595 if not self._includes_none_schema_translate:
7596 raise exc.InvalidRequestError(
7597 "schema translate map which previously did not have "
7598 "`None` present as a key now has `None` present; compiled "
7599 "statement may lack adequate placeholders. Please use "
7600 "consistent keys in successive "
7601 "schema_translate_map dictionaries."
7602 )
7603
7604 d["_none"] = d[None] # type: ignore[index]
7605
7606 def replace(m):
7607 name = m.group(2)
7608 if name in d:
7609 effective_schema = d[name]
7610 else:
7611 if name in (None, "_none"):
7612 raise exc.InvalidRequestError(
7613 "schema translate map which previously had `None` "
7614 "present as a key now no longer has it present; don't "
7615 "know how to apply schema for compiled statement. "
7616 "Please use consistent keys in successive "
7617 "schema_translate_map dictionaries."
7618 )
7619 effective_schema = name
7620
7621 if not effective_schema:
7622 effective_schema = self.dialect.default_schema_name
7623 if not effective_schema:
7624 # TODO: no coverage here
7625 raise exc.CompileError(
7626 "Dialect has no default schema name; can't "
7627 "use None as dynamic schema target."
7628 )
7629 return self.quote_schema(effective_schema)
7630
7631 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7632
7633 def _escape_identifier(self, value: str) -> str:
7634 """Escape an identifier.
7635
7636 Subclasses should override this to provide database-dependent
7637 escaping behavior.
7638 """
7639
7640 value = value.replace(self.escape_quote, self.escape_to_quote)
7641 if self._double_percents:
7642 value = value.replace("%", "%%")
7643 return value
7644
7645 def _unescape_identifier(self, value: str) -> str:
7646 """Canonicalize an escaped identifier.
7647
7648 Subclasses should override this to provide database-dependent
7649 unescaping behavior that reverses _escape_identifier.
7650 """
7651
7652 return value.replace(self.escape_to_quote, self.escape_quote)
7653
7654 def validate_sql_phrase(self, element, reg):
7655 """keyword sequence filter.
7656
7657 a filter for elements that are intended to represent keyword sequences,
7658 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7659 should be present.
7660
7661 .. versionadded:: 1.3
7662
7663 """
7664
7665 if element is not None and not reg.match(element):
7666 raise exc.CompileError(
7667 "Unexpected SQL phrase: %r (matching against %r)"
7668 % (element, reg.pattern)
7669 )
7670 return element
7671
7672 def quote_identifier(self, value: str) -> str:
7673 """Quote an identifier.
7674
7675 Subclasses should override this to provide database-dependent
7676 quoting behavior.
7677 """
7678
7679 return (
7680 self.initial_quote
7681 + self._escape_identifier(value)
7682 + self.final_quote
7683 )
7684
7685 def _requires_quotes(self, value: str) -> bool:
7686 """Return True if the given identifier requires quoting."""
7687 lc_value = value.lower()
7688 return (
7689 lc_value in self.reserved_words
7690 or value[0] in self.illegal_initial_characters
7691 or not self.legal_characters.match(str(value))
7692 or (lc_value != value)
7693 )
7694
7695 def _requires_quotes_illegal_chars(self, value):
7696 """Return True if the given identifier requires quoting, but
7697 not taking case convention into account."""
7698 return not self.legal_characters.match(str(value))
7699
7700 def quote_schema(self, schema: str, force: Any = None) -> str:
7701 """Conditionally quote a schema name.
7702
7703
7704 The name is quoted if it is a reserved word, contains quote-necessary
7705 characters, or is an instance of :class:`.quoted_name` which includes
7706 ``quote`` set to ``True``.
7707
7708 Subclasses can override this to provide database-dependent
7709 quoting behavior for schema names.
7710
7711 :param schema: string schema name
7712 :param force: unused
7713
7714 .. deprecated:: 0.9
7715
7716 The :paramref:`.IdentifierPreparer.quote_schema.force`
7717 parameter is deprecated and will be removed in a future
7718 release. This flag has no effect on the behavior of the
7719 :meth:`.IdentifierPreparer.quote` method; please refer to
7720 :class:`.quoted_name`.
7721
7722 """
7723 if force is not None:
7724 # not using the util.deprecated_params() decorator in this
7725 # case because of the additional function call overhead on this
7726 # very performance-critical spot.
7727 util.warn_deprecated(
7728 "The IdentifierPreparer.quote_schema.force parameter is "
7729 "deprecated and will be removed in a future release. This "
7730 "flag has no effect on the behavior of the "
7731 "IdentifierPreparer.quote method; please refer to "
7732 "quoted_name().",
7733 # deprecated 0.9. warning from 1.3
7734 version="0.9",
7735 )
7736
7737 return self.quote(schema)
7738
7739 def quote(self, ident: str, force: Any = None) -> str:
7740 """Conditionally quote an identifier.
7741
7742 The identifier is quoted if it is a reserved word, contains
7743 quote-necessary characters, or is an instance of
7744 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7745
7746 Subclasses can override this to provide database-dependent
7747 quoting behavior for identifier names.
7748
7749 :param ident: string identifier
7750 :param force: unused
7751
7752 .. deprecated:: 0.9
7753
7754 The :paramref:`.IdentifierPreparer.quote.force`
7755 parameter is deprecated and will be removed in a future
7756 release. This flag has no effect on the behavior of the
7757 :meth:`.IdentifierPreparer.quote` method; please refer to
7758 :class:`.quoted_name`.
7759
7760 """
7761 if force is not None:
7762 # not using the util.deprecated_params() decorator in this
7763 # case because of the additional function call overhead on this
7764 # very performance-critical spot.
7765 util.warn_deprecated(
7766 "The IdentifierPreparer.quote.force parameter is "
7767 "deprecated and will be removed in a future release. This "
7768 "flag has no effect on the behavior of the "
7769 "IdentifierPreparer.quote method; please refer to "
7770 "quoted_name().",
7771 # deprecated 0.9. warning from 1.3
7772 version="0.9",
7773 )
7774
7775 force = getattr(ident, "quote", None)
7776
7777 if force is None:
7778 if ident in self._strings:
7779 return self._strings[ident]
7780 else:
7781 if self._requires_quotes(ident):
7782 self._strings[ident] = self.quote_identifier(ident)
7783 else:
7784 self._strings[ident] = ident
7785 return self._strings[ident]
7786 elif force:
7787 return self.quote_identifier(ident)
7788 else:
7789 return ident
7790
7791 def format_collation(self, collation_name):
7792 if self.quote_case_sensitive_collations:
7793 return self.quote(collation_name)
7794 else:
7795 return collation_name
7796
7797 def format_sequence(
7798 self, sequence: schema.Sequence, use_schema: bool = True
7799 ) -> str:
7800 name = self.quote(sequence.name)
7801
7802 effective_schema = self.schema_for_object(sequence)
7803
7804 if (
7805 not self.omit_schema
7806 and use_schema
7807 and effective_schema is not None
7808 ):
7809 name = self.quote_schema(effective_schema) + "." + name
7810 return name
7811
7812 def format_label(
7813 self, label: Label[Any], name: Optional[str] = None
7814 ) -> str:
7815 return self.quote(name or label.name)
7816
7817 def format_alias(
7818 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7819 ) -> str:
7820 if name is None:
7821 assert alias is not None
7822 return self.quote(alias.name)
7823 else:
7824 return self.quote(name)
7825
7826 def format_savepoint(self, savepoint, name=None):
7827 # Running the savepoint name through quoting is unnecessary
7828 # for all known dialects. This is here to support potential
7829 # third party use cases
7830 ident = name or savepoint.ident
7831 if self._requires_quotes(ident):
7832 ident = self.quote_identifier(ident)
7833 return ident
7834
7835 @util.preload_module("sqlalchemy.sql.naming")
7836 def format_constraint(
7837 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7838 ) -> Optional[str]:
7839 naming = util.preloaded.sql_naming
7840
7841 if constraint.name is _NONE_NAME:
7842 name = naming._constraint_name_for_table(
7843 constraint, constraint.table
7844 )
7845
7846 if name is None:
7847 return None
7848 else:
7849 name = constraint.name
7850
7851 assert name is not None
7852 if constraint.__visit_name__ == "index":
7853 return self.truncate_and_render_index_name(
7854 name, _alembic_quote=_alembic_quote
7855 )
7856 else:
7857 return self.truncate_and_render_constraint_name(
7858 name, _alembic_quote=_alembic_quote
7859 )
7860
7861 def truncate_and_render_index_name(
7862 self, name: str, _alembic_quote: bool = True
7863 ) -> str:
7864 # calculate these at format time so that ad-hoc changes
7865 # to dialect.max_identifier_length etc. can be reflected
7866 # as IdentifierPreparer is long lived
7867 max_ = (
7868 self.dialect.max_index_name_length
7869 or self.dialect.max_identifier_length
7870 )
7871 return self._truncate_and_render_maxlen_name(
7872 name, max_, _alembic_quote
7873 )
7874
7875 def truncate_and_render_constraint_name(
7876 self, name: str, _alembic_quote: bool = True
7877 ) -> str:
7878 # calculate these at format time so that ad-hoc changes
7879 # to dialect.max_identifier_length etc. can be reflected
7880 # as IdentifierPreparer is long lived
7881 max_ = (
7882 self.dialect.max_constraint_name_length
7883 or self.dialect.max_identifier_length
7884 )
7885 return self._truncate_and_render_maxlen_name(
7886 name, max_, _alembic_quote
7887 )
7888
7889 def _truncate_and_render_maxlen_name(
7890 self, name: str, max_: int, _alembic_quote: bool
7891 ) -> str:
7892 if isinstance(name, elements._truncated_label):
7893 if len(name) > max_:
7894 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7895 else:
7896 self.dialect.validate_identifier(name)
7897
7898 if not _alembic_quote:
7899 return name
7900 else:
7901 return self.quote(name)
7902
7903 def format_index(self, index: Index) -> str:
7904 name = self.format_constraint(index)
7905 assert name is not None
7906 return name
7907
7908 def format_table(
7909 self,
7910 table: FromClause,
7911 use_schema: bool = True,
7912 name: Optional[str] = None,
7913 ) -> str:
7914 """Prepare a quoted table and schema name."""
7915 if name is None:
7916 if TYPE_CHECKING:
7917 assert isinstance(table, NamedFromClause)
7918 name = table.name
7919
7920 result = self.quote(name)
7921
7922 effective_schema = self.schema_for_object(table)
7923
7924 if not self.omit_schema and use_schema and effective_schema:
7925 result = self.quote_schema(effective_schema) + "." + result
7926 return result
7927
7928 def format_schema(self, name):
7929 """Prepare a quoted schema name."""
7930
7931 return self.quote(name)
7932
7933 def format_label_name(
7934 self,
7935 name,
7936 anon_map=None,
7937 ):
7938 """Prepare a quoted column name."""
7939
7940 if anon_map is not None and isinstance(
7941 name, elements._truncated_label
7942 ):
7943 name = name.apply_map(anon_map)
7944
7945 return self.quote(name)
7946
7947 def format_column(
7948 self,
7949 column: ColumnElement[Any],
7950 use_table: bool = False,
7951 name: Optional[str] = None,
7952 table_name: Optional[str] = None,
7953 use_schema: bool = False,
7954 anon_map: Optional[Mapping[str, Any]] = None,
7955 ) -> str:
7956 """Prepare a quoted column name."""
7957
7958 if name is None:
7959 name = column.name
7960 assert name is not None
7961
7962 if anon_map is not None and isinstance(
7963 name, elements._truncated_label
7964 ):
7965 name = name.apply_map(anon_map)
7966
7967 if not getattr(column, "is_literal", False):
7968 if use_table:
7969 return (
7970 self.format_table(
7971 column.table, use_schema=use_schema, name=table_name
7972 )
7973 + "."
7974 + self.quote(name)
7975 )
7976 else:
7977 return self.quote(name)
7978 else:
7979 # literal textual elements get stuck into ColumnClause a lot,
7980 # which shouldn't get quoted
7981
7982 if use_table:
7983 return (
7984 self.format_table(
7985 column.table, use_schema=use_schema, name=table_name
7986 )
7987 + "."
7988 + name
7989 )
7990 else:
7991 return name
7992
7993 def format_table_seq(self, table, use_schema=True):
7994 """Format table name and schema as a tuple."""
7995
7996 # Dialects with more levels in their fully qualified references
7997 # ('database', 'owner', etc.) could override this and return
7998 # a longer sequence.
7999
8000 effective_schema = self.schema_for_object(table)
8001
8002 if not self.omit_schema and use_schema and effective_schema:
8003 return (
8004 self.quote_schema(effective_schema),
8005 self.format_table(table, use_schema=False),
8006 )
8007 else:
8008 return (self.format_table(table, use_schema=False),)
8009
8010 @util.memoized_property
8011 def _r_identifiers(self):
8012 initial, final, escaped_final = (
8013 re.escape(s)
8014 for s in (
8015 self.initial_quote,
8016 self.final_quote,
8017 self._escape_identifier(self.final_quote),
8018 )
8019 )
8020 r = re.compile(
8021 r"(?:"
8022 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8023 r"|([^\.]+))(?=\.|$))+"
8024 % {"initial": initial, "final": final, "escaped": escaped_final}
8025 )
8026 return r
8027
8028 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8029 """Unpack 'schema.table.column'-like strings into components."""
8030
8031 r = self._r_identifiers
8032 return [
8033 self._unescape_identifier(i)
8034 for i in [a or b for a, b in r.findall(identifiers)]
8035 ]