1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Mapping
48from typing import MutableMapping
49from typing import NamedTuple
50from typing import NoReturn
51from typing import Optional
52from typing import Pattern
53from typing import Sequence
54from typing import Set
55from typing import Tuple
56from typing import Type
57from typing import TYPE_CHECKING
58from typing import Union
59
60from . import base
61from . import coercions
62from . import crud
63from . import elements
64from . import functions
65from . import operators
66from . import roles
67from . import schema
68from . import selectable
69from . import sqltypes
70from . import util as sql_util
71from ._typing import is_column_element
72from ._typing import is_dml
73from .base import _de_clone
74from .base import _from_objects
75from .base import _NONE_NAME
76from .base import _SentinelDefaultCharacterization
77from .base import NO_ARG
78from .elements import quoted_name
79from .sqltypes import TupleType
80from .visitors import prefix_anon_map
81from .. import exc
82from .. import util
83from ..util import FastIntFlag
84from ..util.typing import Literal
85from ..util.typing import Protocol
86from ..util.typing import Self
87from ..util.typing import TypedDict
88
89if typing.TYPE_CHECKING:
90 from .annotation import _AnnotationDict
91 from .base import _AmbiguousTableNameMap
92 from .base import CompileState
93 from .base import Executable
94 from .cache_key import CacheKey
95 from .ddl import ExecutableDDLElement
96 from .dml import Insert
97 from .dml import Update
98 from .dml import UpdateBase
99 from .dml import UpdateDMLState
100 from .dml import ValuesBase
101 from .elements import _truncated_label
102 from .elements import BinaryExpression
103 from .elements import BindParameter
104 from .elements import ClauseElement
105 from .elements import ColumnClause
106 from .elements import ColumnElement
107 from .elements import False_
108 from .elements import Label
109 from .elements import Null
110 from .elements import True_
111 from .functions import Function
112 from .schema import CheckConstraint
113 from .schema import Column
114 from .schema import Constraint
115 from .schema import ForeignKeyConstraint
116 from .schema import IdentityOptions
117 from .schema import Index
118 from .schema import PrimaryKeyConstraint
119 from .schema import Table
120 from .schema import UniqueConstraint
121 from .selectable import _ColumnsClauseElement
122 from .selectable import AliasedReturnsRows
123 from .selectable import CompoundSelectState
124 from .selectable import CTE
125 from .selectable import FromClause
126 from .selectable import NamedFromClause
127 from .selectable import ReturnsRows
128 from .selectable import Select
129 from .selectable import SelectState
130 from .type_api import _BindProcessorType
131 from .type_api import TypeDecorator
132 from .type_api import TypeEngine
133 from .type_api import UserDefinedType
134 from .visitors import Visitable
135 from ..engine.cursor import CursorResultMetaData
136 from ..engine.interfaces import _CoreSingleExecuteParams
137 from ..engine.interfaces import _DBAPIAnyExecuteParams
138 from ..engine.interfaces import _DBAPIMultiExecuteParams
139 from ..engine.interfaces import _DBAPISingleExecuteParams
140 from ..engine.interfaces import _ExecuteOptions
141 from ..engine.interfaces import _GenericSetInputSizesType
142 from ..engine.interfaces import _MutableCoreSingleExecuteParams
143 from ..engine.interfaces import Dialect
144 from ..engine.interfaces import SchemaTranslateMapType
145
146
147_FromHintsType = Dict["FromClause", str]
148
149RESERVED_WORDS = {
150 "all",
151 "analyse",
152 "analyze",
153 "and",
154 "any",
155 "array",
156 "as",
157 "asc",
158 "asymmetric",
159 "authorization",
160 "between",
161 "binary",
162 "both",
163 "case",
164 "cast",
165 "check",
166 "collate",
167 "column",
168 "constraint",
169 "create",
170 "cross",
171 "current_date",
172 "current_role",
173 "current_time",
174 "current_timestamp",
175 "current_user",
176 "default",
177 "deferrable",
178 "desc",
179 "distinct",
180 "do",
181 "else",
182 "end",
183 "except",
184 "false",
185 "for",
186 "foreign",
187 "freeze",
188 "from",
189 "full",
190 "grant",
191 "group",
192 "having",
193 "ilike",
194 "in",
195 "initially",
196 "inner",
197 "intersect",
198 "into",
199 "is",
200 "isnull",
201 "join",
202 "leading",
203 "left",
204 "like",
205 "limit",
206 "localtime",
207 "localtimestamp",
208 "natural",
209 "new",
210 "not",
211 "notnull",
212 "null",
213 "off",
214 "offset",
215 "old",
216 "on",
217 "only",
218 "or",
219 "order",
220 "outer",
221 "overlaps",
222 "placing",
223 "primary",
224 "references",
225 "right",
226 "select",
227 "session_user",
228 "set",
229 "similar",
230 "some",
231 "symmetric",
232 "table",
233 "then",
234 "to",
235 "trailing",
236 "true",
237 "union",
238 "unique",
239 "user",
240 "using",
241 "verbose",
242 "when",
243 "where",
244}
245
246LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
247LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
248ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
249
250FK_ON_DELETE = re.compile(
251 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
252)
253FK_ON_UPDATE = re.compile(
254 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
255)
256FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
257BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
258BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
259
260_pyformat_template = "%%(%(name)s)s"
261BIND_TEMPLATES = {
262 "pyformat": _pyformat_template,
263 "qmark": "?",
264 "format": "%%s",
265 "numeric": ":[_POSITION]",
266 "numeric_dollar": "$[_POSITION]",
267 "named": ":%(name)s",
268}
269
270
271OPERATORS = {
272 # binary
273 operators.and_: " AND ",
274 operators.or_: " OR ",
275 operators.add: " + ",
276 operators.mul: " * ",
277 operators.sub: " - ",
278 operators.mod: " % ",
279 operators.neg: "-",
280 operators.lt: " < ",
281 operators.le: " <= ",
282 operators.ne: " != ",
283 operators.gt: " > ",
284 operators.ge: " >= ",
285 operators.eq: " = ",
286 operators.is_distinct_from: " IS DISTINCT FROM ",
287 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
288 operators.concat_op: " || ",
289 operators.match_op: " MATCH ",
290 operators.not_match_op: " NOT MATCH ",
291 operators.in_op: " IN ",
292 operators.not_in_op: " NOT IN ",
293 operators.comma_op: ", ",
294 operators.from_: " FROM ",
295 operators.as_: " AS ",
296 operators.is_: " IS ",
297 operators.is_not: " IS NOT ",
298 operators.collate: " COLLATE ",
299 # unary
300 operators.exists: "EXISTS ",
301 operators.distinct_op: "DISTINCT ",
302 operators.inv: "NOT ",
303 operators.any_op: "ANY ",
304 operators.all_op: "ALL ",
305 # modifiers
306 operators.desc_op: " DESC",
307 operators.asc_op: " ASC",
308 operators.nulls_first_op: " NULLS FIRST",
309 operators.nulls_last_op: " NULLS LAST",
310 # bitwise
311 operators.bitwise_xor_op: " ^ ",
312 operators.bitwise_or_op: " | ",
313 operators.bitwise_and_op: " & ",
314 operators.bitwise_not_op: "~",
315 operators.bitwise_lshift_op: " << ",
316 operators.bitwise_rshift_op: " >> ",
317}
318
319FUNCTIONS: Dict[Type[Function[Any]], str] = {
320 functions.coalesce: "coalesce",
321 functions.current_date: "CURRENT_DATE",
322 functions.current_time: "CURRENT_TIME",
323 functions.current_timestamp: "CURRENT_TIMESTAMP",
324 functions.current_user: "CURRENT_USER",
325 functions.localtime: "LOCALTIME",
326 functions.localtimestamp: "LOCALTIMESTAMP",
327 functions.random: "random",
328 functions.sysdate: "sysdate",
329 functions.session_user: "SESSION_USER",
330 functions.user: "USER",
331 functions.cube: "CUBE",
332 functions.rollup: "ROLLUP",
333 functions.grouping_sets: "GROUPING SETS",
334}
335
336
337EXTRACT_MAP = {
338 "month": "month",
339 "day": "day",
340 "year": "year",
341 "second": "second",
342 "hour": "hour",
343 "doy": "doy",
344 "minute": "minute",
345 "quarter": "quarter",
346 "dow": "dow",
347 "week": "week",
348 "epoch": "epoch",
349 "milliseconds": "milliseconds",
350 "microseconds": "microseconds",
351 "timezone_hour": "timezone_hour",
352 "timezone_minute": "timezone_minute",
353}
354
355COMPOUND_KEYWORDS = {
356 selectable._CompoundSelectKeyword.UNION: "UNION",
357 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
358 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
359 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
360 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
361 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
362}
363
364
365class ResultColumnsEntry(NamedTuple):
366 """Tracks a column expression that is expected to be represented
367 in the result rows for this statement.
368
369 This normally refers to the columns clause of a SELECT statement
370 but may also refer to a RETURNING clause, as well as for dialect-specific
371 emulations.
372
373 """
374
375 keyname: str
376 """string name that's expected in cursor.description"""
377
378 name: str
379 """column name, may be labeled"""
380
381 objects: Tuple[Any, ...]
382 """sequence of objects that should be able to locate this column
383 in a RowMapping. This is typically string names and aliases
384 as well as Column objects.
385
386 """
387
388 type: TypeEngine[Any]
389 """Datatype to be associated with this column. This is where
390 the "result processing" logic directly links the compiled statement
391 to the rows that come back from the cursor.
392
393 """
394
395
396class _ResultMapAppender(Protocol):
397 def __call__(
398 self,
399 keyname: str,
400 name: str,
401 objects: Sequence[Any],
402 type_: TypeEngine[Any],
403 ) -> None: ...
404
405
406# integer indexes into ResultColumnsEntry used by cursor.py.
407# some profiling showed integer access faster than named tuple
408RM_RENDERED_NAME: Literal[0] = 0
409RM_NAME: Literal[1] = 1
410RM_OBJECTS: Literal[2] = 2
411RM_TYPE: Literal[3] = 3
412
413
414class _BaseCompilerStackEntry(TypedDict):
415 asfrom_froms: Set[FromClause]
416 correlate_froms: Set[FromClause]
417 selectable: ReturnsRows
418
419
420class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
421 compile_state: CompileState
422 need_result_map_for_nested: bool
423 need_result_map_for_compound: bool
424 select_0: ReturnsRows
425 insert_from_select: Select[Any]
426
427
428class ExpandedState(NamedTuple):
429 """represents state to use when producing "expanded" and
430 "post compile" bound parameters for a statement.
431
432 "expanded" parameters are parameters that are generated at
433 statement execution time to suit a number of parameters passed, the most
434 prominent example being the individual elements inside of an IN expression.
435
436 "post compile" parameters are parameters where the SQL literal value
437 will be rendered into the SQL statement at execution time, rather than
438 being passed as separate parameters to the driver.
439
440 To create an :class:`.ExpandedState` instance, use the
441 :meth:`.SQLCompiler.construct_expanded_state` method on any
442 :class:`.SQLCompiler` instance.
443
444 """
445
446 statement: str
447 """String SQL statement with parameters fully expanded"""
448
449 parameters: _CoreSingleExecuteParams
450 """Parameter dictionary with parameters fully expanded.
451
452 For a statement that uses named parameters, this dictionary will map
453 exactly to the names in the statement. For a statement that uses
454 positional parameters, the :attr:`.ExpandedState.positional_parameters`
455 will yield a tuple with the positional parameter set.
456
457 """
458
459 processors: Mapping[str, _BindProcessorType[Any]]
460 """mapping of bound value processors"""
461
462 positiontup: Optional[Sequence[str]]
463 """Sequence of string names indicating the order of positional
464 parameters"""
465
466 parameter_expansion: Mapping[str, List[str]]
467 """Mapping representing the intermediary link from original parameter
468 name to list of "expanded" parameter names, for those parameters that
469 were expanded."""
470
471 @property
472 def positional_parameters(self) -> Tuple[Any, ...]:
473 """Tuple of positional parameters, for statements that were compiled
474 using a positional paramstyle.
475
476 """
477 if self.positiontup is None:
478 raise exc.InvalidRequestError(
479 "statement does not use a positional paramstyle"
480 )
481 return tuple(self.parameters[key] for key in self.positiontup)
482
483 @property
484 def additional_parameters(self) -> _CoreSingleExecuteParams:
485 """synonym for :attr:`.ExpandedState.parameters`."""
486 return self.parameters
487
488
489class _InsertManyValues(NamedTuple):
490 """represents state to use for executing an "insertmanyvalues" statement.
491
492 The primary consumers of this object are the
493 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
494 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
495
496 .. versionadded:: 2.0
497
498 """
499
500 is_default_expr: bool
501 """if True, the statement is of the form
502 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
503
504 """
505
506 single_values_expr: str
507 """The rendered "values" clause of the INSERT statement.
508
509 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
510 The insertmanyvalues logic uses this string as a search and replace
511 target.
512
513 """
514
515 insert_crud_params: List[crud._CrudParamElementStr]
516 """List of Column / bind names etc. used while rewriting the statement"""
517
518 num_positional_params_counted: int
519 """the number of bound parameters in a single-row statement.
520
521 This count may be larger or smaller than the actual number of columns
522 targeted in the INSERT, as it accommodates for SQL expressions
523 in the values list that may have zero or more parameters embedded
524 within them.
525
526 This count is part of what's used to organize rewritten parameter lists
527 when batching.
528
529 """
530
531 sort_by_parameter_order: bool = False
532 """if the deterministic_returnined_order parameter were used on the
533 insert.
534
535 All of the attributes following this will only be used if this is True.
536
537 """
538
539 includes_upsert_behaviors: bool = False
540 """if True, we have to accommodate for upsert behaviors.
541
542 This will in some cases downgrade "insertmanyvalues" that requests
543 deterministic ordering.
544
545 """
546
547 sentinel_columns: Optional[Sequence[Column[Any]]] = None
548 """List of sentinel columns that were located.
549
550 This list is only here if the INSERT asked for
551 sort_by_parameter_order=True,
552 and dialect-appropriate sentinel columns were located.
553
554 .. versionadded:: 2.0.10
555
556 """
557
558 num_sentinel_columns: int = 0
559 """how many sentinel columns are in the above list, if any.
560
561 This is the same as
562 ``len(sentinel_columns) if sentinel_columns is not None else 0``
563
564 """
565
566 sentinel_param_keys: Optional[Sequence[str]] = None
567 """parameter str keys in each param dictionary / tuple
568 that would link to the client side "sentinel" values for that row, which
569 we can use to match up parameter sets to result rows.
570
571 This is only present if sentinel_columns is present and the INSERT
572 statement actually refers to client side values for these sentinel
573 columns.
574
575 .. versionadded:: 2.0.10
576
577 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
578 only, used against the "compiled parameteters" collection before
579 the parameters were converted by bound parameter processors
580
581 """
582
583 implicit_sentinel: bool = False
584 """if True, we have exactly one sentinel column and it uses a server side
585 value, currently has to generate an incrementing integer value.
586
587 The dialect in question would have asserted that it supports receiving
588 these values back and sorting on that value as a means of guaranteeing
589 correlation with the incoming parameter list.
590
591 .. versionadded:: 2.0.10
592
593 """
594
595 has_upsert_bound_parameters: bool = False
596 """if True, the upsert SET clause contains bound parameters that will
597 receive their values from the parameters dict (i.e., parametrized
598 bindparams where value is None and callable is None).
599
600 This means we can't batch multiple rows in a single statement, since
601 each row would need different values in the SET clause but there's only
602 one SET clause per statement. See issue #13130.
603
604 .. versionadded:: 2.0.37
605
606 """
607
608 embed_values_counter: bool = False
609 """Whether to embed an incrementing integer counter in each parameter
610 set within the VALUES clause as parameters are batched over.
611
612 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
613 where a subquery is used to produce value tuples. Current support
614 includes PostgreSQL, Microsoft SQL Server.
615
616 .. versionadded:: 2.0.10
617
618 """
619
620
621class _InsertManyValuesBatch(NamedTuple):
622 """represents an individual batch SQL statement for insertmanyvalues.
623
624 This is passed through the
625 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
626 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
627 to the :class:`.Connection` within the
628 :meth:`.Connection._exec_insertmany_context` method.
629
630 .. versionadded:: 2.0.10
631
632 """
633
634 replaced_statement: str
635 replaced_parameters: _DBAPIAnyExecuteParams
636 processed_setinputsizes: Optional[_GenericSetInputSizesType]
637 batch: Sequence[_DBAPISingleExecuteParams]
638 sentinel_values: Sequence[Tuple[Any, ...]]
639 current_batch_size: int
640 batchnum: int
641 total_batches: int
642 rows_sorted: bool
643 is_downgraded: bool
644
645
646class InsertmanyvaluesSentinelOpts(FastIntFlag):
647 """bitflag enum indicating styles of PK defaults
648 which can work as implicit sentinel columns
649
650 """
651
652 NOT_SUPPORTED = 1
653 AUTOINCREMENT = 2
654 IDENTITY = 4
655 SEQUENCE = 8
656
657 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
658 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
659
660 USE_INSERT_FROM_SELECT = 16
661 RENDER_SELECT_COL_CASTS = 64
662
663
664class CompilerState(IntEnum):
665 COMPILING = 0
666 """statement is present, compilation phase in progress"""
667
668 STRING_APPLIED = 1
669 """statement is present, string form of the statement has been applied.
670
671 Additional processors by subclasses may still be pending.
672
673 """
674
675 NO_STATEMENT = 2
676 """compiler does not have a statement to compile, is used
677 for method access"""
678
679
680class Linting(IntEnum):
681 """represent preferences for the 'SQL linting' feature.
682
683 this feature currently includes support for flagging cartesian products
684 in SQL statements.
685
686 """
687
688 NO_LINTING = 0
689 "Disable all linting."
690
691 COLLECT_CARTESIAN_PRODUCTS = 1
692 """Collect data on FROMs and cartesian products and gather into
693 'self.from_linter'"""
694
695 WARN_LINTING = 2
696 "Emit warnings for linters that find problems"
697
698 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
699 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
700 and WARN_LINTING"""
701
702
703NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
704 Linting
705)
706
707
708class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
709 """represents current state for the "cartesian product" detection
710 feature."""
711
712 def lint(self, start=None):
713 froms = self.froms
714 if not froms:
715 return None, None
716
717 edges = set(self.edges)
718 the_rest = set(froms)
719
720 if start is not None:
721 start_with = start
722 the_rest.remove(start_with)
723 else:
724 start_with = the_rest.pop()
725
726 stack = collections.deque([start_with])
727
728 while stack and the_rest:
729 node = stack.popleft()
730 the_rest.discard(node)
731
732 # comparison of nodes in edges here is based on hash equality, as
733 # there are "annotated" elements that match the non-annotated ones.
734 # to remove the need for in-python hash() calls, use native
735 # containment routines (e.g. "node in edge", "edge.index(node)")
736 to_remove = {edge for edge in edges if node in edge}
737
738 # appendleft the node in each edge that is not
739 # the one that matched.
740 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
741 edges.difference_update(to_remove)
742
743 # FROMS left over? boom
744 if the_rest:
745 return the_rest, start_with
746 else:
747 return None, None
748
749 def warn(self, stmt_type="SELECT"):
750 the_rest, start_with = self.lint()
751
752 # FROMS left over? boom
753 if the_rest:
754 froms = the_rest
755 if froms:
756 template = (
757 "{stmt_type} statement has a cartesian product between "
758 "FROM element(s) {froms} and "
759 'FROM element "{start}". Apply join condition(s) '
760 "between each element to resolve."
761 )
762 froms_str = ", ".join(
763 f'"{self.froms[from_]}"' for from_ in froms
764 )
765 message = template.format(
766 stmt_type=stmt_type,
767 froms=froms_str,
768 start=self.froms[start_with],
769 )
770
771 util.warn(message)
772
773
774class Compiled:
775 """Represent a compiled SQL or DDL expression.
776
777 The ``__str__`` method of the ``Compiled`` object should produce
778 the actual text of the statement. ``Compiled`` objects are
779 specific to their underlying database dialect, and also may
780 or may not be specific to the columns referenced within a
781 particular set of bind parameters. In no case should the
782 ``Compiled`` object be dependent on the actual values of those
783 bind parameters, even though it may reference those values as
784 defaults.
785 """
786
787 statement: Optional[ClauseElement] = None
788 "The statement to compile."
789 string: str = ""
790 "The string representation of the ``statement``"
791
792 state: CompilerState
793 """description of the compiler's state"""
794
795 is_sql = False
796 is_ddl = False
797
798 _cached_metadata: Optional[CursorResultMetaData] = None
799
800 _result_columns: Optional[List[ResultColumnsEntry]] = None
801
802 schema_translate_map: Optional[SchemaTranslateMapType] = None
803
804 execution_options: _ExecuteOptions = util.EMPTY_DICT
805 """
806 Execution options propagated from the statement. In some cases,
807 sub-elements of the statement can modify these.
808 """
809
810 preparer: IdentifierPreparer
811
812 _annotations: _AnnotationDict = util.EMPTY_DICT
813
814 compile_state: Optional[CompileState] = None
815 """Optional :class:`.CompileState` object that maintains additional
816 state used by the compiler.
817
818 Major executable objects such as :class:`_expression.Insert`,
819 :class:`_expression.Update`, :class:`_expression.Delete`,
820 :class:`_expression.Select` will generate this
821 state when compiled in order to calculate additional information about the
822 object. For the top level object that is to be executed, the state can be
823 stored here where it can also have applicability towards result set
824 processing.
825
826 .. versionadded:: 1.4
827
828 """
829
830 dml_compile_state: Optional[CompileState] = None
831 """Optional :class:`.CompileState` assigned at the same point that
832 .isinsert, .isupdate, or .isdelete is assigned.
833
834 This will normally be the same object as .compile_state, with the
835 exception of cases like the :class:`.ORMFromStatementCompileState`
836 object.
837
838 .. versionadded:: 1.4.40
839
840 """
841
842 cache_key: Optional[CacheKey] = None
843 """The :class:`.CacheKey` that was generated ahead of creating this
844 :class:`.Compiled` object.
845
846 This is used for routines that need access to the original
847 :class:`.CacheKey` instance generated when the :class:`.Compiled`
848 instance was first cached, typically in order to reconcile
849 the original list of :class:`.BindParameter` objects with a
850 per-statement list that's generated on each call.
851
852 """
853
854 _gen_time: float
855 """Generation time of this :class:`.Compiled`, used for reporting
856 cache stats."""
857
858 def __init__(
859 self,
860 dialect: Dialect,
861 statement: Optional[ClauseElement],
862 schema_translate_map: Optional[SchemaTranslateMapType] = None,
863 render_schema_translate: bool = False,
864 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
865 ):
866 """Construct a new :class:`.Compiled` object.
867
868 :param dialect: :class:`.Dialect` to compile against.
869
870 :param statement: :class:`_expression.ClauseElement` to be compiled.
871
872 :param schema_translate_map: dictionary of schema names to be
873 translated when forming the resultant SQL
874
875 .. seealso::
876
877 :ref:`schema_translating`
878
879 :param compile_kwargs: additional kwargs that will be
880 passed to the initial call to :meth:`.Compiled.process`.
881
882
883 """
884 self.dialect = dialect
885 self.preparer = self.dialect.identifier_preparer
886 if schema_translate_map:
887 self.schema_translate_map = schema_translate_map
888 self.preparer = self.preparer._with_schema_translate(
889 schema_translate_map
890 )
891
892 if statement is not None:
893 self.state = CompilerState.COMPILING
894 self.statement = statement
895 self.can_execute = statement.supports_execution
896 self._annotations = statement._annotations
897 if self.can_execute:
898 if TYPE_CHECKING:
899 assert isinstance(statement, Executable)
900 self.execution_options = statement._execution_options
901 self.string = self.process(self.statement, **compile_kwargs)
902
903 if render_schema_translate:
904 assert schema_translate_map is not None
905 self.string = self.preparer._render_schema_translates(
906 self.string, schema_translate_map
907 )
908
909 self.state = CompilerState.STRING_APPLIED
910 else:
911 self.state = CompilerState.NO_STATEMENT
912
913 self._gen_time = perf_counter()
914
915 def __init_subclass__(cls) -> None:
916 cls._init_compiler_cls()
917 return super().__init_subclass__()
918
919 @classmethod
920 def _init_compiler_cls(cls):
921 pass
922
923 def _execute_on_connection(
924 self, connection, distilled_params, execution_options
925 ):
926 if self.can_execute:
927 return connection._execute_compiled(
928 self, distilled_params, execution_options
929 )
930 else:
931 raise exc.ObjectNotExecutableError(self.statement)
932
933 def visit_unsupported_compilation(self, element, err, **kw):
934 raise exc.UnsupportedCompilationError(self, type(element)) from err
935
936 @property
937 def sql_compiler(self) -> SQLCompiler:
938 """Return a Compiled that is capable of processing SQL expressions.
939
940 If this compiler is one, it would likely just return 'self'.
941
942 """
943
944 raise NotImplementedError()
945
946 def process(self, obj: Visitable, **kwargs: Any) -> str:
947 return obj._compiler_dispatch(self, **kwargs)
948
949 def __str__(self) -> str:
950 """Return the string text of the generated SQL or DDL."""
951
952 if self.state is CompilerState.STRING_APPLIED:
953 return self.string
954 else:
955 return ""
956
957 def construct_params(
958 self,
959 params: Optional[_CoreSingleExecuteParams] = None,
960 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
961 escape_names: bool = True,
962 ) -> Optional[_MutableCoreSingleExecuteParams]:
963 """Return the bind params for this compiled object.
964
965 :param params: a dict of string/object pairs whose values will
966 override bind values compiled in to the
967 statement.
968 """
969
970 raise NotImplementedError()
971
972 @property
973 def params(self):
974 """Return the bind params for this compiled object."""
975 return self.construct_params()
976
977
978class TypeCompiler(util.EnsureKWArg):
979 """Produces DDL specification for TypeEngine objects."""
980
981 ensure_kwarg = r"visit_\w+"
982
983 def __init__(self, dialect: Dialect):
984 self.dialect = dialect
985
986 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
987 if (
988 type_._variant_mapping
989 and self.dialect.name in type_._variant_mapping
990 ):
991 type_ = type_._variant_mapping[self.dialect.name]
992 return type_._compiler_dispatch(self, **kw)
993
994 def visit_unsupported_compilation(
995 self, element: Any, err: Exception, **kw: Any
996 ) -> NoReturn:
997 raise exc.UnsupportedCompilationError(self, element) from err
998
999
1000# this was a Visitable, but to allow accurate detection of
1001# column elements this is actually a column element
1002class _CompileLabel(
1003 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1004):
1005 """lightweight label object which acts as an expression.Label."""
1006
1007 __visit_name__ = "label"
1008 __slots__ = "element", "name", "_alt_names"
1009
1010 def __init__(self, col, name, alt_names=()):
1011 self.element = col
1012 self.name = name
1013 self._alt_names = (col,) + alt_names
1014
1015 @property
1016 def proxy_set(self):
1017 return self.element.proxy_set
1018
1019 @property
1020 def type(self):
1021 return self.element.type
1022
1023 def self_group(self, **kw):
1024 return self
1025
1026
1027class ilike_case_insensitive(
1028 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1029):
1030 """produce a wrapping element for a case-insensitive portion of
1031 an ILIKE construct.
1032
1033 The construct usually renders the ``lower()`` function, but on
1034 PostgreSQL will pass silently with the assumption that "ILIKE"
1035 is being used.
1036
1037 .. versionadded:: 2.0
1038
1039 """
1040
1041 __visit_name__ = "ilike_case_insensitive_operand"
1042 __slots__ = "element", "comparator"
1043
1044 def __init__(self, element):
1045 self.element = element
1046 self.comparator = element.comparator
1047
1048 @property
1049 def proxy_set(self):
1050 return self.element.proxy_set
1051
1052 @property
1053 def type(self):
1054 return self.element.type
1055
1056 def self_group(self, **kw):
1057 return self
1058
1059 def _with_binary_element_type(self, type_):
1060 return ilike_case_insensitive(
1061 self.element._with_binary_element_type(type_)
1062 )
1063
1064
1065class SQLCompiler(Compiled):
1066 """Default implementation of :class:`.Compiled`.
1067
1068 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1069
1070 """
1071
1072 extract_map = EXTRACT_MAP
1073
1074 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1075 util.immutabledict(
1076 {
1077 "%": "P",
1078 "(": "A",
1079 ")": "Z",
1080 ":": "C",
1081 ".": "_",
1082 "[": "_",
1083 "]": "_",
1084 " ": "_",
1085 }
1086 )
1087 )
1088 """A mapping (e.g. dict or similar) containing a lookup of
1089 characters keyed to replacement characters which will be applied to all
1090 'bind names' used in SQL statements as a form of 'escaping'; the given
1091 characters are replaced entirely with the 'replacement' character when
1092 rendered in the SQL statement, and a similar translation is performed
1093 on the incoming names used in parameter dictionaries passed to methods
1094 like :meth:`_engine.Connection.execute`.
1095
1096 This allows bound parameter names used in :func:`_sql.bindparam` and
1097 other constructs to have any arbitrary characters present without any
1098 concern for characters that aren't allowed at all on the target database.
1099
1100 Third party dialects can establish their own dictionary here to replace the
1101 default mapping, which will ensure that the particular characters in the
1102 mapping will never appear in a bound parameter name.
1103
1104 The dictionary is evaluated at **class creation time**, so cannot be
1105 modified at runtime; it must be present on the class when the class
1106 is first declared.
1107
1108 Note that for dialects that have additional bound parameter rules such
1109 as additional restrictions on leading characters, the
1110 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1111 See the cx_Oracle compiler for an example of this.
1112
1113 .. versionadded:: 2.0.0rc1
1114
1115 """
1116
1117 _bind_translate_re: ClassVar[Pattern[str]]
1118 _bind_translate_chars: ClassVar[Mapping[str, str]]
1119
1120 is_sql = True
1121
1122 compound_keywords = COMPOUND_KEYWORDS
1123
1124 isdelete: bool = False
1125 isinsert: bool = False
1126 isupdate: bool = False
1127 """class-level defaults which can be set at the instance
1128 level to define if this Compiled instance represents
1129 INSERT/UPDATE/DELETE
1130 """
1131
1132 postfetch: Optional[List[Column[Any]]]
1133 """list of columns that can be post-fetched after INSERT or UPDATE to
1134 receive server-updated values"""
1135
1136 insert_prefetch: Sequence[Column[Any]] = ()
1137 """list of columns for which default values should be evaluated before
1138 an INSERT takes place"""
1139
1140 update_prefetch: Sequence[Column[Any]] = ()
1141 """list of columns for which onupdate default values should be evaluated
1142 before an UPDATE takes place"""
1143
1144 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1145 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1146 statement, used to receive newly generated values of columns.
1147
1148 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1149 ``returning`` collection, which was not a generalized RETURNING
1150 collection and instead was in fact specific to the "implicit returning"
1151 feature.
1152
1153 """
1154
1155 isplaintext: bool = False
1156
1157 binds: Dict[str, BindParameter[Any]]
1158 """a dictionary of bind parameter keys to BindParameter instances."""
1159
1160 bind_names: Dict[BindParameter[Any], str]
1161 """a dictionary of BindParameter instances to "compiled" names
1162 that are actually present in the generated SQL"""
1163
1164 stack: List[_CompilerStackEntry]
1165 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1166 tracked in this stack using an entry format."""
1167
1168 returning_precedes_values: bool = False
1169 """set to True classwide to generate RETURNING
1170 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1171 """
1172
1173 render_table_with_column_in_update_from: bool = False
1174 """set to True classwide to indicate the SET clause
1175 in a multi-table UPDATE statement should qualify
1176 columns with the table name (i.e. MySQL only)
1177 """
1178
1179 ansi_bind_rules: bool = False
1180 """SQL 92 doesn't allow bind parameters to be used
1181 in the columns clause of a SELECT, nor does it allow
1182 ambiguous expressions like "? = ?". A compiler
1183 subclass can set this flag to False if the target
1184 driver/DB enforces this
1185 """
1186
1187 bindtemplate: str
1188 """template to render bound parameters based on paramstyle."""
1189
1190 compilation_bindtemplate: str
1191 """template used by compiler to render parameters before positional
1192 paramstyle application"""
1193
1194 _numeric_binds_identifier_char: str
1195 """Character that's used to as the identifier of a numerical bind param.
1196 For example if this char is set to ``$``, numerical binds will be rendered
1197 in the form ``$1, $2, $3``.
1198 """
1199
1200 _result_columns: List[ResultColumnsEntry]
1201 """relates label names in the final SQL to a tuple of local
1202 column/label name, ColumnElement object (if any) and
1203 TypeEngine. CursorResult uses this for type processing and
1204 column targeting"""
1205
1206 _textual_ordered_columns: bool = False
1207 """tell the result object that the column names as rendered are important,
1208 but they are also "ordered" vs. what is in the compiled object here.
1209
1210 As of 1.4.42 this condition is only present when the statement is a
1211 TextualSelect, e.g. text("....").columns(...), where it is required
1212 that the columns are considered positionally and not by name.
1213
1214 """
1215
1216 _ad_hoc_textual: bool = False
1217 """tell the result that we encountered text() or '*' constructs in the
1218 middle of the result columns, but we also have compiled columns, so
1219 if the number of columns in cursor.description does not match how many
1220 expressions we have, that means we can't rely on positional at all and
1221 should match on name.
1222
1223 """
1224
1225 _ordered_columns: bool = True
1226 """
1227 if False, means we can't be sure the list of entries
1228 in _result_columns is actually the rendered order. Usually
1229 True unless using an unordered TextualSelect.
1230 """
1231
1232 _loose_column_name_matching: bool = False
1233 """tell the result object that the SQL statement is textual, wants to match
1234 up to Column objects, and may be using the ._tq_label in the SELECT rather
1235 than the base name.
1236
1237 """
1238
1239 _numeric_binds: bool = False
1240 """
1241 True if paramstyle is "numeric". This paramstyle is trickier than
1242 all the others.
1243
1244 """
1245
1246 _render_postcompile: bool = False
1247 """
1248 whether to render out POSTCOMPILE params during the compile phase.
1249
1250 This attribute is used only for end-user invocation of stmt.compile();
1251 it's never used for actual statement execution, where instead the
1252 dialect internals access and render the internal postcompile structure
1253 directly.
1254
1255 """
1256
1257 _post_compile_expanded_state: Optional[ExpandedState] = None
1258 """When render_postcompile is used, the ``ExpandedState`` used to create
1259 the "expanded" SQL is assigned here, and then used by the ``.params``
1260 accessor and ``.construct_params()`` methods for their return values.
1261
1262 .. versionadded:: 2.0.0rc1
1263
1264 """
1265
1266 _pre_expanded_string: Optional[str] = None
1267 """Stores the original string SQL before 'post_compile' is applied,
1268 for cases where 'post_compile' were used.
1269
1270 """
1271
1272 _pre_expanded_positiontup: Optional[List[str]] = None
1273
1274 _insertmanyvalues: Optional[_InsertManyValues] = None
1275
1276 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1277
1278 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1279 """bindparameter objects that are rendered as literal values at statement
1280 execution time.
1281
1282 """
1283
1284 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1285 """bindparameter objects that are rendered as bound parameter placeholders
1286 at statement execution time.
1287
1288 """
1289
1290 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1291 """Late escaping of bound parameter names that has to be converted
1292 to the original name when looking in the parameter dictionary.
1293
1294 """
1295
1296 has_out_parameters = False
1297 """if True, there are bindparam() objects that have the isoutparam
1298 flag set."""
1299
1300 postfetch_lastrowid = False
1301 """if True, and this in insert, use cursor.lastrowid to populate
1302 result.inserted_primary_key. """
1303
1304 _cache_key_bind_match: Optional[
1305 Tuple[
1306 Dict[
1307 BindParameter[Any],
1308 List[BindParameter[Any]],
1309 ],
1310 Dict[
1311 str,
1312 BindParameter[Any],
1313 ],
1314 ]
1315 ] = None
1316 """a mapping that will relate the BindParameter object we compile
1317 to those that are part of the extracted collection of parameters
1318 in the cache key, if we were given a cache key.
1319
1320 """
1321
1322 positiontup: Optional[List[str]] = None
1323 """for a compiled construct that uses a positional paramstyle, will be
1324 a sequence of strings, indicating the names of bound parameters in order.
1325
1326 This is used in order to render bound parameters in their correct order,
1327 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1328 render parameters.
1329
1330 This sequence always contains the unescaped name of the parameters.
1331
1332 .. seealso::
1333
1334 :ref:`faq_sql_expression_string` - includes a usage example for
1335 debugging use cases.
1336
1337 """
1338 _values_bindparam: Optional[List[str]] = None
1339
1340 _visited_bindparam: Optional[List[str]] = None
1341
1342 inline: bool = False
1343
1344 ctes: Optional[MutableMapping[CTE, str]]
1345
1346 # Detect same CTE references - Dict[(level, name), cte]
1347 # Level is required for supporting nesting
1348 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1349
1350 # To retrieve key/level in ctes_by_level_name -
1351 # Dict[cte_reference, (level, cte_name, cte_opts)]
1352 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1353
1354 ctes_recursive: bool
1355
1356 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1357 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1358 _positional_pattern = re.compile(
1359 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1360 )
1361
1362 @classmethod
1363 def _init_compiler_cls(cls):
1364 cls._init_bind_translate()
1365
1366 @classmethod
1367 def _init_bind_translate(cls):
1368 reg = re.escape("".join(cls.bindname_escape_characters))
1369 cls._bind_translate_re = re.compile(f"[{reg}]")
1370 cls._bind_translate_chars = cls.bindname_escape_characters
1371
1372 def __init__(
1373 self,
1374 dialect: Dialect,
1375 statement: Optional[ClauseElement],
1376 cache_key: Optional[CacheKey] = None,
1377 column_keys: Optional[Sequence[str]] = None,
1378 for_executemany: bool = False,
1379 linting: Linting = NO_LINTING,
1380 _supporting_against: Optional[SQLCompiler] = None,
1381 **kwargs: Any,
1382 ):
1383 """Construct a new :class:`.SQLCompiler` object.
1384
1385 :param dialect: :class:`.Dialect` to be used
1386
1387 :param statement: :class:`_expression.ClauseElement` to be compiled
1388
1389 :param column_keys: a list of column names to be compiled into an
1390 INSERT or UPDATE statement.
1391
1392 :param for_executemany: whether INSERT / UPDATE statements should
1393 expect that they are to be invoked in an "executemany" style,
1394 which may impact how the statement will be expected to return the
1395 values of defaults and autoincrement / sequences and similar.
1396 Depending on the backend and driver in use, support for retrieving
1397 these values may be disabled which means SQL expressions may
1398 be rendered inline, RETURNING may not be rendered, etc.
1399
1400 :param kwargs: additional keyword arguments to be consumed by the
1401 superclass.
1402
1403 """
1404 self.column_keys = column_keys
1405
1406 self.cache_key = cache_key
1407
1408 if cache_key:
1409 cksm = {b.key: b for b in cache_key[1]}
1410 ckbm = {b: [b] for b in cache_key[1]}
1411 self._cache_key_bind_match = (ckbm, cksm)
1412
1413 # compile INSERT/UPDATE defaults/sequences to expect executemany
1414 # style execution, which may mean no pre-execute of defaults,
1415 # or no RETURNING
1416 self.for_executemany = for_executemany
1417
1418 self.linting = linting
1419
1420 # a dictionary of bind parameter keys to BindParameter
1421 # instances.
1422 self.binds = {}
1423
1424 # a dictionary of BindParameter instances to "compiled" names
1425 # that are actually present in the generated SQL
1426 self.bind_names = util.column_dict()
1427
1428 # stack which keeps track of nested SELECT statements
1429 self.stack = []
1430
1431 self._result_columns = []
1432
1433 # true if the paramstyle is positional
1434 self.positional = dialect.positional
1435 if self.positional:
1436 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1437 if nb:
1438 self._numeric_binds_identifier_char = (
1439 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1440 )
1441
1442 self.compilation_bindtemplate = _pyformat_template
1443 else:
1444 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1445
1446 self.ctes = None
1447
1448 self.label_length = (
1449 dialect.label_length or dialect.max_identifier_length
1450 )
1451
1452 # a map which tracks "anonymous" identifiers that are created on
1453 # the fly here
1454 self.anon_map = prefix_anon_map()
1455
1456 # a map which tracks "truncated" names based on
1457 # dialect.label_length or dialect.max_identifier_length
1458 self.truncated_names: Dict[Tuple[str, str], str] = {}
1459 self._truncated_counters: Dict[str, int] = {}
1460
1461 Compiled.__init__(self, dialect, statement, **kwargs)
1462
1463 if self.isinsert or self.isupdate or self.isdelete:
1464 if TYPE_CHECKING:
1465 assert isinstance(statement, UpdateBase)
1466
1467 if self.isinsert or self.isupdate:
1468 if TYPE_CHECKING:
1469 assert isinstance(statement, ValuesBase)
1470 if statement._inline:
1471 self.inline = True
1472 elif self.for_executemany and (
1473 not self.isinsert
1474 or (
1475 self.dialect.insert_executemany_returning
1476 and statement._return_defaults
1477 )
1478 ):
1479 self.inline = True
1480
1481 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1482
1483 if _supporting_against:
1484 self.__dict__.update(
1485 {
1486 k: v
1487 for k, v in _supporting_against.__dict__.items()
1488 if k
1489 not in {
1490 "state",
1491 "dialect",
1492 "preparer",
1493 "positional",
1494 "_numeric_binds",
1495 "compilation_bindtemplate",
1496 "bindtemplate",
1497 }
1498 }
1499 )
1500
1501 if self.state is CompilerState.STRING_APPLIED:
1502 if self.positional:
1503 if self._numeric_binds:
1504 self._process_numeric()
1505 else:
1506 self._process_positional()
1507
1508 if self._render_postcompile:
1509 parameters = self.construct_params(
1510 escape_names=False,
1511 _no_postcompile=True,
1512 )
1513
1514 self._process_parameters_for_postcompile(
1515 parameters, _populate_self=True
1516 )
1517
1518 @property
1519 def insert_single_values_expr(self) -> Optional[str]:
1520 """When an INSERT is compiled with a single set of parameters inside
1521 a VALUES expression, the string is assigned here, where it can be
1522 used for insert batching schemes to rewrite the VALUES expression.
1523
1524 .. versionadded:: 1.3.8
1525
1526 .. versionchanged:: 2.0 This collection is no longer used by
1527 SQLAlchemy's built-in dialects, in favor of the currently
1528 internal ``_insertmanyvalues`` collection that is used only by
1529 :class:`.SQLCompiler`.
1530
1531 """
1532 if self._insertmanyvalues is None:
1533 return None
1534 else:
1535 return self._insertmanyvalues.single_values_expr
1536
1537 @util.ro_memoized_property
1538 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1539 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1540
1541 This is either the so-called "implicit returning" columns which are
1542 calculated by the compiler on the fly, or those present based on what's
1543 present in ``self.statement._returning`` (expanded into individual
1544 columns using the ``._all_selected_columns`` attribute) i.e. those set
1545 explicitly using the :meth:`.UpdateBase.returning` method.
1546
1547 .. versionadded:: 2.0
1548
1549 """
1550 if self.implicit_returning:
1551 return self.implicit_returning
1552 elif self.statement is not None and is_dml(self.statement):
1553 return [
1554 c
1555 for c in self.statement._all_selected_columns
1556 if is_column_element(c)
1557 ]
1558
1559 else:
1560 return None
1561
1562 @property
1563 def returning(self):
1564 """backwards compatibility; returns the
1565 effective_returning collection.
1566
1567 """
1568 return self.effective_returning
1569
1570 @property
1571 def current_executable(self):
1572 """Return the current 'executable' that is being compiled.
1573
1574 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1575 :class:`_sql.Update`, :class:`_sql.Delete`,
1576 :class:`_sql.CompoundSelect` object that is being compiled.
1577 Specifically it's assigned to the ``self.stack`` list of elements.
1578
1579 When a statement like the above is being compiled, it normally
1580 is also assigned to the ``.statement`` attribute of the
1581 :class:`_sql.Compiler` object. However, all SQL constructs are
1582 ultimately nestable, and this attribute should never be consulted
1583 by a ``visit_`` method, as it is not guaranteed to be assigned
1584 nor guaranteed to correspond to the current statement being compiled.
1585
1586 .. versionadded:: 1.3.21
1587
1588 For compatibility with previous versions, use the following
1589 recipe::
1590
1591 statement = getattr(self, "current_executable", False)
1592 if statement is False:
1593 statement = self.stack[-1]["selectable"]
1594
1595 For versions 1.4 and above, ensure only .current_executable
1596 is used; the format of "self.stack" may change.
1597
1598
1599 """
1600 try:
1601 return self.stack[-1]["selectable"]
1602 except IndexError as ie:
1603 raise IndexError("Compiler does not have a stack entry") from ie
1604
1605 @property
1606 def prefetch(self):
1607 return list(self.insert_prefetch) + list(self.update_prefetch)
1608
1609 @util.memoized_property
1610 def _global_attributes(self) -> Dict[Any, Any]:
1611 return {}
1612
1613 @util.memoized_instancemethod
1614 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1615 """Initialize collections related to CTEs only if
1616 a CTE is located, to save on the overhead of
1617 these collections otherwise.
1618
1619 """
1620 # collect CTEs to tack on top of a SELECT
1621 # To store the query to print - Dict[cte, text_query]
1622 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1623 self.ctes = ctes
1624
1625 # Detect same CTE references - Dict[(level, name), cte]
1626 # Level is required for supporting nesting
1627 self.ctes_by_level_name = {}
1628
1629 # To retrieve key/level in ctes_by_level_name -
1630 # Dict[cte_reference, (level, cte_name, cte_opts)]
1631 self.level_name_by_cte = {}
1632
1633 self.ctes_recursive = False
1634
1635 return ctes
1636
1637 @contextlib.contextmanager
1638 def _nested_result(self):
1639 """special API to support the use case of 'nested result sets'"""
1640 result_columns, ordered_columns = (
1641 self._result_columns,
1642 self._ordered_columns,
1643 )
1644 self._result_columns, self._ordered_columns = [], False
1645
1646 try:
1647 if self.stack:
1648 entry = self.stack[-1]
1649 entry["need_result_map_for_nested"] = True
1650 else:
1651 entry = None
1652 yield self._result_columns, self._ordered_columns
1653 finally:
1654 if entry:
1655 entry.pop("need_result_map_for_nested")
1656 self._result_columns, self._ordered_columns = (
1657 result_columns,
1658 ordered_columns,
1659 )
1660
1661 def _process_positional(self):
1662 assert not self.positiontup
1663 assert self.state is CompilerState.STRING_APPLIED
1664 assert not self._numeric_binds
1665
1666 if self.dialect.paramstyle == "format":
1667 placeholder = "%s"
1668 else:
1669 assert self.dialect.paramstyle == "qmark"
1670 placeholder = "?"
1671
1672 positions = []
1673
1674 def find_position(m: re.Match[str]) -> str:
1675 normal_bind = m.group(1)
1676 if normal_bind:
1677 positions.append(normal_bind)
1678 return placeholder
1679 else:
1680 # this a post-compile bind
1681 positions.append(m.group(2))
1682 return m.group(0)
1683
1684 self.string = re.sub(
1685 self._positional_pattern, find_position, self.string
1686 )
1687
1688 if self.escaped_bind_names:
1689 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1690 assert len(self.escaped_bind_names) == len(reverse_escape)
1691 self.positiontup = [
1692 reverse_escape.get(name, name) for name in positions
1693 ]
1694 else:
1695 self.positiontup = positions
1696
1697 if self._insertmanyvalues:
1698 positions = []
1699
1700 single_values_expr = re.sub(
1701 self._positional_pattern,
1702 find_position,
1703 self._insertmanyvalues.single_values_expr,
1704 )
1705 insert_crud_params = [
1706 (
1707 v[0],
1708 v[1],
1709 re.sub(self._positional_pattern, find_position, v[2]),
1710 v[3],
1711 )
1712 for v in self._insertmanyvalues.insert_crud_params
1713 ]
1714
1715 self._insertmanyvalues = self._insertmanyvalues._replace(
1716 single_values_expr=single_values_expr,
1717 insert_crud_params=insert_crud_params,
1718 )
1719
1720 def _process_numeric(self):
1721 assert self._numeric_binds
1722 assert self.state is CompilerState.STRING_APPLIED
1723
1724 num = 1
1725 param_pos: Dict[str, str] = {}
1726 order: Iterable[str]
1727 if self._insertmanyvalues and self._values_bindparam is not None:
1728 # bindparams that are not in values are always placed first.
1729 # this avoids the need of changing them when using executemany
1730 # values () ()
1731 order = itertools.chain(
1732 (
1733 name
1734 for name in self.bind_names.values()
1735 if name not in self._values_bindparam
1736 ),
1737 self.bind_names.values(),
1738 )
1739 else:
1740 order = self.bind_names.values()
1741
1742 for bind_name in order:
1743 if bind_name in param_pos:
1744 continue
1745 bind = self.binds[bind_name]
1746 if (
1747 bind in self.post_compile_params
1748 or bind in self.literal_execute_params
1749 ):
1750 # set to None to just mark the in positiontup, it will not
1751 # be replaced below.
1752 param_pos[bind_name] = None # type: ignore
1753 else:
1754 ph = f"{self._numeric_binds_identifier_char}{num}"
1755 num += 1
1756 param_pos[bind_name] = ph
1757
1758 self.next_numeric_pos = num
1759
1760 self.positiontup = list(param_pos)
1761 if self.escaped_bind_names:
1762 len_before = len(param_pos)
1763 param_pos = {
1764 self.escaped_bind_names.get(name, name): pos
1765 for name, pos in param_pos.items()
1766 }
1767 assert len(param_pos) == len_before
1768
1769 # Can't use format here since % chars are not escaped.
1770 self.string = self._pyformat_pattern.sub(
1771 lambda m: param_pos[m.group(1)], self.string
1772 )
1773
1774 if self._insertmanyvalues:
1775 single_values_expr = (
1776 # format is ok here since single_values_expr includes only
1777 # place-holders
1778 self._insertmanyvalues.single_values_expr
1779 % param_pos
1780 )
1781 insert_crud_params = [
1782 (v[0], v[1], "%s", v[3])
1783 for v in self._insertmanyvalues.insert_crud_params
1784 ]
1785
1786 self._insertmanyvalues = self._insertmanyvalues._replace(
1787 # This has the numbers (:1, :2)
1788 single_values_expr=single_values_expr,
1789 # The single binds are instead %s so they can be formatted
1790 insert_crud_params=insert_crud_params,
1791 )
1792
1793 @util.memoized_property
1794 def _bind_processors(
1795 self,
1796 ) -> MutableMapping[
1797 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1798 ]:
1799 # mypy is not able to see the two value types as the above Union,
1800 # it just sees "object". don't know how to resolve
1801 return {
1802 key: value # type: ignore
1803 for key, value in (
1804 (
1805 self.bind_names[bindparam],
1806 (
1807 bindparam.type._cached_bind_processor(self.dialect)
1808 if not bindparam.type._is_tuple_type
1809 else tuple(
1810 elem_type._cached_bind_processor(self.dialect)
1811 for elem_type in cast(
1812 TupleType, bindparam.type
1813 ).types
1814 )
1815 ),
1816 )
1817 for bindparam in self.bind_names
1818 )
1819 if value is not None
1820 }
1821
1822 def is_subquery(self):
1823 return len(self.stack) > 1
1824
1825 @property
1826 def sql_compiler(self) -> Self:
1827 return self
1828
1829 def construct_expanded_state(
1830 self,
1831 params: Optional[_CoreSingleExecuteParams] = None,
1832 escape_names: bool = True,
1833 ) -> ExpandedState:
1834 """Return a new :class:`.ExpandedState` for a given parameter set.
1835
1836 For queries that use "expanding" or other late-rendered parameters,
1837 this method will provide for both the finalized SQL string as well
1838 as the parameters that would be used for a particular parameter set.
1839
1840 .. versionadded:: 2.0.0rc1
1841
1842 """
1843 parameters = self.construct_params(
1844 params,
1845 escape_names=escape_names,
1846 _no_postcompile=True,
1847 )
1848 return self._process_parameters_for_postcompile(
1849 parameters,
1850 )
1851
1852 def construct_params(
1853 self,
1854 params: Optional[_CoreSingleExecuteParams] = None,
1855 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1856 escape_names: bool = True,
1857 _group_number: Optional[int] = None,
1858 _check: bool = True,
1859 _no_postcompile: bool = False,
1860 ) -> _MutableCoreSingleExecuteParams:
1861 """return a dictionary of bind parameter keys and values"""
1862
1863 if self._render_postcompile and not _no_postcompile:
1864 assert self._post_compile_expanded_state is not None
1865 if not params:
1866 return dict(self._post_compile_expanded_state.parameters)
1867 else:
1868 raise exc.InvalidRequestError(
1869 "can't construct new parameters when render_postcompile "
1870 "is used; the statement is hard-linked to the original "
1871 "parameters. Use construct_expanded_state to generate a "
1872 "new statement and parameters."
1873 )
1874
1875 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1876
1877 if extracted_parameters:
1878 # related the bound parameters collected in the original cache key
1879 # to those collected in the incoming cache key. They will not have
1880 # matching names but they will line up positionally in the same
1881 # way. The parameters present in self.bind_names may be clones of
1882 # these original cache key params in the case of DML but the .key
1883 # will be guaranteed to match.
1884 if self.cache_key is None:
1885 raise exc.CompileError(
1886 "This compiled object has no original cache key; "
1887 "can't pass extracted_parameters to construct_params"
1888 )
1889 else:
1890 orig_extracted = self.cache_key[1]
1891
1892 ckbm_tuple = self._cache_key_bind_match
1893 assert ckbm_tuple is not None
1894 ckbm, _ = ckbm_tuple
1895 resolved_extracted = {
1896 bind: extracted
1897 for b, extracted in zip(orig_extracted, extracted_parameters)
1898 for bind in ckbm[b]
1899 }
1900 else:
1901 resolved_extracted = None
1902
1903 if params:
1904 pd = {}
1905 for bindparam, name in self.bind_names.items():
1906 escaped_name = (
1907 self.escaped_bind_names.get(name, name)
1908 if has_escaped_names
1909 else name
1910 )
1911
1912 if bindparam.key in params:
1913 pd[escaped_name] = params[bindparam.key]
1914 elif name in params:
1915 pd[escaped_name] = params[name]
1916
1917 elif _check and bindparam.required:
1918 if _group_number:
1919 raise exc.InvalidRequestError(
1920 "A value is required for bind parameter %r, "
1921 "in parameter group %d"
1922 % (bindparam.key, _group_number),
1923 code="cd3x",
1924 )
1925 else:
1926 raise exc.InvalidRequestError(
1927 "A value is required for bind parameter %r"
1928 % bindparam.key,
1929 code="cd3x",
1930 )
1931 else:
1932 if resolved_extracted:
1933 value_param = resolved_extracted.get(
1934 bindparam, bindparam
1935 )
1936 else:
1937 value_param = bindparam
1938
1939 if bindparam.callable:
1940 pd[escaped_name] = value_param.effective_value
1941 else:
1942 pd[escaped_name] = value_param.value
1943 return pd
1944 else:
1945 pd = {}
1946 for bindparam, name in self.bind_names.items():
1947 escaped_name = (
1948 self.escaped_bind_names.get(name, name)
1949 if has_escaped_names
1950 else name
1951 )
1952
1953 if _check and bindparam.required:
1954 if _group_number:
1955 raise exc.InvalidRequestError(
1956 "A value is required for bind parameter %r, "
1957 "in parameter group %d"
1958 % (bindparam.key, _group_number),
1959 code="cd3x",
1960 )
1961 else:
1962 raise exc.InvalidRequestError(
1963 "A value is required for bind parameter %r"
1964 % bindparam.key,
1965 code="cd3x",
1966 )
1967
1968 if resolved_extracted:
1969 value_param = resolved_extracted.get(bindparam, bindparam)
1970 else:
1971 value_param = bindparam
1972
1973 if bindparam.callable:
1974 pd[escaped_name] = value_param.effective_value
1975 else:
1976 pd[escaped_name] = value_param.value
1977
1978 return pd
1979
1980 @util.memoized_instancemethod
1981 def _get_set_input_sizes_lookup(self):
1982 dialect = self.dialect
1983
1984 include_types = dialect.include_set_input_sizes
1985 exclude_types = dialect.exclude_set_input_sizes
1986
1987 dbapi = dialect.dbapi
1988
1989 def lookup_type(typ):
1990 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
1991
1992 if (
1993 dbtype is not None
1994 and (exclude_types is None or dbtype not in exclude_types)
1995 and (include_types is None or dbtype in include_types)
1996 ):
1997 return dbtype
1998 else:
1999 return None
2000
2001 inputsizes = {}
2002
2003 literal_execute_params = self.literal_execute_params
2004
2005 for bindparam in self.bind_names:
2006 if bindparam in literal_execute_params:
2007 continue
2008
2009 if bindparam.type._is_tuple_type:
2010 inputsizes[bindparam] = [
2011 lookup_type(typ)
2012 for typ in cast(TupleType, bindparam.type).types
2013 ]
2014 else:
2015 inputsizes[bindparam] = lookup_type(bindparam.type)
2016
2017 return inputsizes
2018
2019 @property
2020 def params(self):
2021 """Return the bind param dictionary embedded into this
2022 compiled object, for those values that are present.
2023
2024 .. seealso::
2025
2026 :ref:`faq_sql_expression_string` - includes a usage example for
2027 debugging use cases.
2028
2029 """
2030 return self.construct_params(_check=False)
2031
2032 def _process_parameters_for_postcompile(
2033 self,
2034 parameters: _MutableCoreSingleExecuteParams,
2035 _populate_self: bool = False,
2036 ) -> ExpandedState:
2037 """handle special post compile parameters.
2038
2039 These include:
2040
2041 * "expanding" parameters -typically IN tuples that are rendered
2042 on a per-parameter basis for an otherwise fixed SQL statement string.
2043
2044 * literal_binds compiled with the literal_execute flag. Used for
2045 things like SQL Server "TOP N" where the driver does not accommodate
2046 N as a bound parameter.
2047
2048 """
2049
2050 expanded_parameters = {}
2051 new_positiontup: Optional[List[str]]
2052
2053 pre_expanded_string = self._pre_expanded_string
2054 if pre_expanded_string is None:
2055 pre_expanded_string = self.string
2056
2057 if self.positional:
2058 new_positiontup = []
2059
2060 pre_expanded_positiontup = self._pre_expanded_positiontup
2061 if pre_expanded_positiontup is None:
2062 pre_expanded_positiontup = self.positiontup
2063
2064 else:
2065 new_positiontup = pre_expanded_positiontup = None
2066
2067 processors = self._bind_processors
2068 single_processors = cast(
2069 "Mapping[str, _BindProcessorType[Any]]", processors
2070 )
2071 tuple_processors = cast(
2072 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2073 )
2074
2075 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2076
2077 replacement_expressions: Dict[str, Any] = {}
2078 to_update_sets: Dict[str, Any] = {}
2079
2080 # notes:
2081 # *unescaped* parameter names in:
2082 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2083 #
2084 # *escaped* parameter names in:
2085 # construct_params(), replacement_expressions
2086
2087 numeric_positiontup: Optional[List[str]] = None
2088
2089 if self.positional and pre_expanded_positiontup is not None:
2090 names: Iterable[str] = pre_expanded_positiontup
2091 if self._numeric_binds:
2092 numeric_positiontup = []
2093 else:
2094 names = self.bind_names.values()
2095
2096 ebn = self.escaped_bind_names
2097 for name in names:
2098 escaped_name = ebn.get(name, name) if ebn else name
2099 parameter = self.binds[name]
2100
2101 if parameter in self.literal_execute_params:
2102 if escaped_name not in replacement_expressions:
2103 replacement_expressions[escaped_name] = (
2104 self.render_literal_bindparam(
2105 parameter,
2106 render_literal_value=parameters.pop(escaped_name),
2107 )
2108 )
2109 continue
2110
2111 if parameter in self.post_compile_params:
2112 if escaped_name in replacement_expressions:
2113 to_update = to_update_sets[escaped_name]
2114 values = None
2115 else:
2116 # we are removing the parameter from parameters
2117 # because it is a list value, which is not expected by
2118 # TypeEngine objects that would otherwise be asked to
2119 # process it. the single name is being replaced with
2120 # individual numbered parameters for each value in the
2121 # param.
2122 #
2123 # note we are also inserting *escaped* parameter names
2124 # into the given dictionary. default dialect will
2125 # use these param names directly as they will not be
2126 # in the escaped_bind_names dictionary.
2127 values = parameters.pop(name)
2128
2129 leep_res = self._literal_execute_expanding_parameter(
2130 escaped_name, parameter, values
2131 )
2132 (to_update, replacement_expr) = leep_res
2133
2134 to_update_sets[escaped_name] = to_update
2135 replacement_expressions[escaped_name] = replacement_expr
2136
2137 if not parameter.literal_execute:
2138 parameters.update(to_update)
2139 if parameter.type._is_tuple_type:
2140 assert values is not None
2141 new_processors.update(
2142 (
2143 "%s_%s_%s" % (name, i, j),
2144 tuple_processors[name][j - 1],
2145 )
2146 for i, tuple_element in enumerate(values, 1)
2147 for j, _ in enumerate(tuple_element, 1)
2148 if name in tuple_processors
2149 and tuple_processors[name][j - 1] is not None
2150 )
2151 else:
2152 new_processors.update(
2153 (key, single_processors[name])
2154 for key, _ in to_update
2155 if name in single_processors
2156 )
2157 if numeric_positiontup is not None:
2158 numeric_positiontup.extend(
2159 name for name, _ in to_update
2160 )
2161 elif new_positiontup is not None:
2162 # to_update has escaped names, but that's ok since
2163 # these are new names, that aren't in the
2164 # escaped_bind_names dict.
2165 new_positiontup.extend(name for name, _ in to_update)
2166 expanded_parameters[name] = [
2167 expand_key for expand_key, _ in to_update
2168 ]
2169 elif new_positiontup is not None:
2170 new_positiontup.append(name)
2171
2172 def process_expanding(m):
2173 key = m.group(1)
2174 expr = replacement_expressions[key]
2175
2176 # if POSTCOMPILE included a bind_expression, render that
2177 # around each element
2178 if m.group(2):
2179 tok = m.group(2).split("~~")
2180 be_left, be_right = tok[1], tok[3]
2181 expr = ", ".join(
2182 "%s%s%s" % (be_left, exp, be_right)
2183 for exp in expr.split(", ")
2184 )
2185 return expr
2186
2187 statement = re.sub(
2188 self._post_compile_pattern, process_expanding, pre_expanded_string
2189 )
2190
2191 if numeric_positiontup is not None:
2192 assert new_positiontup is not None
2193 param_pos = {
2194 key: f"{self._numeric_binds_identifier_char}{num}"
2195 for num, key in enumerate(
2196 numeric_positiontup, self.next_numeric_pos
2197 )
2198 }
2199 # Can't use format here since % chars are not escaped.
2200 statement = self._pyformat_pattern.sub(
2201 lambda m: param_pos[m.group(1)], statement
2202 )
2203 new_positiontup.extend(numeric_positiontup)
2204
2205 expanded_state = ExpandedState(
2206 statement,
2207 parameters,
2208 new_processors,
2209 new_positiontup,
2210 expanded_parameters,
2211 )
2212
2213 if _populate_self:
2214 # this is for the "render_postcompile" flag, which is not
2215 # otherwise used internally and is for end-user debugging and
2216 # special use cases.
2217 self._pre_expanded_string = pre_expanded_string
2218 self._pre_expanded_positiontup = pre_expanded_positiontup
2219 self.string = expanded_state.statement
2220 self.positiontup = (
2221 list(expanded_state.positiontup or ())
2222 if self.positional
2223 else None
2224 )
2225 self._post_compile_expanded_state = expanded_state
2226
2227 return expanded_state
2228
2229 @util.preload_module("sqlalchemy.engine.cursor")
2230 def _create_result_map(self):
2231 """utility method used for unit tests only."""
2232 cursor = util.preloaded.engine_cursor
2233 return cursor.CursorResultMetaData._create_description_match_map(
2234 self._result_columns
2235 )
2236
2237 # assigned by crud.py for insert/update statements
2238 _get_bind_name_for_col: _BindNameForColProtocol
2239
2240 @util.memoized_property
2241 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2242 getter = self._get_bind_name_for_col
2243 return getter
2244
2245 @util.memoized_property
2246 @util.preload_module("sqlalchemy.engine.result")
2247 def _inserted_primary_key_from_lastrowid_getter(self):
2248 result = util.preloaded.engine_result
2249
2250 param_key_getter = self._within_exec_param_key_getter
2251
2252 assert self.compile_state is not None
2253 statement = self.compile_state.statement
2254
2255 if TYPE_CHECKING:
2256 assert isinstance(statement, Insert)
2257
2258 table = statement.table
2259
2260 getters = [
2261 (operator.methodcaller("get", param_key_getter(col), None), col)
2262 for col in table.primary_key
2263 ]
2264
2265 autoinc_getter = None
2266 autoinc_col = table._autoincrement_column
2267 if autoinc_col is not None:
2268 # apply type post processors to the lastrowid
2269 lastrowid_processor = autoinc_col.type._cached_result_processor(
2270 self.dialect, None
2271 )
2272 autoinc_key = param_key_getter(autoinc_col)
2273
2274 # if a bind value is present for the autoincrement column
2275 # in the parameters, we need to do the logic dictated by
2276 # #7998; honor a non-None user-passed parameter over lastrowid.
2277 # previously in the 1.4 series we weren't fetching lastrowid
2278 # at all if the key were present in the parameters
2279 if autoinc_key in self.binds:
2280
2281 def _autoinc_getter(lastrowid, parameters):
2282 param_value = parameters.get(autoinc_key, lastrowid)
2283 if param_value is not None:
2284 # they supplied non-None parameter, use that.
2285 # SQLite at least is observed to return the wrong
2286 # cursor.lastrowid for INSERT..ON CONFLICT so it
2287 # can't be used in all cases
2288 return param_value
2289 else:
2290 # use lastrowid
2291 return lastrowid
2292
2293 # work around mypy https://github.com/python/mypy/issues/14027
2294 autoinc_getter = _autoinc_getter
2295
2296 else:
2297 lastrowid_processor = None
2298
2299 row_fn = result.result_tuple([col.key for col in table.primary_key])
2300
2301 def get(lastrowid, parameters):
2302 """given cursor.lastrowid value and the parameters used for INSERT,
2303 return a "row" that represents the primary key, either by
2304 using the "lastrowid" or by extracting values from the parameters
2305 that were sent along with the INSERT.
2306
2307 """
2308 if lastrowid_processor is not None:
2309 lastrowid = lastrowid_processor(lastrowid)
2310
2311 if lastrowid is None:
2312 return row_fn(getter(parameters) for getter, col in getters)
2313 else:
2314 return row_fn(
2315 (
2316 (
2317 autoinc_getter(lastrowid, parameters)
2318 if autoinc_getter is not None
2319 else lastrowid
2320 )
2321 if col is autoinc_col
2322 else getter(parameters)
2323 )
2324 for getter, col in getters
2325 )
2326
2327 return get
2328
2329 @util.memoized_property
2330 @util.preload_module("sqlalchemy.engine.result")
2331 def _inserted_primary_key_from_returning_getter(self):
2332 result = util.preloaded.engine_result
2333
2334 assert self.compile_state is not None
2335 statement = self.compile_state.statement
2336
2337 if TYPE_CHECKING:
2338 assert isinstance(statement, Insert)
2339
2340 param_key_getter = self._within_exec_param_key_getter
2341 table = statement.table
2342
2343 returning = self.implicit_returning
2344 assert returning is not None
2345 ret = {col: idx for idx, col in enumerate(returning)}
2346
2347 getters = cast(
2348 "List[Tuple[Callable[[Any], Any], bool]]",
2349 [
2350 (
2351 (operator.itemgetter(ret[col]), True)
2352 if col in ret
2353 else (
2354 operator.methodcaller(
2355 "get", param_key_getter(col), None
2356 ),
2357 False,
2358 )
2359 )
2360 for col in table.primary_key
2361 ],
2362 )
2363
2364 row_fn = result.result_tuple([col.key for col in table.primary_key])
2365
2366 def get(row, parameters):
2367 return row_fn(
2368 getter(row) if use_row else getter(parameters)
2369 for getter, use_row in getters
2370 )
2371
2372 return get
2373
2374 def default_from(self) -> str:
2375 """Called when a SELECT statement has no froms, and no FROM clause is
2376 to be appended.
2377
2378 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2379 output.
2380
2381 """
2382 return ""
2383
2384 def visit_override_binds(self, override_binds, **kw):
2385 """SQL compile the nested element of an _OverrideBinds with
2386 bindparams swapped out.
2387
2388 The _OverrideBinds is not normally expected to be compiled; it
2389 is meant to be used when an already cached statement is to be used,
2390 the compilation was already performed, and only the bound params should
2391 be swapped in at execution time.
2392
2393 However, there are test cases that exericise this object, and
2394 additionally the ORM subquery loader is known to feed in expressions
2395 which include this construct into new queries (discovered in #11173),
2396 so it has to do the right thing at compile time as well.
2397
2398 """
2399
2400 # get SQL text first
2401 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2402
2403 # for a test compile that is not for caching, change binds after the
2404 # fact. note that we don't try to
2405 # swap the bindparam as we compile, because our element may be
2406 # elsewhere in the statement already (e.g. a subquery or perhaps a
2407 # CTE) and was already visited / compiled. See
2408 # test_relationship_criteria.py ->
2409 # test_selectinload_local_criteria_subquery
2410 for k in override_binds.translate:
2411 if k not in self.binds:
2412 continue
2413 bp = self.binds[k]
2414
2415 # so this would work, just change the value of bp in place.
2416 # but we dont want to mutate things outside.
2417 # bp.value = override_binds.translate[bp.key]
2418 # continue
2419
2420 # instead, need to replace bp with new_bp or otherwise accommodate
2421 # in all internal collections
2422 new_bp = bp._with_value(
2423 override_binds.translate[bp.key],
2424 maintain_key=True,
2425 required=False,
2426 )
2427
2428 name = self.bind_names[bp]
2429 self.binds[k] = self.binds[name] = new_bp
2430 self.bind_names[new_bp] = name
2431 self.bind_names.pop(bp, None)
2432
2433 if bp in self.post_compile_params:
2434 self.post_compile_params |= {new_bp}
2435 if bp in self.literal_execute_params:
2436 self.literal_execute_params |= {new_bp}
2437
2438 ckbm_tuple = self._cache_key_bind_match
2439 if ckbm_tuple:
2440 ckbm, cksm = ckbm_tuple
2441 for bp in bp._cloned_set:
2442 if bp.key in cksm:
2443 cb = cksm[bp.key]
2444 ckbm[cb].append(new_bp)
2445
2446 return sqltext
2447
2448 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2449 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2450
2451 def visit_select_statement_grouping(self, grouping, **kwargs):
2452 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2453
2454 def visit_label_reference(
2455 self, element, within_columns_clause=False, **kwargs
2456 ):
2457 if self.stack and self.dialect.supports_simple_order_by_label:
2458 try:
2459 compile_state = cast(
2460 "Union[SelectState, CompoundSelectState]",
2461 self.stack[-1]["compile_state"],
2462 )
2463 except KeyError as ke:
2464 raise exc.CompileError(
2465 "Can't resolve label reference for ORDER BY / "
2466 "GROUP BY / DISTINCT etc."
2467 ) from ke
2468
2469 (
2470 with_cols,
2471 only_froms,
2472 only_cols,
2473 ) = compile_state._label_resolve_dict
2474 if within_columns_clause:
2475 resolve_dict = only_froms
2476 else:
2477 resolve_dict = only_cols
2478
2479 # this can be None in the case that a _label_reference()
2480 # were subject to a replacement operation, in which case
2481 # the replacement of the Label element may have changed
2482 # to something else like a ColumnClause expression.
2483 order_by_elem = element.element._order_by_label_element
2484
2485 if (
2486 order_by_elem is not None
2487 and order_by_elem.name in resolve_dict
2488 and order_by_elem.shares_lineage(
2489 resolve_dict[order_by_elem.name]
2490 )
2491 ):
2492 kwargs["render_label_as_label"] = (
2493 element.element._order_by_label_element
2494 )
2495 return self.process(
2496 element.element,
2497 within_columns_clause=within_columns_clause,
2498 **kwargs,
2499 )
2500
2501 def visit_textual_label_reference(
2502 self, element, within_columns_clause=False, **kwargs
2503 ):
2504 if not self.stack:
2505 # compiling the element outside of the context of a SELECT
2506 return self.process(element._text_clause)
2507
2508 try:
2509 compile_state = cast(
2510 "Union[SelectState, CompoundSelectState]",
2511 self.stack[-1]["compile_state"],
2512 )
2513 except KeyError as ke:
2514 coercions._no_text_coercion(
2515 element.element,
2516 extra=(
2517 "Can't resolve label reference for ORDER BY / "
2518 "GROUP BY / DISTINCT etc."
2519 ),
2520 exc_cls=exc.CompileError,
2521 err=ke,
2522 )
2523
2524 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2525 try:
2526 if within_columns_clause:
2527 col = only_froms[element.element]
2528 else:
2529 col = with_cols[element.element]
2530 except KeyError as err:
2531 coercions._no_text_coercion(
2532 element.element,
2533 extra=(
2534 "Can't resolve label reference for ORDER BY / "
2535 "GROUP BY / DISTINCT etc."
2536 ),
2537 exc_cls=exc.CompileError,
2538 err=err,
2539 )
2540 else:
2541 kwargs["render_label_as_label"] = col
2542 return self.process(
2543 col, within_columns_clause=within_columns_clause, **kwargs
2544 )
2545
2546 def visit_label(
2547 self,
2548 label,
2549 add_to_result_map=None,
2550 within_label_clause=False,
2551 within_columns_clause=False,
2552 render_label_as_label=None,
2553 result_map_targets=(),
2554 **kw,
2555 ):
2556 # only render labels within the columns clause
2557 # or ORDER BY clause of a select. dialect-specific compilers
2558 # can modify this behavior.
2559 render_label_with_as = (
2560 within_columns_clause and not within_label_clause
2561 )
2562 render_label_only = render_label_as_label is label
2563
2564 if render_label_only or render_label_with_as:
2565 if isinstance(label.name, elements._truncated_label):
2566 labelname = self._truncated_identifier("colident", label.name)
2567 else:
2568 labelname = label.name
2569
2570 if render_label_with_as:
2571 if add_to_result_map is not None:
2572 add_to_result_map(
2573 labelname,
2574 label.name,
2575 (label, labelname) + label._alt_names + result_map_targets,
2576 label.type,
2577 )
2578 return (
2579 label.element._compiler_dispatch(
2580 self,
2581 within_columns_clause=True,
2582 within_label_clause=True,
2583 **kw,
2584 )
2585 + OPERATORS[operators.as_]
2586 + self.preparer.format_label(label, labelname)
2587 )
2588 elif render_label_only:
2589 return self.preparer.format_label(label, labelname)
2590 else:
2591 return label.element._compiler_dispatch(
2592 self, within_columns_clause=False, **kw
2593 )
2594
2595 def _fallback_column_name(self, column):
2596 raise exc.CompileError(
2597 "Cannot compile Column object until its 'name' is assigned."
2598 )
2599
2600 def visit_lambda_element(self, element, **kw):
2601 sql_element = element._resolved
2602 return self.process(sql_element, **kw)
2603
2604 def visit_column(
2605 self,
2606 column: ColumnClause[Any],
2607 add_to_result_map: Optional[_ResultMapAppender] = None,
2608 include_table: bool = True,
2609 result_map_targets: Tuple[Any, ...] = (),
2610 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2611 **kwargs: Any,
2612 ) -> str:
2613 name = orig_name = column.name
2614 if name is None:
2615 name = self._fallback_column_name(column)
2616
2617 is_literal = column.is_literal
2618 if not is_literal and isinstance(name, elements._truncated_label):
2619 name = self._truncated_identifier("colident", name)
2620
2621 if add_to_result_map is not None:
2622 targets = (column, name, column.key) + result_map_targets
2623 if column._tq_label:
2624 targets += (column._tq_label,)
2625
2626 add_to_result_map(name, orig_name, targets, column.type)
2627
2628 if is_literal:
2629 # note we are not currently accommodating for
2630 # literal_column(quoted_name('ident', True)) here
2631 name = self.escape_literal_column(name)
2632 else:
2633 name = self.preparer.quote(name)
2634 table = column.table
2635 if table is None or not include_table or not table.named_with_column:
2636 return name
2637 else:
2638 effective_schema = self.preparer.schema_for_object(table)
2639
2640 if effective_schema:
2641 schema_prefix = (
2642 self.preparer.quote_schema(effective_schema) + "."
2643 )
2644 else:
2645 schema_prefix = ""
2646
2647 if TYPE_CHECKING:
2648 assert isinstance(table, NamedFromClause)
2649 tablename = table.name
2650
2651 if (
2652 not effective_schema
2653 and ambiguous_table_name_map
2654 and tablename in ambiguous_table_name_map
2655 ):
2656 tablename = ambiguous_table_name_map[tablename]
2657
2658 if isinstance(tablename, elements._truncated_label):
2659 tablename = self._truncated_identifier("alias", tablename)
2660
2661 return schema_prefix + self.preparer.quote(tablename) + "." + name
2662
2663 def visit_collation(self, element, **kw):
2664 return self.preparer.format_collation(element.collation)
2665
2666 def visit_fromclause(self, fromclause, **kwargs):
2667 return fromclause.name
2668
2669 def visit_index(self, index, **kwargs):
2670 return index.name
2671
2672 def visit_typeclause(self, typeclause, **kw):
2673 kw["type_expression"] = typeclause
2674 kw["identifier_preparer"] = self.preparer
2675 return self.dialect.type_compiler_instance.process(
2676 typeclause.type, **kw
2677 )
2678
2679 def post_process_text(self, text):
2680 if self.preparer._double_percents:
2681 text = text.replace("%", "%%")
2682 return text
2683
2684 def escape_literal_column(self, text):
2685 if self.preparer._double_percents:
2686 text = text.replace("%", "%%")
2687 return text
2688
2689 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2690 def do_bindparam(m):
2691 name = m.group(1)
2692 if name in textclause._bindparams:
2693 return self.process(textclause._bindparams[name], **kw)
2694 else:
2695 return self.bindparam_string(name, **kw)
2696
2697 if not self.stack:
2698 self.isplaintext = True
2699
2700 if add_to_result_map:
2701 # text() object is present in the columns clause of a
2702 # select(). Add a no-name entry to the result map so that
2703 # row[text()] produces a result
2704 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2705
2706 # un-escape any \:params
2707 return BIND_PARAMS_ESC.sub(
2708 lambda m: m.group(1),
2709 BIND_PARAMS.sub(
2710 do_bindparam, self.post_process_text(textclause.text)
2711 ),
2712 )
2713
2714 def visit_textual_select(
2715 self, taf, compound_index=None, asfrom=False, **kw
2716 ):
2717 toplevel = not self.stack
2718 entry = self._default_stack_entry if toplevel else self.stack[-1]
2719
2720 new_entry: _CompilerStackEntry = {
2721 "correlate_froms": set(),
2722 "asfrom_froms": set(),
2723 "selectable": taf,
2724 }
2725 self.stack.append(new_entry)
2726
2727 if taf._independent_ctes:
2728 self._dispatch_independent_ctes(taf, kw)
2729
2730 populate_result_map = (
2731 toplevel
2732 or (
2733 compound_index == 0
2734 and entry.get("need_result_map_for_compound", False)
2735 )
2736 or entry.get("need_result_map_for_nested", False)
2737 )
2738
2739 if populate_result_map:
2740 self._ordered_columns = self._textual_ordered_columns = (
2741 taf.positional
2742 )
2743
2744 # enable looser result column matching when the SQL text links to
2745 # Column objects by name only
2746 self._loose_column_name_matching = not taf.positional and bool(
2747 taf.column_args
2748 )
2749
2750 for c in taf.column_args:
2751 self.process(
2752 c,
2753 within_columns_clause=True,
2754 add_to_result_map=self._add_to_result_map,
2755 )
2756
2757 text = self.process(taf.element, **kw)
2758 if self.ctes:
2759 nesting_level = len(self.stack) if not toplevel else None
2760 text = self._render_cte_clause(nesting_level=nesting_level) + text
2761
2762 self.stack.pop(-1)
2763
2764 return text
2765
2766 def visit_null(self, expr: Null, **kw: Any) -> str:
2767 return "NULL"
2768
2769 def visit_true(self, expr: True_, **kw: Any) -> str:
2770 if self.dialect.supports_native_boolean:
2771 return "true"
2772 else:
2773 return "1"
2774
2775 def visit_false(self, expr: False_, **kw: Any) -> str:
2776 if self.dialect.supports_native_boolean:
2777 return "false"
2778 else:
2779 return "0"
2780
2781 def _generate_delimited_list(self, elements, separator, **kw):
2782 return separator.join(
2783 s
2784 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2785 if s
2786 )
2787
2788 def _generate_delimited_and_list(self, clauses, **kw):
2789 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2790 operators.and_,
2791 elements.True_._singleton,
2792 elements.False_._singleton,
2793 clauses,
2794 )
2795 if lcc == 1:
2796 return clauses[0]._compiler_dispatch(self, **kw)
2797 else:
2798 separator = OPERATORS[operators.and_]
2799 return separator.join(
2800 s
2801 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2802 if s
2803 )
2804
2805 def visit_tuple(self, clauselist, **kw):
2806 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2807
2808 def visit_clauselist(self, clauselist, **kw):
2809 sep = clauselist.operator
2810 if sep is None:
2811 sep = " "
2812 else:
2813 sep = OPERATORS[clauselist.operator]
2814
2815 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2816
2817 def visit_expression_clauselist(self, clauselist, **kw):
2818 operator_ = clauselist.operator
2819
2820 disp = self._get_operator_dispatch(
2821 operator_, "expression_clauselist", None
2822 )
2823 if disp:
2824 return disp(clauselist, operator_, **kw)
2825
2826 try:
2827 opstring = OPERATORS[operator_]
2828 except KeyError as err:
2829 raise exc.UnsupportedCompilationError(self, operator_) from err
2830 else:
2831 kw["_in_operator_expression"] = True
2832 return self._generate_delimited_list(
2833 clauselist.clauses, opstring, **kw
2834 )
2835
2836 def visit_case(self, clause, **kwargs):
2837 x = "CASE "
2838 if clause.value is not None:
2839 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2840 for cond, result in clause.whens:
2841 x += (
2842 "WHEN "
2843 + cond._compiler_dispatch(self, **kwargs)
2844 + " THEN "
2845 + result._compiler_dispatch(self, **kwargs)
2846 + " "
2847 )
2848 if clause.else_ is not None:
2849 x += (
2850 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2851 )
2852 x += "END"
2853 return x
2854
2855 def visit_type_coerce(self, type_coerce, **kw):
2856 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2857
2858 def visit_cast(self, cast, **kwargs):
2859 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2860 match = re.match("(.*)( COLLATE .*)", type_clause)
2861 return "CAST(%s AS %s)%s" % (
2862 cast.clause._compiler_dispatch(self, **kwargs),
2863 match.group(1) if match else type_clause,
2864 match.group(2) if match else "",
2865 )
2866
2867 def _format_frame_clause(self, range_, **kw):
2868 return "%s AND %s" % (
2869 (
2870 "UNBOUNDED PRECEDING"
2871 if range_[0] is elements.RANGE_UNBOUNDED
2872 else (
2873 "CURRENT ROW"
2874 if range_[0] is elements.RANGE_CURRENT
2875 else (
2876 "%s PRECEDING"
2877 % (
2878 self.process(
2879 elements.literal(abs(range_[0])), **kw
2880 ),
2881 )
2882 if range_[0] < 0
2883 else "%s FOLLOWING"
2884 % (self.process(elements.literal(range_[0]), **kw),)
2885 )
2886 )
2887 ),
2888 (
2889 "UNBOUNDED FOLLOWING"
2890 if range_[1] is elements.RANGE_UNBOUNDED
2891 else (
2892 "CURRENT ROW"
2893 if range_[1] is elements.RANGE_CURRENT
2894 else (
2895 "%s PRECEDING"
2896 % (
2897 self.process(
2898 elements.literal(abs(range_[1])), **kw
2899 ),
2900 )
2901 if range_[1] < 0
2902 else "%s FOLLOWING"
2903 % (self.process(elements.literal(range_[1]), **kw),)
2904 )
2905 )
2906 ),
2907 )
2908
2909 def visit_over(self, over, **kwargs):
2910 text = over.element._compiler_dispatch(self, **kwargs)
2911 if over.range_ is not None:
2912 range_ = "RANGE BETWEEN %s" % self._format_frame_clause(
2913 over.range_, **kwargs
2914 )
2915 elif over.rows is not None:
2916 range_ = "ROWS BETWEEN %s" % self._format_frame_clause(
2917 over.rows, **kwargs
2918 )
2919 elif over.groups is not None:
2920 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause(
2921 over.groups, **kwargs
2922 )
2923 else:
2924 range_ = None
2925
2926 return "%s OVER (%s)" % (
2927 text,
2928 " ".join(
2929 [
2930 "%s BY %s"
2931 % (word, clause._compiler_dispatch(self, **kwargs))
2932 for word, clause in (
2933 ("PARTITION", over.partition_by),
2934 ("ORDER", over.order_by),
2935 )
2936 if clause is not None and len(clause)
2937 ]
2938 + ([range_] if range_ else [])
2939 ),
2940 )
2941
2942 def visit_withingroup(self, withingroup, **kwargs):
2943 return "%s WITHIN GROUP (ORDER BY %s)" % (
2944 withingroup.element._compiler_dispatch(self, **kwargs),
2945 withingroup.order_by._compiler_dispatch(self, **kwargs),
2946 )
2947
2948 def visit_funcfilter(self, funcfilter, **kwargs):
2949 return "%s FILTER (WHERE %s)" % (
2950 funcfilter.func._compiler_dispatch(self, **kwargs),
2951 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2952 )
2953
2954 def visit_extract(self, extract, **kwargs):
2955 field = self.extract_map.get(extract.field, extract.field)
2956 return "EXTRACT(%s FROM %s)" % (
2957 field,
2958 extract.expr._compiler_dispatch(self, **kwargs),
2959 )
2960
2961 def visit_scalar_function_column(self, element, **kw):
2962 compiled_fn = self.visit_function(element.fn, **kw)
2963 compiled_col = self.visit_column(element, **kw)
2964 return "(%s).%s" % (compiled_fn, compiled_col)
2965
2966 def visit_function(
2967 self,
2968 func: Function[Any],
2969 add_to_result_map: Optional[_ResultMapAppender] = None,
2970 **kwargs: Any,
2971 ) -> str:
2972 if add_to_result_map is not None:
2973 add_to_result_map(func.name, func.name, (func.name,), func.type)
2974
2975 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
2976
2977 text: str
2978
2979 if disp:
2980 text = disp(func, **kwargs)
2981 else:
2982 name = FUNCTIONS.get(func._deannotate().__class__, None)
2983 if name:
2984 if func._has_args:
2985 name += "%(expr)s"
2986 else:
2987 name = func.name
2988 name = (
2989 self.preparer.quote(name)
2990 if self.preparer._requires_quotes_illegal_chars(name)
2991 or isinstance(name, elements.quoted_name)
2992 else name
2993 )
2994 name = name + "%(expr)s"
2995 text = ".".join(
2996 [
2997 (
2998 self.preparer.quote(tok)
2999 if self.preparer._requires_quotes_illegal_chars(tok)
3000 or isinstance(name, elements.quoted_name)
3001 else tok
3002 )
3003 for tok in func.packagenames
3004 ]
3005 + [name]
3006 ) % {"expr": self.function_argspec(func, **kwargs)}
3007
3008 if func._with_ordinality:
3009 text += " WITH ORDINALITY"
3010 return text
3011
3012 def visit_next_value_func(self, next_value, **kw):
3013 return self.visit_sequence(next_value.sequence)
3014
3015 def visit_sequence(self, sequence, **kw):
3016 raise NotImplementedError(
3017 "Dialect '%s' does not support sequence increments."
3018 % self.dialect.name
3019 )
3020
3021 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3022 return func.clause_expr._compiler_dispatch(self, **kwargs)
3023
3024 def visit_compound_select(
3025 self, cs, asfrom=False, compound_index=None, **kwargs
3026 ):
3027 toplevel = not self.stack
3028
3029 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3030
3031 if toplevel and not self.compile_state:
3032 self.compile_state = compile_state
3033
3034 compound_stmt = compile_state.statement
3035
3036 entry = self._default_stack_entry if toplevel else self.stack[-1]
3037 need_result_map = toplevel or (
3038 not compound_index
3039 and entry.get("need_result_map_for_compound", False)
3040 )
3041
3042 # indicates there is already a CompoundSelect in play
3043 if compound_index == 0:
3044 entry["select_0"] = cs
3045
3046 self.stack.append(
3047 {
3048 "correlate_froms": entry["correlate_froms"],
3049 "asfrom_froms": entry["asfrom_froms"],
3050 "selectable": cs,
3051 "compile_state": compile_state,
3052 "need_result_map_for_compound": need_result_map,
3053 }
3054 )
3055
3056 if compound_stmt._independent_ctes:
3057 self._dispatch_independent_ctes(compound_stmt, kwargs)
3058
3059 keyword = self.compound_keywords[cs.keyword]
3060
3061 text = (" " + keyword + " ").join(
3062 (
3063 c._compiler_dispatch(
3064 self, asfrom=asfrom, compound_index=i, **kwargs
3065 )
3066 for i, c in enumerate(cs.selects)
3067 )
3068 )
3069
3070 kwargs["include_table"] = False
3071 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3072 text += self.order_by_clause(cs, **kwargs)
3073 if cs._has_row_limiting_clause:
3074 text += self._row_limit_clause(cs, **kwargs)
3075
3076 if self.ctes:
3077 nesting_level = len(self.stack) if not toplevel else None
3078 text = (
3079 self._render_cte_clause(
3080 nesting_level=nesting_level,
3081 include_following_stack=True,
3082 )
3083 + text
3084 )
3085
3086 self.stack.pop(-1)
3087 return text
3088
3089 def _row_limit_clause(self, cs, **kwargs):
3090 if cs._fetch_clause is not None:
3091 return self.fetch_clause(cs, **kwargs)
3092 else:
3093 return self.limit_clause(cs, **kwargs)
3094
3095 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3096 attrname = "visit_%s_%s%s" % (
3097 operator_.__name__,
3098 qualifier1,
3099 "_" + qualifier2 if qualifier2 else "",
3100 )
3101 return getattr(self, attrname, None)
3102
3103 def visit_unary(
3104 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3105 ):
3106 if add_to_result_map is not None:
3107 result_map_targets += (unary,)
3108 kw["add_to_result_map"] = add_to_result_map
3109 kw["result_map_targets"] = result_map_targets
3110
3111 if unary.operator:
3112 if unary.modifier:
3113 raise exc.CompileError(
3114 "Unary expression does not support operator "
3115 "and modifier simultaneously"
3116 )
3117 disp = self._get_operator_dispatch(
3118 unary.operator, "unary", "operator"
3119 )
3120 if disp:
3121 return disp(unary, unary.operator, **kw)
3122 else:
3123 return self._generate_generic_unary_operator(
3124 unary, OPERATORS[unary.operator], **kw
3125 )
3126 elif unary.modifier:
3127 disp = self._get_operator_dispatch(
3128 unary.modifier, "unary", "modifier"
3129 )
3130 if disp:
3131 return disp(unary, unary.modifier, **kw)
3132 else:
3133 return self._generate_generic_unary_modifier(
3134 unary, OPERATORS[unary.modifier], **kw
3135 )
3136 else:
3137 raise exc.CompileError(
3138 "Unary expression has no operator or modifier"
3139 )
3140
3141 def visit_truediv_binary(self, binary, operator, **kw):
3142 if self.dialect.div_is_floordiv:
3143 return (
3144 self.process(binary.left, **kw)
3145 + " / "
3146 # TODO: would need a fast cast again here,
3147 # unless we want to use an implicit cast like "+ 0.0"
3148 + self.process(
3149 elements.Cast(
3150 binary.right,
3151 (
3152 binary.right.type
3153 if binary.right.type._type_affinity
3154 is sqltypes.Numeric
3155 else sqltypes.Numeric()
3156 ),
3157 ),
3158 **kw,
3159 )
3160 )
3161 else:
3162 return (
3163 self.process(binary.left, **kw)
3164 + " / "
3165 + self.process(binary.right, **kw)
3166 )
3167
3168 def visit_floordiv_binary(self, binary, operator, **kw):
3169 if (
3170 self.dialect.div_is_floordiv
3171 and binary.right.type._type_affinity is sqltypes.Integer
3172 ):
3173 return (
3174 self.process(binary.left, **kw)
3175 + " / "
3176 + self.process(binary.right, **kw)
3177 )
3178 else:
3179 return "FLOOR(%s)" % (
3180 self.process(binary.left, **kw)
3181 + " / "
3182 + self.process(binary.right, **kw)
3183 )
3184
3185 def visit_is_true_unary_operator(self, element, operator, **kw):
3186 if (
3187 element._is_implicitly_boolean
3188 or self.dialect.supports_native_boolean
3189 ):
3190 return self.process(element.element, **kw)
3191 else:
3192 return "%s = 1" % self.process(element.element, **kw)
3193
3194 def visit_is_false_unary_operator(self, element, operator, **kw):
3195 if (
3196 element._is_implicitly_boolean
3197 or self.dialect.supports_native_boolean
3198 ):
3199 return "NOT %s" % self.process(element.element, **kw)
3200 else:
3201 return "%s = 0" % self.process(element.element, **kw)
3202
3203 def visit_not_match_op_binary(self, binary, operator, **kw):
3204 return "NOT %s" % self.visit_binary(
3205 binary, override_operator=operators.match_op
3206 )
3207
3208 def visit_not_in_op_binary(self, binary, operator, **kw):
3209 # The brackets are required in the NOT IN operation because the empty
3210 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3211 # The presence of the OR makes the brackets required.
3212 return "(%s)" % self._generate_generic_binary(
3213 binary, OPERATORS[operator], **kw
3214 )
3215
3216 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3217 if expand_op is operators.not_in_op:
3218 if len(type_) > 1:
3219 return "(%s)) OR (1 = 1" % (
3220 ", ".join("NULL" for element in type_)
3221 )
3222 else:
3223 return "NULL) OR (1 = 1"
3224 elif expand_op is operators.in_op:
3225 if len(type_) > 1:
3226 return "(%s)) AND (1 != 1" % (
3227 ", ".join("NULL" for element in type_)
3228 )
3229 else:
3230 return "NULL) AND (1 != 1"
3231 else:
3232 return self.visit_empty_set_expr(type_)
3233
3234 def visit_empty_set_expr(self, element_types, **kw):
3235 raise NotImplementedError(
3236 "Dialect '%s' does not support empty set expression."
3237 % self.dialect.name
3238 )
3239
3240 def _literal_execute_expanding_parameter_literal_binds(
3241 self, parameter, values, bind_expression_template=None
3242 ):
3243 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3244
3245 if not values:
3246 # empty IN expression. note we don't need to use
3247 # bind_expression_template here because there are no
3248 # expressions to render.
3249
3250 if typ_dialect_impl._is_tuple_type:
3251 replacement_expression = (
3252 "VALUES " if self.dialect.tuple_in_values else ""
3253 ) + self.visit_empty_set_op_expr(
3254 parameter.type.types, parameter.expand_op
3255 )
3256
3257 else:
3258 replacement_expression = self.visit_empty_set_op_expr(
3259 [parameter.type], parameter.expand_op
3260 )
3261
3262 elif typ_dialect_impl._is_tuple_type or (
3263 typ_dialect_impl._isnull
3264 and isinstance(values[0], collections_abc.Sequence)
3265 and not isinstance(values[0], (str, bytes))
3266 ):
3267 if typ_dialect_impl._has_bind_expression:
3268 raise NotImplementedError(
3269 "bind_expression() on TupleType not supported with "
3270 "literal_binds"
3271 )
3272
3273 replacement_expression = (
3274 "VALUES " if self.dialect.tuple_in_values else ""
3275 ) + ", ".join(
3276 "(%s)"
3277 % (
3278 ", ".join(
3279 self.render_literal_value(value, param_type)
3280 for value, param_type in zip(
3281 tuple_element, parameter.type.types
3282 )
3283 )
3284 )
3285 for i, tuple_element in enumerate(values)
3286 )
3287 else:
3288 if bind_expression_template:
3289 post_compile_pattern = self._post_compile_pattern
3290 m = post_compile_pattern.search(bind_expression_template)
3291 assert m and m.group(
3292 2
3293 ), "unexpected format for expanding parameter"
3294
3295 tok = m.group(2).split("~~")
3296 be_left, be_right = tok[1], tok[3]
3297 replacement_expression = ", ".join(
3298 "%s%s%s"
3299 % (
3300 be_left,
3301 self.render_literal_value(value, parameter.type),
3302 be_right,
3303 )
3304 for value in values
3305 )
3306 else:
3307 replacement_expression = ", ".join(
3308 self.render_literal_value(value, parameter.type)
3309 for value in values
3310 )
3311
3312 return (), replacement_expression
3313
3314 def _literal_execute_expanding_parameter(self, name, parameter, values):
3315 if parameter.literal_execute:
3316 return self._literal_execute_expanding_parameter_literal_binds(
3317 parameter, values
3318 )
3319
3320 dialect = self.dialect
3321 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3322
3323 if self._numeric_binds:
3324 bind_template = self.compilation_bindtemplate
3325 else:
3326 bind_template = self.bindtemplate
3327
3328 if (
3329 self.dialect._bind_typing_render_casts
3330 and typ_dialect_impl.render_bind_cast
3331 ):
3332
3333 def _render_bindtemplate(name):
3334 return self.render_bind_cast(
3335 parameter.type,
3336 typ_dialect_impl,
3337 bind_template % {"name": name},
3338 )
3339
3340 else:
3341
3342 def _render_bindtemplate(name):
3343 return bind_template % {"name": name}
3344
3345 if not values:
3346 to_update = []
3347 if typ_dialect_impl._is_tuple_type:
3348 replacement_expression = self.visit_empty_set_op_expr(
3349 parameter.type.types, parameter.expand_op
3350 )
3351 else:
3352 replacement_expression = self.visit_empty_set_op_expr(
3353 [parameter.type], parameter.expand_op
3354 )
3355
3356 elif typ_dialect_impl._is_tuple_type or (
3357 typ_dialect_impl._isnull
3358 and isinstance(values[0], collections_abc.Sequence)
3359 and not isinstance(values[0], (str, bytes))
3360 ):
3361 assert not typ_dialect_impl._is_array
3362 to_update = [
3363 ("%s_%s_%s" % (name, i, j), value)
3364 for i, tuple_element in enumerate(values, 1)
3365 for j, value in enumerate(tuple_element, 1)
3366 ]
3367
3368 replacement_expression = (
3369 "VALUES " if dialect.tuple_in_values else ""
3370 ) + ", ".join(
3371 "(%s)"
3372 % (
3373 ", ".join(
3374 _render_bindtemplate(
3375 to_update[i * len(tuple_element) + j][0]
3376 )
3377 for j, value in enumerate(tuple_element)
3378 )
3379 )
3380 for i, tuple_element in enumerate(values)
3381 )
3382 else:
3383 to_update = [
3384 ("%s_%s" % (name, i), value)
3385 for i, value in enumerate(values, 1)
3386 ]
3387 replacement_expression = ", ".join(
3388 _render_bindtemplate(key) for key, value in to_update
3389 )
3390
3391 return to_update, replacement_expression
3392
3393 def visit_binary(
3394 self,
3395 binary,
3396 override_operator=None,
3397 eager_grouping=False,
3398 from_linter=None,
3399 lateral_from_linter=None,
3400 **kw,
3401 ):
3402 if from_linter and operators.is_comparison(binary.operator):
3403 if lateral_from_linter is not None:
3404 enclosing_lateral = kw["enclosing_lateral"]
3405 lateral_from_linter.edges.update(
3406 itertools.product(
3407 _de_clone(
3408 binary.left._from_objects + [enclosing_lateral]
3409 ),
3410 _de_clone(
3411 binary.right._from_objects + [enclosing_lateral]
3412 ),
3413 )
3414 )
3415 else:
3416 from_linter.edges.update(
3417 itertools.product(
3418 _de_clone(binary.left._from_objects),
3419 _de_clone(binary.right._from_objects),
3420 )
3421 )
3422
3423 # don't allow "? = ?" to render
3424 if (
3425 self.ansi_bind_rules
3426 and isinstance(binary.left, elements.BindParameter)
3427 and isinstance(binary.right, elements.BindParameter)
3428 ):
3429 kw["literal_execute"] = True
3430
3431 operator_ = override_operator or binary.operator
3432 disp = self._get_operator_dispatch(operator_, "binary", None)
3433 if disp:
3434 return disp(binary, operator_, **kw)
3435 else:
3436 try:
3437 opstring = OPERATORS[operator_]
3438 except KeyError as err:
3439 raise exc.UnsupportedCompilationError(self, operator_) from err
3440 else:
3441 return self._generate_generic_binary(
3442 binary,
3443 opstring,
3444 from_linter=from_linter,
3445 lateral_from_linter=lateral_from_linter,
3446 **kw,
3447 )
3448
3449 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3450 return self.process(element.sql_function, **kw)
3451
3452 def visit_mod_binary(self, binary, operator, **kw):
3453 if self.preparer._double_percents:
3454 return (
3455 self.process(binary.left, **kw)
3456 + " %% "
3457 + self.process(binary.right, **kw)
3458 )
3459 else:
3460 return (
3461 self.process(binary.left, **kw)
3462 + " % "
3463 + self.process(binary.right, **kw)
3464 )
3465
3466 def visit_custom_op_binary(self, element, operator, **kw):
3467 kw["eager_grouping"] = operator.eager_grouping
3468 return self._generate_generic_binary(
3469 element,
3470 " " + self.escape_literal_column(operator.opstring) + " ",
3471 **kw,
3472 )
3473
3474 def visit_custom_op_unary_operator(self, element, operator, **kw):
3475 return self._generate_generic_unary_operator(
3476 element, self.escape_literal_column(operator.opstring) + " ", **kw
3477 )
3478
3479 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3480 return self._generate_generic_unary_modifier(
3481 element, " " + self.escape_literal_column(operator.opstring), **kw
3482 )
3483
3484 def _generate_generic_binary(
3485 self,
3486 binary: BinaryExpression[Any],
3487 opstring: str,
3488 eager_grouping: bool = False,
3489 **kw: Any,
3490 ) -> str:
3491 _in_operator_expression = kw.get("_in_operator_expression", False)
3492
3493 kw["_in_operator_expression"] = True
3494 kw["_binary_op"] = binary.operator
3495 text = (
3496 binary.left._compiler_dispatch(
3497 self, eager_grouping=eager_grouping, **kw
3498 )
3499 + opstring
3500 + binary.right._compiler_dispatch(
3501 self, eager_grouping=eager_grouping, **kw
3502 )
3503 )
3504
3505 if _in_operator_expression and eager_grouping:
3506 text = "(%s)" % text
3507 return text
3508
3509 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3510 return opstring + unary.element._compiler_dispatch(self, **kw)
3511
3512 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3513 return unary.element._compiler_dispatch(self, **kw) + opstring
3514
3515 @util.memoized_property
3516 def _like_percent_literal(self):
3517 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3518
3519 def visit_ilike_case_insensitive_operand(self, element, **kw):
3520 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3521
3522 def visit_contains_op_binary(self, binary, operator, **kw):
3523 binary = binary._clone()
3524 percent = self._like_percent_literal
3525 binary.right = percent.concat(binary.right).concat(percent)
3526 return self.visit_like_op_binary(binary, operator, **kw)
3527
3528 def visit_not_contains_op_binary(self, binary, operator, **kw):
3529 binary = binary._clone()
3530 percent = self._like_percent_literal
3531 binary.right = percent.concat(binary.right).concat(percent)
3532 return self.visit_not_like_op_binary(binary, operator, **kw)
3533
3534 def visit_icontains_op_binary(self, binary, operator, **kw):
3535 binary = binary._clone()
3536 percent = self._like_percent_literal
3537 binary.left = ilike_case_insensitive(binary.left)
3538 binary.right = percent.concat(
3539 ilike_case_insensitive(binary.right)
3540 ).concat(percent)
3541 return self.visit_ilike_op_binary(binary, operator, **kw)
3542
3543 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3544 binary = binary._clone()
3545 percent = self._like_percent_literal
3546 binary.left = ilike_case_insensitive(binary.left)
3547 binary.right = percent.concat(
3548 ilike_case_insensitive(binary.right)
3549 ).concat(percent)
3550 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3551
3552 def visit_startswith_op_binary(self, binary, operator, **kw):
3553 binary = binary._clone()
3554 percent = self._like_percent_literal
3555 binary.right = percent._rconcat(binary.right)
3556 return self.visit_like_op_binary(binary, operator, **kw)
3557
3558 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3559 binary = binary._clone()
3560 percent = self._like_percent_literal
3561 binary.right = percent._rconcat(binary.right)
3562 return self.visit_not_like_op_binary(binary, operator, **kw)
3563
3564 def visit_istartswith_op_binary(self, binary, operator, **kw):
3565 binary = binary._clone()
3566 percent = self._like_percent_literal
3567 binary.left = ilike_case_insensitive(binary.left)
3568 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3569 return self.visit_ilike_op_binary(binary, operator, **kw)
3570
3571 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3572 binary = binary._clone()
3573 percent = self._like_percent_literal
3574 binary.left = ilike_case_insensitive(binary.left)
3575 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3576 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3577
3578 def visit_endswith_op_binary(self, binary, operator, **kw):
3579 binary = binary._clone()
3580 percent = self._like_percent_literal
3581 binary.right = percent.concat(binary.right)
3582 return self.visit_like_op_binary(binary, operator, **kw)
3583
3584 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3585 binary = binary._clone()
3586 percent = self._like_percent_literal
3587 binary.right = percent.concat(binary.right)
3588 return self.visit_not_like_op_binary(binary, operator, **kw)
3589
3590 def visit_iendswith_op_binary(self, binary, operator, **kw):
3591 binary = binary._clone()
3592 percent = self._like_percent_literal
3593 binary.left = ilike_case_insensitive(binary.left)
3594 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3595 return self.visit_ilike_op_binary(binary, operator, **kw)
3596
3597 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3598 binary = binary._clone()
3599 percent = self._like_percent_literal
3600 binary.left = ilike_case_insensitive(binary.left)
3601 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3602 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3603
3604 def visit_like_op_binary(self, binary, operator, **kw):
3605 escape = binary.modifiers.get("escape", None)
3606
3607 return "%s LIKE %s" % (
3608 binary.left._compiler_dispatch(self, **kw),
3609 binary.right._compiler_dispatch(self, **kw),
3610 ) + (
3611 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3612 if escape is not None
3613 else ""
3614 )
3615
3616 def visit_not_like_op_binary(self, binary, operator, **kw):
3617 escape = binary.modifiers.get("escape", None)
3618 return "%s NOT LIKE %s" % (
3619 binary.left._compiler_dispatch(self, **kw),
3620 binary.right._compiler_dispatch(self, **kw),
3621 ) + (
3622 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3623 if escape is not None
3624 else ""
3625 )
3626
3627 def visit_ilike_op_binary(self, binary, operator, **kw):
3628 if operator is operators.ilike_op:
3629 binary = binary._clone()
3630 binary.left = ilike_case_insensitive(binary.left)
3631 binary.right = ilike_case_insensitive(binary.right)
3632 # else we assume ilower() has been applied
3633
3634 return self.visit_like_op_binary(binary, operator, **kw)
3635
3636 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3637 if operator is operators.not_ilike_op:
3638 binary = binary._clone()
3639 binary.left = ilike_case_insensitive(binary.left)
3640 binary.right = ilike_case_insensitive(binary.right)
3641 # else we assume ilower() has been applied
3642
3643 return self.visit_not_like_op_binary(binary, operator, **kw)
3644
3645 def visit_between_op_binary(self, binary, operator, **kw):
3646 symmetric = binary.modifiers.get("symmetric", False)
3647 return self._generate_generic_binary(
3648 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3649 )
3650
3651 def visit_not_between_op_binary(self, binary, operator, **kw):
3652 symmetric = binary.modifiers.get("symmetric", False)
3653 return self._generate_generic_binary(
3654 binary,
3655 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3656 **kw,
3657 )
3658
3659 def visit_regexp_match_op_binary(
3660 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3661 ) -> str:
3662 raise exc.CompileError(
3663 "%s dialect does not support regular expressions"
3664 % self.dialect.name
3665 )
3666
3667 def visit_not_regexp_match_op_binary(
3668 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3669 ) -> str:
3670 raise exc.CompileError(
3671 "%s dialect does not support regular expressions"
3672 % self.dialect.name
3673 )
3674
3675 def visit_regexp_replace_op_binary(
3676 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3677 ) -> str:
3678 raise exc.CompileError(
3679 "%s dialect does not support regular expression replacements"
3680 % self.dialect.name
3681 )
3682
3683 def visit_bindparam(
3684 self,
3685 bindparam,
3686 within_columns_clause=False,
3687 literal_binds=False,
3688 skip_bind_expression=False,
3689 literal_execute=False,
3690 render_postcompile=False,
3691 is_upsert_set=False,
3692 **kwargs,
3693 ):
3694 # Detect parametrized bindparams in upsert SET clause for issue #13130
3695 if (
3696 is_upsert_set
3697 and bindparam.value is None
3698 and bindparam.callable is None
3699 and self._insertmanyvalues is not None
3700 ):
3701 self._insertmanyvalues = self._insertmanyvalues._replace(
3702 has_upsert_bound_parameters=True
3703 )
3704
3705 if not skip_bind_expression:
3706 impl = bindparam.type.dialect_impl(self.dialect)
3707 if impl._has_bind_expression:
3708 bind_expression = impl.bind_expression(bindparam)
3709 wrapped = self.process(
3710 bind_expression,
3711 skip_bind_expression=True,
3712 within_columns_clause=within_columns_clause,
3713 literal_binds=literal_binds and not bindparam.expanding,
3714 literal_execute=literal_execute,
3715 render_postcompile=render_postcompile,
3716 **kwargs,
3717 )
3718 if bindparam.expanding:
3719 # for postcompile w/ expanding, move the "wrapped" part
3720 # of this into the inside
3721
3722 m = re.match(
3723 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3724 )
3725 assert m, "unexpected format for expanding parameter"
3726 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3727 m.group(2),
3728 m.group(1),
3729 m.group(3),
3730 )
3731
3732 if literal_binds:
3733 ret = self.render_literal_bindparam(
3734 bindparam,
3735 within_columns_clause=True,
3736 bind_expression_template=wrapped,
3737 **kwargs,
3738 )
3739 return "(%s)" % ret
3740
3741 return wrapped
3742
3743 if not literal_binds:
3744 literal_execute = (
3745 literal_execute
3746 or bindparam.literal_execute
3747 or (within_columns_clause and self.ansi_bind_rules)
3748 )
3749 post_compile = literal_execute or bindparam.expanding
3750 else:
3751 post_compile = False
3752
3753 if literal_binds:
3754 ret = self.render_literal_bindparam(
3755 bindparam, within_columns_clause=True, **kwargs
3756 )
3757 if bindparam.expanding:
3758 ret = "(%s)" % ret
3759 return ret
3760
3761 name = self._truncate_bindparam(bindparam)
3762
3763 if name in self.binds:
3764 existing = self.binds[name]
3765 if existing is not bindparam:
3766 if (
3767 (existing.unique or bindparam.unique)
3768 and not existing.proxy_set.intersection(
3769 bindparam.proxy_set
3770 )
3771 and not existing._cloned_set.intersection(
3772 bindparam._cloned_set
3773 )
3774 ):
3775 raise exc.CompileError(
3776 "Bind parameter '%s' conflicts with "
3777 "unique bind parameter of the same name" % name
3778 )
3779 elif existing.expanding != bindparam.expanding:
3780 raise exc.CompileError(
3781 "Can't reuse bound parameter name '%s' in both "
3782 "'expanding' (e.g. within an IN expression) and "
3783 "non-expanding contexts. If this parameter is to "
3784 "receive a list/array value, set 'expanding=True' on "
3785 "it for expressions that aren't IN, otherwise use "
3786 "a different parameter name." % (name,)
3787 )
3788 elif existing._is_crud or bindparam._is_crud:
3789 if existing._is_crud and bindparam._is_crud:
3790 # TODO: this condition is not well understood.
3791 # see tests in test/sql/test_update.py
3792 raise exc.CompileError(
3793 "Encountered unsupported case when compiling an "
3794 "INSERT or UPDATE statement. If this is a "
3795 "multi-table "
3796 "UPDATE statement, please provide string-named "
3797 "arguments to the "
3798 "values() method with distinct names; support for "
3799 "multi-table UPDATE statements that "
3800 "target multiple tables for UPDATE is very "
3801 "limited",
3802 )
3803 else:
3804 raise exc.CompileError(
3805 f"bindparam() name '{bindparam.key}' is reserved "
3806 "for automatic usage in the VALUES or SET "
3807 "clause of this "
3808 "insert/update statement. Please use a "
3809 "name other than column name when using "
3810 "bindparam() "
3811 "with insert() or update() (for example, "
3812 f"'b_{bindparam.key}')."
3813 )
3814
3815 self.binds[bindparam.key] = self.binds[name] = bindparam
3816
3817 # if we are given a cache key that we're going to match against,
3818 # relate the bindparam here to one that is most likely present
3819 # in the "extracted params" portion of the cache key. this is used
3820 # to set up a positional mapping that is used to determine the
3821 # correct parameters for a subsequent use of this compiled with
3822 # a different set of parameter values. here, we accommodate for
3823 # parameters that may have been cloned both before and after the cache
3824 # key was been generated.
3825 ckbm_tuple = self._cache_key_bind_match
3826
3827 if ckbm_tuple:
3828 ckbm, cksm = ckbm_tuple
3829 for bp in bindparam._cloned_set:
3830 if bp.key in cksm:
3831 cb = cksm[bp.key]
3832 ckbm[cb].append(bindparam)
3833
3834 if bindparam.isoutparam:
3835 self.has_out_parameters = True
3836
3837 if post_compile:
3838 if render_postcompile:
3839 self._render_postcompile = True
3840
3841 if literal_execute:
3842 self.literal_execute_params |= {bindparam}
3843 else:
3844 self.post_compile_params |= {bindparam}
3845
3846 ret = self.bindparam_string(
3847 name,
3848 post_compile=post_compile,
3849 expanding=bindparam.expanding,
3850 bindparam_type=bindparam.type,
3851 **kwargs,
3852 )
3853
3854 if bindparam.expanding:
3855 ret = "(%s)" % ret
3856
3857 return ret
3858
3859 def render_bind_cast(self, type_, dbapi_type, sqltext):
3860 raise NotImplementedError()
3861
3862 def render_literal_bindparam(
3863 self,
3864 bindparam,
3865 render_literal_value=NO_ARG,
3866 bind_expression_template=None,
3867 **kw,
3868 ):
3869 if render_literal_value is not NO_ARG:
3870 value = render_literal_value
3871 else:
3872 if bindparam.value is None and bindparam.callable is None:
3873 op = kw.get("_binary_op", None)
3874 if op and op not in (operators.is_, operators.is_not):
3875 util.warn_limited(
3876 "Bound parameter '%s' rendering literal NULL in a SQL "
3877 "expression; comparisons to NULL should not use "
3878 "operators outside of 'is' or 'is not'",
3879 (bindparam.key,),
3880 )
3881 return self.process(sqltypes.NULLTYPE, **kw)
3882 value = bindparam.effective_value
3883
3884 if bindparam.expanding:
3885 leep = self._literal_execute_expanding_parameter_literal_binds
3886 to_update, replacement_expr = leep(
3887 bindparam,
3888 value,
3889 bind_expression_template=bind_expression_template,
3890 )
3891 return replacement_expr
3892 else:
3893 return self.render_literal_value(value, bindparam.type)
3894
3895 def render_literal_value(
3896 self, value: Any, type_: sqltypes.TypeEngine[Any]
3897 ) -> str:
3898 """Render the value of a bind parameter as a quoted literal.
3899
3900 This is used for statement sections that do not accept bind parameters
3901 on the target driver/database.
3902
3903 This should be implemented by subclasses using the quoting services
3904 of the DBAPI.
3905
3906 """
3907
3908 if value is None and not type_.should_evaluate_none:
3909 # issue #10535 - handle NULL in the compiler without placing
3910 # this onto each type, except for "evaluate None" types
3911 # (e.g. JSON)
3912 return self.process(elements.Null._instance())
3913
3914 processor = type_._cached_literal_processor(self.dialect)
3915 if processor:
3916 try:
3917 return processor(value)
3918 except Exception as e:
3919 raise exc.CompileError(
3920 f"Could not render literal value "
3921 f'"{sql_util._repr_single_value(value)}" '
3922 f"with datatype "
3923 f"{type_}; see parent stack trace for "
3924 "more detail."
3925 ) from e
3926
3927 else:
3928 raise exc.CompileError(
3929 f"No literal value renderer is available for literal value "
3930 f'"{sql_util._repr_single_value(value)}" '
3931 f"with datatype {type_}"
3932 )
3933
3934 def _truncate_bindparam(self, bindparam):
3935 if bindparam in self.bind_names:
3936 return self.bind_names[bindparam]
3937
3938 bind_name = bindparam.key
3939 if isinstance(bind_name, elements._truncated_label):
3940 bind_name = self._truncated_identifier("bindparam", bind_name)
3941
3942 # add to bind_names for translation
3943 self.bind_names[bindparam] = bind_name
3944
3945 return bind_name
3946
3947 def _truncated_identifier(
3948 self, ident_class: str, name: _truncated_label
3949 ) -> str:
3950 if (ident_class, name) in self.truncated_names:
3951 return self.truncated_names[(ident_class, name)]
3952
3953 anonname = name.apply_map(self.anon_map)
3954
3955 if len(anonname) > self.label_length - 6:
3956 counter = self._truncated_counters.get(ident_class, 1)
3957 truncname = (
3958 anonname[0 : max(self.label_length - 6, 0)]
3959 + "_"
3960 + hex(counter)[2:]
3961 )
3962 self._truncated_counters[ident_class] = counter + 1
3963 else:
3964 truncname = anonname
3965 self.truncated_names[(ident_class, name)] = truncname
3966 return truncname
3967
3968 def _anonymize(self, name: str) -> str:
3969 return name % self.anon_map
3970
3971 def bindparam_string(
3972 self,
3973 name: str,
3974 post_compile: bool = False,
3975 expanding: bool = False,
3976 escaped_from: Optional[str] = None,
3977 bindparam_type: Optional[TypeEngine[Any]] = None,
3978 accumulate_bind_names: Optional[Set[str]] = None,
3979 visited_bindparam: Optional[List[str]] = None,
3980 **kw: Any,
3981 ) -> str:
3982 # TODO: accumulate_bind_names is passed by crud.py to gather
3983 # names on a per-value basis, visited_bindparam is passed by
3984 # visit_insert() to collect all parameters in the statement.
3985 # see if this gathering can be simplified somehow
3986 if accumulate_bind_names is not None:
3987 accumulate_bind_names.add(name)
3988 if visited_bindparam is not None:
3989 visited_bindparam.append(name)
3990
3991 if not escaped_from:
3992 if self._bind_translate_re.search(name):
3993 # not quite the translate use case as we want to
3994 # also get a quick boolean if we even found
3995 # unusual characters in the name
3996 new_name = self._bind_translate_re.sub(
3997 lambda m: self._bind_translate_chars[m.group(0)],
3998 name,
3999 )
4000 escaped_from = name
4001 name = new_name
4002
4003 if escaped_from:
4004 self.escaped_bind_names = self.escaped_bind_names.union(
4005 {escaped_from: name}
4006 )
4007 if post_compile:
4008 ret = "__[POSTCOMPILE_%s]" % name
4009 if expanding:
4010 # for expanding, bound parameters or literal values will be
4011 # rendered per item
4012 return ret
4013
4014 # otherwise, for non-expanding "literal execute", apply
4015 # bind casts as determined by the datatype
4016 if bindparam_type is not None:
4017 type_impl = bindparam_type._unwrapped_dialect_impl(
4018 self.dialect
4019 )
4020 if type_impl.render_literal_cast:
4021 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4022 return ret
4023 elif self.state is CompilerState.COMPILING:
4024 ret = self.compilation_bindtemplate % {"name": name}
4025 else:
4026 ret = self.bindtemplate % {"name": name}
4027
4028 if (
4029 bindparam_type is not None
4030 and self.dialect._bind_typing_render_casts
4031 ):
4032 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4033 if type_impl.render_bind_cast:
4034 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4035
4036 return ret
4037
4038 def _dispatch_independent_ctes(self, stmt, kw):
4039 local_kw = kw.copy()
4040 local_kw.pop("cte_opts", None)
4041 for cte, opt in zip(
4042 stmt._independent_ctes, stmt._independent_ctes_opts
4043 ):
4044 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4045
4046 def visit_cte(
4047 self,
4048 cte: CTE,
4049 asfrom: bool = False,
4050 ashint: bool = False,
4051 fromhints: Optional[_FromHintsType] = None,
4052 visiting_cte: Optional[CTE] = None,
4053 from_linter: Optional[FromLinter] = None,
4054 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4055 **kwargs: Any,
4056 ) -> Optional[str]:
4057 self_ctes = self._init_cte_state()
4058 assert self_ctes is self.ctes
4059
4060 kwargs["visiting_cte"] = cte
4061
4062 cte_name = cte.name
4063
4064 if isinstance(cte_name, elements._truncated_label):
4065 cte_name = self._truncated_identifier("alias", cte_name)
4066
4067 is_new_cte = True
4068 embedded_in_current_named_cte = False
4069
4070 _reference_cte = cte._get_reference_cte()
4071
4072 nesting = cte.nesting or cte_opts.nesting
4073
4074 # check for CTE already encountered
4075 if _reference_cte in self.level_name_by_cte:
4076 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4077 _reference_cte
4078 ]
4079 assert _ == cte_name
4080
4081 cte_level_name = (cte_level, cte_name)
4082 existing_cte = self.ctes_by_level_name[cte_level_name]
4083
4084 # check if we are receiving it here with a specific
4085 # "nest_here" location; if so, move it to this location
4086
4087 if cte_opts.nesting:
4088 if existing_cte_opts.nesting:
4089 raise exc.CompileError(
4090 "CTE is stated as 'nest_here' in "
4091 "more than one location"
4092 )
4093
4094 old_level_name = (cte_level, cte_name)
4095 cte_level = len(self.stack) if nesting else 1
4096 cte_level_name = new_level_name = (cte_level, cte_name)
4097
4098 del self.ctes_by_level_name[old_level_name]
4099 self.ctes_by_level_name[new_level_name] = existing_cte
4100 self.level_name_by_cte[_reference_cte] = new_level_name + (
4101 cte_opts,
4102 )
4103
4104 else:
4105 cte_level = len(self.stack) if nesting else 1
4106 cte_level_name = (cte_level, cte_name)
4107
4108 if cte_level_name in self.ctes_by_level_name:
4109 existing_cte = self.ctes_by_level_name[cte_level_name]
4110 else:
4111 existing_cte = None
4112
4113 if existing_cte is not None:
4114 embedded_in_current_named_cte = visiting_cte is existing_cte
4115
4116 # we've generated a same-named CTE that we are enclosed in,
4117 # or this is the same CTE. just return the name.
4118 if cte is existing_cte._restates or cte is existing_cte:
4119 is_new_cte = False
4120 elif existing_cte is cte._restates:
4121 # we've generated a same-named CTE that is
4122 # enclosed in us - we take precedence, so
4123 # discard the text for the "inner".
4124 del self_ctes[existing_cte]
4125
4126 existing_cte_reference_cte = existing_cte._get_reference_cte()
4127
4128 assert existing_cte_reference_cte is _reference_cte
4129 assert existing_cte_reference_cte is existing_cte
4130
4131 del self.level_name_by_cte[existing_cte_reference_cte]
4132 else:
4133 if (
4134 # if the two CTEs have the same hash, which we expect
4135 # here means that one/both is an annotated of the other
4136 (hash(cte) == hash(existing_cte))
4137 # or...
4138 or (
4139 (
4140 # if they are clones, i.e. they came from the ORM
4141 # or some other visit method
4142 cte._is_clone_of is not None
4143 or existing_cte._is_clone_of is not None
4144 )
4145 # and are deep-copy identical
4146 and cte.compare(existing_cte)
4147 )
4148 ):
4149 # then consider these two CTEs the same
4150 is_new_cte = False
4151 else:
4152 # otherwise these are two CTEs that either will render
4153 # differently, or were indicated separately by the user,
4154 # with the same name
4155 raise exc.CompileError(
4156 "Multiple, unrelated CTEs found with "
4157 "the same name: %r" % cte_name
4158 )
4159
4160 if not asfrom and not is_new_cte:
4161 return None
4162
4163 if cte._cte_alias is not None:
4164 pre_alias_cte = cte._cte_alias
4165 cte_pre_alias_name = cte._cte_alias.name
4166 if isinstance(cte_pre_alias_name, elements._truncated_label):
4167 cte_pre_alias_name = self._truncated_identifier(
4168 "alias", cte_pre_alias_name
4169 )
4170 else:
4171 pre_alias_cte = cte
4172 cte_pre_alias_name = None
4173
4174 if is_new_cte:
4175 self.ctes_by_level_name[cte_level_name] = cte
4176 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4177 cte_opts,
4178 )
4179
4180 if pre_alias_cte not in self.ctes:
4181 self.visit_cte(pre_alias_cte, **kwargs)
4182
4183 if not cte_pre_alias_name and cte not in self_ctes:
4184 if cte.recursive:
4185 self.ctes_recursive = True
4186 text = self.preparer.format_alias(cte, cte_name)
4187 if cte.recursive or cte.element.name_cte_columns:
4188 col_source = cte.element
4189
4190 # TODO: can we get at the .columns_plus_names collection
4191 # that is already (or will be?) generated for the SELECT
4192 # rather than calling twice?
4193 recur_cols = [
4194 # TODO: proxy_name is not technically safe,
4195 # see test_cte->
4196 # test_with_recursive_no_name_currently_buggy. not
4197 # clear what should be done with such a case
4198 fallback_label_name or proxy_name
4199 for (
4200 _,
4201 proxy_name,
4202 fallback_label_name,
4203 c,
4204 repeated,
4205 ) in (col_source._generate_columns_plus_names(True))
4206 if not repeated
4207 ]
4208
4209 text += "(%s)" % (
4210 ", ".join(
4211 self.preparer.format_label_name(
4212 ident, anon_map=self.anon_map
4213 )
4214 for ident in recur_cols
4215 )
4216 )
4217
4218 assert kwargs.get("subquery", False) is False
4219
4220 if not self.stack:
4221 # toplevel, this is a stringify of the
4222 # cte directly. just compile the inner
4223 # the way alias() does.
4224 return cte.element._compiler_dispatch(
4225 self, asfrom=asfrom, **kwargs
4226 )
4227 else:
4228 prefixes = self._generate_prefixes(
4229 cte, cte._prefixes, **kwargs
4230 )
4231 inner = cte.element._compiler_dispatch(
4232 self, asfrom=True, **kwargs
4233 )
4234
4235 text += " AS %s\n(%s)" % (prefixes, inner)
4236
4237 if cte._suffixes:
4238 text += " " + self._generate_prefixes(
4239 cte, cte._suffixes, **kwargs
4240 )
4241
4242 self_ctes[cte] = text
4243
4244 if asfrom:
4245 if from_linter:
4246 from_linter.froms[cte._de_clone()] = cte_name
4247
4248 if not is_new_cte and embedded_in_current_named_cte:
4249 return self.preparer.format_alias(cte, cte_name)
4250
4251 if cte_pre_alias_name:
4252 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4253 if self.preparer._requires_quotes(cte_name):
4254 cte_name = self.preparer.quote(cte_name)
4255 text += self.get_render_as_alias_suffix(cte_name)
4256 return text # type: ignore[no-any-return]
4257 else:
4258 return self.preparer.format_alias(cte, cte_name)
4259
4260 return None
4261
4262 def visit_table_valued_alias(self, element, **kw):
4263 if element.joins_implicitly:
4264 kw["from_linter"] = None
4265 if element._is_lateral:
4266 return self.visit_lateral(element, **kw)
4267 else:
4268 return self.visit_alias(element, **kw)
4269
4270 def visit_table_valued_column(self, element, **kw):
4271 return self.visit_column(element, **kw)
4272
4273 def visit_alias(
4274 self,
4275 alias,
4276 asfrom=False,
4277 ashint=False,
4278 iscrud=False,
4279 fromhints=None,
4280 subquery=False,
4281 lateral=False,
4282 enclosing_alias=None,
4283 from_linter=None,
4284 **kwargs,
4285 ):
4286 if lateral:
4287 if "enclosing_lateral" not in kwargs:
4288 # if lateral is set and enclosing_lateral is not
4289 # present, we assume we are being called directly
4290 # from visit_lateral() and we need to set enclosing_lateral.
4291 assert alias._is_lateral
4292 kwargs["enclosing_lateral"] = alias
4293
4294 # for lateral objects, we track a second from_linter that is...
4295 # lateral! to the level above us.
4296 if (
4297 from_linter
4298 and "lateral_from_linter" not in kwargs
4299 and "enclosing_lateral" in kwargs
4300 ):
4301 kwargs["lateral_from_linter"] = from_linter
4302
4303 if enclosing_alias is not None and enclosing_alias.element is alias:
4304 inner = alias.element._compiler_dispatch(
4305 self,
4306 asfrom=asfrom,
4307 ashint=ashint,
4308 iscrud=iscrud,
4309 fromhints=fromhints,
4310 lateral=lateral,
4311 enclosing_alias=alias,
4312 **kwargs,
4313 )
4314 if subquery and (asfrom or lateral):
4315 inner = "(%s)" % (inner,)
4316 return inner
4317 else:
4318 kwargs["enclosing_alias"] = alias
4319
4320 if asfrom or ashint:
4321 if isinstance(alias.name, elements._truncated_label):
4322 alias_name = self._truncated_identifier("alias", alias.name)
4323 else:
4324 alias_name = alias.name
4325
4326 if ashint:
4327 return self.preparer.format_alias(alias, alias_name)
4328 elif asfrom:
4329 if from_linter:
4330 from_linter.froms[alias._de_clone()] = alias_name
4331
4332 inner = alias.element._compiler_dispatch(
4333 self, asfrom=True, lateral=lateral, **kwargs
4334 )
4335 if subquery:
4336 inner = "(%s)" % (inner,)
4337
4338 ret = inner + self.get_render_as_alias_suffix(
4339 self.preparer.format_alias(alias, alias_name)
4340 )
4341
4342 if alias._supports_derived_columns and alias._render_derived:
4343 ret += "(%s)" % (
4344 ", ".join(
4345 "%s%s"
4346 % (
4347 self.preparer.quote(col.name),
4348 (
4349 " %s"
4350 % self.dialect.type_compiler_instance.process(
4351 col.type, **kwargs
4352 )
4353 if alias._render_derived_w_types
4354 else ""
4355 ),
4356 )
4357 for col in alias.c
4358 )
4359 )
4360
4361 if fromhints and alias in fromhints:
4362 ret = self.format_from_hint_text(
4363 ret, alias, fromhints[alias], iscrud
4364 )
4365
4366 return ret
4367 else:
4368 # note we cancel the "subquery" flag here as well
4369 return alias.element._compiler_dispatch(
4370 self, lateral=lateral, **kwargs
4371 )
4372
4373 def visit_subquery(self, subquery, **kw):
4374 kw["subquery"] = True
4375 return self.visit_alias(subquery, **kw)
4376
4377 def visit_lateral(self, lateral_, **kw):
4378 kw["lateral"] = True
4379 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4380
4381 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4382 text = "%s TABLESAMPLE %s" % (
4383 self.visit_alias(tablesample, asfrom=True, **kw),
4384 tablesample._get_method()._compiler_dispatch(self, **kw),
4385 )
4386
4387 if tablesample.seed is not None:
4388 text += " REPEATABLE (%s)" % (
4389 tablesample.seed._compiler_dispatch(self, **kw)
4390 )
4391
4392 return text
4393
4394 def _render_values(self, element, **kw):
4395 kw.setdefault("literal_binds", element.literal_binds)
4396 tuples = ", ".join(
4397 self.process(
4398 elements.Tuple(
4399 types=element._column_types, *elem
4400 ).self_group(),
4401 **kw,
4402 )
4403 for chunk in element._data
4404 for elem in chunk
4405 )
4406 return f"VALUES {tuples}"
4407
4408 def visit_values(
4409 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4410 ):
4411
4412 if element._independent_ctes:
4413 self._dispatch_independent_ctes(element, kw)
4414
4415 v = self._render_values(element, **kw)
4416
4417 if element._unnamed:
4418 name = None
4419 elif isinstance(element.name, elements._truncated_label):
4420 name = self._truncated_identifier("values", element.name)
4421 else:
4422 name = element.name
4423
4424 if element._is_lateral:
4425 lateral = "LATERAL "
4426 else:
4427 lateral = ""
4428
4429 if asfrom:
4430 if from_linter:
4431 from_linter.froms[element._de_clone()] = (
4432 name if name is not None else "(unnamed VALUES element)"
4433 )
4434
4435 if visiting_cte is not None and visiting_cte.element is element:
4436 if element._is_lateral:
4437 raise exc.CompileError(
4438 "Can't use a LATERAL VALUES expression inside of a CTE"
4439 )
4440 elif name:
4441 kw["include_table"] = False
4442 v = "%s(%s)%s (%s)" % (
4443 lateral,
4444 v,
4445 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4446 (
4447 ", ".join(
4448 c._compiler_dispatch(self, **kw)
4449 for c in element.columns
4450 )
4451 ),
4452 )
4453 else:
4454 v = "%s(%s)" % (lateral, v)
4455 return v
4456
4457 def visit_scalar_values(self, element, **kw):
4458 return f"({self._render_values(element, **kw)})"
4459
4460 def get_render_as_alias_suffix(self, alias_name_text):
4461 return " AS " + alias_name_text
4462
4463 def _add_to_result_map(
4464 self,
4465 keyname: str,
4466 name: str,
4467 objects: Tuple[Any, ...],
4468 type_: TypeEngine[Any],
4469 ) -> None:
4470
4471 # note objects must be non-empty for cursor.py to handle the
4472 # collection properly
4473 assert objects
4474
4475 if keyname is None or keyname == "*":
4476 self._ordered_columns = False
4477 self._ad_hoc_textual = True
4478 if type_._is_tuple_type:
4479 raise exc.CompileError(
4480 "Most backends don't support SELECTing "
4481 "from a tuple() object. If this is an ORM query, "
4482 "consider using the Bundle object."
4483 )
4484 self._result_columns.append(
4485 ResultColumnsEntry(keyname, name, objects, type_)
4486 )
4487
4488 def _label_returning_column(
4489 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4490 ):
4491 """Render a column with necessary labels inside of a RETURNING clause.
4492
4493 This method is provided for individual dialects in place of calling
4494 the _label_select_column method directly, so that the two use cases
4495 of RETURNING vs. SELECT can be disambiguated going forward.
4496
4497 .. versionadded:: 1.4.21
4498
4499 """
4500 return self._label_select_column(
4501 None,
4502 column,
4503 populate_result_map,
4504 False,
4505 {} if column_clause_args is None else column_clause_args,
4506 **kw,
4507 )
4508
4509 def _label_select_column(
4510 self,
4511 select,
4512 column,
4513 populate_result_map,
4514 asfrom,
4515 column_clause_args,
4516 name=None,
4517 proxy_name=None,
4518 fallback_label_name=None,
4519 within_columns_clause=True,
4520 column_is_repeated=False,
4521 need_column_expressions=False,
4522 include_table=True,
4523 ):
4524 """produce labeled columns present in a select()."""
4525 impl = column.type.dialect_impl(self.dialect)
4526
4527 if impl._has_column_expression and (
4528 need_column_expressions or populate_result_map
4529 ):
4530 col_expr = impl.column_expression(column)
4531 else:
4532 col_expr = column
4533
4534 if populate_result_map:
4535 # pass an "add_to_result_map" callable into the compilation
4536 # of embedded columns. this collects information about the
4537 # column as it will be fetched in the result and is coordinated
4538 # with cursor.description when the query is executed.
4539 add_to_result_map = self._add_to_result_map
4540
4541 # if the SELECT statement told us this column is a repeat,
4542 # wrap the callable with one that prevents the addition of the
4543 # targets
4544 if column_is_repeated:
4545 _add_to_result_map = add_to_result_map
4546
4547 def add_to_result_map(keyname, name, objects, type_):
4548 _add_to_result_map(keyname, name, (keyname,), type_)
4549
4550 # if we redefined col_expr for type expressions, wrap the
4551 # callable with one that adds the original column to the targets
4552 elif col_expr is not column:
4553 _add_to_result_map = add_to_result_map
4554
4555 def add_to_result_map(keyname, name, objects, type_):
4556 _add_to_result_map(
4557 keyname, name, (column,) + objects, type_
4558 )
4559
4560 else:
4561 add_to_result_map = None
4562
4563 # this method is used by some of the dialects for RETURNING,
4564 # which has different inputs. _label_returning_column was added
4565 # as the better target for this now however for 1.4 we will keep
4566 # _label_select_column directly compatible with this use case.
4567 # these assertions right now set up the current expected inputs
4568 assert within_columns_clause, (
4569 "_label_select_column is only relevant within "
4570 "the columns clause of a SELECT or RETURNING"
4571 )
4572 result_expr: Union[elements.Label[Any], _CompileLabel]
4573
4574 if isinstance(column, elements.Label):
4575 if col_expr is not column:
4576 result_expr = _CompileLabel(
4577 col_expr, column.name, alt_names=(column.element,)
4578 )
4579 else:
4580 result_expr = col_expr
4581
4582 elif name:
4583 # here, _columns_plus_names has determined there's an explicit
4584 # label name we need to use. this is the default for
4585 # tablenames_plus_columnnames as well as when columns are being
4586 # deduplicated on name
4587
4588 assert (
4589 proxy_name is not None
4590 ), "proxy_name is required if 'name' is passed"
4591
4592 result_expr = _CompileLabel(
4593 col_expr,
4594 name,
4595 alt_names=(
4596 proxy_name,
4597 # this is a hack to allow legacy result column lookups
4598 # to work as they did before; this goes away in 2.0.
4599 # TODO: this only seems to be tested indirectly
4600 # via test/orm/test_deprecations.py. should be a
4601 # resultset test for this
4602 column._tq_label,
4603 ),
4604 )
4605 else:
4606 # determine here whether this column should be rendered in
4607 # a labelled context or not, as we were given no required label
4608 # name from the caller. Here we apply heuristics based on the kind
4609 # of SQL expression involved.
4610
4611 if col_expr is not column:
4612 # type-specific expression wrapping the given column,
4613 # so we render a label
4614 render_with_label = True
4615 elif isinstance(column, elements.ColumnClause):
4616 # table-bound column, we render its name as a label if we are
4617 # inside of a subquery only
4618 render_with_label = (
4619 asfrom
4620 and not column.is_literal
4621 and column.table is not None
4622 )
4623 elif isinstance(column, elements.TextClause):
4624 render_with_label = False
4625 elif isinstance(column, elements.UnaryExpression):
4626 # unary expression. notes added as of #12681
4627 #
4628 # By convention, the visit_unary() method
4629 # itself does not add an entry to the result map, and relies
4630 # upon either the inner expression creating a result map
4631 # entry, or if not, by creating a label here that produces
4632 # the result map entry. Where that happens is based on whether
4633 # or not the element immediately inside the unary is a
4634 # NamedColumn subclass or not.
4635 #
4636 # Now, this also impacts how the SELECT is written; if
4637 # we decide to generate a label here, we get the usual
4638 # "~(x+y) AS anon_1" thing in the columns clause. If we
4639 # don't, we don't get an AS at all, we get like
4640 # "~table.column".
4641 #
4642 # But here is the important thing as of modernish (like 1.4)
4643 # versions of SQLAlchemy - **whether or not the AS <label>
4644 # is present in the statement is not actually important**.
4645 # We target result columns **positionally** for a fully
4646 # compiled ``Select()`` object; before 1.4 we needed those
4647 # labels to match in cursor.description etc etc but now it
4648 # really doesn't matter.
4649 # So really, we could set render_with_label True in all cases.
4650 # Or we could just have visit_unary() populate the result map
4651 # in all cases.
4652 #
4653 # What we're doing here is strictly trying to not rock the
4654 # boat too much with when we do/don't render "AS label";
4655 # labels being present helps in the edge cases that we
4656 # "fall back" to named cursor.description matching, labels
4657 # not being present for columns keeps us from having awkward
4658 # phrases like "SELECT DISTINCT table.x AS x".
4659 render_with_label = (
4660 (
4661 # exception case to detect if we render "not boolean"
4662 # as "not <col>" for native boolean or "<col> = 1"
4663 # for non-native boolean. this is controlled by
4664 # visit_is_<true|false>_unary_operator
4665 column.operator
4666 in (operators.is_false, operators.is_true)
4667 and not self.dialect.supports_native_boolean
4668 )
4669 or column._wraps_unnamed_column()
4670 or asfrom
4671 )
4672 elif (
4673 # general class of expressions that don't have a SQL-column
4674 # addressable name. includes scalar selects, bind parameters,
4675 # SQL functions, others
4676 not isinstance(column, elements.NamedColumn)
4677 # deeper check that indicates there's no natural "name" to
4678 # this element, which accommodates for custom SQL constructs
4679 # that might have a ".name" attribute (but aren't SQL
4680 # functions) but are not implementing this more recently added
4681 # base class. in theory the "NamedColumn" check should be
4682 # enough, however here we seek to maintain legacy behaviors
4683 # as well.
4684 and column._non_anon_label is None
4685 ):
4686 render_with_label = True
4687 else:
4688 render_with_label = False
4689
4690 if render_with_label:
4691 if not fallback_label_name:
4692 # used by the RETURNING case right now. we generate it
4693 # here as 3rd party dialects may be referring to
4694 # _label_select_column method directly instead of the
4695 # just-added _label_returning_column method
4696 assert not column_is_repeated
4697 fallback_label_name = column._anon_name_label
4698
4699 fallback_label_name = (
4700 elements._truncated_label(fallback_label_name)
4701 if not isinstance(
4702 fallback_label_name, elements._truncated_label
4703 )
4704 else fallback_label_name
4705 )
4706
4707 result_expr = _CompileLabel(
4708 col_expr, fallback_label_name, alt_names=(proxy_name,)
4709 )
4710 else:
4711 result_expr = col_expr
4712
4713 column_clause_args.update(
4714 within_columns_clause=within_columns_clause,
4715 add_to_result_map=add_to_result_map,
4716 include_table=include_table,
4717 )
4718 return result_expr._compiler_dispatch(self, **column_clause_args)
4719
4720 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4721 hinttext = self.get_from_hint_text(table, hint)
4722 if hinttext:
4723 sqltext += " " + hinttext
4724 return sqltext
4725
4726 def get_select_hint_text(self, byfroms):
4727 return None
4728
4729 def get_from_hint_text(
4730 self, table: FromClause, text: Optional[str]
4731 ) -> Optional[str]:
4732 return None
4733
4734 def get_crud_hint_text(self, table, text):
4735 return None
4736
4737 def get_statement_hint_text(self, hint_texts):
4738 return " ".join(hint_texts)
4739
4740 _default_stack_entry: _CompilerStackEntry
4741
4742 if not typing.TYPE_CHECKING:
4743 _default_stack_entry = util.immutabledict(
4744 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4745 )
4746
4747 def _display_froms_for_select(
4748 self, select_stmt, asfrom, lateral=False, **kw
4749 ):
4750 # utility method to help external dialects
4751 # get the correct from list for a select.
4752 # specifically the oracle dialect needs this feature
4753 # right now.
4754 toplevel = not self.stack
4755 entry = self._default_stack_entry if toplevel else self.stack[-1]
4756
4757 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4758
4759 correlate_froms = entry["correlate_froms"]
4760 asfrom_froms = entry["asfrom_froms"]
4761
4762 if asfrom and not lateral:
4763 froms = compile_state._get_display_froms(
4764 explicit_correlate_froms=correlate_froms.difference(
4765 asfrom_froms
4766 ),
4767 implicit_correlate_froms=(),
4768 )
4769 else:
4770 froms = compile_state._get_display_froms(
4771 explicit_correlate_froms=correlate_froms,
4772 implicit_correlate_froms=asfrom_froms,
4773 )
4774 return froms
4775
4776 translate_select_structure: Any = None
4777 """if not ``None``, should be a callable which accepts ``(select_stmt,
4778 **kw)`` and returns a select object. this is used for structural changes
4779 mostly to accommodate for LIMIT/OFFSET schemes
4780
4781 """
4782
4783 def visit_select(
4784 self,
4785 select_stmt,
4786 asfrom=False,
4787 insert_into=False,
4788 fromhints=None,
4789 compound_index=None,
4790 select_wraps_for=None,
4791 lateral=False,
4792 from_linter=None,
4793 **kwargs,
4794 ):
4795 assert select_wraps_for is None, (
4796 "SQLAlchemy 1.4 requires use of "
4797 "the translate_select_structure hook for structural "
4798 "translations of SELECT objects"
4799 )
4800
4801 # initial setup of SELECT. the compile_state_factory may now
4802 # be creating a totally different SELECT from the one that was
4803 # passed in. for ORM use this will convert from an ORM-state
4804 # SELECT to a regular "Core" SELECT. other composed operations
4805 # such as computation of joins will be performed.
4806
4807 kwargs["within_columns_clause"] = False
4808
4809 compile_state = select_stmt._compile_state_factory(
4810 select_stmt, self, **kwargs
4811 )
4812 kwargs["ambiguous_table_name_map"] = (
4813 compile_state._ambiguous_table_name_map
4814 )
4815
4816 select_stmt = compile_state.statement
4817
4818 toplevel = not self.stack
4819
4820 if toplevel and not self.compile_state:
4821 self.compile_state = compile_state
4822
4823 is_embedded_select = compound_index is not None or insert_into
4824
4825 # translate step for Oracle, SQL Server which often need to
4826 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4827 # other conditions
4828 if self.translate_select_structure:
4829 new_select_stmt = self.translate_select_structure(
4830 select_stmt, asfrom=asfrom, **kwargs
4831 )
4832
4833 # if SELECT was restructured, maintain a link to the originals
4834 # and assemble a new compile state
4835 if new_select_stmt is not select_stmt:
4836 compile_state_wraps_for = compile_state
4837 select_wraps_for = select_stmt
4838 select_stmt = new_select_stmt
4839
4840 compile_state = select_stmt._compile_state_factory(
4841 select_stmt, self, **kwargs
4842 )
4843 select_stmt = compile_state.statement
4844
4845 entry = self._default_stack_entry if toplevel else self.stack[-1]
4846
4847 populate_result_map = need_column_expressions = (
4848 toplevel
4849 or entry.get("need_result_map_for_compound", False)
4850 or entry.get("need_result_map_for_nested", False)
4851 )
4852
4853 # indicates there is a CompoundSelect in play and we are not the
4854 # first select
4855 if compound_index:
4856 populate_result_map = False
4857
4858 # this was first proposed as part of #3372; however, it is not
4859 # reached in current tests and could possibly be an assertion
4860 # instead.
4861 if not populate_result_map and "add_to_result_map" in kwargs:
4862 del kwargs["add_to_result_map"]
4863
4864 froms = self._setup_select_stack(
4865 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4866 )
4867
4868 column_clause_args = kwargs.copy()
4869 column_clause_args.update(
4870 {"within_label_clause": False, "within_columns_clause": False}
4871 )
4872
4873 text = "SELECT " # we're off to a good start !
4874
4875 if select_stmt._hints:
4876 hint_text, byfrom = self._setup_select_hints(select_stmt)
4877 if hint_text:
4878 text += hint_text + " "
4879 else:
4880 byfrom = None
4881
4882 if select_stmt._independent_ctes:
4883 self._dispatch_independent_ctes(select_stmt, kwargs)
4884
4885 if select_stmt._prefixes:
4886 text += self._generate_prefixes(
4887 select_stmt, select_stmt._prefixes, **kwargs
4888 )
4889
4890 text += self.get_select_precolumns(select_stmt, **kwargs)
4891 # the actual list of columns to print in the SELECT column list.
4892 inner_columns = [
4893 c
4894 for c in [
4895 self._label_select_column(
4896 select_stmt,
4897 column,
4898 populate_result_map,
4899 asfrom,
4900 column_clause_args,
4901 name=name,
4902 proxy_name=proxy_name,
4903 fallback_label_name=fallback_label_name,
4904 column_is_repeated=repeated,
4905 need_column_expressions=need_column_expressions,
4906 )
4907 for (
4908 name,
4909 proxy_name,
4910 fallback_label_name,
4911 column,
4912 repeated,
4913 ) in compile_state.columns_plus_names
4914 ]
4915 if c is not None
4916 ]
4917
4918 if populate_result_map and select_wraps_for is not None:
4919 # if this select was generated from translate_select,
4920 # rewrite the targeted columns in the result map
4921
4922 translate = dict(
4923 zip(
4924 [
4925 name
4926 for (
4927 key,
4928 proxy_name,
4929 fallback_label_name,
4930 name,
4931 repeated,
4932 ) in compile_state.columns_plus_names
4933 ],
4934 [
4935 name
4936 for (
4937 key,
4938 proxy_name,
4939 fallback_label_name,
4940 name,
4941 repeated,
4942 ) in compile_state_wraps_for.columns_plus_names
4943 ],
4944 )
4945 )
4946
4947 self._result_columns = [
4948 ResultColumnsEntry(
4949 key, name, tuple(translate.get(o, o) for o in obj), type_
4950 )
4951 for key, name, obj, type_ in self._result_columns
4952 ]
4953
4954 text = self._compose_select_body(
4955 text,
4956 select_stmt,
4957 compile_state,
4958 inner_columns,
4959 froms,
4960 byfrom,
4961 toplevel,
4962 kwargs,
4963 )
4964
4965 if select_stmt._statement_hints:
4966 per_dialect = [
4967 ht
4968 for (dialect_name, ht) in select_stmt._statement_hints
4969 if dialect_name in ("*", self.dialect.name)
4970 ]
4971 if per_dialect:
4972 text += " " + self.get_statement_hint_text(per_dialect)
4973
4974 # In compound query, CTEs are shared at the compound level
4975 if self.ctes and (not is_embedded_select or toplevel):
4976 nesting_level = len(self.stack) if not toplevel else None
4977 text = self._render_cte_clause(nesting_level=nesting_level) + text
4978
4979 if select_stmt._suffixes:
4980 text += " " + self._generate_prefixes(
4981 select_stmt, select_stmt._suffixes, **kwargs
4982 )
4983
4984 self.stack.pop(-1)
4985
4986 return text
4987
4988 def _setup_select_hints(
4989 self, select: Select[Any]
4990 ) -> Tuple[str, _FromHintsType]:
4991 byfrom = {
4992 from_: hinttext
4993 % {"name": from_._compiler_dispatch(self, ashint=True)}
4994 for (from_, dialect), hinttext in select._hints.items()
4995 if dialect in ("*", self.dialect.name)
4996 }
4997 hint_text = self.get_select_hint_text(byfrom)
4998 return hint_text, byfrom
4999
5000 def _setup_select_stack(
5001 self, select, compile_state, entry, asfrom, lateral, compound_index
5002 ):
5003 correlate_froms = entry["correlate_froms"]
5004 asfrom_froms = entry["asfrom_froms"]
5005
5006 if compound_index == 0:
5007 entry["select_0"] = select
5008 elif compound_index:
5009 select_0 = entry["select_0"]
5010 numcols = len(select_0._all_selected_columns)
5011
5012 if len(compile_state.columns_plus_names) != numcols:
5013 raise exc.CompileError(
5014 "All selectables passed to "
5015 "CompoundSelect must have identical numbers of "
5016 "columns; select #%d has %d columns, select "
5017 "#%d has %d"
5018 % (
5019 1,
5020 numcols,
5021 compound_index + 1,
5022 len(select._all_selected_columns),
5023 )
5024 )
5025
5026 if asfrom and not lateral:
5027 froms = compile_state._get_display_froms(
5028 explicit_correlate_froms=correlate_froms.difference(
5029 asfrom_froms
5030 ),
5031 implicit_correlate_froms=(),
5032 )
5033 else:
5034 froms = compile_state._get_display_froms(
5035 explicit_correlate_froms=correlate_froms,
5036 implicit_correlate_froms=asfrom_froms,
5037 )
5038
5039 new_correlate_froms = set(_from_objects(*froms))
5040 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5041
5042 new_entry: _CompilerStackEntry = {
5043 "asfrom_froms": new_correlate_froms,
5044 "correlate_froms": all_correlate_froms,
5045 "selectable": select,
5046 "compile_state": compile_state,
5047 }
5048 self.stack.append(new_entry)
5049
5050 return froms
5051
5052 def _compose_select_body(
5053 self,
5054 text,
5055 select,
5056 compile_state,
5057 inner_columns,
5058 froms,
5059 byfrom,
5060 toplevel,
5061 kwargs,
5062 ):
5063 text += ", ".join(inner_columns)
5064
5065 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5066 from_linter = FromLinter({}, set())
5067 warn_linting = self.linting & WARN_LINTING
5068 if toplevel:
5069 self.from_linter = from_linter
5070 else:
5071 from_linter = None
5072 warn_linting = False
5073
5074 # adjust the whitespace for no inner columns, part of #9440,
5075 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5076 # "SELECT FROM ...".
5077 # while it would be better to have built the SELECT starting string
5078 # without trailing whitespace first, then add whitespace only if inner
5079 # cols were present, this breaks compatibility with various custom
5080 # compilation schemes that are currently being tested.
5081 if not inner_columns:
5082 text = text.rstrip()
5083
5084 if froms:
5085 text += " \nFROM "
5086
5087 if select._hints:
5088 text += ", ".join(
5089 [
5090 f._compiler_dispatch(
5091 self,
5092 asfrom=True,
5093 fromhints=byfrom,
5094 from_linter=from_linter,
5095 **kwargs,
5096 )
5097 for f in froms
5098 ]
5099 )
5100 else:
5101 text += ", ".join(
5102 [
5103 f._compiler_dispatch(
5104 self,
5105 asfrom=True,
5106 from_linter=from_linter,
5107 **kwargs,
5108 )
5109 for f in froms
5110 ]
5111 )
5112 else:
5113 text += self.default_from()
5114
5115 if select._where_criteria:
5116 t = self._generate_delimited_and_list(
5117 select._where_criteria, from_linter=from_linter, **kwargs
5118 )
5119 if t:
5120 text += " \nWHERE " + t
5121
5122 if warn_linting:
5123 assert from_linter is not None
5124 from_linter.warn()
5125
5126 if select._group_by_clauses:
5127 text += self.group_by_clause(select, **kwargs)
5128
5129 if select._having_criteria:
5130 t = self._generate_delimited_and_list(
5131 select._having_criteria, **kwargs
5132 )
5133 if t:
5134 text += " \nHAVING " + t
5135
5136 if select._order_by_clauses:
5137 text += self.order_by_clause(select, **kwargs)
5138
5139 if select._has_row_limiting_clause:
5140 text += self._row_limit_clause(select, **kwargs)
5141
5142 if select._for_update_arg is not None:
5143 text += self.for_update_clause(select, **kwargs)
5144
5145 return text
5146
5147 def _generate_prefixes(self, stmt, prefixes, **kw):
5148 clause = " ".join(
5149 prefix._compiler_dispatch(self, **kw)
5150 for prefix, dialect_name in prefixes
5151 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5152 )
5153 if clause:
5154 clause += " "
5155 return clause
5156
5157 def _render_cte_clause(
5158 self,
5159 nesting_level=None,
5160 include_following_stack=False,
5161 ):
5162 """
5163 include_following_stack
5164 Also render the nesting CTEs on the next stack. Useful for
5165 SQL structures like UNION or INSERT that can wrap SELECT
5166 statements containing nesting CTEs.
5167 """
5168 if not self.ctes:
5169 return ""
5170
5171 ctes: MutableMapping[CTE, str]
5172
5173 if nesting_level and nesting_level > 1:
5174 ctes = util.OrderedDict()
5175 for cte in list(self.ctes.keys()):
5176 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5177 cte._get_reference_cte()
5178 ]
5179 nesting = cte.nesting or cte_opts.nesting
5180 is_rendered_level = cte_level == nesting_level or (
5181 include_following_stack and cte_level == nesting_level + 1
5182 )
5183 if not (nesting and is_rendered_level):
5184 continue
5185
5186 ctes[cte] = self.ctes[cte]
5187
5188 else:
5189 ctes = self.ctes
5190
5191 if not ctes:
5192 return ""
5193 ctes_recursive = any([cte.recursive for cte in ctes])
5194
5195 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5196 cte_text += ", \n".join([txt for txt in ctes.values()])
5197 cte_text += "\n "
5198
5199 if nesting_level and nesting_level > 1:
5200 for cte in list(ctes.keys()):
5201 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5202 cte._get_reference_cte()
5203 ]
5204 del self.ctes[cte]
5205 del self.ctes_by_level_name[(cte_level, cte_name)]
5206 del self.level_name_by_cte[cte._get_reference_cte()]
5207
5208 return cte_text
5209
5210 def get_cte_preamble(self, recursive):
5211 if recursive:
5212 return "WITH RECURSIVE"
5213 else:
5214 return "WITH"
5215
5216 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5217 """Called when building a ``SELECT`` statement, position is just
5218 before column list.
5219
5220 """
5221 if select._distinct_on:
5222 util.warn_deprecated(
5223 "DISTINCT ON is currently supported only by the PostgreSQL "
5224 "dialect. Use of DISTINCT ON for other backends is currently "
5225 "silently ignored, however this usage is deprecated, and will "
5226 "raise CompileError in a future release for all backends "
5227 "that do not support this syntax.",
5228 version="1.4",
5229 )
5230 return "DISTINCT " if select._distinct else ""
5231
5232 def group_by_clause(self, select, **kw):
5233 """allow dialects to customize how GROUP BY is rendered."""
5234
5235 group_by = self._generate_delimited_list(
5236 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5237 )
5238 if group_by:
5239 return " GROUP BY " + group_by
5240 else:
5241 return ""
5242
5243 def order_by_clause(self, select, **kw):
5244 """allow dialects to customize how ORDER BY is rendered."""
5245
5246 order_by = self._generate_delimited_list(
5247 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5248 )
5249
5250 if order_by:
5251 return " ORDER BY " + order_by
5252 else:
5253 return ""
5254
5255 def for_update_clause(self, select, **kw):
5256 return " FOR UPDATE"
5257
5258 def returning_clause(
5259 self,
5260 stmt: UpdateBase,
5261 returning_cols: Sequence[_ColumnsClauseElement],
5262 *,
5263 populate_result_map: bool,
5264 **kw: Any,
5265 ) -> str:
5266 columns = [
5267 self._label_returning_column(
5268 stmt,
5269 column,
5270 populate_result_map,
5271 fallback_label_name=fallback_label_name,
5272 column_is_repeated=repeated,
5273 name=name,
5274 proxy_name=proxy_name,
5275 **kw,
5276 )
5277 for (
5278 name,
5279 proxy_name,
5280 fallback_label_name,
5281 column,
5282 repeated,
5283 ) in stmt._generate_columns_plus_names(
5284 True, cols=base._select_iterables(returning_cols)
5285 )
5286 ]
5287
5288 return "RETURNING " + ", ".join(columns)
5289
5290 def limit_clause(self, select, **kw):
5291 text = ""
5292 if select._limit_clause is not None:
5293 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5294 if select._offset_clause is not None:
5295 if select._limit_clause is None:
5296 text += "\n LIMIT -1"
5297 text += " OFFSET " + self.process(select._offset_clause, **kw)
5298 return text
5299
5300 def fetch_clause(
5301 self,
5302 select,
5303 fetch_clause=None,
5304 require_offset=False,
5305 use_literal_execute_for_simple_int=False,
5306 **kw,
5307 ):
5308 if fetch_clause is None:
5309 fetch_clause = select._fetch_clause
5310 fetch_clause_options = select._fetch_clause_options
5311 else:
5312 fetch_clause_options = {"percent": False, "with_ties": False}
5313
5314 text = ""
5315
5316 if select._offset_clause is not None:
5317 offset_clause = select._offset_clause
5318 if (
5319 use_literal_execute_for_simple_int
5320 and select._simple_int_clause(offset_clause)
5321 ):
5322 offset_clause = offset_clause.render_literal_execute()
5323 offset_str = self.process(offset_clause, **kw)
5324 text += "\n OFFSET %s ROWS" % offset_str
5325 elif require_offset:
5326 text += "\n OFFSET 0 ROWS"
5327
5328 if fetch_clause is not None:
5329 if (
5330 use_literal_execute_for_simple_int
5331 and select._simple_int_clause(fetch_clause)
5332 ):
5333 fetch_clause = fetch_clause.render_literal_execute()
5334 text += "\n FETCH FIRST %s%s ROWS %s" % (
5335 self.process(fetch_clause, **kw),
5336 " PERCENT" if fetch_clause_options["percent"] else "",
5337 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5338 )
5339 return text
5340
5341 def visit_table(
5342 self,
5343 table,
5344 asfrom=False,
5345 iscrud=False,
5346 ashint=False,
5347 fromhints=None,
5348 use_schema=True,
5349 from_linter=None,
5350 ambiguous_table_name_map=None,
5351 enclosing_alias=None,
5352 **kwargs,
5353 ):
5354 if from_linter:
5355 from_linter.froms[table] = table.fullname
5356
5357 if asfrom or ashint:
5358 effective_schema = self.preparer.schema_for_object(table)
5359
5360 if use_schema and effective_schema:
5361 ret = (
5362 self.preparer.quote_schema(effective_schema)
5363 + "."
5364 + self.preparer.quote(table.name)
5365 )
5366 else:
5367 ret = self.preparer.quote(table.name)
5368
5369 if (
5370 (
5371 enclosing_alias is None
5372 or enclosing_alias.element is not table
5373 )
5374 and not effective_schema
5375 and ambiguous_table_name_map
5376 and table.name in ambiguous_table_name_map
5377 ):
5378 anon_name = self._truncated_identifier(
5379 "alias", ambiguous_table_name_map[table.name]
5380 )
5381
5382 ret = ret + self.get_render_as_alias_suffix(
5383 self.preparer.format_alias(None, anon_name)
5384 )
5385
5386 if fromhints and table in fromhints:
5387 ret = self.format_from_hint_text(
5388 ret, table, fromhints[table], iscrud
5389 )
5390 return ret
5391 else:
5392 return ""
5393
5394 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5395 if from_linter:
5396 from_linter.edges.update(
5397 itertools.product(
5398 _de_clone(join.left._from_objects),
5399 _de_clone(join.right._from_objects),
5400 )
5401 )
5402
5403 if join.full:
5404 join_type = " FULL OUTER JOIN "
5405 elif join.isouter:
5406 join_type = " LEFT OUTER JOIN "
5407 else:
5408 join_type = " JOIN "
5409 return (
5410 join.left._compiler_dispatch(
5411 self, asfrom=True, from_linter=from_linter, **kwargs
5412 )
5413 + join_type
5414 + join.right._compiler_dispatch(
5415 self, asfrom=True, from_linter=from_linter, **kwargs
5416 )
5417 + " ON "
5418 # TODO: likely need asfrom=True here?
5419 + join.onclause._compiler_dispatch(
5420 self, from_linter=from_linter, **kwargs
5421 )
5422 )
5423
5424 def _setup_crud_hints(self, stmt, table_text):
5425 dialect_hints = {
5426 table: hint_text
5427 for (table, dialect), hint_text in stmt._hints.items()
5428 if dialect in ("*", self.dialect.name)
5429 }
5430 if stmt.table in dialect_hints:
5431 table_text = self.format_from_hint_text(
5432 table_text, stmt.table, dialect_hints[stmt.table], True
5433 )
5434 return dialect_hints, table_text
5435
5436 # within the realm of "insertmanyvalues sentinel columns",
5437 # these lookups match different kinds of Column() configurations
5438 # to specific backend capabilities. they are broken into two
5439 # lookups, one for autoincrement columns and the other for non
5440 # autoincrement columns
5441 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5442 {
5443 _SentinelDefaultCharacterization.CLIENTSIDE: (
5444 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5445 ),
5446 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5447 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5448 ),
5449 _SentinelDefaultCharacterization.NONE: (
5450 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5451 ),
5452 _SentinelDefaultCharacterization.IDENTITY: (
5453 InsertmanyvaluesSentinelOpts.IDENTITY
5454 ),
5455 _SentinelDefaultCharacterization.SEQUENCE: (
5456 InsertmanyvaluesSentinelOpts.SEQUENCE
5457 ),
5458 }
5459 )
5460 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5461 {
5462 _SentinelDefaultCharacterization.NONE: (
5463 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5464 ),
5465 }
5466 )
5467
5468 def _get_sentinel_column_for_table(
5469 self, table: Table
5470 ) -> Optional[Sequence[Column[Any]]]:
5471 """given a :class:`.Table`, return a usable sentinel column or
5472 columns for this dialect if any.
5473
5474 Return None if no sentinel columns could be identified, or raise an
5475 error if a column was marked as a sentinel explicitly but isn't
5476 compatible with this dialect.
5477
5478 """
5479
5480 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5481 sentinel_characteristics = table._sentinel_column_characteristics
5482
5483 sent_cols = sentinel_characteristics.columns
5484
5485 if sent_cols is None:
5486 return None
5487
5488 if sentinel_characteristics.is_autoinc:
5489 bitmask = self._sentinel_col_autoinc_lookup.get(
5490 sentinel_characteristics.default_characterization, 0
5491 )
5492 else:
5493 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5494 sentinel_characteristics.default_characterization, 0
5495 )
5496
5497 if sentinel_opts & bitmask:
5498 return sent_cols
5499
5500 if sentinel_characteristics.is_explicit:
5501 # a column was explicitly marked as insert_sentinel=True,
5502 # however it is not compatible with this dialect. they should
5503 # not indicate this column as a sentinel if they need to include
5504 # this dialect.
5505
5506 # TODO: do we want non-primary key explicit sentinel cols
5507 # that can gracefully degrade for some backends?
5508 # insert_sentinel="degrade" perhaps. not for the initial release.
5509 # I am hoping people are generally not dealing with this sentinel
5510 # business at all.
5511
5512 # if is_explicit is True, there will be only one sentinel column.
5513
5514 raise exc.InvalidRequestError(
5515 f"Column {sent_cols[0]} can't be explicitly "
5516 "marked as a sentinel column when using the "
5517 f"{self.dialect.name} dialect, as the "
5518 "particular type of default generation on this column is "
5519 "not currently compatible with this dialect's specific "
5520 f"INSERT..RETURNING syntax which can receive the "
5521 "server-generated value in "
5522 "a deterministic way. To remove this error, remove "
5523 "insert_sentinel=True from primary key autoincrement "
5524 "columns; these columns are automatically used as "
5525 "sentinels for supported dialects in any case."
5526 )
5527
5528 return None
5529
5530 def _deliver_insertmanyvalues_batches(
5531 self,
5532 statement: str,
5533 parameters: _DBAPIMultiExecuteParams,
5534 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5535 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5536 batch_size: int,
5537 sort_by_parameter_order: bool,
5538 schema_translate_map: Optional[SchemaTranslateMapType],
5539 ) -> Iterator[_InsertManyValuesBatch]:
5540 imv = self._insertmanyvalues
5541 assert imv is not None
5542
5543 if not imv.sentinel_param_keys:
5544 _sentinel_from_params = None
5545 else:
5546 _sentinel_from_params = operator.itemgetter(
5547 *imv.sentinel_param_keys
5548 )
5549
5550 lenparams = len(parameters)
5551 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5552 # backend doesn't support
5553 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5554 # at the moment this is basically SQL Server due to
5555 # not being able to use DEFAULT for identity column
5556 # just yield out that many single statements! still
5557 # faster than a whole connection.execute() call ;)
5558 #
5559 # note we still are taking advantage of the fact that we know
5560 # we are using RETURNING. The generalized approach of fetching
5561 # cursor.lastrowid etc. still goes through the more heavyweight
5562 # "ExecutionContext per statement" system as it isn't usable
5563 # as a generic "RETURNING" approach
5564 use_row_at_a_time = True
5565 downgraded = False
5566 elif not self.dialect.supports_multivalues_insert or (
5567 sort_by_parameter_order
5568 and self._result_columns
5569 and (
5570 imv.sentinel_columns is None
5571 or (
5572 imv.includes_upsert_behaviors
5573 and not imv.embed_values_counter
5574 )
5575 )
5576 ):
5577 # deterministic order was requested and the compiler could
5578 # not organize sentinel columns for this dialect/statement.
5579 # use row at a time. Note: if embed_values_counter is True,
5580 # the counter itself provides the ordering capability we need,
5581 # so we can use batch mode even with upsert behaviors.
5582 use_row_at_a_time = True
5583 downgraded = True
5584 elif (
5585 imv.has_upsert_bound_parameters
5586 and not imv.embed_values_counter
5587 and self._result_columns
5588 ):
5589 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
5590 # and parametrized bindparams in the SET clause, we must use
5591 # row-at-a-time. Batching multiple rows in a single statement
5592 # doesn't work when the SET clause contains bound parameters that
5593 # will receive different values per row, as there's only one SET
5594 # clause per statement. See issue #13130.
5595 use_row_at_a_time = True
5596 downgraded = True
5597 else:
5598 use_row_at_a_time = False
5599 downgraded = False
5600
5601 if use_row_at_a_time:
5602 for batchnum, (param, compiled_param) in enumerate(
5603 cast(
5604 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5605 zip(parameters, compiled_parameters),
5606 ),
5607 1,
5608 ):
5609 yield _InsertManyValuesBatch(
5610 statement,
5611 param,
5612 generic_setinputsizes,
5613 [param],
5614 (
5615 [_sentinel_from_params(compiled_param)]
5616 if _sentinel_from_params
5617 else []
5618 ),
5619 1,
5620 batchnum,
5621 lenparams,
5622 sort_by_parameter_order,
5623 downgraded,
5624 )
5625 return
5626
5627 if schema_translate_map:
5628 rst = functools.partial(
5629 self.preparer._render_schema_translates,
5630 schema_translate_map=schema_translate_map,
5631 )
5632 else:
5633 rst = None
5634
5635 imv_single_values_expr = imv.single_values_expr
5636 if rst:
5637 imv_single_values_expr = rst(imv_single_values_expr)
5638
5639 executemany_values = f"({imv_single_values_expr})"
5640 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5641
5642 # Use optional insertmanyvalues_max_parameters
5643 # to further shrink the batch size so that there are no more than
5644 # insertmanyvalues_max_parameters params.
5645 # Currently used by SQL Server, which limits statements to 2100 bound
5646 # parameters (actually 2099).
5647 max_params = self.dialect.insertmanyvalues_max_parameters
5648 if max_params:
5649 total_num_of_params = len(self.bind_names)
5650 num_params_per_batch = len(imv.insert_crud_params)
5651 num_params_outside_of_batch = (
5652 total_num_of_params - num_params_per_batch
5653 )
5654 batch_size = min(
5655 batch_size,
5656 (
5657 (max_params - num_params_outside_of_batch)
5658 // num_params_per_batch
5659 ),
5660 )
5661
5662 batches = cast("List[Sequence[Any]]", list(parameters))
5663 compiled_batches = cast(
5664 "List[Sequence[Any]]", list(compiled_parameters)
5665 )
5666
5667 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5668 batchnum = 1
5669 total_batches = lenparams // batch_size + (
5670 1 if lenparams % batch_size else 0
5671 )
5672
5673 insert_crud_params = imv.insert_crud_params
5674 assert insert_crud_params is not None
5675
5676 if rst:
5677 insert_crud_params = [
5678 (col, key, rst(expr), st)
5679 for col, key, expr, st in insert_crud_params
5680 ]
5681
5682 escaped_bind_names: Mapping[str, str]
5683 expand_pos_lower_index = expand_pos_upper_index = 0
5684
5685 if not self.positional:
5686 if self.escaped_bind_names:
5687 escaped_bind_names = self.escaped_bind_names
5688 else:
5689 escaped_bind_names = {}
5690
5691 all_keys = set(parameters[0])
5692
5693 def apply_placeholders(keys, formatted):
5694 for key in keys:
5695 key = escaped_bind_names.get(key, key)
5696 formatted = formatted.replace(
5697 self.bindtemplate % {"name": key},
5698 self.bindtemplate
5699 % {"name": f"{key}__EXECMANY_INDEX__"},
5700 )
5701 return formatted
5702
5703 if imv.embed_values_counter:
5704 imv_values_counter = ", _IMV_VALUES_COUNTER"
5705 else:
5706 imv_values_counter = ""
5707 formatted_values_clause = f"""({', '.join(
5708 apply_placeholders(bind_keys, formatted)
5709 for _, _, formatted, bind_keys in insert_crud_params
5710 )}{imv_values_counter})"""
5711
5712 keys_to_replace = all_keys.intersection(
5713 escaped_bind_names.get(key, key)
5714 for _, _, _, bind_keys in insert_crud_params
5715 for key in bind_keys
5716 )
5717 base_parameters = {
5718 key: parameters[0][key]
5719 for key in all_keys.difference(keys_to_replace)
5720 }
5721
5722 executemany_values_w_comma = ""
5723 else:
5724 formatted_values_clause = ""
5725 keys_to_replace = set()
5726 base_parameters = {}
5727
5728 if imv.embed_values_counter:
5729 executemany_values_w_comma = (
5730 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5731 )
5732 else:
5733 executemany_values_w_comma = f"({imv_single_values_expr}), "
5734
5735 all_names_we_will_expand: Set[str] = set()
5736 for elem in imv.insert_crud_params:
5737 all_names_we_will_expand.update(elem[3])
5738
5739 # get the start and end position in a particular list
5740 # of parameters where we will be doing the "expanding".
5741 # statements can have params on either side or both sides,
5742 # given RETURNING and CTEs
5743 if all_names_we_will_expand:
5744 positiontup = self.positiontup
5745 assert positiontup is not None
5746
5747 all_expand_positions = {
5748 idx
5749 for idx, name in enumerate(positiontup)
5750 if name in all_names_we_will_expand
5751 }
5752 expand_pos_lower_index = min(all_expand_positions)
5753 expand_pos_upper_index = max(all_expand_positions) + 1
5754 assert (
5755 len(all_expand_positions)
5756 == expand_pos_upper_index - expand_pos_lower_index
5757 )
5758
5759 if self._numeric_binds:
5760 escaped = re.escape(self._numeric_binds_identifier_char)
5761 executemany_values_w_comma = re.sub(
5762 rf"{escaped}\d+", "%s", executemany_values_w_comma
5763 )
5764
5765 while batches:
5766 batch = batches[0:batch_size]
5767 compiled_batch = compiled_batches[0:batch_size]
5768
5769 batches[0:batch_size] = []
5770 compiled_batches[0:batch_size] = []
5771
5772 if batches:
5773 current_batch_size = batch_size
5774 else:
5775 current_batch_size = len(batch)
5776
5777 if generic_setinputsizes:
5778 # if setinputsizes is present, expand this collection to
5779 # suit the batch length as well
5780 # currently this will be mssql+pyodbc for internal dialects
5781 processed_setinputsizes = [
5782 (new_key, len_, typ)
5783 for new_key, len_, typ in (
5784 (f"{key}_{index}", len_, typ)
5785 for index in range(current_batch_size)
5786 for key, len_, typ in generic_setinputsizes
5787 )
5788 ]
5789
5790 replaced_parameters: Any
5791 if self.positional:
5792 num_ins_params = imv.num_positional_params_counted
5793
5794 batch_iterator: Iterable[Sequence[Any]]
5795 extra_params_left: Sequence[Any]
5796 extra_params_right: Sequence[Any]
5797
5798 if num_ins_params == len(batch[0]):
5799 extra_params_left = extra_params_right = ()
5800 batch_iterator = batch
5801 else:
5802 extra_params_left = batch[0][:expand_pos_lower_index]
5803 extra_params_right = batch[0][expand_pos_upper_index:]
5804 batch_iterator = (
5805 b[expand_pos_lower_index:expand_pos_upper_index]
5806 for b in batch
5807 )
5808
5809 if imv.embed_values_counter:
5810 expanded_values_string = (
5811 "".join(
5812 executemany_values_w_comma.replace(
5813 "_IMV_VALUES_COUNTER", str(i)
5814 )
5815 for i, _ in enumerate(batch)
5816 )
5817 )[:-2]
5818 else:
5819 expanded_values_string = (
5820 (executemany_values_w_comma * current_batch_size)
5821 )[:-2]
5822
5823 if self._numeric_binds and num_ins_params > 0:
5824 # numeric will always number the parameters inside of
5825 # VALUES (and thus order self.positiontup) to be higher
5826 # than non-VALUES parameters, no matter where in the
5827 # statement those non-VALUES parameters appear (this is
5828 # ensured in _process_numeric by numbering first all
5829 # params that are not in _values_bindparam)
5830 # therefore all extra params are always
5831 # on the left side and numbered lower than the VALUES
5832 # parameters
5833 assert not extra_params_right
5834
5835 start = expand_pos_lower_index + 1
5836 end = num_ins_params * (current_batch_size) + start
5837
5838 # need to format here, since statement may contain
5839 # unescaped %, while values_string contains just (%s, %s)
5840 positions = tuple(
5841 f"{self._numeric_binds_identifier_char}{i}"
5842 for i in range(start, end)
5843 )
5844 expanded_values_string = expanded_values_string % positions
5845
5846 replaced_statement = statement.replace(
5847 "__EXECMANY_TOKEN__", expanded_values_string
5848 )
5849
5850 replaced_parameters = tuple(
5851 itertools.chain.from_iterable(batch_iterator)
5852 )
5853
5854 replaced_parameters = (
5855 extra_params_left
5856 + replaced_parameters
5857 + extra_params_right
5858 )
5859
5860 else:
5861 replaced_values_clauses = []
5862 replaced_parameters = base_parameters.copy()
5863
5864 for i, param in enumerate(batch):
5865 fmv = formatted_values_clause.replace(
5866 "EXECMANY_INDEX__", str(i)
5867 )
5868 if imv.embed_values_counter:
5869 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5870
5871 replaced_values_clauses.append(fmv)
5872 replaced_parameters.update(
5873 {f"{key}__{i}": param[key] for key in keys_to_replace}
5874 )
5875
5876 replaced_statement = statement.replace(
5877 "__EXECMANY_TOKEN__",
5878 ", ".join(replaced_values_clauses),
5879 )
5880
5881 yield _InsertManyValuesBatch(
5882 replaced_statement,
5883 replaced_parameters,
5884 processed_setinputsizes,
5885 batch,
5886 (
5887 [_sentinel_from_params(cb) for cb in compiled_batch]
5888 if _sentinel_from_params
5889 else []
5890 ),
5891 current_batch_size,
5892 batchnum,
5893 total_batches,
5894 sort_by_parameter_order,
5895 False,
5896 )
5897 batchnum += 1
5898
5899 def visit_insert(
5900 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5901 ):
5902 compile_state = insert_stmt._compile_state_factory(
5903 insert_stmt, self, **kw
5904 )
5905 insert_stmt = compile_state.statement
5906
5907 if visiting_cte is not None:
5908 kw["visiting_cte"] = visiting_cte
5909 toplevel = False
5910 else:
5911 toplevel = not self.stack
5912
5913 if toplevel:
5914 self.isinsert = True
5915 if not self.dml_compile_state:
5916 self.dml_compile_state = compile_state
5917 if not self.compile_state:
5918 self.compile_state = compile_state
5919
5920 self.stack.append(
5921 {
5922 "correlate_froms": set(),
5923 "asfrom_froms": set(),
5924 "selectable": insert_stmt,
5925 }
5926 )
5927
5928 counted_bindparam = 0
5929
5930 # reset any incoming "visited_bindparam" collection
5931 visited_bindparam = None
5932
5933 # for positional, insertmanyvalues needs to know how many
5934 # bound parameters are in the VALUES sequence; there's no simple
5935 # rule because default expressions etc. can have zero or more
5936 # params inside them. After multiple attempts to figure this out,
5937 # this very simplistic "count after" works and is
5938 # likely the least amount of callcounts, though looks clumsy
5939 if self.positional and visiting_cte is None:
5940 # if we are inside a CTE, don't count parameters
5941 # here since they won't be for insertmanyvalues. keep
5942 # visited_bindparam at None so no counting happens.
5943 # see #9173
5944 visited_bindparam = []
5945
5946 crud_params_struct = crud._get_crud_params(
5947 self,
5948 insert_stmt,
5949 compile_state,
5950 toplevel,
5951 visited_bindparam=visited_bindparam,
5952 **kw,
5953 )
5954
5955 if self.positional and visited_bindparam is not None:
5956 counted_bindparam = len(visited_bindparam)
5957 if self._numeric_binds:
5958 if self._values_bindparam is not None:
5959 self._values_bindparam += visited_bindparam
5960 else:
5961 self._values_bindparam = visited_bindparam
5962
5963 crud_params_single = crud_params_struct.single_params
5964
5965 if (
5966 not crud_params_single
5967 and not self.dialect.supports_default_values
5968 and not self.dialect.supports_default_metavalue
5969 and not self.dialect.supports_empty_insert
5970 ):
5971 raise exc.CompileError(
5972 "The '%s' dialect with current database "
5973 "version settings does not support empty "
5974 "inserts." % self.dialect.name
5975 )
5976
5977 if compile_state._has_multi_parameters:
5978 if not self.dialect.supports_multivalues_insert:
5979 raise exc.CompileError(
5980 "The '%s' dialect with current database "
5981 "version settings does not support "
5982 "in-place multirow inserts." % self.dialect.name
5983 )
5984 elif (
5985 self.implicit_returning or insert_stmt._returning
5986 ) and insert_stmt._sort_by_parameter_order:
5987 raise exc.CompileError(
5988 "RETURNING cannot be deterministically sorted when "
5989 "using an INSERT which includes multi-row values()."
5990 )
5991 crud_params_single = crud_params_struct.single_params
5992 else:
5993 crud_params_single = crud_params_struct.single_params
5994
5995 preparer = self.preparer
5996 supports_default_values = self.dialect.supports_default_values
5997
5998 text = "INSERT "
5999
6000 if insert_stmt._prefixes:
6001 text += self._generate_prefixes(
6002 insert_stmt, insert_stmt._prefixes, **kw
6003 )
6004
6005 text += "INTO "
6006 table_text = preparer.format_table(insert_stmt.table)
6007
6008 if insert_stmt._hints:
6009 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6010
6011 if insert_stmt._independent_ctes:
6012 self._dispatch_independent_ctes(insert_stmt, kw)
6013
6014 text += table_text
6015
6016 if crud_params_single or not supports_default_values:
6017 text += " (%s)" % ", ".join(
6018 [expr for _, expr, _, _ in crud_params_single]
6019 )
6020
6021 # look for insertmanyvalues attributes that would have been configured
6022 # by crud.py as it scanned through the columns to be part of the
6023 # INSERT
6024 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6025 named_sentinel_params: Optional[Sequence[str]] = None
6026 add_sentinel_cols = None
6027 implicit_sentinel = False
6028
6029 returning_cols = self.implicit_returning or insert_stmt._returning
6030 if returning_cols:
6031 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6032 if add_sentinel_cols is not None:
6033 assert use_insertmanyvalues
6034
6035 # search for the sentinel column explicitly present
6036 # in the INSERT columns list, and additionally check that
6037 # this column has a bound parameter name set up that's in the
6038 # parameter list. If both of these cases are present, it means
6039 # we will have a client side value for the sentinel in each
6040 # parameter set.
6041
6042 _params_by_col = {
6043 col: param_names
6044 for col, _, _, param_names in crud_params_single
6045 }
6046 named_sentinel_params = []
6047 for _add_sentinel_col in add_sentinel_cols:
6048 if _add_sentinel_col not in _params_by_col:
6049 named_sentinel_params = None
6050 break
6051 param_name = self._within_exec_param_key_getter(
6052 _add_sentinel_col
6053 )
6054 if param_name not in _params_by_col[_add_sentinel_col]:
6055 named_sentinel_params = None
6056 break
6057 named_sentinel_params.append(param_name)
6058
6059 if named_sentinel_params is None:
6060 # if we are not going to have a client side value for
6061 # the sentinel in the parameter set, that means it's
6062 # an autoincrement, an IDENTITY, or a server-side SQL
6063 # expression like nextval('seqname'). So this is
6064 # an "implicit" sentinel; we will look for it in
6065 # RETURNING
6066 # only, and then sort on it. For this case on PG,
6067 # SQL Server we have to use a special INSERT form
6068 # that guarantees the server side function lines up with
6069 # the entries in the VALUES.
6070 if (
6071 self.dialect.insertmanyvalues_implicit_sentinel
6072 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6073 ):
6074 implicit_sentinel = True
6075 else:
6076 # here, we are not using a sentinel at all
6077 # and we are likely the SQLite dialect.
6078 # The first add_sentinel_col that we have should not
6079 # be marked as "insert_sentinel=True". if it was,
6080 # an error should have been raised in
6081 # _get_sentinel_column_for_table.
6082 assert not add_sentinel_cols[0]._insert_sentinel, (
6083 "sentinel selection rules should have prevented "
6084 "us from getting here for this dialect"
6085 )
6086
6087 # always put the sentinel columns last. even if they are
6088 # in the returning list already, they will be there twice
6089 # then.
6090 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6091
6092 returning_clause = self.returning_clause(
6093 insert_stmt,
6094 returning_cols,
6095 populate_result_map=toplevel,
6096 )
6097
6098 if self.returning_precedes_values:
6099 text += " " + returning_clause
6100
6101 else:
6102 returning_clause = None
6103
6104 if insert_stmt.select is not None:
6105 # placed here by crud.py
6106 select_text = self.process(
6107 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6108 )
6109
6110 if self.ctes and self.dialect.cte_follows_insert:
6111 nesting_level = len(self.stack) if not toplevel else None
6112 text += " %s%s" % (
6113 self._render_cte_clause(
6114 nesting_level=nesting_level,
6115 include_following_stack=True,
6116 ),
6117 select_text,
6118 )
6119 else:
6120 text += " %s" % select_text
6121 elif not crud_params_single and supports_default_values:
6122 text += " DEFAULT VALUES"
6123 if use_insertmanyvalues:
6124 self._insertmanyvalues = _InsertManyValues(
6125 True,
6126 self.dialect.default_metavalue_token,
6127 crud_params_single,
6128 counted_bindparam,
6129 sort_by_parameter_order=(
6130 insert_stmt._sort_by_parameter_order
6131 ),
6132 includes_upsert_behaviors=(
6133 insert_stmt._post_values_clause is not None
6134 ),
6135 sentinel_columns=add_sentinel_cols,
6136 num_sentinel_columns=(
6137 len(add_sentinel_cols) if add_sentinel_cols else 0
6138 ),
6139 implicit_sentinel=implicit_sentinel,
6140 )
6141 elif compile_state._has_multi_parameters:
6142 text += " VALUES %s" % (
6143 ", ".join(
6144 "(%s)"
6145 % (", ".join(value for _, _, value, _ in crud_param_set))
6146 for crud_param_set in crud_params_struct.all_multi_params
6147 ),
6148 )
6149 elif use_insertmanyvalues:
6150 if (
6151 implicit_sentinel
6152 and (
6153 self.dialect.insertmanyvalues_implicit_sentinel
6154 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6155 )
6156 # this is checking if we have
6157 # INSERT INTO table (id) VALUES (DEFAULT).
6158 and not (crud_params_struct.is_default_metavalue_only)
6159 ):
6160 # if we have a sentinel column that is server generated,
6161 # then for selected backends render the VALUES list as a
6162 # subquery. This is the orderable form supported by
6163 # PostgreSQL and in fewer cases SQL Server
6164 embed_sentinel_value = True
6165
6166 render_bind_casts = (
6167 self.dialect.insertmanyvalues_implicit_sentinel
6168 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6169 )
6170
6171 add_sentinel_set = add_sentinel_cols or ()
6172
6173 insert_single_values_expr = ", ".join(
6174 [
6175 value
6176 for col, _, value, _ in crud_params_single
6177 if col not in add_sentinel_set
6178 ]
6179 )
6180
6181 colnames = ", ".join(
6182 f"p{i}"
6183 for i, cp in enumerate(crud_params_single)
6184 if cp[0] not in add_sentinel_set
6185 )
6186
6187 if render_bind_casts:
6188 # render casts for the SELECT list. For PG, we are
6189 # already rendering bind casts in the parameter list,
6190 # selectively for the more "tricky" types like ARRAY.
6191 # however, even for the "easy" types, if the parameter
6192 # is NULL for every entry, PG gives up and says
6193 # "it must be TEXT", which fails for other easy types
6194 # like ints. So we cast on this side too.
6195 colnames_w_cast = ", ".join(
6196 (
6197 self.render_bind_cast(
6198 col.type,
6199 col.type._unwrapped_dialect_impl(self.dialect),
6200 f"p{i}",
6201 )
6202 if col not in add_sentinel_set
6203 else expr
6204 )
6205 for i, (col, _, expr, _) in enumerate(
6206 crud_params_single
6207 )
6208 )
6209 else:
6210 colnames_w_cast = ", ".join(
6211 (f"p{i}" if col not in add_sentinel_set else expr)
6212 for i, (col, _, expr, _) in enumerate(
6213 crud_params_single
6214 )
6215 )
6216
6217 insert_crud_params = [
6218 elem
6219 for elem in crud_params_single
6220 if elem[0] not in add_sentinel_set
6221 ]
6222
6223 text += (
6224 f" SELECT {colnames_w_cast} FROM "
6225 f"(VALUES ({insert_single_values_expr})) "
6226 f"AS imp_sen({colnames}, sen_counter) "
6227 "ORDER BY sen_counter"
6228 )
6229
6230 else:
6231 # otherwise, if no sentinel or backend doesn't support
6232 # orderable subquery form, use a plain VALUES list
6233 embed_sentinel_value = False
6234 insert_crud_params = crud_params_single
6235 insert_single_values_expr = ", ".join(
6236 [value for _, _, value, _ in crud_params_single]
6237 )
6238
6239 text += f" VALUES ({insert_single_values_expr})"
6240
6241 self._insertmanyvalues = _InsertManyValues(
6242 is_default_expr=False,
6243 single_values_expr=insert_single_values_expr,
6244 insert_crud_params=insert_crud_params,
6245 num_positional_params_counted=counted_bindparam,
6246 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6247 includes_upsert_behaviors=(
6248 insert_stmt._post_values_clause is not None
6249 ),
6250 sentinel_columns=add_sentinel_cols,
6251 num_sentinel_columns=(
6252 len(add_sentinel_cols) if add_sentinel_cols else 0
6253 ),
6254 sentinel_param_keys=named_sentinel_params,
6255 implicit_sentinel=implicit_sentinel,
6256 embed_values_counter=embed_sentinel_value,
6257 )
6258
6259 else:
6260 insert_single_values_expr = ", ".join(
6261 [value for _, _, value, _ in crud_params_single]
6262 )
6263
6264 text += f" VALUES ({insert_single_values_expr})"
6265
6266 if insert_stmt._post_values_clause is not None:
6267 post_values_clause = self.process(
6268 insert_stmt._post_values_clause, **kw
6269 )
6270 if post_values_clause:
6271 text += " " + post_values_clause
6272
6273 if returning_clause and not self.returning_precedes_values:
6274 text += " " + returning_clause
6275
6276 if self.ctes and not self.dialect.cte_follows_insert:
6277 nesting_level = len(self.stack) if not toplevel else None
6278 text = (
6279 self._render_cte_clause(
6280 nesting_level=nesting_level,
6281 include_following_stack=True,
6282 )
6283 + text
6284 )
6285
6286 self.stack.pop(-1)
6287
6288 return text
6289
6290 def update_limit_clause(self, update_stmt):
6291 """Provide a hook for MySQL to add LIMIT to the UPDATE"""
6292 return None
6293
6294 def delete_limit_clause(self, delete_stmt):
6295 """Provide a hook for MySQL to add LIMIT to the DELETE"""
6296 return None
6297
6298 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6299 """Provide a hook to override the initial table clause
6300 in an UPDATE statement.
6301
6302 MySQL overrides this.
6303
6304 """
6305 kw["asfrom"] = True
6306 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6307
6308 def update_from_clause(
6309 self, update_stmt, from_table, extra_froms, from_hints, **kw
6310 ):
6311 """Provide a hook to override the generation of an
6312 UPDATE..FROM clause.
6313
6314 MySQL and MSSQL override this.
6315
6316 """
6317 raise NotImplementedError(
6318 "This backend does not support multiple-table "
6319 "criteria within UPDATE"
6320 )
6321
6322 def visit_update(
6323 self,
6324 update_stmt: Update,
6325 visiting_cte: Optional[CTE] = None,
6326 **kw: Any,
6327 ) -> str:
6328 compile_state = update_stmt._compile_state_factory(
6329 update_stmt, self, **kw
6330 )
6331 if TYPE_CHECKING:
6332 assert isinstance(compile_state, UpdateDMLState)
6333 update_stmt = compile_state.statement # type: ignore[assignment]
6334
6335 if visiting_cte is not None:
6336 kw["visiting_cte"] = visiting_cte
6337 toplevel = False
6338 else:
6339 toplevel = not self.stack
6340
6341 if toplevel:
6342 self.isupdate = True
6343 if not self.dml_compile_state:
6344 self.dml_compile_state = compile_state
6345 if not self.compile_state:
6346 self.compile_state = compile_state
6347
6348 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6349 from_linter = FromLinter({}, set())
6350 warn_linting = self.linting & WARN_LINTING
6351 if toplevel:
6352 self.from_linter = from_linter
6353 else:
6354 from_linter = None
6355 warn_linting = False
6356
6357 extra_froms = compile_state._extra_froms
6358 is_multitable = bool(extra_froms)
6359
6360 if is_multitable:
6361 # main table might be a JOIN
6362 main_froms = set(_from_objects(update_stmt.table))
6363 render_extra_froms = [
6364 f for f in extra_froms if f not in main_froms
6365 ]
6366 correlate_froms = main_froms.union(extra_froms)
6367 else:
6368 render_extra_froms = []
6369 correlate_froms = {update_stmt.table}
6370
6371 self.stack.append(
6372 {
6373 "correlate_froms": correlate_froms,
6374 "asfrom_froms": correlate_froms,
6375 "selectable": update_stmt,
6376 }
6377 )
6378
6379 text = "UPDATE "
6380
6381 if update_stmt._prefixes:
6382 text += self._generate_prefixes(
6383 update_stmt, update_stmt._prefixes, **kw
6384 )
6385
6386 table_text = self.update_tables_clause(
6387 update_stmt,
6388 update_stmt.table,
6389 render_extra_froms,
6390 from_linter=from_linter,
6391 **kw,
6392 )
6393 crud_params_struct = crud._get_crud_params(
6394 self, update_stmt, compile_state, toplevel, **kw
6395 )
6396 crud_params = crud_params_struct.single_params
6397
6398 if update_stmt._hints:
6399 dialect_hints, table_text = self._setup_crud_hints(
6400 update_stmt, table_text
6401 )
6402 else:
6403 dialect_hints = None
6404
6405 if update_stmt._independent_ctes:
6406 self._dispatch_independent_ctes(update_stmt, kw)
6407
6408 text += table_text
6409
6410 text += " SET "
6411 text += ", ".join(
6412 expr + "=" + value
6413 for _, expr, value, _ in cast(
6414 "List[Tuple[Any, str, str, Any]]", crud_params
6415 )
6416 )
6417
6418 if self.implicit_returning or update_stmt._returning:
6419 if self.returning_precedes_values:
6420 text += " " + self.returning_clause(
6421 update_stmt,
6422 self.implicit_returning or update_stmt._returning,
6423 populate_result_map=toplevel,
6424 )
6425
6426 if extra_froms:
6427 extra_from_text = self.update_from_clause(
6428 update_stmt,
6429 update_stmt.table,
6430 render_extra_froms,
6431 dialect_hints,
6432 from_linter=from_linter,
6433 **kw,
6434 )
6435 if extra_from_text:
6436 text += " " + extra_from_text
6437
6438 if update_stmt._where_criteria:
6439 t = self._generate_delimited_and_list(
6440 update_stmt._where_criteria, from_linter=from_linter, **kw
6441 )
6442 if t:
6443 text += " WHERE " + t
6444
6445 limit_clause = self.update_limit_clause(update_stmt)
6446 if limit_clause:
6447 text += " " + limit_clause
6448
6449 if (
6450 self.implicit_returning or update_stmt._returning
6451 ) and not self.returning_precedes_values:
6452 text += " " + self.returning_clause(
6453 update_stmt,
6454 self.implicit_returning or update_stmt._returning,
6455 populate_result_map=toplevel,
6456 )
6457
6458 if self.ctes:
6459 nesting_level = len(self.stack) if not toplevel else None
6460 text = self._render_cte_clause(nesting_level=nesting_level) + text
6461
6462 if warn_linting:
6463 assert from_linter is not None
6464 from_linter.warn(stmt_type="UPDATE")
6465
6466 self.stack.pop(-1)
6467
6468 return text # type: ignore[no-any-return]
6469
6470 def delete_extra_from_clause(
6471 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6472 ):
6473 """Provide a hook to override the generation of an
6474 DELETE..FROM clause.
6475
6476 This can be used to implement DELETE..USING for example.
6477
6478 MySQL and MSSQL override this.
6479
6480 """
6481 raise NotImplementedError(
6482 "This backend does not support multiple-table "
6483 "criteria within DELETE"
6484 )
6485
6486 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6487 return from_table._compiler_dispatch(
6488 self, asfrom=True, iscrud=True, **kw
6489 )
6490
6491 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6492 compile_state = delete_stmt._compile_state_factory(
6493 delete_stmt, self, **kw
6494 )
6495 delete_stmt = compile_state.statement
6496
6497 if visiting_cte is not None:
6498 kw["visiting_cte"] = visiting_cte
6499 toplevel = False
6500 else:
6501 toplevel = not self.stack
6502
6503 if toplevel:
6504 self.isdelete = True
6505 if not self.dml_compile_state:
6506 self.dml_compile_state = compile_state
6507 if not self.compile_state:
6508 self.compile_state = compile_state
6509
6510 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6511 from_linter = FromLinter({}, set())
6512 warn_linting = self.linting & WARN_LINTING
6513 if toplevel:
6514 self.from_linter = from_linter
6515 else:
6516 from_linter = None
6517 warn_linting = False
6518
6519 extra_froms = compile_state._extra_froms
6520
6521 correlate_froms = {delete_stmt.table}.union(extra_froms)
6522 self.stack.append(
6523 {
6524 "correlate_froms": correlate_froms,
6525 "asfrom_froms": correlate_froms,
6526 "selectable": delete_stmt,
6527 }
6528 )
6529
6530 text = "DELETE "
6531
6532 if delete_stmt._prefixes:
6533 text += self._generate_prefixes(
6534 delete_stmt, delete_stmt._prefixes, **kw
6535 )
6536
6537 text += "FROM "
6538
6539 try:
6540 table_text = self.delete_table_clause(
6541 delete_stmt,
6542 delete_stmt.table,
6543 extra_froms,
6544 from_linter=from_linter,
6545 )
6546 except TypeError:
6547 # anticipate 3rd party dialects that don't include **kw
6548 # TODO: remove in 2.1
6549 table_text = self.delete_table_clause(
6550 delete_stmt, delete_stmt.table, extra_froms
6551 )
6552 if from_linter:
6553 _ = self.process(delete_stmt.table, from_linter=from_linter)
6554
6555 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6556
6557 if delete_stmt._hints:
6558 dialect_hints, table_text = self._setup_crud_hints(
6559 delete_stmt, table_text
6560 )
6561 else:
6562 dialect_hints = None
6563
6564 if delete_stmt._independent_ctes:
6565 self._dispatch_independent_ctes(delete_stmt, kw)
6566
6567 text += table_text
6568
6569 if (
6570 self.implicit_returning or delete_stmt._returning
6571 ) and self.returning_precedes_values:
6572 text += " " + self.returning_clause(
6573 delete_stmt,
6574 self.implicit_returning or delete_stmt._returning,
6575 populate_result_map=toplevel,
6576 )
6577
6578 if extra_froms:
6579 extra_from_text = self.delete_extra_from_clause(
6580 delete_stmt,
6581 delete_stmt.table,
6582 extra_froms,
6583 dialect_hints,
6584 from_linter=from_linter,
6585 **kw,
6586 )
6587 if extra_from_text:
6588 text += " " + extra_from_text
6589
6590 if delete_stmt._where_criteria:
6591 t = self._generate_delimited_and_list(
6592 delete_stmt._where_criteria, from_linter=from_linter, **kw
6593 )
6594 if t:
6595 text += " WHERE " + t
6596
6597 limit_clause = self.delete_limit_clause(delete_stmt)
6598 if limit_clause:
6599 text += " " + limit_clause
6600
6601 if (
6602 self.implicit_returning or delete_stmt._returning
6603 ) and not self.returning_precedes_values:
6604 text += " " + self.returning_clause(
6605 delete_stmt,
6606 self.implicit_returning or delete_stmt._returning,
6607 populate_result_map=toplevel,
6608 )
6609
6610 if self.ctes:
6611 nesting_level = len(self.stack) if not toplevel else None
6612 text = self._render_cte_clause(nesting_level=nesting_level) + text
6613
6614 if warn_linting:
6615 assert from_linter is not None
6616 from_linter.warn(stmt_type="DELETE")
6617
6618 self.stack.pop(-1)
6619
6620 return text
6621
6622 def visit_savepoint(self, savepoint_stmt, **kw):
6623 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6624
6625 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6626 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6627 savepoint_stmt
6628 )
6629
6630 def visit_release_savepoint(self, savepoint_stmt, **kw):
6631 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6632 savepoint_stmt
6633 )
6634
6635
6636class StrSQLCompiler(SQLCompiler):
6637 """A :class:`.SQLCompiler` subclass which allows a small selection
6638 of non-standard SQL features to render into a string value.
6639
6640 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6641 element is directly stringified without calling upon the
6642 :meth:`_expression.ClauseElement.compile` method.
6643 It can render a limited set
6644 of non-standard SQL constructs to assist in basic stringification,
6645 however for more substantial custom or dialect-specific SQL constructs,
6646 it will be necessary to make use of
6647 :meth:`_expression.ClauseElement.compile`
6648 directly.
6649
6650 .. seealso::
6651
6652 :ref:`faq_sql_expression_string`
6653
6654 """
6655
6656 def _fallback_column_name(self, column):
6657 return "<name unknown>"
6658
6659 @util.preload_module("sqlalchemy.engine.url")
6660 def visit_unsupported_compilation(self, element, err, **kw):
6661 if element.stringify_dialect != "default":
6662 url = util.preloaded.engine_url
6663 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6664
6665 compiler = dialect.statement_compiler(
6666 dialect, None, _supporting_against=self
6667 )
6668 if not isinstance(compiler, StrSQLCompiler):
6669 return compiler.process(element, **kw)
6670
6671 return super().visit_unsupported_compilation(element, err)
6672
6673 def visit_getitem_binary(self, binary, operator, **kw):
6674 return "%s[%s]" % (
6675 self.process(binary.left, **kw),
6676 self.process(binary.right, **kw),
6677 )
6678
6679 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6680 return self.visit_getitem_binary(binary, operator, **kw)
6681
6682 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6683 return self.visit_getitem_binary(binary, operator, **kw)
6684
6685 def visit_sequence(self, sequence, **kw):
6686 return (
6687 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6688 )
6689
6690 def returning_clause(
6691 self,
6692 stmt: UpdateBase,
6693 returning_cols: Sequence[_ColumnsClauseElement],
6694 *,
6695 populate_result_map: bool,
6696 **kw: Any,
6697 ) -> str:
6698 columns = [
6699 self._label_select_column(None, c, True, False, {})
6700 for c in base._select_iterables(returning_cols)
6701 ]
6702 return "RETURNING " + ", ".join(columns)
6703
6704 def update_from_clause(
6705 self, update_stmt, from_table, extra_froms, from_hints, **kw
6706 ):
6707 kw["asfrom"] = True
6708 return "FROM " + ", ".join(
6709 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6710 for t in extra_froms
6711 )
6712
6713 def delete_extra_from_clause(
6714 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6715 ):
6716 kw["asfrom"] = True
6717 return ", " + ", ".join(
6718 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6719 for t in extra_froms
6720 )
6721
6722 def visit_empty_set_expr(self, element_types, **kw):
6723 return "SELECT 1 WHERE 1!=1"
6724
6725 def get_from_hint_text(self, table, text):
6726 return "[%s]" % text
6727
6728 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6729 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6730
6731 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6732 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6733
6734 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6735 return "<regexp replace>(%s, %s)" % (
6736 binary.left._compiler_dispatch(self, **kw),
6737 binary.right._compiler_dispatch(self, **kw),
6738 )
6739
6740 def visit_try_cast(self, cast, **kwargs):
6741 return "TRY_CAST(%s AS %s)" % (
6742 cast.clause._compiler_dispatch(self, **kwargs),
6743 cast.typeclause._compiler_dispatch(self, **kwargs),
6744 )
6745
6746
6747class DDLCompiler(Compiled):
6748 is_ddl = True
6749
6750 if TYPE_CHECKING:
6751
6752 def __init__(
6753 self,
6754 dialect: Dialect,
6755 statement: ExecutableDDLElement,
6756 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6757 render_schema_translate: bool = ...,
6758 compile_kwargs: Mapping[str, Any] = ...,
6759 ): ...
6760
6761 @util.ro_memoized_property
6762 def sql_compiler(self) -> SQLCompiler:
6763 return self.dialect.statement_compiler(
6764 self.dialect, None, schema_translate_map=self.schema_translate_map
6765 )
6766
6767 @util.memoized_property
6768 def type_compiler(self):
6769 return self.dialect.type_compiler_instance
6770
6771 def construct_params(
6772 self,
6773 params: Optional[_CoreSingleExecuteParams] = None,
6774 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6775 escape_names: bool = True,
6776 ) -> Optional[_MutableCoreSingleExecuteParams]:
6777 return None
6778
6779 def visit_ddl(self, ddl, **kwargs):
6780 # table events can substitute table and schema name
6781 context = ddl.context
6782 if isinstance(ddl.target, schema.Table):
6783 context = context.copy()
6784
6785 preparer = self.preparer
6786 path = preparer.format_table_seq(ddl.target)
6787 if len(path) == 1:
6788 table, sch = path[0], ""
6789 else:
6790 table, sch = path[-1], path[0]
6791
6792 context.setdefault("table", table)
6793 context.setdefault("schema", sch)
6794 context.setdefault("fullname", preparer.format_table(ddl.target))
6795
6796 return self.sql_compiler.post_process_text(ddl.statement % context)
6797
6798 def visit_create_schema(self, create, **kw):
6799 text = "CREATE SCHEMA "
6800 if create.if_not_exists:
6801 text += "IF NOT EXISTS "
6802 return text + self.preparer.format_schema(create.element)
6803
6804 def visit_drop_schema(self, drop, **kw):
6805 text = "DROP SCHEMA "
6806 if drop.if_exists:
6807 text += "IF EXISTS "
6808 text += self.preparer.format_schema(drop.element)
6809 if drop.cascade:
6810 text += " CASCADE"
6811 return text
6812
6813 def visit_create_table(self, create, **kw):
6814 table = create.element
6815 preparer = self.preparer
6816
6817 text = "\nCREATE "
6818 if table._prefixes:
6819 text += " ".join(table._prefixes) + " "
6820
6821 text += "TABLE "
6822 if create.if_not_exists:
6823 text += "IF NOT EXISTS "
6824
6825 text += preparer.format_table(table) + " "
6826
6827 create_table_suffix = self.create_table_suffix(table)
6828 if create_table_suffix:
6829 text += create_table_suffix + " "
6830
6831 text += "("
6832
6833 separator = "\n"
6834
6835 # if only one primary key, specify it along with the column
6836 first_pk = False
6837 for create_column in create.columns:
6838 column = create_column.element
6839 try:
6840 processed = self.process(
6841 create_column, first_pk=column.primary_key and not first_pk
6842 )
6843 if processed is not None:
6844 text += separator
6845 separator = ", \n"
6846 text += "\t" + processed
6847 if column.primary_key:
6848 first_pk = True
6849 except exc.CompileError as ce:
6850 raise exc.CompileError(
6851 "(in table '%s', column '%s'): %s"
6852 % (table.description, column.name, ce.args[0])
6853 ) from ce
6854
6855 const = self.create_table_constraints(
6856 table,
6857 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6858 )
6859 if const:
6860 text += separator + "\t" + const
6861
6862 text += "\n)%s\n\n" % self.post_create_table(table)
6863 return text
6864
6865 def visit_create_column(self, create, first_pk=False, **kw):
6866 column = create.element
6867
6868 if column.system:
6869 return None
6870
6871 text = self.get_column_specification(column, first_pk=first_pk)
6872 const = " ".join(
6873 self.process(constraint) for constraint in column.constraints
6874 )
6875 if const:
6876 text += " " + const
6877
6878 return text
6879
6880 def create_table_constraints(
6881 self, table, _include_foreign_key_constraints=None, **kw
6882 ):
6883 # On some DB order is significant: visit PK first, then the
6884 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6885 constraints = []
6886 if table.primary_key:
6887 constraints.append(table.primary_key)
6888
6889 all_fkcs = table.foreign_key_constraints
6890 if _include_foreign_key_constraints is not None:
6891 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6892 else:
6893 omit_fkcs = set()
6894
6895 constraints.extend(
6896 [
6897 c
6898 for c in table._sorted_constraints
6899 if c is not table.primary_key and c not in omit_fkcs
6900 ]
6901 )
6902
6903 return ", \n\t".join(
6904 p
6905 for p in (
6906 self.process(constraint)
6907 for constraint in constraints
6908 if (constraint._should_create_for_compiler(self))
6909 and (
6910 not self.dialect.supports_alter
6911 or not getattr(constraint, "use_alter", False)
6912 )
6913 )
6914 if p is not None
6915 )
6916
6917 def visit_drop_table(self, drop, **kw):
6918 text = "\nDROP TABLE "
6919 if drop.if_exists:
6920 text += "IF EXISTS "
6921 return text + self.preparer.format_table(drop.element)
6922
6923 def visit_drop_view(self, drop, **kw):
6924 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
6925
6926 def _verify_index_table(self, index: Index) -> None:
6927 if index.table is None:
6928 raise exc.CompileError(
6929 "Index '%s' is not associated with any table." % index.name
6930 )
6931
6932 def visit_create_index(
6933 self, create, include_schema=False, include_table_schema=True, **kw
6934 ):
6935 index = create.element
6936 self._verify_index_table(index)
6937 preparer = self.preparer
6938 text = "CREATE "
6939 if index.unique:
6940 text += "UNIQUE "
6941 if index.name is None:
6942 raise exc.CompileError(
6943 "CREATE INDEX requires that the index have a name"
6944 )
6945
6946 text += "INDEX "
6947 if create.if_not_exists:
6948 text += "IF NOT EXISTS "
6949
6950 text += "%s ON %s (%s)" % (
6951 self._prepared_index_name(index, include_schema=include_schema),
6952 preparer.format_table(
6953 index.table, use_schema=include_table_schema
6954 ),
6955 ", ".join(
6956 self.sql_compiler.process(
6957 expr, include_table=False, literal_binds=True
6958 )
6959 for expr in index.expressions
6960 ),
6961 )
6962 return text
6963
6964 def visit_drop_index(self, drop, **kw):
6965 index = drop.element
6966
6967 if index.name is None:
6968 raise exc.CompileError(
6969 "DROP INDEX requires that the index have a name"
6970 )
6971 text = "\nDROP INDEX "
6972 if drop.if_exists:
6973 text += "IF EXISTS "
6974
6975 return text + self._prepared_index_name(index, include_schema=True)
6976
6977 def _prepared_index_name(
6978 self, index: Index, include_schema: bool = False
6979 ) -> str:
6980 if index.table is not None:
6981 effective_schema = self.preparer.schema_for_object(index.table)
6982 else:
6983 effective_schema = None
6984 if include_schema and effective_schema:
6985 schema_name = self.preparer.quote_schema(effective_schema)
6986 else:
6987 schema_name = None
6988
6989 index_name: str = self.preparer.format_index(index)
6990
6991 if schema_name:
6992 index_name = schema_name + "." + index_name
6993 return index_name
6994
6995 def visit_add_constraint(self, create, **kw):
6996 return "ALTER TABLE %s ADD %s" % (
6997 self.preparer.format_table(create.element.table),
6998 self.process(create.element),
6999 )
7000
7001 def visit_set_table_comment(self, create, **kw):
7002 return "COMMENT ON TABLE %s IS %s" % (
7003 self.preparer.format_table(create.element),
7004 self.sql_compiler.render_literal_value(
7005 create.element.comment, sqltypes.String()
7006 ),
7007 )
7008
7009 def visit_drop_table_comment(self, drop, **kw):
7010 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7011 drop.element
7012 )
7013
7014 def visit_set_column_comment(self, create, **kw):
7015 return "COMMENT ON COLUMN %s IS %s" % (
7016 self.preparer.format_column(
7017 create.element, use_table=True, use_schema=True
7018 ),
7019 self.sql_compiler.render_literal_value(
7020 create.element.comment, sqltypes.String()
7021 ),
7022 )
7023
7024 def visit_drop_column_comment(self, drop, **kw):
7025 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7026 drop.element, use_table=True
7027 )
7028
7029 def visit_set_constraint_comment(self, create, **kw):
7030 raise exc.UnsupportedCompilationError(self, type(create))
7031
7032 def visit_drop_constraint_comment(self, drop, **kw):
7033 raise exc.UnsupportedCompilationError(self, type(drop))
7034
7035 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7036 text = []
7037 if identity_options.increment is not None:
7038 text.append("INCREMENT BY %d" % identity_options.increment)
7039 if identity_options.start is not None:
7040 text.append("START WITH %d" % identity_options.start)
7041 if identity_options.minvalue is not None:
7042 text.append("MINVALUE %d" % identity_options.minvalue)
7043 if identity_options.maxvalue is not None:
7044 text.append("MAXVALUE %d" % identity_options.maxvalue)
7045 if identity_options.nominvalue is not None:
7046 text.append("NO MINVALUE")
7047 if identity_options.nomaxvalue is not None:
7048 text.append("NO MAXVALUE")
7049 if identity_options.cache is not None:
7050 text.append("CACHE %d" % identity_options.cache)
7051 if identity_options.cycle is not None:
7052 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7053 return " ".join(text)
7054
7055 def visit_create_sequence(self, create, prefix=None, **kw):
7056 text = "CREATE SEQUENCE "
7057 if create.if_not_exists:
7058 text += "IF NOT EXISTS "
7059 text += self.preparer.format_sequence(create.element)
7060
7061 if prefix:
7062 text += prefix
7063 options = self.get_identity_options(create.element)
7064 if options:
7065 text += " " + options
7066 return text
7067
7068 def visit_drop_sequence(self, drop, **kw):
7069 text = "DROP SEQUENCE "
7070 if drop.if_exists:
7071 text += "IF EXISTS "
7072 return text + self.preparer.format_sequence(drop.element)
7073
7074 def visit_drop_constraint(self, drop, **kw):
7075 constraint = drop.element
7076 if constraint.name is not None:
7077 formatted_name = self.preparer.format_constraint(constraint)
7078 else:
7079 formatted_name = None
7080
7081 if formatted_name is None:
7082 raise exc.CompileError(
7083 "Can't emit DROP CONSTRAINT for constraint %r; "
7084 "it has no name" % drop.element
7085 )
7086 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7087 self.preparer.format_table(drop.element.table),
7088 "IF EXISTS " if drop.if_exists else "",
7089 formatted_name,
7090 " CASCADE" if drop.cascade else "",
7091 )
7092
7093 def get_column_specification(self, column, **kwargs):
7094 colspec = (
7095 self.preparer.format_column(column)
7096 + " "
7097 + self.dialect.type_compiler_instance.process(
7098 column.type, type_expression=column
7099 )
7100 )
7101 default = self.get_column_default_string(column)
7102 if default is not None:
7103 colspec += " DEFAULT " + default
7104
7105 if column.computed is not None:
7106 colspec += " " + self.process(column.computed)
7107
7108 if (
7109 column.identity is not None
7110 and self.dialect.supports_identity_columns
7111 ):
7112 colspec += " " + self.process(column.identity)
7113
7114 if not column.nullable and (
7115 not column.identity or not self.dialect.supports_identity_columns
7116 ):
7117 colspec += " NOT NULL"
7118 return colspec
7119
7120 def create_table_suffix(self, table):
7121 return ""
7122
7123 def post_create_table(self, table):
7124 return ""
7125
7126 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7127 if isinstance(column.server_default, schema.DefaultClause):
7128 return self.render_default_string(column.server_default.arg)
7129 else:
7130 return None
7131
7132 def render_default_string(self, default: Union[Visitable, str]) -> str:
7133 if isinstance(default, str):
7134 return self.sql_compiler.render_literal_value(
7135 default, sqltypes.STRINGTYPE
7136 )
7137 else:
7138 return self.sql_compiler.process(default, literal_binds=True)
7139
7140 def visit_table_or_column_check_constraint(self, constraint, **kw):
7141 if constraint.is_column_level:
7142 return self.visit_column_check_constraint(constraint)
7143 else:
7144 return self.visit_check_constraint(constraint)
7145
7146 def visit_check_constraint(self, constraint, **kw):
7147 text = self.define_constraint_preamble(constraint, **kw)
7148 text += self.define_check_body(constraint, **kw)
7149 text += self.define_constraint_deferrability(constraint)
7150 return text
7151
7152 def visit_column_check_constraint(self, constraint, **kw):
7153 text = self.define_constraint_preamble(constraint, **kw)
7154 text += self.define_check_body(constraint, **kw)
7155 text += self.define_constraint_deferrability(constraint)
7156 return text
7157
7158 def visit_primary_key_constraint(
7159 self, constraint: PrimaryKeyConstraint, **kw: Any
7160 ) -> str:
7161 if len(constraint) == 0:
7162 return ""
7163 text = self.define_constraint_preamble(constraint, **kw)
7164 text += self.define_primary_key_body(constraint, **kw)
7165 text += self.define_constraint_deferrability(constraint)
7166 return text
7167
7168 def visit_foreign_key_constraint(
7169 self, constraint: ForeignKeyConstraint, **kw: Any
7170 ) -> str:
7171 text = self.define_constraint_preamble(constraint, **kw)
7172 text += self.define_foreign_key_body(constraint, **kw)
7173 text += self.define_constraint_match(constraint)
7174 text += self.define_constraint_cascades(constraint)
7175 text += self.define_constraint_deferrability(constraint)
7176 return text
7177
7178 def define_constraint_remote_table(self, constraint, table, preparer):
7179 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7180
7181 return preparer.format_table(table)
7182
7183 def visit_unique_constraint(
7184 self, constraint: UniqueConstraint, **kw: Any
7185 ) -> str:
7186 if len(constraint) == 0:
7187 return ""
7188 text = self.define_constraint_preamble(constraint, **kw)
7189 text += self.define_unique_body(constraint, **kw)
7190 text += self.define_constraint_deferrability(constraint)
7191 return text
7192
7193 def define_constraint_preamble(
7194 self, constraint: Constraint, **kw: Any
7195 ) -> str:
7196 text = ""
7197 if constraint.name is not None:
7198 formatted_name = self.preparer.format_constraint(constraint)
7199 if formatted_name is not None:
7200 text += "CONSTRAINT %s " % formatted_name
7201 return text
7202
7203 def define_primary_key_body(
7204 self, constraint: PrimaryKeyConstraint, **kw: Any
7205 ) -> str:
7206 text = ""
7207 text += "PRIMARY KEY "
7208 text += "(%s)" % ", ".join(
7209 self.preparer.quote(c.name)
7210 for c in (
7211 constraint.columns_autoinc_first
7212 if constraint._implicit_generated
7213 else constraint.columns
7214 )
7215 )
7216 return text
7217
7218 def define_foreign_key_body(
7219 self, constraint: ForeignKeyConstraint, **kw: Any
7220 ) -> str:
7221 preparer = self.preparer
7222 remote_table = list(constraint.elements)[0].column.table
7223 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7224 ", ".join(
7225 preparer.quote(f.parent.name) for f in constraint.elements
7226 ),
7227 self.define_constraint_remote_table(
7228 constraint, remote_table, preparer
7229 ),
7230 ", ".join(
7231 preparer.quote(f.column.name) for f in constraint.elements
7232 ),
7233 )
7234 return text
7235
7236 def define_unique_body(
7237 self, constraint: UniqueConstraint, **kw: Any
7238 ) -> str:
7239 text = "UNIQUE %s(%s)" % (
7240 self.define_unique_constraint_distinct(constraint, **kw),
7241 ", ".join(self.preparer.quote(c.name) for c in constraint),
7242 )
7243 return text
7244
7245 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7246 text = "CHECK (%s)" % self.sql_compiler.process(
7247 constraint.sqltext, include_table=False, literal_binds=True
7248 )
7249 return text
7250
7251 def define_unique_constraint_distinct(
7252 self, constraint: UniqueConstraint, **kw: Any
7253 ) -> str:
7254 return ""
7255
7256 def define_constraint_cascades(
7257 self, constraint: ForeignKeyConstraint
7258 ) -> str:
7259 text = ""
7260 if constraint.ondelete is not None:
7261 text += self.define_constraint_ondelete_cascade(constraint)
7262
7263 if constraint.onupdate is not None:
7264 text += self.define_constraint_onupdate_cascade(constraint)
7265 return text
7266
7267 def define_constraint_ondelete_cascade(
7268 self, constraint: ForeignKeyConstraint
7269 ) -> str:
7270 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7271 constraint.ondelete, FK_ON_DELETE
7272 )
7273
7274 def define_constraint_onupdate_cascade(
7275 self, constraint: ForeignKeyConstraint
7276 ) -> str:
7277 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7278 constraint.onupdate, FK_ON_UPDATE
7279 )
7280
7281 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7282 text = ""
7283 if constraint.deferrable is not None:
7284 if constraint.deferrable:
7285 text += " DEFERRABLE"
7286 else:
7287 text += " NOT DEFERRABLE"
7288 if constraint.initially is not None:
7289 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7290 constraint.initially, FK_INITIALLY
7291 )
7292 return text
7293
7294 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7295 text = ""
7296 if constraint.match is not None:
7297 text += " MATCH %s" % constraint.match
7298 return text
7299
7300 def visit_computed_column(self, generated, **kw):
7301 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7302 generated.sqltext, include_table=False, literal_binds=True
7303 )
7304 if generated.persisted is True:
7305 text += " STORED"
7306 elif generated.persisted is False:
7307 text += " VIRTUAL"
7308 return text
7309
7310 def visit_identity_column(self, identity, **kw):
7311 text = "GENERATED %s AS IDENTITY" % (
7312 "ALWAYS" if identity.always else "BY DEFAULT",
7313 )
7314 options = self.get_identity_options(identity)
7315 if options:
7316 text += " (%s)" % options
7317 return text
7318
7319
7320class GenericTypeCompiler(TypeCompiler):
7321 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7322 return "FLOAT"
7323
7324 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7325 return "DOUBLE"
7326
7327 def visit_DOUBLE_PRECISION(
7328 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7329 ) -> str:
7330 return "DOUBLE PRECISION"
7331
7332 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7333 return "REAL"
7334
7335 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7336 if type_.precision is None:
7337 return "NUMERIC"
7338 elif type_.scale is None:
7339 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7340 else:
7341 return "NUMERIC(%(precision)s, %(scale)s)" % {
7342 "precision": type_.precision,
7343 "scale": type_.scale,
7344 }
7345
7346 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7347 if type_.precision is None:
7348 return "DECIMAL"
7349 elif type_.scale is None:
7350 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7351 else:
7352 return "DECIMAL(%(precision)s, %(scale)s)" % {
7353 "precision": type_.precision,
7354 "scale": type_.scale,
7355 }
7356
7357 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7358 return "INTEGER"
7359
7360 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7361 return "SMALLINT"
7362
7363 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7364 return "BIGINT"
7365
7366 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7367 return "TIMESTAMP"
7368
7369 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7370 return "DATETIME"
7371
7372 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7373 return "DATE"
7374
7375 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7376 return "TIME"
7377
7378 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7379 return "CLOB"
7380
7381 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7382 return "NCLOB"
7383
7384 def _render_string_type(
7385 self, name: str, length: Optional[int], collation: Optional[str]
7386 ) -> str:
7387 text = name
7388 if length:
7389 text += f"({length})"
7390 if collation:
7391 text += f' COLLATE "{collation}"'
7392 return text
7393
7394 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7395 return self._render_string_type("CHAR", type_.length, type_.collation)
7396
7397 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7398 return self._render_string_type("NCHAR", type_.length, type_.collation)
7399
7400 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7401 return self._render_string_type(
7402 "VARCHAR", type_.length, type_.collation
7403 )
7404
7405 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7406 return self._render_string_type(
7407 "NVARCHAR", type_.length, type_.collation
7408 )
7409
7410 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7411 return self._render_string_type("TEXT", type_.length, type_.collation)
7412
7413 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7414 return "UUID"
7415
7416 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7417 return "BLOB"
7418
7419 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7420 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7421
7422 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7423 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7424
7425 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7426 return "BOOLEAN"
7427
7428 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7429 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7430 return self._render_string_type("CHAR", length=32, collation=None)
7431 else:
7432 return self.visit_UUID(type_, **kw)
7433
7434 def visit_large_binary(
7435 self, type_: sqltypes.LargeBinary, **kw: Any
7436 ) -> str:
7437 return self.visit_BLOB(type_, **kw)
7438
7439 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7440 return self.visit_BOOLEAN(type_, **kw)
7441
7442 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7443 return self.visit_TIME(type_, **kw)
7444
7445 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7446 return self.visit_DATETIME(type_, **kw)
7447
7448 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7449 return self.visit_DATE(type_, **kw)
7450
7451 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7452 return self.visit_BIGINT(type_, **kw)
7453
7454 def visit_small_integer(
7455 self, type_: sqltypes.SmallInteger, **kw: Any
7456 ) -> str:
7457 return self.visit_SMALLINT(type_, **kw)
7458
7459 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7460 return self.visit_INTEGER(type_, **kw)
7461
7462 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7463 return self.visit_REAL(type_, **kw)
7464
7465 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7466 return self.visit_FLOAT(type_, **kw)
7467
7468 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7469 return self.visit_DOUBLE(type_, **kw)
7470
7471 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7472 return self.visit_NUMERIC(type_, **kw)
7473
7474 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7475 return self.visit_VARCHAR(type_, **kw)
7476
7477 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7478 return self.visit_VARCHAR(type_, **kw)
7479
7480 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7481 return self.visit_TEXT(type_, **kw)
7482
7483 def visit_unicode_text(
7484 self, type_: sqltypes.UnicodeText, **kw: Any
7485 ) -> str:
7486 return self.visit_TEXT(type_, **kw)
7487
7488 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7489 return self.visit_VARCHAR(type_, **kw)
7490
7491 def visit_null(self, type_, **kw):
7492 raise exc.CompileError(
7493 "Can't generate DDL for %r; "
7494 "did you forget to specify a "
7495 "type on this Column?" % type_
7496 )
7497
7498 def visit_type_decorator(
7499 self, type_: TypeDecorator[Any], **kw: Any
7500 ) -> str:
7501 return self.process(type_.type_engine(self.dialect), **kw)
7502
7503 def visit_user_defined(
7504 self, type_: UserDefinedType[Any], **kw: Any
7505 ) -> str:
7506 return type_.get_col_spec(**kw)
7507
7508
7509class StrSQLTypeCompiler(GenericTypeCompiler):
7510 def process(self, type_, **kw):
7511 try:
7512 _compiler_dispatch = type_._compiler_dispatch
7513 except AttributeError:
7514 return self._visit_unknown(type_, **kw)
7515 else:
7516 return _compiler_dispatch(self, **kw)
7517
7518 def __getattr__(self, key):
7519 if key.startswith("visit_"):
7520 return self._visit_unknown
7521 else:
7522 raise AttributeError(key)
7523
7524 def _visit_unknown(self, type_, **kw):
7525 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7526 return type_.__class__.__name__
7527 else:
7528 return repr(type_)
7529
7530 def visit_null(self, type_, **kw):
7531 return "NULL"
7532
7533 def visit_user_defined(self, type_, **kw):
7534 try:
7535 get_col_spec = type_.get_col_spec
7536 except AttributeError:
7537 return repr(type_)
7538 else:
7539 return get_col_spec(**kw)
7540
7541
7542class _SchemaForObjectCallable(Protocol):
7543 def __call__(self, __obj: Any) -> str: ...
7544
7545
7546class _BindNameForColProtocol(Protocol):
7547 def __call__(self, col: ColumnClause[Any]) -> str: ...
7548
7549
7550class IdentifierPreparer:
7551 """Handle quoting and case-folding of identifiers based on options."""
7552
7553 reserved_words = RESERVED_WORDS
7554
7555 legal_characters = LEGAL_CHARACTERS
7556
7557 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7558
7559 initial_quote: str
7560
7561 final_quote: str
7562
7563 _strings: MutableMapping[str, str]
7564
7565 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7566 """Return the .schema attribute for an object.
7567
7568 For the default IdentifierPreparer, the schema for an object is always
7569 the value of the ".schema" attribute. if the preparer is replaced
7570 with one that has a non-empty schema_translate_map, the value of the
7571 ".schema" attribute is rendered a symbol that will be converted to a
7572 real schema name from the mapping post-compile.
7573
7574 """
7575
7576 _includes_none_schema_translate: bool = False
7577
7578 def __init__(
7579 self,
7580 dialect: Dialect,
7581 initial_quote: str = '"',
7582 final_quote: Optional[str] = None,
7583 escape_quote: str = '"',
7584 quote_case_sensitive_collations: bool = True,
7585 omit_schema: bool = False,
7586 ):
7587 """Construct a new ``IdentifierPreparer`` object.
7588
7589 initial_quote
7590 Character that begins a delimited identifier.
7591
7592 final_quote
7593 Character that ends a delimited identifier. Defaults to
7594 `initial_quote`.
7595
7596 omit_schema
7597 Prevent prepending schema name. Useful for databases that do
7598 not support schemae.
7599 """
7600
7601 self.dialect = dialect
7602 self.initial_quote = initial_quote
7603 self.final_quote = final_quote or self.initial_quote
7604 self.escape_quote = escape_quote
7605 self.escape_to_quote = self.escape_quote * 2
7606 self.omit_schema = omit_schema
7607 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7608 self._strings = {}
7609 self._double_percents = self.dialect.paramstyle in (
7610 "format",
7611 "pyformat",
7612 )
7613
7614 def _with_schema_translate(self, schema_translate_map):
7615 prep = self.__class__.__new__(self.__class__)
7616 prep.__dict__.update(self.__dict__)
7617
7618 includes_none = None in schema_translate_map
7619
7620 def symbol_getter(obj):
7621 name = obj.schema
7622 if obj._use_schema_map and (name is not None or includes_none):
7623 if name is not None and ("[" in name or "]" in name):
7624 raise exc.CompileError(
7625 "Square bracket characters ([]) not supported "
7626 "in schema translate name '%s'" % name
7627 )
7628 return quoted_name(
7629 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7630 )
7631 else:
7632 return obj.schema
7633
7634 prep.schema_for_object = symbol_getter
7635 prep._includes_none_schema_translate = includes_none
7636 return prep
7637
7638 def _render_schema_translates(
7639 self, statement: str, schema_translate_map: SchemaTranslateMapType
7640 ) -> str:
7641 d = schema_translate_map
7642 if None in d:
7643 if not self._includes_none_schema_translate:
7644 raise exc.InvalidRequestError(
7645 "schema translate map which previously did not have "
7646 "`None` present as a key now has `None` present; compiled "
7647 "statement may lack adequate placeholders. Please use "
7648 "consistent keys in successive "
7649 "schema_translate_map dictionaries."
7650 )
7651
7652 d["_none"] = d[None] # type: ignore[index]
7653
7654 def replace(m):
7655 name = m.group(2)
7656 if name in d:
7657 effective_schema = d[name]
7658 else:
7659 if name in (None, "_none"):
7660 raise exc.InvalidRequestError(
7661 "schema translate map which previously had `None` "
7662 "present as a key now no longer has it present; don't "
7663 "know how to apply schema for compiled statement. "
7664 "Please use consistent keys in successive "
7665 "schema_translate_map dictionaries."
7666 )
7667 effective_schema = name
7668
7669 if not effective_schema:
7670 effective_schema = self.dialect.default_schema_name
7671 if not effective_schema:
7672 # TODO: no coverage here
7673 raise exc.CompileError(
7674 "Dialect has no default schema name; can't "
7675 "use None as dynamic schema target."
7676 )
7677 return self.quote_schema(effective_schema)
7678
7679 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7680
7681 def _escape_identifier(self, value: str) -> str:
7682 """Escape an identifier.
7683
7684 Subclasses should override this to provide database-dependent
7685 escaping behavior.
7686 """
7687
7688 value = value.replace(self.escape_quote, self.escape_to_quote)
7689 if self._double_percents:
7690 value = value.replace("%", "%%")
7691 return value
7692
7693 def _unescape_identifier(self, value: str) -> str:
7694 """Canonicalize an escaped identifier.
7695
7696 Subclasses should override this to provide database-dependent
7697 unescaping behavior that reverses _escape_identifier.
7698 """
7699
7700 return value.replace(self.escape_to_quote, self.escape_quote)
7701
7702 def validate_sql_phrase(self, element, reg):
7703 """keyword sequence filter.
7704
7705 a filter for elements that are intended to represent keyword sequences,
7706 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7707 should be present.
7708
7709 .. versionadded:: 1.3
7710
7711 """
7712
7713 if element is not None and not reg.match(element):
7714 raise exc.CompileError(
7715 "Unexpected SQL phrase: %r (matching against %r)"
7716 % (element, reg.pattern)
7717 )
7718 return element
7719
7720 def quote_identifier(self, value: str) -> str:
7721 """Quote an identifier.
7722
7723 Subclasses should override this to provide database-dependent
7724 quoting behavior.
7725 """
7726
7727 return (
7728 self.initial_quote
7729 + self._escape_identifier(value)
7730 + self.final_quote
7731 )
7732
7733 def _requires_quotes(self, value: str) -> bool:
7734 """Return True if the given identifier requires quoting."""
7735 lc_value = value.lower()
7736 return (
7737 lc_value in self.reserved_words
7738 or value[0] in self.illegal_initial_characters
7739 or not self.legal_characters.match(str(value))
7740 or (lc_value != value)
7741 )
7742
7743 def _requires_quotes_illegal_chars(self, value):
7744 """Return True if the given identifier requires quoting, but
7745 not taking case convention into account."""
7746 return not self.legal_characters.match(str(value))
7747
7748 def quote_schema(self, schema: str, force: Any = None) -> str:
7749 """Conditionally quote a schema name.
7750
7751
7752 The name is quoted if it is a reserved word, contains quote-necessary
7753 characters, or is an instance of :class:`.quoted_name` which includes
7754 ``quote`` set to ``True``.
7755
7756 Subclasses can override this to provide database-dependent
7757 quoting behavior for schema names.
7758
7759 :param schema: string schema name
7760 :param force: unused
7761
7762 .. deprecated:: 0.9
7763
7764 The :paramref:`.IdentifierPreparer.quote_schema.force`
7765 parameter is deprecated and will be removed in a future
7766 release. This flag has no effect on the behavior of the
7767 :meth:`.IdentifierPreparer.quote` method; please refer to
7768 :class:`.quoted_name`.
7769
7770 """
7771 if force is not None:
7772 # not using the util.deprecated_params() decorator in this
7773 # case because of the additional function call overhead on this
7774 # very performance-critical spot.
7775 util.warn_deprecated(
7776 "The IdentifierPreparer.quote_schema.force parameter is "
7777 "deprecated and will be removed in a future release. This "
7778 "flag has no effect on the behavior of the "
7779 "IdentifierPreparer.quote method; please refer to "
7780 "quoted_name().",
7781 # deprecated 0.9. warning from 1.3
7782 version="0.9",
7783 )
7784
7785 return self.quote(schema)
7786
7787 def quote(self, ident: str, force: Any = None) -> str:
7788 """Conditionally quote an identifier.
7789
7790 The identifier is quoted if it is a reserved word, contains
7791 quote-necessary characters, or is an instance of
7792 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7793
7794 Subclasses can override this to provide database-dependent
7795 quoting behavior for identifier names.
7796
7797 :param ident: string identifier
7798 :param force: unused
7799
7800 .. deprecated:: 0.9
7801
7802 The :paramref:`.IdentifierPreparer.quote.force`
7803 parameter is deprecated and will be removed in a future
7804 release. This flag has no effect on the behavior of the
7805 :meth:`.IdentifierPreparer.quote` method; please refer to
7806 :class:`.quoted_name`.
7807
7808 """
7809 if force is not None:
7810 # not using the util.deprecated_params() decorator in this
7811 # case because of the additional function call overhead on this
7812 # very performance-critical spot.
7813 util.warn_deprecated(
7814 "The IdentifierPreparer.quote.force parameter is "
7815 "deprecated and will be removed in a future release. This "
7816 "flag has no effect on the behavior of the "
7817 "IdentifierPreparer.quote method; please refer to "
7818 "quoted_name().",
7819 # deprecated 0.9. warning from 1.3
7820 version="0.9",
7821 )
7822
7823 force = getattr(ident, "quote", None)
7824
7825 if force is None:
7826 if ident in self._strings:
7827 return self._strings[ident]
7828 else:
7829 if self._requires_quotes(ident):
7830 self._strings[ident] = self.quote_identifier(ident)
7831 else:
7832 self._strings[ident] = ident
7833 return self._strings[ident]
7834 elif force:
7835 return self.quote_identifier(ident)
7836 else:
7837 return ident
7838
7839 def format_collation(self, collation_name):
7840 if self.quote_case_sensitive_collations:
7841 return self.quote(collation_name)
7842 else:
7843 return collation_name
7844
7845 def format_sequence(
7846 self, sequence: schema.Sequence, use_schema: bool = True
7847 ) -> str:
7848 name = self.quote(sequence.name)
7849
7850 effective_schema = self.schema_for_object(sequence)
7851
7852 if (
7853 not self.omit_schema
7854 and use_schema
7855 and effective_schema is not None
7856 ):
7857 name = self.quote_schema(effective_schema) + "." + name
7858 return name
7859
7860 def format_label(
7861 self, label: Label[Any], name: Optional[str] = None
7862 ) -> str:
7863 return self.quote(name or label.name)
7864
7865 def format_alias(
7866 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7867 ) -> str:
7868 if name is None:
7869 assert alias is not None
7870 return self.quote(alias.name)
7871 else:
7872 return self.quote(name)
7873
7874 def format_savepoint(self, savepoint, name=None):
7875 # Running the savepoint name through quoting is unnecessary
7876 # for all known dialects. This is here to support potential
7877 # third party use cases
7878 ident = name or savepoint.ident
7879 if self._requires_quotes(ident):
7880 ident = self.quote_identifier(ident)
7881 return ident
7882
7883 @util.preload_module("sqlalchemy.sql.naming")
7884 def format_constraint(
7885 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7886 ) -> Optional[str]:
7887 naming = util.preloaded.sql_naming
7888
7889 if constraint.name is _NONE_NAME:
7890 name = naming._constraint_name_for_table(
7891 constraint, constraint.table
7892 )
7893
7894 if name is None:
7895 return None
7896 else:
7897 name = constraint.name
7898
7899 assert name is not None
7900 if constraint.__visit_name__ == "index":
7901 return self.truncate_and_render_index_name(
7902 name, _alembic_quote=_alembic_quote
7903 )
7904 else:
7905 return self.truncate_and_render_constraint_name(
7906 name, _alembic_quote=_alembic_quote
7907 )
7908
7909 def truncate_and_render_index_name(
7910 self, name: str, _alembic_quote: bool = True
7911 ) -> str:
7912 # calculate these at format time so that ad-hoc changes
7913 # to dialect.max_identifier_length etc. can be reflected
7914 # as IdentifierPreparer is long lived
7915 max_ = (
7916 self.dialect.max_index_name_length
7917 or self.dialect.max_identifier_length
7918 )
7919 return self._truncate_and_render_maxlen_name(
7920 name, max_, _alembic_quote
7921 )
7922
7923 def truncate_and_render_constraint_name(
7924 self, name: str, _alembic_quote: bool = True
7925 ) -> str:
7926 # calculate these at format time so that ad-hoc changes
7927 # to dialect.max_identifier_length etc. can be reflected
7928 # as IdentifierPreparer is long lived
7929 max_ = (
7930 self.dialect.max_constraint_name_length
7931 or self.dialect.max_identifier_length
7932 )
7933 return self._truncate_and_render_maxlen_name(
7934 name, max_, _alembic_quote
7935 )
7936
7937 def _truncate_and_render_maxlen_name(
7938 self, name: str, max_: int, _alembic_quote: bool
7939 ) -> str:
7940 if isinstance(name, elements._truncated_label):
7941 if len(name) > max_:
7942 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7943 else:
7944 self.dialect.validate_identifier(name)
7945
7946 if not _alembic_quote:
7947 return name
7948 else:
7949 return self.quote(name)
7950
7951 def format_index(self, index: Index) -> str:
7952 name = self.format_constraint(index)
7953 assert name is not None
7954 return name
7955
7956 def format_table(
7957 self,
7958 table: FromClause,
7959 use_schema: bool = True,
7960 name: Optional[str] = None,
7961 ) -> str:
7962 """Prepare a quoted table and schema name."""
7963 if name is None:
7964 if TYPE_CHECKING:
7965 assert isinstance(table, NamedFromClause)
7966 name = table.name
7967
7968 result = self.quote(name)
7969
7970 effective_schema = self.schema_for_object(table)
7971
7972 if not self.omit_schema and use_schema and effective_schema:
7973 result = self.quote_schema(effective_schema) + "." + result
7974 return result
7975
7976 def format_schema(self, name):
7977 """Prepare a quoted schema name."""
7978
7979 return self.quote(name)
7980
7981 def format_label_name(
7982 self,
7983 name,
7984 anon_map=None,
7985 ):
7986 """Prepare a quoted column name."""
7987
7988 if anon_map is not None and isinstance(
7989 name, elements._truncated_label
7990 ):
7991 name = name.apply_map(anon_map)
7992
7993 return self.quote(name)
7994
7995 def format_column(
7996 self,
7997 column: ColumnElement[Any],
7998 use_table: bool = False,
7999 name: Optional[str] = None,
8000 table_name: Optional[str] = None,
8001 use_schema: bool = False,
8002 anon_map: Optional[Mapping[str, Any]] = None,
8003 ) -> str:
8004 """Prepare a quoted column name."""
8005
8006 if name is None:
8007 name = column.name
8008 assert name is not None
8009
8010 if anon_map is not None and isinstance(
8011 name, elements._truncated_label
8012 ):
8013 name = name.apply_map(anon_map)
8014
8015 if not getattr(column, "is_literal", False):
8016 if use_table:
8017 return (
8018 self.format_table(
8019 column.table, use_schema=use_schema, name=table_name
8020 )
8021 + "."
8022 + self.quote(name)
8023 )
8024 else:
8025 return self.quote(name)
8026 else:
8027 # literal textual elements get stuck into ColumnClause a lot,
8028 # which shouldn't get quoted
8029
8030 if use_table:
8031 return (
8032 self.format_table(
8033 column.table, use_schema=use_schema, name=table_name
8034 )
8035 + "."
8036 + name
8037 )
8038 else:
8039 return name
8040
8041 def format_table_seq(self, table, use_schema=True):
8042 """Format table name and schema as a tuple."""
8043
8044 # Dialects with more levels in their fully qualified references
8045 # ('database', 'owner', etc.) could override this and return
8046 # a longer sequence.
8047
8048 effective_schema = self.schema_for_object(table)
8049
8050 if not self.omit_schema and use_schema and effective_schema:
8051 return (
8052 self.quote_schema(effective_schema),
8053 self.format_table(table, use_schema=False),
8054 )
8055 else:
8056 return (self.format_table(table, use_schema=False),)
8057
8058 @util.memoized_property
8059 def _r_identifiers(self):
8060 initial, final, escaped_final = (
8061 re.escape(s)
8062 for s in (
8063 self.initial_quote,
8064 self.final_quote,
8065 self._escape_identifier(self.final_quote),
8066 )
8067 )
8068 r = re.compile(
8069 r"(?:"
8070 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8071 r"|([^\.]+))(?=\.|$))+"
8072 % {"initial": initial, "final": final, "escaped": escaped_final}
8073 )
8074 return r
8075
8076 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8077 """Unpack 'schema.table.column'-like strings into components."""
8078
8079 r = self._r_identifiers
8080 return [
8081 self._unescape_identifier(i)
8082 for i in [a or b for a, b in r.findall(identifiers)]
8083 ]