1# sql/compiler.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Mapping
48from typing import MutableMapping
49from typing import NamedTuple
50from typing import NoReturn
51from typing import Optional
52from typing import Pattern
53from typing import Sequence
54from typing import Set
55from typing import Tuple
56from typing import Type
57from typing import TYPE_CHECKING
58from typing import Union
59
60from . import base
61from . import coercions
62from . import crud
63from . import elements
64from . import functions
65from . import operators
66from . import roles
67from . import schema
68from . import selectable
69from . import sqltypes
70from . import util as sql_util
71from ._typing import is_column_element
72from ._typing import is_dml
73from .base import _de_clone
74from .base import _from_objects
75from .base import _NONE_NAME
76from .base import _SentinelDefaultCharacterization
77from .base import NO_ARG
78from .elements import quoted_name
79from .sqltypes import TupleType
80from .visitors import prefix_anon_map
81from .. import exc
82from .. import util
83from ..util import FastIntFlag
84from ..util.typing import Literal
85from ..util.typing import Protocol
86from ..util.typing import Self
87from ..util.typing import TypedDict
88
89if typing.TYPE_CHECKING:
90 from .annotation import _AnnotationDict
91 from .base import _AmbiguousTableNameMap
92 from .base import CompileState
93 from .base import Executable
94 from .cache_key import CacheKey
95 from .ddl import ExecutableDDLElement
96 from .dml import Insert
97 from .dml import Update
98 from .dml import UpdateBase
99 from .dml import UpdateDMLState
100 from .dml import ValuesBase
101 from .elements import _truncated_label
102 from .elements import BinaryExpression
103 from .elements import BindParameter
104 from .elements import ClauseElement
105 from .elements import ColumnClause
106 from .elements import ColumnElement
107 from .elements import False_
108 from .elements import Label
109 from .elements import Null
110 from .elements import True_
111 from .functions import Function
112 from .schema import CheckConstraint
113 from .schema import Column
114 from .schema import Constraint
115 from .schema import ForeignKeyConstraint
116 from .schema import Index
117 from .schema import PrimaryKeyConstraint
118 from .schema import Table
119 from .schema import UniqueConstraint
120 from .selectable import _ColumnsClauseElement
121 from .selectable import AliasedReturnsRows
122 from .selectable import CompoundSelectState
123 from .selectable import CTE
124 from .selectable import FromClause
125 from .selectable import NamedFromClause
126 from .selectable import ReturnsRows
127 from .selectable import Select
128 from .selectable import SelectState
129 from .type_api import _BindProcessorType
130 from .type_api import TypeDecorator
131 from .type_api import TypeEngine
132 from .type_api import UserDefinedType
133 from .visitors import Visitable
134 from ..engine.cursor import CursorResultMetaData
135 from ..engine.interfaces import _CoreSingleExecuteParams
136 from ..engine.interfaces import _DBAPIAnyExecuteParams
137 from ..engine.interfaces import _DBAPIMultiExecuteParams
138 from ..engine.interfaces import _DBAPISingleExecuteParams
139 from ..engine.interfaces import _ExecuteOptions
140 from ..engine.interfaces import _GenericSetInputSizesType
141 from ..engine.interfaces import _MutableCoreSingleExecuteParams
142 from ..engine.interfaces import Dialect
143 from ..engine.interfaces import SchemaTranslateMapType
144
145
146_FromHintsType = Dict["FromClause", str]
147
148RESERVED_WORDS = {
149 "all",
150 "analyse",
151 "analyze",
152 "and",
153 "any",
154 "array",
155 "as",
156 "asc",
157 "asymmetric",
158 "authorization",
159 "between",
160 "binary",
161 "both",
162 "case",
163 "cast",
164 "check",
165 "collate",
166 "column",
167 "constraint",
168 "create",
169 "cross",
170 "current_date",
171 "current_role",
172 "current_time",
173 "current_timestamp",
174 "current_user",
175 "default",
176 "deferrable",
177 "desc",
178 "distinct",
179 "do",
180 "else",
181 "end",
182 "except",
183 "false",
184 "for",
185 "foreign",
186 "freeze",
187 "from",
188 "full",
189 "grant",
190 "group",
191 "having",
192 "ilike",
193 "in",
194 "initially",
195 "inner",
196 "intersect",
197 "into",
198 "is",
199 "isnull",
200 "join",
201 "leading",
202 "left",
203 "like",
204 "limit",
205 "localtime",
206 "localtimestamp",
207 "natural",
208 "new",
209 "not",
210 "notnull",
211 "null",
212 "off",
213 "offset",
214 "old",
215 "on",
216 "only",
217 "or",
218 "order",
219 "outer",
220 "overlaps",
221 "placing",
222 "primary",
223 "references",
224 "right",
225 "select",
226 "session_user",
227 "set",
228 "similar",
229 "some",
230 "symmetric",
231 "table",
232 "then",
233 "to",
234 "trailing",
235 "true",
236 "union",
237 "unique",
238 "user",
239 "using",
240 "verbose",
241 "when",
242 "where",
243}
244
245LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
246LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
247ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
248
249FK_ON_DELETE = re.compile(
250 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
251)
252FK_ON_UPDATE = re.compile(
253 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
254)
255FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
256BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
257BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
258
259_pyformat_template = "%%(%(name)s)s"
260BIND_TEMPLATES = {
261 "pyformat": _pyformat_template,
262 "qmark": "?",
263 "format": "%%s",
264 "numeric": ":[_POSITION]",
265 "numeric_dollar": "$[_POSITION]",
266 "named": ":%(name)s",
267}
268
269
270OPERATORS = {
271 # binary
272 operators.and_: " AND ",
273 operators.or_: " OR ",
274 operators.add: " + ",
275 operators.mul: " * ",
276 operators.sub: " - ",
277 operators.mod: " % ",
278 operators.neg: "-",
279 operators.lt: " < ",
280 operators.le: " <= ",
281 operators.ne: " != ",
282 operators.gt: " > ",
283 operators.ge: " >= ",
284 operators.eq: " = ",
285 operators.is_distinct_from: " IS DISTINCT FROM ",
286 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
287 operators.concat_op: " || ",
288 operators.match_op: " MATCH ",
289 operators.not_match_op: " NOT MATCH ",
290 operators.in_op: " IN ",
291 operators.not_in_op: " NOT IN ",
292 operators.comma_op: ", ",
293 operators.from_: " FROM ",
294 operators.as_: " AS ",
295 operators.is_: " IS ",
296 operators.is_not: " IS NOT ",
297 operators.collate: " COLLATE ",
298 # unary
299 operators.exists: "EXISTS ",
300 operators.distinct_op: "DISTINCT ",
301 operators.inv: "NOT ",
302 operators.any_op: "ANY ",
303 operators.all_op: "ALL ",
304 # modifiers
305 operators.desc_op: " DESC",
306 operators.asc_op: " ASC",
307 operators.nulls_first_op: " NULLS FIRST",
308 operators.nulls_last_op: " NULLS LAST",
309 # bitwise
310 operators.bitwise_xor_op: " ^ ",
311 operators.bitwise_or_op: " | ",
312 operators.bitwise_and_op: " & ",
313 operators.bitwise_not_op: "~",
314 operators.bitwise_lshift_op: " << ",
315 operators.bitwise_rshift_op: " >> ",
316}
317
318FUNCTIONS: Dict[Type[Function[Any]], str] = {
319 functions.coalesce: "coalesce",
320 functions.current_date: "CURRENT_DATE",
321 functions.current_time: "CURRENT_TIME",
322 functions.current_timestamp: "CURRENT_TIMESTAMP",
323 functions.current_user: "CURRENT_USER",
324 functions.localtime: "LOCALTIME",
325 functions.localtimestamp: "LOCALTIMESTAMP",
326 functions.random: "random",
327 functions.sysdate: "sysdate",
328 functions.session_user: "SESSION_USER",
329 functions.user: "USER",
330 functions.cube: "CUBE",
331 functions.rollup: "ROLLUP",
332 functions.grouping_sets: "GROUPING SETS",
333}
334
335
336EXTRACT_MAP = {
337 "month": "month",
338 "day": "day",
339 "year": "year",
340 "second": "second",
341 "hour": "hour",
342 "doy": "doy",
343 "minute": "minute",
344 "quarter": "quarter",
345 "dow": "dow",
346 "week": "week",
347 "epoch": "epoch",
348 "milliseconds": "milliseconds",
349 "microseconds": "microseconds",
350 "timezone_hour": "timezone_hour",
351 "timezone_minute": "timezone_minute",
352}
353
354COMPOUND_KEYWORDS = {
355 selectable._CompoundSelectKeyword.UNION: "UNION",
356 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
357 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
358 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
359 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
360 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
361}
362
363
364class ResultColumnsEntry(NamedTuple):
365 """Tracks a column expression that is expected to be represented
366 in the result rows for this statement.
367
368 This normally refers to the columns clause of a SELECT statement
369 but may also refer to a RETURNING clause, as well as for dialect-specific
370 emulations.
371
372 """
373
374 keyname: str
375 """string name that's expected in cursor.description"""
376
377 name: str
378 """column name, may be labeled"""
379
380 objects: Tuple[Any, ...]
381 """sequence of objects that should be able to locate this column
382 in a RowMapping. This is typically string names and aliases
383 as well as Column objects.
384
385 """
386
387 type: TypeEngine[Any]
388 """Datatype to be associated with this column. This is where
389 the "result processing" logic directly links the compiled statement
390 to the rows that come back from the cursor.
391
392 """
393
394
395class _ResultMapAppender(Protocol):
396 def __call__(
397 self,
398 keyname: str,
399 name: str,
400 objects: Sequence[Any],
401 type_: TypeEngine[Any],
402 ) -> None: ...
403
404
405# integer indexes into ResultColumnsEntry used by cursor.py.
406# some profiling showed integer access faster than named tuple
407RM_RENDERED_NAME: Literal[0] = 0
408RM_NAME: Literal[1] = 1
409RM_OBJECTS: Literal[2] = 2
410RM_TYPE: Literal[3] = 3
411
412
413class _BaseCompilerStackEntry(TypedDict):
414 asfrom_froms: Set[FromClause]
415 correlate_froms: Set[FromClause]
416 selectable: ReturnsRows
417
418
419class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
420 compile_state: CompileState
421 need_result_map_for_nested: bool
422 need_result_map_for_compound: bool
423 select_0: ReturnsRows
424 insert_from_select: Select[Any]
425
426
427class ExpandedState(NamedTuple):
428 """represents state to use when producing "expanded" and
429 "post compile" bound parameters for a statement.
430
431 "expanded" parameters are parameters that are generated at
432 statement execution time to suit a number of parameters passed, the most
433 prominent example being the individual elements inside of an IN expression.
434
435 "post compile" parameters are parameters where the SQL literal value
436 will be rendered into the SQL statement at execution time, rather than
437 being passed as separate parameters to the driver.
438
439 To create an :class:`.ExpandedState` instance, use the
440 :meth:`.SQLCompiler.construct_expanded_state` method on any
441 :class:`.SQLCompiler` instance.
442
443 """
444
445 statement: str
446 """String SQL statement with parameters fully expanded"""
447
448 parameters: _CoreSingleExecuteParams
449 """Parameter dictionary with parameters fully expanded.
450
451 For a statement that uses named parameters, this dictionary will map
452 exactly to the names in the statement. For a statement that uses
453 positional parameters, the :attr:`.ExpandedState.positional_parameters`
454 will yield a tuple with the positional parameter set.
455
456 """
457
458 processors: Mapping[str, _BindProcessorType[Any]]
459 """mapping of bound value processors"""
460
461 positiontup: Optional[Sequence[str]]
462 """Sequence of string names indicating the order of positional
463 parameters"""
464
465 parameter_expansion: Mapping[str, List[str]]
466 """Mapping representing the intermediary link from original parameter
467 name to list of "expanded" parameter names, for those parameters that
468 were expanded."""
469
470 @property
471 def positional_parameters(self) -> Tuple[Any, ...]:
472 """Tuple of positional parameters, for statements that were compiled
473 using a positional paramstyle.
474
475 """
476 if self.positiontup is None:
477 raise exc.InvalidRequestError(
478 "statement does not use a positional paramstyle"
479 )
480 return tuple(self.parameters[key] for key in self.positiontup)
481
482 @property
483 def additional_parameters(self) -> _CoreSingleExecuteParams:
484 """synonym for :attr:`.ExpandedState.parameters`."""
485 return self.parameters
486
487
488class _InsertManyValues(NamedTuple):
489 """represents state to use for executing an "insertmanyvalues" statement.
490
491 The primary consumers of this object are the
492 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
493 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
494
495 .. versionadded:: 2.0
496
497 """
498
499 is_default_expr: bool
500 """if True, the statement is of the form
501 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
502
503 """
504
505 single_values_expr: str
506 """The rendered "values" clause of the INSERT statement.
507
508 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
509 The insertmanyvalues logic uses this string as a search and replace
510 target.
511
512 """
513
514 insert_crud_params: List[crud._CrudParamElementStr]
515 """List of Column / bind names etc. used while rewriting the statement"""
516
517 num_positional_params_counted: int
518 """the number of bound parameters in a single-row statement.
519
520 This count may be larger or smaller than the actual number of columns
521 targeted in the INSERT, as it accommodates for SQL expressions
522 in the values list that may have zero or more parameters embedded
523 within them.
524
525 This count is part of what's used to organize rewritten parameter lists
526 when batching.
527
528 """
529
530 sort_by_parameter_order: bool = False
531 """if the deterministic_returnined_order parameter were used on the
532 insert.
533
534 All of the attributes following this will only be used if this is True.
535
536 """
537
538 includes_upsert_behaviors: bool = False
539 """if True, we have to accommodate for upsert behaviors.
540
541 This will in some cases downgrade "insertmanyvalues" that requests
542 deterministic ordering.
543
544 """
545
546 sentinel_columns: Optional[Sequence[Column[Any]]] = None
547 """List of sentinel columns that were located.
548
549 This list is only here if the INSERT asked for
550 sort_by_parameter_order=True,
551 and dialect-appropriate sentinel columns were located.
552
553 .. versionadded:: 2.0.10
554
555 """
556
557 num_sentinel_columns: int = 0
558 """how many sentinel columns are in the above list, if any.
559
560 This is the same as
561 ``len(sentinel_columns) if sentinel_columns is not None else 0``
562
563 """
564
565 sentinel_param_keys: Optional[Sequence[str]] = None
566 """parameter str keys in each param dictionary / tuple
567 that would link to the client side "sentinel" values for that row, which
568 we can use to match up parameter sets to result rows.
569
570 This is only present if sentinel_columns is present and the INSERT
571 statement actually refers to client side values for these sentinel
572 columns.
573
574 .. versionadded:: 2.0.10
575
576 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
577 only, used against the "compiled parameteters" collection before
578 the parameters were converted by bound parameter processors
579
580 """
581
582 implicit_sentinel: bool = False
583 """if True, we have exactly one sentinel column and it uses a server side
584 value, currently has to generate an incrementing integer value.
585
586 The dialect in question would have asserted that it supports receiving
587 these values back and sorting on that value as a means of guaranteeing
588 correlation with the incoming parameter list.
589
590 .. versionadded:: 2.0.10
591
592 """
593
594 embed_values_counter: bool = False
595 """Whether to embed an incrementing integer counter in each parameter
596 set within the VALUES clause as parameters are batched over.
597
598 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
599 where a subquery is used to produce value tuples. Current support
600 includes PostgreSQL, Microsoft SQL Server.
601
602 .. versionadded:: 2.0.10
603
604 """
605
606
607class _InsertManyValuesBatch(NamedTuple):
608 """represents an individual batch SQL statement for insertmanyvalues.
609
610 This is passed through the
611 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
612 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
613 to the :class:`.Connection` within the
614 :meth:`.Connection._exec_insertmany_context` method.
615
616 .. versionadded:: 2.0.10
617
618 """
619
620 replaced_statement: str
621 replaced_parameters: _DBAPIAnyExecuteParams
622 processed_setinputsizes: Optional[_GenericSetInputSizesType]
623 batch: Sequence[_DBAPISingleExecuteParams]
624 sentinel_values: Sequence[Tuple[Any, ...]]
625 current_batch_size: int
626 batchnum: int
627 total_batches: int
628 rows_sorted: bool
629 is_downgraded: bool
630
631
632class InsertmanyvaluesSentinelOpts(FastIntFlag):
633 """bitflag enum indicating styles of PK defaults
634 which can work as implicit sentinel columns
635
636 """
637
638 NOT_SUPPORTED = 1
639 AUTOINCREMENT = 2
640 IDENTITY = 4
641 SEQUENCE = 8
642
643 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
644 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
645
646 USE_INSERT_FROM_SELECT = 16
647 RENDER_SELECT_COL_CASTS = 64
648
649
650class CompilerState(IntEnum):
651 COMPILING = 0
652 """statement is present, compilation phase in progress"""
653
654 STRING_APPLIED = 1
655 """statement is present, string form of the statement has been applied.
656
657 Additional processors by subclasses may still be pending.
658
659 """
660
661 NO_STATEMENT = 2
662 """compiler does not have a statement to compile, is used
663 for method access"""
664
665
666class Linting(IntEnum):
667 """represent preferences for the 'SQL linting' feature.
668
669 this feature currently includes support for flagging cartesian products
670 in SQL statements.
671
672 """
673
674 NO_LINTING = 0
675 "Disable all linting."
676
677 COLLECT_CARTESIAN_PRODUCTS = 1
678 """Collect data on FROMs and cartesian products and gather into
679 'self.from_linter'"""
680
681 WARN_LINTING = 2
682 "Emit warnings for linters that find problems"
683
684 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
685 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
686 and WARN_LINTING"""
687
688
689NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
690 Linting
691)
692
693
694class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
695 """represents current state for the "cartesian product" detection
696 feature."""
697
698 def lint(self, start=None):
699 froms = self.froms
700 if not froms:
701 return None, None
702
703 edges = set(self.edges)
704 the_rest = set(froms)
705
706 if start is not None:
707 start_with = start
708 the_rest.remove(start_with)
709 else:
710 start_with = the_rest.pop()
711
712 stack = collections.deque([start_with])
713
714 while stack and the_rest:
715 node = stack.popleft()
716 the_rest.discard(node)
717
718 # comparison of nodes in edges here is based on hash equality, as
719 # there are "annotated" elements that match the non-annotated ones.
720 # to remove the need for in-python hash() calls, use native
721 # containment routines (e.g. "node in edge", "edge.index(node)")
722 to_remove = {edge for edge in edges if node in edge}
723
724 # appendleft the node in each edge that is not
725 # the one that matched.
726 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
727 edges.difference_update(to_remove)
728
729 # FROMS left over? boom
730 if the_rest:
731 return the_rest, start_with
732 else:
733 return None, None
734
735 def warn(self, stmt_type="SELECT"):
736 the_rest, start_with = self.lint()
737
738 # FROMS left over? boom
739 if the_rest:
740 froms = the_rest
741 if froms:
742 template = (
743 "{stmt_type} statement has a cartesian product between "
744 "FROM element(s) {froms} and "
745 'FROM element "{start}". Apply join condition(s) '
746 "between each element to resolve."
747 )
748 froms_str = ", ".join(
749 f'"{self.froms[from_]}"' for from_ in froms
750 )
751 message = template.format(
752 stmt_type=stmt_type,
753 froms=froms_str,
754 start=self.froms[start_with],
755 )
756
757 util.warn(message)
758
759
760class Compiled:
761 """Represent a compiled SQL or DDL expression.
762
763 The ``__str__`` method of the ``Compiled`` object should produce
764 the actual text of the statement. ``Compiled`` objects are
765 specific to their underlying database dialect, and also may
766 or may not be specific to the columns referenced within a
767 particular set of bind parameters. In no case should the
768 ``Compiled`` object be dependent on the actual values of those
769 bind parameters, even though it may reference those values as
770 defaults.
771 """
772
773 statement: Optional[ClauseElement] = None
774 "The statement to compile."
775 string: str = ""
776 "The string representation of the ``statement``"
777
778 state: CompilerState
779 """description of the compiler's state"""
780
781 is_sql = False
782 is_ddl = False
783
784 _cached_metadata: Optional[CursorResultMetaData] = None
785
786 _result_columns: Optional[List[ResultColumnsEntry]] = None
787
788 schema_translate_map: Optional[SchemaTranslateMapType] = None
789
790 execution_options: _ExecuteOptions = util.EMPTY_DICT
791 """
792 Execution options propagated from the statement. In some cases,
793 sub-elements of the statement can modify these.
794 """
795
796 preparer: IdentifierPreparer
797
798 _annotations: _AnnotationDict = util.EMPTY_DICT
799
800 compile_state: Optional[CompileState] = None
801 """Optional :class:`.CompileState` object that maintains additional
802 state used by the compiler.
803
804 Major executable objects such as :class:`_expression.Insert`,
805 :class:`_expression.Update`, :class:`_expression.Delete`,
806 :class:`_expression.Select` will generate this
807 state when compiled in order to calculate additional information about the
808 object. For the top level object that is to be executed, the state can be
809 stored here where it can also have applicability towards result set
810 processing.
811
812 .. versionadded:: 1.4
813
814 """
815
816 dml_compile_state: Optional[CompileState] = None
817 """Optional :class:`.CompileState` assigned at the same point that
818 .isinsert, .isupdate, or .isdelete is assigned.
819
820 This will normally be the same object as .compile_state, with the
821 exception of cases like the :class:`.ORMFromStatementCompileState`
822 object.
823
824 .. versionadded:: 1.4.40
825
826 """
827
828 cache_key: Optional[CacheKey] = None
829 """The :class:`.CacheKey` that was generated ahead of creating this
830 :class:`.Compiled` object.
831
832 This is used for routines that need access to the original
833 :class:`.CacheKey` instance generated when the :class:`.Compiled`
834 instance was first cached, typically in order to reconcile
835 the original list of :class:`.BindParameter` objects with a
836 per-statement list that's generated on each call.
837
838 """
839
840 _gen_time: float
841 """Generation time of this :class:`.Compiled`, used for reporting
842 cache stats."""
843
844 def __init__(
845 self,
846 dialect: Dialect,
847 statement: Optional[ClauseElement],
848 schema_translate_map: Optional[SchemaTranslateMapType] = None,
849 render_schema_translate: bool = False,
850 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
851 ):
852 """Construct a new :class:`.Compiled` object.
853
854 :param dialect: :class:`.Dialect` to compile against.
855
856 :param statement: :class:`_expression.ClauseElement` to be compiled.
857
858 :param schema_translate_map: dictionary of schema names to be
859 translated when forming the resultant SQL
860
861 .. seealso::
862
863 :ref:`schema_translating`
864
865 :param compile_kwargs: additional kwargs that will be
866 passed to the initial call to :meth:`.Compiled.process`.
867
868
869 """
870 self.dialect = dialect
871 self.preparer = self.dialect.identifier_preparer
872 if schema_translate_map:
873 self.schema_translate_map = schema_translate_map
874 self.preparer = self.preparer._with_schema_translate(
875 schema_translate_map
876 )
877
878 if statement is not None:
879 self.state = CompilerState.COMPILING
880 self.statement = statement
881 self.can_execute = statement.supports_execution
882 self._annotations = statement._annotations
883 if self.can_execute:
884 if TYPE_CHECKING:
885 assert isinstance(statement, Executable)
886 self.execution_options = statement._execution_options
887 self.string = self.process(self.statement, **compile_kwargs)
888
889 if render_schema_translate:
890 assert schema_translate_map is not None
891 self.string = self.preparer._render_schema_translates(
892 self.string, schema_translate_map
893 )
894
895 self.state = CompilerState.STRING_APPLIED
896 else:
897 self.state = CompilerState.NO_STATEMENT
898
899 self._gen_time = perf_counter()
900
901 def __init_subclass__(cls) -> None:
902 cls._init_compiler_cls()
903 return super().__init_subclass__()
904
905 @classmethod
906 def _init_compiler_cls(cls):
907 pass
908
909 def _execute_on_connection(
910 self, connection, distilled_params, execution_options
911 ):
912 if self.can_execute:
913 return connection._execute_compiled(
914 self, distilled_params, execution_options
915 )
916 else:
917 raise exc.ObjectNotExecutableError(self.statement)
918
919 def visit_unsupported_compilation(self, element, err, **kw):
920 raise exc.UnsupportedCompilationError(self, type(element)) from err
921
922 @property
923 def sql_compiler(self) -> SQLCompiler:
924 """Return a Compiled that is capable of processing SQL expressions.
925
926 If this compiler is one, it would likely just return 'self'.
927
928 """
929
930 raise NotImplementedError()
931
932 def process(self, obj: Visitable, **kwargs: Any) -> str:
933 return obj._compiler_dispatch(self, **kwargs)
934
935 def __str__(self) -> str:
936 """Return the string text of the generated SQL or DDL."""
937
938 if self.state is CompilerState.STRING_APPLIED:
939 return self.string
940 else:
941 return ""
942
943 def construct_params(
944 self,
945 params: Optional[_CoreSingleExecuteParams] = None,
946 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
947 escape_names: bool = True,
948 ) -> Optional[_MutableCoreSingleExecuteParams]:
949 """Return the bind params for this compiled object.
950
951 :param params: a dict of string/object pairs whose values will
952 override bind values compiled in to the
953 statement.
954 """
955
956 raise NotImplementedError()
957
958 @property
959 def params(self):
960 """Return the bind params for this compiled object."""
961 return self.construct_params()
962
963
964class TypeCompiler(util.EnsureKWArg):
965 """Produces DDL specification for TypeEngine objects."""
966
967 ensure_kwarg = r"visit_\w+"
968
969 def __init__(self, dialect: Dialect):
970 self.dialect = dialect
971
972 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
973 if (
974 type_._variant_mapping
975 and self.dialect.name in type_._variant_mapping
976 ):
977 type_ = type_._variant_mapping[self.dialect.name]
978 return type_._compiler_dispatch(self, **kw)
979
980 def visit_unsupported_compilation(
981 self, element: Any, err: Exception, **kw: Any
982 ) -> NoReturn:
983 raise exc.UnsupportedCompilationError(self, element) from err
984
985
986# this was a Visitable, but to allow accurate detection of
987# column elements this is actually a column element
988class _CompileLabel(
989 roles.BinaryElementRole[Any], elements.CompilerColumnElement
990):
991 """lightweight label object which acts as an expression.Label."""
992
993 __visit_name__ = "label"
994 __slots__ = "element", "name", "_alt_names"
995
996 def __init__(self, col, name, alt_names=()):
997 self.element = col
998 self.name = name
999 self._alt_names = (col,) + alt_names
1000
1001 @property
1002 def proxy_set(self):
1003 return self.element.proxy_set
1004
1005 @property
1006 def type(self):
1007 return self.element.type
1008
1009 def self_group(self, **kw):
1010 return self
1011
1012
1013class ilike_case_insensitive(
1014 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1015):
1016 """produce a wrapping element for a case-insensitive portion of
1017 an ILIKE construct.
1018
1019 The construct usually renders the ``lower()`` function, but on
1020 PostgreSQL will pass silently with the assumption that "ILIKE"
1021 is being used.
1022
1023 .. versionadded:: 2.0
1024
1025 """
1026
1027 __visit_name__ = "ilike_case_insensitive_operand"
1028 __slots__ = "element", "comparator"
1029
1030 def __init__(self, element):
1031 self.element = element
1032 self.comparator = element.comparator
1033
1034 @property
1035 def proxy_set(self):
1036 return self.element.proxy_set
1037
1038 @property
1039 def type(self):
1040 return self.element.type
1041
1042 def self_group(self, **kw):
1043 return self
1044
1045 def _with_binary_element_type(self, type_):
1046 return ilike_case_insensitive(
1047 self.element._with_binary_element_type(type_)
1048 )
1049
1050
1051class SQLCompiler(Compiled):
1052 """Default implementation of :class:`.Compiled`.
1053
1054 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1055
1056 """
1057
1058 extract_map = EXTRACT_MAP
1059
1060 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1061 util.immutabledict(
1062 {
1063 "%": "P",
1064 "(": "A",
1065 ")": "Z",
1066 ":": "C",
1067 ".": "_",
1068 "[": "_",
1069 "]": "_",
1070 " ": "_",
1071 }
1072 )
1073 )
1074 """A mapping (e.g. dict or similar) containing a lookup of
1075 characters keyed to replacement characters which will be applied to all
1076 'bind names' used in SQL statements as a form of 'escaping'; the given
1077 characters are replaced entirely with the 'replacement' character when
1078 rendered in the SQL statement, and a similar translation is performed
1079 on the incoming names used in parameter dictionaries passed to methods
1080 like :meth:`_engine.Connection.execute`.
1081
1082 This allows bound parameter names used in :func:`_sql.bindparam` and
1083 other constructs to have any arbitrary characters present without any
1084 concern for characters that aren't allowed at all on the target database.
1085
1086 Third party dialects can establish their own dictionary here to replace the
1087 default mapping, which will ensure that the particular characters in the
1088 mapping will never appear in a bound parameter name.
1089
1090 The dictionary is evaluated at **class creation time**, so cannot be
1091 modified at runtime; it must be present on the class when the class
1092 is first declared.
1093
1094 Note that for dialects that have additional bound parameter rules such
1095 as additional restrictions on leading characters, the
1096 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1097 See the cx_Oracle compiler for an example of this.
1098
1099 .. versionadded:: 2.0.0rc1
1100
1101 """
1102
1103 _bind_translate_re: ClassVar[Pattern[str]]
1104 _bind_translate_chars: ClassVar[Mapping[str, str]]
1105
1106 is_sql = True
1107
1108 compound_keywords = COMPOUND_KEYWORDS
1109
1110 isdelete: bool = False
1111 isinsert: bool = False
1112 isupdate: bool = False
1113 """class-level defaults which can be set at the instance
1114 level to define if this Compiled instance represents
1115 INSERT/UPDATE/DELETE
1116 """
1117
1118 postfetch: Optional[List[Column[Any]]]
1119 """list of columns that can be post-fetched after INSERT or UPDATE to
1120 receive server-updated values"""
1121
1122 insert_prefetch: Sequence[Column[Any]] = ()
1123 """list of columns for which default values should be evaluated before
1124 an INSERT takes place"""
1125
1126 update_prefetch: Sequence[Column[Any]] = ()
1127 """list of columns for which onupdate default values should be evaluated
1128 before an UPDATE takes place"""
1129
1130 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1131 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1132 statement, used to receive newly generated values of columns.
1133
1134 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1135 ``returning`` collection, which was not a generalized RETURNING
1136 collection and instead was in fact specific to the "implicit returning"
1137 feature.
1138
1139 """
1140
1141 isplaintext: bool = False
1142
1143 binds: Dict[str, BindParameter[Any]]
1144 """a dictionary of bind parameter keys to BindParameter instances."""
1145
1146 bind_names: Dict[BindParameter[Any], str]
1147 """a dictionary of BindParameter instances to "compiled" names
1148 that are actually present in the generated SQL"""
1149
1150 stack: List[_CompilerStackEntry]
1151 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1152 tracked in this stack using an entry format."""
1153
1154 returning_precedes_values: bool = False
1155 """set to True classwide to generate RETURNING
1156 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1157 """
1158
1159 render_table_with_column_in_update_from: bool = False
1160 """set to True classwide to indicate the SET clause
1161 in a multi-table UPDATE statement should qualify
1162 columns with the table name (i.e. MySQL only)
1163 """
1164
1165 ansi_bind_rules: bool = False
1166 """SQL 92 doesn't allow bind parameters to be used
1167 in the columns clause of a SELECT, nor does it allow
1168 ambiguous expressions like "? = ?". A compiler
1169 subclass can set this flag to False if the target
1170 driver/DB enforces this
1171 """
1172
1173 bindtemplate: str
1174 """template to render bound parameters based on paramstyle."""
1175
1176 compilation_bindtemplate: str
1177 """template used by compiler to render parameters before positional
1178 paramstyle application"""
1179
1180 _numeric_binds_identifier_char: str
1181 """Character that's used to as the identifier of a numerical bind param.
1182 For example if this char is set to ``$``, numerical binds will be rendered
1183 in the form ``$1, $2, $3``.
1184 """
1185
1186 _result_columns: List[ResultColumnsEntry]
1187 """relates label names in the final SQL to a tuple of local
1188 column/label name, ColumnElement object (if any) and
1189 TypeEngine. CursorResult uses this for type processing and
1190 column targeting"""
1191
1192 _textual_ordered_columns: bool = False
1193 """tell the result object that the column names as rendered are important,
1194 but they are also "ordered" vs. what is in the compiled object here.
1195
1196 As of 1.4.42 this condition is only present when the statement is a
1197 TextualSelect, e.g. text("....").columns(...), where it is required
1198 that the columns are considered positionally and not by name.
1199
1200 """
1201
1202 _ad_hoc_textual: bool = False
1203 """tell the result that we encountered text() or '*' constructs in the
1204 middle of the result columns, but we also have compiled columns, so
1205 if the number of columns in cursor.description does not match how many
1206 expressions we have, that means we can't rely on positional at all and
1207 should match on name.
1208
1209 """
1210
1211 _ordered_columns: bool = True
1212 """
1213 if False, means we can't be sure the list of entries
1214 in _result_columns is actually the rendered order. Usually
1215 True unless using an unordered TextualSelect.
1216 """
1217
1218 _loose_column_name_matching: bool = False
1219 """tell the result object that the SQL statement is textual, wants to match
1220 up to Column objects, and may be using the ._tq_label in the SELECT rather
1221 than the base name.
1222
1223 """
1224
1225 _numeric_binds: bool = False
1226 """
1227 True if paramstyle is "numeric". This paramstyle is trickier than
1228 all the others.
1229
1230 """
1231
1232 _render_postcompile: bool = False
1233 """
1234 whether to render out POSTCOMPILE params during the compile phase.
1235
1236 This attribute is used only for end-user invocation of stmt.compile();
1237 it's never used for actual statement execution, where instead the
1238 dialect internals access and render the internal postcompile structure
1239 directly.
1240
1241 """
1242
1243 _post_compile_expanded_state: Optional[ExpandedState] = None
1244 """When render_postcompile is used, the ``ExpandedState`` used to create
1245 the "expanded" SQL is assigned here, and then used by the ``.params``
1246 accessor and ``.construct_params()`` methods for their return values.
1247
1248 .. versionadded:: 2.0.0rc1
1249
1250 """
1251
1252 _pre_expanded_string: Optional[str] = None
1253 """Stores the original string SQL before 'post_compile' is applied,
1254 for cases where 'post_compile' were used.
1255
1256 """
1257
1258 _pre_expanded_positiontup: Optional[List[str]] = None
1259
1260 _insertmanyvalues: Optional[_InsertManyValues] = None
1261
1262 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1263
1264 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1265 """bindparameter objects that are rendered as literal values at statement
1266 execution time.
1267
1268 """
1269
1270 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1271 """bindparameter objects that are rendered as bound parameter placeholders
1272 at statement execution time.
1273
1274 """
1275
1276 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1277 """Late escaping of bound parameter names that has to be converted
1278 to the original name when looking in the parameter dictionary.
1279
1280 """
1281
1282 has_out_parameters = False
1283 """if True, there are bindparam() objects that have the isoutparam
1284 flag set."""
1285
1286 postfetch_lastrowid = False
1287 """if True, and this in insert, use cursor.lastrowid to populate
1288 result.inserted_primary_key. """
1289
1290 _cache_key_bind_match: Optional[
1291 Tuple[
1292 Dict[
1293 BindParameter[Any],
1294 List[BindParameter[Any]],
1295 ],
1296 Dict[
1297 str,
1298 BindParameter[Any],
1299 ],
1300 ]
1301 ] = None
1302 """a mapping that will relate the BindParameter object we compile
1303 to those that are part of the extracted collection of parameters
1304 in the cache key, if we were given a cache key.
1305
1306 """
1307
1308 positiontup: Optional[List[str]] = None
1309 """for a compiled construct that uses a positional paramstyle, will be
1310 a sequence of strings, indicating the names of bound parameters in order.
1311
1312 This is used in order to render bound parameters in their correct order,
1313 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1314 render parameters.
1315
1316 This sequence always contains the unescaped name of the parameters.
1317
1318 .. seealso::
1319
1320 :ref:`faq_sql_expression_string` - includes a usage example for
1321 debugging use cases.
1322
1323 """
1324 _values_bindparam: Optional[List[str]] = None
1325
1326 _visited_bindparam: Optional[List[str]] = None
1327
1328 inline: bool = False
1329
1330 ctes: Optional[MutableMapping[CTE, str]]
1331
1332 # Detect same CTE references - Dict[(level, name), cte]
1333 # Level is required for supporting nesting
1334 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1335
1336 # To retrieve key/level in ctes_by_level_name -
1337 # Dict[cte_reference, (level, cte_name, cte_opts)]
1338 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1339
1340 ctes_recursive: bool
1341
1342 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1343 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1344 _positional_pattern = re.compile(
1345 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1346 )
1347
1348 @classmethod
1349 def _init_compiler_cls(cls):
1350 cls._init_bind_translate()
1351
1352 @classmethod
1353 def _init_bind_translate(cls):
1354 reg = re.escape("".join(cls.bindname_escape_characters))
1355 cls._bind_translate_re = re.compile(f"[{reg}]")
1356 cls._bind_translate_chars = cls.bindname_escape_characters
1357
1358 def __init__(
1359 self,
1360 dialect: Dialect,
1361 statement: Optional[ClauseElement],
1362 cache_key: Optional[CacheKey] = None,
1363 column_keys: Optional[Sequence[str]] = None,
1364 for_executemany: bool = False,
1365 linting: Linting = NO_LINTING,
1366 _supporting_against: Optional[SQLCompiler] = None,
1367 **kwargs: Any,
1368 ):
1369 """Construct a new :class:`.SQLCompiler` object.
1370
1371 :param dialect: :class:`.Dialect` to be used
1372
1373 :param statement: :class:`_expression.ClauseElement` to be compiled
1374
1375 :param column_keys: a list of column names to be compiled into an
1376 INSERT or UPDATE statement.
1377
1378 :param for_executemany: whether INSERT / UPDATE statements should
1379 expect that they are to be invoked in an "executemany" style,
1380 which may impact how the statement will be expected to return the
1381 values of defaults and autoincrement / sequences and similar.
1382 Depending on the backend and driver in use, support for retrieving
1383 these values may be disabled which means SQL expressions may
1384 be rendered inline, RETURNING may not be rendered, etc.
1385
1386 :param kwargs: additional keyword arguments to be consumed by the
1387 superclass.
1388
1389 """
1390 self.column_keys = column_keys
1391
1392 self.cache_key = cache_key
1393
1394 if cache_key:
1395 cksm = {b.key: b for b in cache_key[1]}
1396 ckbm = {b: [b] for b in cache_key[1]}
1397 self._cache_key_bind_match = (ckbm, cksm)
1398
1399 # compile INSERT/UPDATE defaults/sequences to expect executemany
1400 # style execution, which may mean no pre-execute of defaults,
1401 # or no RETURNING
1402 self.for_executemany = for_executemany
1403
1404 self.linting = linting
1405
1406 # a dictionary of bind parameter keys to BindParameter
1407 # instances.
1408 self.binds = {}
1409
1410 # a dictionary of BindParameter instances to "compiled" names
1411 # that are actually present in the generated SQL
1412 self.bind_names = util.column_dict()
1413
1414 # stack which keeps track of nested SELECT statements
1415 self.stack = []
1416
1417 self._result_columns = []
1418
1419 # true if the paramstyle is positional
1420 self.positional = dialect.positional
1421 if self.positional:
1422 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1423 if nb:
1424 self._numeric_binds_identifier_char = (
1425 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1426 )
1427
1428 self.compilation_bindtemplate = _pyformat_template
1429 else:
1430 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1431
1432 self.ctes = None
1433
1434 self.label_length = (
1435 dialect.label_length or dialect.max_identifier_length
1436 )
1437
1438 # a map which tracks "anonymous" identifiers that are created on
1439 # the fly here
1440 self.anon_map = prefix_anon_map()
1441
1442 # a map which tracks "truncated" names based on
1443 # dialect.label_length or dialect.max_identifier_length
1444 self.truncated_names: Dict[Tuple[str, str], str] = {}
1445 self._truncated_counters: Dict[str, int] = {}
1446
1447 Compiled.__init__(self, dialect, statement, **kwargs)
1448
1449 if self.isinsert or self.isupdate or self.isdelete:
1450 if TYPE_CHECKING:
1451 assert isinstance(statement, UpdateBase)
1452
1453 if self.isinsert or self.isupdate:
1454 if TYPE_CHECKING:
1455 assert isinstance(statement, ValuesBase)
1456 if statement._inline:
1457 self.inline = True
1458 elif self.for_executemany and (
1459 not self.isinsert
1460 or (
1461 self.dialect.insert_executemany_returning
1462 and statement._return_defaults
1463 )
1464 ):
1465 self.inline = True
1466
1467 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1468
1469 if _supporting_against:
1470 self.__dict__.update(
1471 {
1472 k: v
1473 for k, v in _supporting_against.__dict__.items()
1474 if k
1475 not in {
1476 "state",
1477 "dialect",
1478 "preparer",
1479 "positional",
1480 "_numeric_binds",
1481 "compilation_bindtemplate",
1482 "bindtemplate",
1483 }
1484 }
1485 )
1486
1487 if self.state is CompilerState.STRING_APPLIED:
1488 if self.positional:
1489 if self._numeric_binds:
1490 self._process_numeric()
1491 else:
1492 self._process_positional()
1493
1494 if self._render_postcompile:
1495 parameters = self.construct_params(
1496 escape_names=False,
1497 _no_postcompile=True,
1498 )
1499
1500 self._process_parameters_for_postcompile(
1501 parameters, _populate_self=True
1502 )
1503
1504 @property
1505 def insert_single_values_expr(self) -> Optional[str]:
1506 """When an INSERT is compiled with a single set of parameters inside
1507 a VALUES expression, the string is assigned here, where it can be
1508 used for insert batching schemes to rewrite the VALUES expression.
1509
1510 .. versionadded:: 1.3.8
1511
1512 .. versionchanged:: 2.0 This collection is no longer used by
1513 SQLAlchemy's built-in dialects, in favor of the currently
1514 internal ``_insertmanyvalues`` collection that is used only by
1515 :class:`.SQLCompiler`.
1516
1517 """
1518 if self._insertmanyvalues is None:
1519 return None
1520 else:
1521 return self._insertmanyvalues.single_values_expr
1522
1523 @util.ro_memoized_property
1524 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1525 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1526
1527 This is either the so-called "implicit returning" columns which are
1528 calculated by the compiler on the fly, or those present based on what's
1529 present in ``self.statement._returning`` (expanded into individual
1530 columns using the ``._all_selected_columns`` attribute) i.e. those set
1531 explicitly using the :meth:`.UpdateBase.returning` method.
1532
1533 .. versionadded:: 2.0
1534
1535 """
1536 if self.implicit_returning:
1537 return self.implicit_returning
1538 elif self.statement is not None and is_dml(self.statement):
1539 return [
1540 c
1541 for c in self.statement._all_selected_columns
1542 if is_column_element(c)
1543 ]
1544
1545 else:
1546 return None
1547
1548 @property
1549 def returning(self):
1550 """backwards compatibility; returns the
1551 effective_returning collection.
1552
1553 """
1554 return self.effective_returning
1555
1556 @property
1557 def current_executable(self):
1558 """Return the current 'executable' that is being compiled.
1559
1560 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1561 :class:`_sql.Update`, :class:`_sql.Delete`,
1562 :class:`_sql.CompoundSelect` object that is being compiled.
1563 Specifically it's assigned to the ``self.stack`` list of elements.
1564
1565 When a statement like the above is being compiled, it normally
1566 is also assigned to the ``.statement`` attribute of the
1567 :class:`_sql.Compiler` object. However, all SQL constructs are
1568 ultimately nestable, and this attribute should never be consulted
1569 by a ``visit_`` method, as it is not guaranteed to be assigned
1570 nor guaranteed to correspond to the current statement being compiled.
1571
1572 .. versionadded:: 1.3.21
1573
1574 For compatibility with previous versions, use the following
1575 recipe::
1576
1577 statement = getattr(self, "current_executable", False)
1578 if statement is False:
1579 statement = self.stack[-1]["selectable"]
1580
1581 For versions 1.4 and above, ensure only .current_executable
1582 is used; the format of "self.stack" may change.
1583
1584
1585 """
1586 try:
1587 return self.stack[-1]["selectable"]
1588 except IndexError as ie:
1589 raise IndexError("Compiler does not have a stack entry") from ie
1590
1591 @property
1592 def prefetch(self):
1593 return list(self.insert_prefetch) + list(self.update_prefetch)
1594
1595 @util.memoized_property
1596 def _global_attributes(self) -> Dict[Any, Any]:
1597 return {}
1598
1599 @util.memoized_instancemethod
1600 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1601 """Initialize collections related to CTEs only if
1602 a CTE is located, to save on the overhead of
1603 these collections otherwise.
1604
1605 """
1606 # collect CTEs to tack on top of a SELECT
1607 # To store the query to print - Dict[cte, text_query]
1608 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1609 self.ctes = ctes
1610
1611 # Detect same CTE references - Dict[(level, name), cte]
1612 # Level is required for supporting nesting
1613 self.ctes_by_level_name = {}
1614
1615 # To retrieve key/level in ctes_by_level_name -
1616 # Dict[cte_reference, (level, cte_name, cte_opts)]
1617 self.level_name_by_cte = {}
1618
1619 self.ctes_recursive = False
1620
1621 return ctes
1622
1623 @contextlib.contextmanager
1624 def _nested_result(self):
1625 """special API to support the use case of 'nested result sets'"""
1626 result_columns, ordered_columns = (
1627 self._result_columns,
1628 self._ordered_columns,
1629 )
1630 self._result_columns, self._ordered_columns = [], False
1631
1632 try:
1633 if self.stack:
1634 entry = self.stack[-1]
1635 entry["need_result_map_for_nested"] = True
1636 else:
1637 entry = None
1638 yield self._result_columns, self._ordered_columns
1639 finally:
1640 if entry:
1641 entry.pop("need_result_map_for_nested")
1642 self._result_columns, self._ordered_columns = (
1643 result_columns,
1644 ordered_columns,
1645 )
1646
1647 def _process_positional(self):
1648 assert not self.positiontup
1649 assert self.state is CompilerState.STRING_APPLIED
1650 assert not self._numeric_binds
1651
1652 if self.dialect.paramstyle == "format":
1653 placeholder = "%s"
1654 else:
1655 assert self.dialect.paramstyle == "qmark"
1656 placeholder = "?"
1657
1658 positions = []
1659
1660 def find_position(m: re.Match[str]) -> str:
1661 normal_bind = m.group(1)
1662 if normal_bind:
1663 positions.append(normal_bind)
1664 return placeholder
1665 else:
1666 # this a post-compile bind
1667 positions.append(m.group(2))
1668 return m.group(0)
1669
1670 self.string = re.sub(
1671 self._positional_pattern, find_position, self.string
1672 )
1673
1674 if self.escaped_bind_names:
1675 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1676 assert len(self.escaped_bind_names) == len(reverse_escape)
1677 self.positiontup = [
1678 reverse_escape.get(name, name) for name in positions
1679 ]
1680 else:
1681 self.positiontup = positions
1682
1683 if self._insertmanyvalues:
1684 positions = []
1685
1686 single_values_expr = re.sub(
1687 self._positional_pattern,
1688 find_position,
1689 self._insertmanyvalues.single_values_expr,
1690 )
1691 insert_crud_params = [
1692 (
1693 v[0],
1694 v[1],
1695 re.sub(self._positional_pattern, find_position, v[2]),
1696 v[3],
1697 )
1698 for v in self._insertmanyvalues.insert_crud_params
1699 ]
1700
1701 self._insertmanyvalues = self._insertmanyvalues._replace(
1702 single_values_expr=single_values_expr,
1703 insert_crud_params=insert_crud_params,
1704 )
1705
1706 def _process_numeric(self):
1707 assert self._numeric_binds
1708 assert self.state is CompilerState.STRING_APPLIED
1709
1710 num = 1
1711 param_pos: Dict[str, str] = {}
1712 order: Iterable[str]
1713 if self._insertmanyvalues and self._values_bindparam is not None:
1714 # bindparams that are not in values are always placed first.
1715 # this avoids the need of changing them when using executemany
1716 # values () ()
1717 order = itertools.chain(
1718 (
1719 name
1720 for name in self.bind_names.values()
1721 if name not in self._values_bindparam
1722 ),
1723 self.bind_names.values(),
1724 )
1725 else:
1726 order = self.bind_names.values()
1727
1728 for bind_name in order:
1729 if bind_name in param_pos:
1730 continue
1731 bind = self.binds[bind_name]
1732 if (
1733 bind in self.post_compile_params
1734 or bind in self.literal_execute_params
1735 ):
1736 # set to None to just mark the in positiontup, it will not
1737 # be replaced below.
1738 param_pos[bind_name] = None # type: ignore
1739 else:
1740 ph = f"{self._numeric_binds_identifier_char}{num}"
1741 num += 1
1742 param_pos[bind_name] = ph
1743
1744 self.next_numeric_pos = num
1745
1746 self.positiontup = list(param_pos)
1747 if self.escaped_bind_names:
1748 len_before = len(param_pos)
1749 param_pos = {
1750 self.escaped_bind_names.get(name, name): pos
1751 for name, pos in param_pos.items()
1752 }
1753 assert len(param_pos) == len_before
1754
1755 # Can't use format here since % chars are not escaped.
1756 self.string = self._pyformat_pattern.sub(
1757 lambda m: param_pos[m.group(1)], self.string
1758 )
1759
1760 if self._insertmanyvalues:
1761 single_values_expr = (
1762 # format is ok here since single_values_expr includes only
1763 # place-holders
1764 self._insertmanyvalues.single_values_expr
1765 % param_pos
1766 )
1767 insert_crud_params = [
1768 (v[0], v[1], "%s", v[3])
1769 for v in self._insertmanyvalues.insert_crud_params
1770 ]
1771
1772 self._insertmanyvalues = self._insertmanyvalues._replace(
1773 # This has the numbers (:1, :2)
1774 single_values_expr=single_values_expr,
1775 # The single binds are instead %s so they can be formatted
1776 insert_crud_params=insert_crud_params,
1777 )
1778
1779 @util.memoized_property
1780 def _bind_processors(
1781 self,
1782 ) -> MutableMapping[
1783 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1784 ]:
1785 # mypy is not able to see the two value types as the above Union,
1786 # it just sees "object". don't know how to resolve
1787 return {
1788 key: value # type: ignore
1789 for key, value in (
1790 (
1791 self.bind_names[bindparam],
1792 (
1793 bindparam.type._cached_bind_processor(self.dialect)
1794 if not bindparam.type._is_tuple_type
1795 else tuple(
1796 elem_type._cached_bind_processor(self.dialect)
1797 for elem_type in cast(
1798 TupleType, bindparam.type
1799 ).types
1800 )
1801 ),
1802 )
1803 for bindparam in self.bind_names
1804 )
1805 if value is not None
1806 }
1807
1808 def is_subquery(self):
1809 return len(self.stack) > 1
1810
1811 @property
1812 def sql_compiler(self) -> Self:
1813 return self
1814
1815 def construct_expanded_state(
1816 self,
1817 params: Optional[_CoreSingleExecuteParams] = None,
1818 escape_names: bool = True,
1819 ) -> ExpandedState:
1820 """Return a new :class:`.ExpandedState` for a given parameter set.
1821
1822 For queries that use "expanding" or other late-rendered parameters,
1823 this method will provide for both the finalized SQL string as well
1824 as the parameters that would be used for a particular parameter set.
1825
1826 .. versionadded:: 2.0.0rc1
1827
1828 """
1829 parameters = self.construct_params(
1830 params,
1831 escape_names=escape_names,
1832 _no_postcompile=True,
1833 )
1834 return self._process_parameters_for_postcompile(
1835 parameters,
1836 )
1837
1838 def construct_params(
1839 self,
1840 params: Optional[_CoreSingleExecuteParams] = None,
1841 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1842 escape_names: bool = True,
1843 _group_number: Optional[int] = None,
1844 _check: bool = True,
1845 _no_postcompile: bool = False,
1846 ) -> _MutableCoreSingleExecuteParams:
1847 """return a dictionary of bind parameter keys and values"""
1848
1849 if self._render_postcompile and not _no_postcompile:
1850 assert self._post_compile_expanded_state is not None
1851 if not params:
1852 return dict(self._post_compile_expanded_state.parameters)
1853 else:
1854 raise exc.InvalidRequestError(
1855 "can't construct new parameters when render_postcompile "
1856 "is used; the statement is hard-linked to the original "
1857 "parameters. Use construct_expanded_state to generate a "
1858 "new statement and parameters."
1859 )
1860
1861 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1862
1863 if extracted_parameters:
1864 # related the bound parameters collected in the original cache key
1865 # to those collected in the incoming cache key. They will not have
1866 # matching names but they will line up positionally in the same
1867 # way. The parameters present in self.bind_names may be clones of
1868 # these original cache key params in the case of DML but the .key
1869 # will be guaranteed to match.
1870 if self.cache_key is None:
1871 raise exc.CompileError(
1872 "This compiled object has no original cache key; "
1873 "can't pass extracted_parameters to construct_params"
1874 )
1875 else:
1876 orig_extracted = self.cache_key[1]
1877
1878 ckbm_tuple = self._cache_key_bind_match
1879 assert ckbm_tuple is not None
1880 ckbm, _ = ckbm_tuple
1881 resolved_extracted = {
1882 bind: extracted
1883 for b, extracted in zip(orig_extracted, extracted_parameters)
1884 for bind in ckbm[b]
1885 }
1886 else:
1887 resolved_extracted = None
1888
1889 if params:
1890 pd = {}
1891 for bindparam, name in self.bind_names.items():
1892 escaped_name = (
1893 self.escaped_bind_names.get(name, name)
1894 if has_escaped_names
1895 else name
1896 )
1897
1898 if bindparam.key in params:
1899 pd[escaped_name] = params[bindparam.key]
1900 elif name in params:
1901 pd[escaped_name] = params[name]
1902
1903 elif _check and bindparam.required:
1904 if _group_number:
1905 raise exc.InvalidRequestError(
1906 "A value is required for bind parameter %r, "
1907 "in parameter group %d"
1908 % (bindparam.key, _group_number),
1909 code="cd3x",
1910 )
1911 else:
1912 raise exc.InvalidRequestError(
1913 "A value is required for bind parameter %r"
1914 % bindparam.key,
1915 code="cd3x",
1916 )
1917 else:
1918 if resolved_extracted:
1919 value_param = resolved_extracted.get(
1920 bindparam, bindparam
1921 )
1922 else:
1923 value_param = bindparam
1924
1925 if bindparam.callable:
1926 pd[escaped_name] = value_param.effective_value
1927 else:
1928 pd[escaped_name] = value_param.value
1929 return pd
1930 else:
1931 pd = {}
1932 for bindparam, name in self.bind_names.items():
1933 escaped_name = (
1934 self.escaped_bind_names.get(name, name)
1935 if has_escaped_names
1936 else name
1937 )
1938
1939 if _check and bindparam.required:
1940 if _group_number:
1941 raise exc.InvalidRequestError(
1942 "A value is required for bind parameter %r, "
1943 "in parameter group %d"
1944 % (bindparam.key, _group_number),
1945 code="cd3x",
1946 )
1947 else:
1948 raise exc.InvalidRequestError(
1949 "A value is required for bind parameter %r"
1950 % bindparam.key,
1951 code="cd3x",
1952 )
1953
1954 if resolved_extracted:
1955 value_param = resolved_extracted.get(bindparam, bindparam)
1956 else:
1957 value_param = bindparam
1958
1959 if bindparam.callable:
1960 pd[escaped_name] = value_param.effective_value
1961 else:
1962 pd[escaped_name] = value_param.value
1963
1964 return pd
1965
1966 @util.memoized_instancemethod
1967 def _get_set_input_sizes_lookup(self):
1968 dialect = self.dialect
1969
1970 include_types = dialect.include_set_input_sizes
1971 exclude_types = dialect.exclude_set_input_sizes
1972
1973 dbapi = dialect.dbapi
1974
1975 def lookup_type(typ):
1976 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
1977
1978 if (
1979 dbtype is not None
1980 and (exclude_types is None or dbtype not in exclude_types)
1981 and (include_types is None or dbtype in include_types)
1982 ):
1983 return dbtype
1984 else:
1985 return None
1986
1987 inputsizes = {}
1988
1989 literal_execute_params = self.literal_execute_params
1990
1991 for bindparam in self.bind_names:
1992 if bindparam in literal_execute_params:
1993 continue
1994
1995 if bindparam.type._is_tuple_type:
1996 inputsizes[bindparam] = [
1997 lookup_type(typ)
1998 for typ in cast(TupleType, bindparam.type).types
1999 ]
2000 else:
2001 inputsizes[bindparam] = lookup_type(bindparam.type)
2002
2003 return inputsizes
2004
2005 @property
2006 def params(self):
2007 """Return the bind param dictionary embedded into this
2008 compiled object, for those values that are present.
2009
2010 .. seealso::
2011
2012 :ref:`faq_sql_expression_string` - includes a usage example for
2013 debugging use cases.
2014
2015 """
2016 return self.construct_params(_check=False)
2017
2018 def _process_parameters_for_postcompile(
2019 self,
2020 parameters: _MutableCoreSingleExecuteParams,
2021 _populate_self: bool = False,
2022 ) -> ExpandedState:
2023 """handle special post compile parameters.
2024
2025 These include:
2026
2027 * "expanding" parameters -typically IN tuples that are rendered
2028 on a per-parameter basis for an otherwise fixed SQL statement string.
2029
2030 * literal_binds compiled with the literal_execute flag. Used for
2031 things like SQL Server "TOP N" where the driver does not accommodate
2032 N as a bound parameter.
2033
2034 """
2035
2036 expanded_parameters = {}
2037 new_positiontup: Optional[List[str]]
2038
2039 pre_expanded_string = self._pre_expanded_string
2040 if pre_expanded_string is None:
2041 pre_expanded_string = self.string
2042
2043 if self.positional:
2044 new_positiontup = []
2045
2046 pre_expanded_positiontup = self._pre_expanded_positiontup
2047 if pre_expanded_positiontup is None:
2048 pre_expanded_positiontup = self.positiontup
2049
2050 else:
2051 new_positiontup = pre_expanded_positiontup = None
2052
2053 processors = self._bind_processors
2054 single_processors = cast(
2055 "Mapping[str, _BindProcessorType[Any]]", processors
2056 )
2057 tuple_processors = cast(
2058 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2059 )
2060
2061 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2062
2063 replacement_expressions: Dict[str, Any] = {}
2064 to_update_sets: Dict[str, Any] = {}
2065
2066 # notes:
2067 # *unescaped* parameter names in:
2068 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2069 #
2070 # *escaped* parameter names in:
2071 # construct_params(), replacement_expressions
2072
2073 numeric_positiontup: Optional[List[str]] = None
2074
2075 if self.positional and pre_expanded_positiontup is not None:
2076 names: Iterable[str] = pre_expanded_positiontup
2077 if self._numeric_binds:
2078 numeric_positiontup = []
2079 else:
2080 names = self.bind_names.values()
2081
2082 ebn = self.escaped_bind_names
2083 for name in names:
2084 escaped_name = ebn.get(name, name) if ebn else name
2085 parameter = self.binds[name]
2086
2087 if parameter in self.literal_execute_params:
2088 if escaped_name not in replacement_expressions:
2089 replacement_expressions[escaped_name] = (
2090 self.render_literal_bindparam(
2091 parameter,
2092 render_literal_value=parameters.pop(escaped_name),
2093 )
2094 )
2095 continue
2096
2097 if parameter in self.post_compile_params:
2098 if escaped_name in replacement_expressions:
2099 to_update = to_update_sets[escaped_name]
2100 values = None
2101 else:
2102 # we are removing the parameter from parameters
2103 # because it is a list value, which is not expected by
2104 # TypeEngine objects that would otherwise be asked to
2105 # process it. the single name is being replaced with
2106 # individual numbered parameters for each value in the
2107 # param.
2108 #
2109 # note we are also inserting *escaped* parameter names
2110 # into the given dictionary. default dialect will
2111 # use these param names directly as they will not be
2112 # in the escaped_bind_names dictionary.
2113 values = parameters.pop(name)
2114
2115 leep_res = self._literal_execute_expanding_parameter(
2116 escaped_name, parameter, values
2117 )
2118 (to_update, replacement_expr) = leep_res
2119
2120 to_update_sets[escaped_name] = to_update
2121 replacement_expressions[escaped_name] = replacement_expr
2122
2123 if not parameter.literal_execute:
2124 parameters.update(to_update)
2125 if parameter.type._is_tuple_type:
2126 assert values is not None
2127 new_processors.update(
2128 (
2129 "%s_%s_%s" % (name, i, j),
2130 tuple_processors[name][j - 1],
2131 )
2132 for i, tuple_element in enumerate(values, 1)
2133 for j, _ in enumerate(tuple_element, 1)
2134 if name in tuple_processors
2135 and tuple_processors[name][j - 1] is not None
2136 )
2137 else:
2138 new_processors.update(
2139 (key, single_processors[name])
2140 for key, _ in to_update
2141 if name in single_processors
2142 )
2143 if numeric_positiontup is not None:
2144 numeric_positiontup.extend(
2145 name for name, _ in to_update
2146 )
2147 elif new_positiontup is not None:
2148 # to_update has escaped names, but that's ok since
2149 # these are new names, that aren't in the
2150 # escaped_bind_names dict.
2151 new_positiontup.extend(name for name, _ in to_update)
2152 expanded_parameters[name] = [
2153 expand_key for expand_key, _ in to_update
2154 ]
2155 elif new_positiontup is not None:
2156 new_positiontup.append(name)
2157
2158 def process_expanding(m):
2159 key = m.group(1)
2160 expr = replacement_expressions[key]
2161
2162 # if POSTCOMPILE included a bind_expression, render that
2163 # around each element
2164 if m.group(2):
2165 tok = m.group(2).split("~~")
2166 be_left, be_right = tok[1], tok[3]
2167 expr = ", ".join(
2168 "%s%s%s" % (be_left, exp, be_right)
2169 for exp in expr.split(", ")
2170 )
2171 return expr
2172
2173 statement = re.sub(
2174 self._post_compile_pattern, process_expanding, pre_expanded_string
2175 )
2176
2177 if numeric_positiontup is not None:
2178 assert new_positiontup is not None
2179 param_pos = {
2180 key: f"{self._numeric_binds_identifier_char}{num}"
2181 for num, key in enumerate(
2182 numeric_positiontup, self.next_numeric_pos
2183 )
2184 }
2185 # Can't use format here since % chars are not escaped.
2186 statement = self._pyformat_pattern.sub(
2187 lambda m: param_pos[m.group(1)], statement
2188 )
2189 new_positiontup.extend(numeric_positiontup)
2190
2191 expanded_state = ExpandedState(
2192 statement,
2193 parameters,
2194 new_processors,
2195 new_positiontup,
2196 expanded_parameters,
2197 )
2198
2199 if _populate_self:
2200 # this is for the "render_postcompile" flag, which is not
2201 # otherwise used internally and is for end-user debugging and
2202 # special use cases.
2203 self._pre_expanded_string = pre_expanded_string
2204 self._pre_expanded_positiontup = pre_expanded_positiontup
2205 self.string = expanded_state.statement
2206 self.positiontup = (
2207 list(expanded_state.positiontup or ())
2208 if self.positional
2209 else None
2210 )
2211 self._post_compile_expanded_state = expanded_state
2212
2213 return expanded_state
2214
2215 @util.preload_module("sqlalchemy.engine.cursor")
2216 def _create_result_map(self):
2217 """utility method used for unit tests only."""
2218 cursor = util.preloaded.engine_cursor
2219 return cursor.CursorResultMetaData._create_description_match_map(
2220 self._result_columns
2221 )
2222
2223 # assigned by crud.py for insert/update statements
2224 _get_bind_name_for_col: _BindNameForColProtocol
2225
2226 @util.memoized_property
2227 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2228 getter = self._get_bind_name_for_col
2229 return getter
2230
2231 @util.memoized_property
2232 @util.preload_module("sqlalchemy.engine.result")
2233 def _inserted_primary_key_from_lastrowid_getter(self):
2234 result = util.preloaded.engine_result
2235
2236 param_key_getter = self._within_exec_param_key_getter
2237
2238 assert self.compile_state is not None
2239 statement = self.compile_state.statement
2240
2241 if TYPE_CHECKING:
2242 assert isinstance(statement, Insert)
2243
2244 table = statement.table
2245
2246 getters = [
2247 (operator.methodcaller("get", param_key_getter(col), None), col)
2248 for col in table.primary_key
2249 ]
2250
2251 autoinc_getter = None
2252 autoinc_col = table._autoincrement_column
2253 if autoinc_col is not None:
2254 # apply type post processors to the lastrowid
2255 lastrowid_processor = autoinc_col.type._cached_result_processor(
2256 self.dialect, None
2257 )
2258 autoinc_key = param_key_getter(autoinc_col)
2259
2260 # if a bind value is present for the autoincrement column
2261 # in the parameters, we need to do the logic dictated by
2262 # #7998; honor a non-None user-passed parameter over lastrowid.
2263 # previously in the 1.4 series we weren't fetching lastrowid
2264 # at all if the key were present in the parameters
2265 if autoinc_key in self.binds:
2266
2267 def _autoinc_getter(lastrowid, parameters):
2268 param_value = parameters.get(autoinc_key, lastrowid)
2269 if param_value is not None:
2270 # they supplied non-None parameter, use that.
2271 # SQLite at least is observed to return the wrong
2272 # cursor.lastrowid for INSERT..ON CONFLICT so it
2273 # can't be used in all cases
2274 return param_value
2275 else:
2276 # use lastrowid
2277 return lastrowid
2278
2279 # work around mypy https://github.com/python/mypy/issues/14027
2280 autoinc_getter = _autoinc_getter
2281
2282 else:
2283 lastrowid_processor = None
2284
2285 row_fn = result.result_tuple([col.key for col in table.primary_key])
2286
2287 def get(lastrowid, parameters):
2288 """given cursor.lastrowid value and the parameters used for INSERT,
2289 return a "row" that represents the primary key, either by
2290 using the "lastrowid" or by extracting values from the parameters
2291 that were sent along with the INSERT.
2292
2293 """
2294 if lastrowid_processor is not None:
2295 lastrowid = lastrowid_processor(lastrowid)
2296
2297 if lastrowid is None:
2298 return row_fn(getter(parameters) for getter, col in getters)
2299 else:
2300 return row_fn(
2301 (
2302 (
2303 autoinc_getter(lastrowid, parameters)
2304 if autoinc_getter is not None
2305 else lastrowid
2306 )
2307 if col is autoinc_col
2308 else getter(parameters)
2309 )
2310 for getter, col in getters
2311 )
2312
2313 return get
2314
2315 @util.memoized_property
2316 @util.preload_module("sqlalchemy.engine.result")
2317 def _inserted_primary_key_from_returning_getter(self):
2318 result = util.preloaded.engine_result
2319
2320 assert self.compile_state is not None
2321 statement = self.compile_state.statement
2322
2323 if TYPE_CHECKING:
2324 assert isinstance(statement, Insert)
2325
2326 param_key_getter = self._within_exec_param_key_getter
2327 table = statement.table
2328
2329 returning = self.implicit_returning
2330 assert returning is not None
2331 ret = {col: idx for idx, col in enumerate(returning)}
2332
2333 getters = cast(
2334 "List[Tuple[Callable[[Any], Any], bool]]",
2335 [
2336 (
2337 (operator.itemgetter(ret[col]), True)
2338 if col in ret
2339 else (
2340 operator.methodcaller(
2341 "get", param_key_getter(col), None
2342 ),
2343 False,
2344 )
2345 )
2346 for col in table.primary_key
2347 ],
2348 )
2349
2350 row_fn = result.result_tuple([col.key for col in table.primary_key])
2351
2352 def get(row, parameters):
2353 return row_fn(
2354 getter(row) if use_row else getter(parameters)
2355 for getter, use_row in getters
2356 )
2357
2358 return get
2359
2360 def default_from(self) -> str:
2361 """Called when a SELECT statement has no froms, and no FROM clause is
2362 to be appended.
2363
2364 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2365 output.
2366
2367 """
2368 return ""
2369
2370 def visit_override_binds(self, override_binds, **kw):
2371 """SQL compile the nested element of an _OverrideBinds with
2372 bindparams swapped out.
2373
2374 The _OverrideBinds is not normally expected to be compiled; it
2375 is meant to be used when an already cached statement is to be used,
2376 the compilation was already performed, and only the bound params should
2377 be swapped in at execution time.
2378
2379 However, there are test cases that exericise this object, and
2380 additionally the ORM subquery loader is known to feed in expressions
2381 which include this construct into new queries (discovered in #11173),
2382 so it has to do the right thing at compile time as well.
2383
2384 """
2385
2386 # get SQL text first
2387 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2388
2389 # for a test compile that is not for caching, change binds after the
2390 # fact. note that we don't try to
2391 # swap the bindparam as we compile, because our element may be
2392 # elsewhere in the statement already (e.g. a subquery or perhaps a
2393 # CTE) and was already visited / compiled. See
2394 # test_relationship_criteria.py ->
2395 # test_selectinload_local_criteria_subquery
2396 for k in override_binds.translate:
2397 if k not in self.binds:
2398 continue
2399 bp = self.binds[k]
2400
2401 # so this would work, just change the value of bp in place.
2402 # but we dont want to mutate things outside.
2403 # bp.value = override_binds.translate[bp.key]
2404 # continue
2405
2406 # instead, need to replace bp with new_bp or otherwise accommodate
2407 # in all internal collections
2408 new_bp = bp._with_value(
2409 override_binds.translate[bp.key],
2410 maintain_key=True,
2411 required=False,
2412 )
2413
2414 name = self.bind_names[bp]
2415 self.binds[k] = self.binds[name] = new_bp
2416 self.bind_names[new_bp] = name
2417 self.bind_names.pop(bp, None)
2418
2419 if bp in self.post_compile_params:
2420 self.post_compile_params |= {new_bp}
2421 if bp in self.literal_execute_params:
2422 self.literal_execute_params |= {new_bp}
2423
2424 ckbm_tuple = self._cache_key_bind_match
2425 if ckbm_tuple:
2426 ckbm, cksm = ckbm_tuple
2427 for bp in bp._cloned_set:
2428 if bp.key in cksm:
2429 cb = cksm[bp.key]
2430 ckbm[cb].append(new_bp)
2431
2432 return sqltext
2433
2434 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2435 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2436
2437 def visit_select_statement_grouping(self, grouping, **kwargs):
2438 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2439
2440 def visit_label_reference(
2441 self, element, within_columns_clause=False, **kwargs
2442 ):
2443 if self.stack and self.dialect.supports_simple_order_by_label:
2444 try:
2445 compile_state = cast(
2446 "Union[SelectState, CompoundSelectState]",
2447 self.stack[-1]["compile_state"],
2448 )
2449 except KeyError as ke:
2450 raise exc.CompileError(
2451 "Can't resolve label reference for ORDER BY / "
2452 "GROUP BY / DISTINCT etc."
2453 ) from ke
2454
2455 (
2456 with_cols,
2457 only_froms,
2458 only_cols,
2459 ) = compile_state._label_resolve_dict
2460 if within_columns_clause:
2461 resolve_dict = only_froms
2462 else:
2463 resolve_dict = only_cols
2464
2465 # this can be None in the case that a _label_reference()
2466 # were subject to a replacement operation, in which case
2467 # the replacement of the Label element may have changed
2468 # to something else like a ColumnClause expression.
2469 order_by_elem = element.element._order_by_label_element
2470
2471 if (
2472 order_by_elem is not None
2473 and order_by_elem.name in resolve_dict
2474 and order_by_elem.shares_lineage(
2475 resolve_dict[order_by_elem.name]
2476 )
2477 ):
2478 kwargs["render_label_as_label"] = (
2479 element.element._order_by_label_element
2480 )
2481 return self.process(
2482 element.element,
2483 within_columns_clause=within_columns_clause,
2484 **kwargs,
2485 )
2486
2487 def visit_textual_label_reference(
2488 self, element, within_columns_clause=False, **kwargs
2489 ):
2490 if not self.stack:
2491 # compiling the element outside of the context of a SELECT
2492 return self.process(element._text_clause)
2493
2494 try:
2495 compile_state = cast(
2496 "Union[SelectState, CompoundSelectState]",
2497 self.stack[-1]["compile_state"],
2498 )
2499 except KeyError as ke:
2500 coercions._no_text_coercion(
2501 element.element,
2502 extra=(
2503 "Can't resolve label reference for ORDER BY / "
2504 "GROUP BY / DISTINCT etc."
2505 ),
2506 exc_cls=exc.CompileError,
2507 err=ke,
2508 )
2509
2510 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2511 try:
2512 if within_columns_clause:
2513 col = only_froms[element.element]
2514 else:
2515 col = with_cols[element.element]
2516 except KeyError as err:
2517 coercions._no_text_coercion(
2518 element.element,
2519 extra=(
2520 "Can't resolve label reference for ORDER BY / "
2521 "GROUP BY / DISTINCT etc."
2522 ),
2523 exc_cls=exc.CompileError,
2524 err=err,
2525 )
2526 else:
2527 kwargs["render_label_as_label"] = col
2528 return self.process(
2529 col, within_columns_clause=within_columns_clause, **kwargs
2530 )
2531
2532 def visit_label(
2533 self,
2534 label,
2535 add_to_result_map=None,
2536 within_label_clause=False,
2537 within_columns_clause=False,
2538 render_label_as_label=None,
2539 result_map_targets=(),
2540 **kw,
2541 ):
2542 # only render labels within the columns clause
2543 # or ORDER BY clause of a select. dialect-specific compilers
2544 # can modify this behavior.
2545 render_label_with_as = (
2546 within_columns_clause and not within_label_clause
2547 )
2548 render_label_only = render_label_as_label is label
2549
2550 if render_label_only or render_label_with_as:
2551 if isinstance(label.name, elements._truncated_label):
2552 labelname = self._truncated_identifier("colident", label.name)
2553 else:
2554 labelname = label.name
2555
2556 if render_label_with_as:
2557 if add_to_result_map is not None:
2558 add_to_result_map(
2559 labelname,
2560 label.name,
2561 (label, labelname) + label._alt_names + result_map_targets,
2562 label.type,
2563 )
2564 return (
2565 label.element._compiler_dispatch(
2566 self,
2567 within_columns_clause=True,
2568 within_label_clause=True,
2569 **kw,
2570 )
2571 + OPERATORS[operators.as_]
2572 + self.preparer.format_label(label, labelname)
2573 )
2574 elif render_label_only:
2575 return self.preparer.format_label(label, labelname)
2576 else:
2577 return label.element._compiler_dispatch(
2578 self, within_columns_clause=False, **kw
2579 )
2580
2581 def _fallback_column_name(self, column):
2582 raise exc.CompileError(
2583 "Cannot compile Column object until its 'name' is assigned."
2584 )
2585
2586 def visit_lambda_element(self, element, **kw):
2587 sql_element = element._resolved
2588 return self.process(sql_element, **kw)
2589
2590 def visit_column(
2591 self,
2592 column: ColumnClause[Any],
2593 add_to_result_map: Optional[_ResultMapAppender] = None,
2594 include_table: bool = True,
2595 result_map_targets: Tuple[Any, ...] = (),
2596 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2597 **kwargs: Any,
2598 ) -> str:
2599 name = orig_name = column.name
2600 if name is None:
2601 name = self._fallback_column_name(column)
2602
2603 is_literal = column.is_literal
2604 if not is_literal and isinstance(name, elements._truncated_label):
2605 name = self._truncated_identifier("colident", name)
2606
2607 if add_to_result_map is not None:
2608 targets = (column, name, column.key) + result_map_targets
2609 if column._tq_label:
2610 targets += (column._tq_label,)
2611
2612 add_to_result_map(name, orig_name, targets, column.type)
2613
2614 if is_literal:
2615 # note we are not currently accommodating for
2616 # literal_column(quoted_name('ident', True)) here
2617 name = self.escape_literal_column(name)
2618 else:
2619 name = self.preparer.quote(name)
2620 table = column.table
2621 if table is None or not include_table or not table.named_with_column:
2622 return name
2623 else:
2624 effective_schema = self.preparer.schema_for_object(table)
2625
2626 if effective_schema:
2627 schema_prefix = (
2628 self.preparer.quote_schema(effective_schema) + "."
2629 )
2630 else:
2631 schema_prefix = ""
2632
2633 if TYPE_CHECKING:
2634 assert isinstance(table, NamedFromClause)
2635 tablename = table.name
2636
2637 if (
2638 not effective_schema
2639 and ambiguous_table_name_map
2640 and tablename in ambiguous_table_name_map
2641 ):
2642 tablename = ambiguous_table_name_map[tablename]
2643
2644 if isinstance(tablename, elements._truncated_label):
2645 tablename = self._truncated_identifier("alias", tablename)
2646
2647 return schema_prefix + self.preparer.quote(tablename) + "." + name
2648
2649 def visit_collation(self, element, **kw):
2650 return self.preparer.format_collation(element.collation)
2651
2652 def visit_fromclause(self, fromclause, **kwargs):
2653 return fromclause.name
2654
2655 def visit_index(self, index, **kwargs):
2656 return index.name
2657
2658 def visit_typeclause(self, typeclause, **kw):
2659 kw["type_expression"] = typeclause
2660 kw["identifier_preparer"] = self.preparer
2661 return self.dialect.type_compiler_instance.process(
2662 typeclause.type, **kw
2663 )
2664
2665 def post_process_text(self, text):
2666 if self.preparer._double_percents:
2667 text = text.replace("%", "%%")
2668 return text
2669
2670 def escape_literal_column(self, text):
2671 if self.preparer._double_percents:
2672 text = text.replace("%", "%%")
2673 return text
2674
2675 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2676 def do_bindparam(m):
2677 name = m.group(1)
2678 if name in textclause._bindparams:
2679 return self.process(textclause._bindparams[name], **kw)
2680 else:
2681 return self.bindparam_string(name, **kw)
2682
2683 if not self.stack:
2684 self.isplaintext = True
2685
2686 if add_to_result_map:
2687 # text() object is present in the columns clause of a
2688 # select(). Add a no-name entry to the result map so that
2689 # row[text()] produces a result
2690 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2691
2692 # un-escape any \:params
2693 return BIND_PARAMS_ESC.sub(
2694 lambda m: m.group(1),
2695 BIND_PARAMS.sub(
2696 do_bindparam, self.post_process_text(textclause.text)
2697 ),
2698 )
2699
2700 def visit_textual_select(
2701 self, taf, compound_index=None, asfrom=False, **kw
2702 ):
2703 toplevel = not self.stack
2704 entry = self._default_stack_entry if toplevel else self.stack[-1]
2705
2706 new_entry: _CompilerStackEntry = {
2707 "correlate_froms": set(),
2708 "asfrom_froms": set(),
2709 "selectable": taf,
2710 }
2711 self.stack.append(new_entry)
2712
2713 if taf._independent_ctes:
2714 self._dispatch_independent_ctes(taf, kw)
2715
2716 populate_result_map = (
2717 toplevel
2718 or (
2719 compound_index == 0
2720 and entry.get("need_result_map_for_compound", False)
2721 )
2722 or entry.get("need_result_map_for_nested", False)
2723 )
2724
2725 if populate_result_map:
2726 self._ordered_columns = self._textual_ordered_columns = (
2727 taf.positional
2728 )
2729
2730 # enable looser result column matching when the SQL text links to
2731 # Column objects by name only
2732 self._loose_column_name_matching = not taf.positional and bool(
2733 taf.column_args
2734 )
2735
2736 for c in taf.column_args:
2737 self.process(
2738 c,
2739 within_columns_clause=True,
2740 add_to_result_map=self._add_to_result_map,
2741 )
2742
2743 text = self.process(taf.element, **kw)
2744 if self.ctes:
2745 nesting_level = len(self.stack) if not toplevel else None
2746 text = self._render_cte_clause(nesting_level=nesting_level) + text
2747
2748 self.stack.pop(-1)
2749
2750 return text
2751
2752 def visit_null(self, expr: Null, **kw: Any) -> str:
2753 return "NULL"
2754
2755 def visit_true(self, expr: True_, **kw: Any) -> str:
2756 if self.dialect.supports_native_boolean:
2757 return "true"
2758 else:
2759 return "1"
2760
2761 def visit_false(self, expr: False_, **kw: Any) -> str:
2762 if self.dialect.supports_native_boolean:
2763 return "false"
2764 else:
2765 return "0"
2766
2767 def _generate_delimited_list(self, elements, separator, **kw):
2768 return separator.join(
2769 s
2770 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2771 if s
2772 )
2773
2774 def _generate_delimited_and_list(self, clauses, **kw):
2775 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2776 operators.and_,
2777 elements.True_._singleton,
2778 elements.False_._singleton,
2779 clauses,
2780 )
2781 if lcc == 1:
2782 return clauses[0]._compiler_dispatch(self, **kw)
2783 else:
2784 separator = OPERATORS[operators.and_]
2785 return separator.join(
2786 s
2787 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2788 if s
2789 )
2790
2791 def visit_tuple(self, clauselist, **kw):
2792 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2793
2794 def visit_clauselist(self, clauselist, **kw):
2795 sep = clauselist.operator
2796 if sep is None:
2797 sep = " "
2798 else:
2799 sep = OPERATORS[clauselist.operator]
2800
2801 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2802
2803 def visit_expression_clauselist(self, clauselist, **kw):
2804 operator_ = clauselist.operator
2805
2806 disp = self._get_operator_dispatch(
2807 operator_, "expression_clauselist", None
2808 )
2809 if disp:
2810 return disp(clauselist, operator_, **kw)
2811
2812 try:
2813 opstring = OPERATORS[operator_]
2814 except KeyError as err:
2815 raise exc.UnsupportedCompilationError(self, operator_) from err
2816 else:
2817 kw["_in_operator_expression"] = True
2818 return self._generate_delimited_list(
2819 clauselist.clauses, opstring, **kw
2820 )
2821
2822 def visit_case(self, clause, **kwargs):
2823 x = "CASE "
2824 if clause.value is not None:
2825 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2826 for cond, result in clause.whens:
2827 x += (
2828 "WHEN "
2829 + cond._compiler_dispatch(self, **kwargs)
2830 + " THEN "
2831 + result._compiler_dispatch(self, **kwargs)
2832 + " "
2833 )
2834 if clause.else_ is not None:
2835 x += (
2836 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2837 )
2838 x += "END"
2839 return x
2840
2841 def visit_type_coerce(self, type_coerce, **kw):
2842 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2843
2844 def visit_cast(self, cast, **kwargs):
2845 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2846 match = re.match("(.*)( COLLATE .*)", type_clause)
2847 return "CAST(%s AS %s)%s" % (
2848 cast.clause._compiler_dispatch(self, **kwargs),
2849 match.group(1) if match else type_clause,
2850 match.group(2) if match else "",
2851 )
2852
2853 def _format_frame_clause(self, range_, **kw):
2854 return "%s AND %s" % (
2855 (
2856 "UNBOUNDED PRECEDING"
2857 if range_[0] is elements.RANGE_UNBOUNDED
2858 else (
2859 "CURRENT ROW"
2860 if range_[0] is elements.RANGE_CURRENT
2861 else (
2862 "%s PRECEDING"
2863 % (
2864 self.process(
2865 elements.literal(abs(range_[0])), **kw
2866 ),
2867 )
2868 if range_[0] < 0
2869 else "%s FOLLOWING"
2870 % (self.process(elements.literal(range_[0]), **kw),)
2871 )
2872 )
2873 ),
2874 (
2875 "UNBOUNDED FOLLOWING"
2876 if range_[1] is elements.RANGE_UNBOUNDED
2877 else (
2878 "CURRENT ROW"
2879 if range_[1] is elements.RANGE_CURRENT
2880 else (
2881 "%s PRECEDING"
2882 % (
2883 self.process(
2884 elements.literal(abs(range_[1])), **kw
2885 ),
2886 )
2887 if range_[1] < 0
2888 else "%s FOLLOWING"
2889 % (self.process(elements.literal(range_[1]), **kw),)
2890 )
2891 )
2892 ),
2893 )
2894
2895 def visit_over(self, over, **kwargs):
2896 text = over.element._compiler_dispatch(self, **kwargs)
2897 if over.range_ is not None:
2898 range_ = "RANGE BETWEEN %s" % self._format_frame_clause(
2899 over.range_, **kwargs
2900 )
2901 elif over.rows is not None:
2902 range_ = "ROWS BETWEEN %s" % self._format_frame_clause(
2903 over.rows, **kwargs
2904 )
2905 elif over.groups is not None:
2906 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause(
2907 over.groups, **kwargs
2908 )
2909 else:
2910 range_ = None
2911
2912 return "%s OVER (%s)" % (
2913 text,
2914 " ".join(
2915 [
2916 "%s BY %s"
2917 % (word, clause._compiler_dispatch(self, **kwargs))
2918 for word, clause in (
2919 ("PARTITION", over.partition_by),
2920 ("ORDER", over.order_by),
2921 )
2922 if clause is not None and len(clause)
2923 ]
2924 + ([range_] if range_ else [])
2925 ),
2926 )
2927
2928 def visit_withingroup(self, withingroup, **kwargs):
2929 return "%s WITHIN GROUP (ORDER BY %s)" % (
2930 withingroup.element._compiler_dispatch(self, **kwargs),
2931 withingroup.order_by._compiler_dispatch(self, **kwargs),
2932 )
2933
2934 def visit_funcfilter(self, funcfilter, **kwargs):
2935 return "%s FILTER (WHERE %s)" % (
2936 funcfilter.func._compiler_dispatch(self, **kwargs),
2937 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2938 )
2939
2940 def visit_extract(self, extract, **kwargs):
2941 field = self.extract_map.get(extract.field, extract.field)
2942 return "EXTRACT(%s FROM %s)" % (
2943 field,
2944 extract.expr._compiler_dispatch(self, **kwargs),
2945 )
2946
2947 def visit_scalar_function_column(self, element, **kw):
2948 compiled_fn = self.visit_function(element.fn, **kw)
2949 compiled_col = self.visit_column(element, **kw)
2950 return "(%s).%s" % (compiled_fn, compiled_col)
2951
2952 def visit_function(
2953 self,
2954 func: Function[Any],
2955 add_to_result_map: Optional[_ResultMapAppender] = None,
2956 **kwargs: Any,
2957 ) -> str:
2958 if add_to_result_map is not None:
2959 add_to_result_map(func.name, func.name, (func.name,), func.type)
2960
2961 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
2962
2963 text: str
2964
2965 if disp:
2966 text = disp(func, **kwargs)
2967 else:
2968 name = FUNCTIONS.get(func._deannotate().__class__, None)
2969 if name:
2970 if func._has_args:
2971 name += "%(expr)s"
2972 else:
2973 name = func.name
2974 name = (
2975 self.preparer.quote(name)
2976 if self.preparer._requires_quotes_illegal_chars(name)
2977 or isinstance(name, elements.quoted_name)
2978 else name
2979 )
2980 name = name + "%(expr)s"
2981 text = ".".join(
2982 [
2983 (
2984 self.preparer.quote(tok)
2985 if self.preparer._requires_quotes_illegal_chars(tok)
2986 or isinstance(name, elements.quoted_name)
2987 else tok
2988 )
2989 for tok in func.packagenames
2990 ]
2991 + [name]
2992 ) % {"expr": self.function_argspec(func, **kwargs)}
2993
2994 if func._with_ordinality:
2995 text += " WITH ORDINALITY"
2996 return text
2997
2998 def visit_next_value_func(self, next_value, **kw):
2999 return self.visit_sequence(next_value.sequence)
3000
3001 def visit_sequence(self, sequence, **kw):
3002 raise NotImplementedError(
3003 "Dialect '%s' does not support sequence increments."
3004 % self.dialect.name
3005 )
3006
3007 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3008 return func.clause_expr._compiler_dispatch(self, **kwargs)
3009
3010 def visit_compound_select(
3011 self, cs, asfrom=False, compound_index=None, **kwargs
3012 ):
3013 toplevel = not self.stack
3014
3015 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3016
3017 if toplevel and not self.compile_state:
3018 self.compile_state = compile_state
3019
3020 compound_stmt = compile_state.statement
3021
3022 entry = self._default_stack_entry if toplevel else self.stack[-1]
3023 need_result_map = toplevel or (
3024 not compound_index
3025 and entry.get("need_result_map_for_compound", False)
3026 )
3027
3028 # indicates there is already a CompoundSelect in play
3029 if compound_index == 0:
3030 entry["select_0"] = cs
3031
3032 self.stack.append(
3033 {
3034 "correlate_froms": entry["correlate_froms"],
3035 "asfrom_froms": entry["asfrom_froms"],
3036 "selectable": cs,
3037 "compile_state": compile_state,
3038 "need_result_map_for_compound": need_result_map,
3039 }
3040 )
3041
3042 if compound_stmt._independent_ctes:
3043 self._dispatch_independent_ctes(compound_stmt, kwargs)
3044
3045 keyword = self.compound_keywords[cs.keyword]
3046
3047 text = (" " + keyword + " ").join(
3048 (
3049 c._compiler_dispatch(
3050 self, asfrom=asfrom, compound_index=i, **kwargs
3051 )
3052 for i, c in enumerate(cs.selects)
3053 )
3054 )
3055
3056 kwargs["include_table"] = False
3057 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3058 text += self.order_by_clause(cs, **kwargs)
3059 if cs._has_row_limiting_clause:
3060 text += self._row_limit_clause(cs, **kwargs)
3061
3062 if self.ctes:
3063 nesting_level = len(self.stack) if not toplevel else None
3064 text = (
3065 self._render_cte_clause(
3066 nesting_level=nesting_level,
3067 include_following_stack=True,
3068 )
3069 + text
3070 )
3071
3072 self.stack.pop(-1)
3073 return text
3074
3075 def _row_limit_clause(self, cs, **kwargs):
3076 if cs._fetch_clause is not None:
3077 return self.fetch_clause(cs, **kwargs)
3078 else:
3079 return self.limit_clause(cs, **kwargs)
3080
3081 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3082 attrname = "visit_%s_%s%s" % (
3083 operator_.__name__,
3084 qualifier1,
3085 "_" + qualifier2 if qualifier2 else "",
3086 )
3087 return getattr(self, attrname, None)
3088
3089 def visit_unary(
3090 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3091 ):
3092 if add_to_result_map is not None:
3093 result_map_targets += (unary,)
3094 kw["add_to_result_map"] = add_to_result_map
3095 kw["result_map_targets"] = result_map_targets
3096
3097 if unary.operator:
3098 if unary.modifier:
3099 raise exc.CompileError(
3100 "Unary expression does not support operator "
3101 "and modifier simultaneously"
3102 )
3103 disp = self._get_operator_dispatch(
3104 unary.operator, "unary", "operator"
3105 )
3106 if disp:
3107 return disp(unary, unary.operator, **kw)
3108 else:
3109 return self._generate_generic_unary_operator(
3110 unary, OPERATORS[unary.operator], **kw
3111 )
3112 elif unary.modifier:
3113 disp = self._get_operator_dispatch(
3114 unary.modifier, "unary", "modifier"
3115 )
3116 if disp:
3117 return disp(unary, unary.modifier, **kw)
3118 else:
3119 return self._generate_generic_unary_modifier(
3120 unary, OPERATORS[unary.modifier], **kw
3121 )
3122 else:
3123 raise exc.CompileError(
3124 "Unary expression has no operator or modifier"
3125 )
3126
3127 def visit_truediv_binary(self, binary, operator, **kw):
3128 if self.dialect.div_is_floordiv:
3129 return (
3130 self.process(binary.left, **kw)
3131 + " / "
3132 # TODO: would need a fast cast again here,
3133 # unless we want to use an implicit cast like "+ 0.0"
3134 + self.process(
3135 elements.Cast(
3136 binary.right,
3137 (
3138 binary.right.type
3139 if binary.right.type._type_affinity
3140 is sqltypes.Numeric
3141 else sqltypes.Numeric()
3142 ),
3143 ),
3144 **kw,
3145 )
3146 )
3147 else:
3148 return (
3149 self.process(binary.left, **kw)
3150 + " / "
3151 + self.process(binary.right, **kw)
3152 )
3153
3154 def visit_floordiv_binary(self, binary, operator, **kw):
3155 if (
3156 self.dialect.div_is_floordiv
3157 and binary.right.type._type_affinity is sqltypes.Integer
3158 ):
3159 return (
3160 self.process(binary.left, **kw)
3161 + " / "
3162 + self.process(binary.right, **kw)
3163 )
3164 else:
3165 return "FLOOR(%s)" % (
3166 self.process(binary.left, **kw)
3167 + " / "
3168 + self.process(binary.right, **kw)
3169 )
3170
3171 def visit_is_true_unary_operator(self, element, operator, **kw):
3172 if (
3173 element._is_implicitly_boolean
3174 or self.dialect.supports_native_boolean
3175 ):
3176 return self.process(element.element, **kw)
3177 else:
3178 return "%s = 1" % self.process(element.element, **kw)
3179
3180 def visit_is_false_unary_operator(self, element, operator, **kw):
3181 if (
3182 element._is_implicitly_boolean
3183 or self.dialect.supports_native_boolean
3184 ):
3185 return "NOT %s" % self.process(element.element, **kw)
3186 else:
3187 return "%s = 0" % self.process(element.element, **kw)
3188
3189 def visit_not_match_op_binary(self, binary, operator, **kw):
3190 return "NOT %s" % self.visit_binary(
3191 binary, override_operator=operators.match_op
3192 )
3193
3194 def visit_not_in_op_binary(self, binary, operator, **kw):
3195 # The brackets are required in the NOT IN operation because the empty
3196 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3197 # The presence of the OR makes the brackets required.
3198 return "(%s)" % self._generate_generic_binary(
3199 binary, OPERATORS[operator], **kw
3200 )
3201
3202 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3203 if expand_op is operators.not_in_op:
3204 if len(type_) > 1:
3205 return "(%s)) OR (1 = 1" % (
3206 ", ".join("NULL" for element in type_)
3207 )
3208 else:
3209 return "NULL) OR (1 = 1"
3210 elif expand_op is operators.in_op:
3211 if len(type_) > 1:
3212 return "(%s)) AND (1 != 1" % (
3213 ", ".join("NULL" for element in type_)
3214 )
3215 else:
3216 return "NULL) AND (1 != 1"
3217 else:
3218 return self.visit_empty_set_expr(type_)
3219
3220 def visit_empty_set_expr(self, element_types, **kw):
3221 raise NotImplementedError(
3222 "Dialect '%s' does not support empty set expression."
3223 % self.dialect.name
3224 )
3225
3226 def _literal_execute_expanding_parameter_literal_binds(
3227 self, parameter, values, bind_expression_template=None
3228 ):
3229 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3230
3231 if not values:
3232 # empty IN expression. note we don't need to use
3233 # bind_expression_template here because there are no
3234 # expressions to render.
3235
3236 if typ_dialect_impl._is_tuple_type:
3237 replacement_expression = (
3238 "VALUES " if self.dialect.tuple_in_values else ""
3239 ) + self.visit_empty_set_op_expr(
3240 parameter.type.types, parameter.expand_op
3241 )
3242
3243 else:
3244 replacement_expression = self.visit_empty_set_op_expr(
3245 [parameter.type], parameter.expand_op
3246 )
3247
3248 elif typ_dialect_impl._is_tuple_type or (
3249 typ_dialect_impl._isnull
3250 and isinstance(values[0], collections_abc.Sequence)
3251 and not isinstance(values[0], (str, bytes))
3252 ):
3253 if typ_dialect_impl._has_bind_expression:
3254 raise NotImplementedError(
3255 "bind_expression() on TupleType not supported with "
3256 "literal_binds"
3257 )
3258
3259 replacement_expression = (
3260 "VALUES " if self.dialect.tuple_in_values else ""
3261 ) + ", ".join(
3262 "(%s)"
3263 % (
3264 ", ".join(
3265 self.render_literal_value(value, param_type)
3266 for value, param_type in zip(
3267 tuple_element, parameter.type.types
3268 )
3269 )
3270 )
3271 for i, tuple_element in enumerate(values)
3272 )
3273 else:
3274 if bind_expression_template:
3275 post_compile_pattern = self._post_compile_pattern
3276 m = post_compile_pattern.search(bind_expression_template)
3277 assert m and m.group(
3278 2
3279 ), "unexpected format for expanding parameter"
3280
3281 tok = m.group(2).split("~~")
3282 be_left, be_right = tok[1], tok[3]
3283 replacement_expression = ", ".join(
3284 "%s%s%s"
3285 % (
3286 be_left,
3287 self.render_literal_value(value, parameter.type),
3288 be_right,
3289 )
3290 for value in values
3291 )
3292 else:
3293 replacement_expression = ", ".join(
3294 self.render_literal_value(value, parameter.type)
3295 for value in values
3296 )
3297
3298 return (), replacement_expression
3299
3300 def _literal_execute_expanding_parameter(self, name, parameter, values):
3301 if parameter.literal_execute:
3302 return self._literal_execute_expanding_parameter_literal_binds(
3303 parameter, values
3304 )
3305
3306 dialect = self.dialect
3307 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3308
3309 if self._numeric_binds:
3310 bind_template = self.compilation_bindtemplate
3311 else:
3312 bind_template = self.bindtemplate
3313
3314 if (
3315 self.dialect._bind_typing_render_casts
3316 and typ_dialect_impl.render_bind_cast
3317 ):
3318
3319 def _render_bindtemplate(name):
3320 return self.render_bind_cast(
3321 parameter.type,
3322 typ_dialect_impl,
3323 bind_template % {"name": name},
3324 )
3325
3326 else:
3327
3328 def _render_bindtemplate(name):
3329 return bind_template % {"name": name}
3330
3331 if not values:
3332 to_update = []
3333 if typ_dialect_impl._is_tuple_type:
3334 replacement_expression = self.visit_empty_set_op_expr(
3335 parameter.type.types, parameter.expand_op
3336 )
3337 else:
3338 replacement_expression = self.visit_empty_set_op_expr(
3339 [parameter.type], parameter.expand_op
3340 )
3341
3342 elif typ_dialect_impl._is_tuple_type or (
3343 typ_dialect_impl._isnull
3344 and isinstance(values[0], collections_abc.Sequence)
3345 and not isinstance(values[0], (str, bytes))
3346 ):
3347 assert not typ_dialect_impl._is_array
3348 to_update = [
3349 ("%s_%s_%s" % (name, i, j), value)
3350 for i, tuple_element in enumerate(values, 1)
3351 for j, value in enumerate(tuple_element, 1)
3352 ]
3353
3354 replacement_expression = (
3355 "VALUES " if dialect.tuple_in_values else ""
3356 ) + ", ".join(
3357 "(%s)"
3358 % (
3359 ", ".join(
3360 _render_bindtemplate(
3361 to_update[i * len(tuple_element) + j][0]
3362 )
3363 for j, value in enumerate(tuple_element)
3364 )
3365 )
3366 for i, tuple_element in enumerate(values)
3367 )
3368 else:
3369 to_update = [
3370 ("%s_%s" % (name, i), value)
3371 for i, value in enumerate(values, 1)
3372 ]
3373 replacement_expression = ", ".join(
3374 _render_bindtemplate(key) for key, value in to_update
3375 )
3376
3377 return to_update, replacement_expression
3378
3379 def visit_binary(
3380 self,
3381 binary,
3382 override_operator=None,
3383 eager_grouping=False,
3384 from_linter=None,
3385 lateral_from_linter=None,
3386 **kw,
3387 ):
3388 if from_linter and operators.is_comparison(binary.operator):
3389 if lateral_from_linter is not None:
3390 enclosing_lateral = kw["enclosing_lateral"]
3391 lateral_from_linter.edges.update(
3392 itertools.product(
3393 _de_clone(
3394 binary.left._from_objects + [enclosing_lateral]
3395 ),
3396 _de_clone(
3397 binary.right._from_objects + [enclosing_lateral]
3398 ),
3399 )
3400 )
3401 else:
3402 from_linter.edges.update(
3403 itertools.product(
3404 _de_clone(binary.left._from_objects),
3405 _de_clone(binary.right._from_objects),
3406 )
3407 )
3408
3409 # don't allow "? = ?" to render
3410 if (
3411 self.ansi_bind_rules
3412 and isinstance(binary.left, elements.BindParameter)
3413 and isinstance(binary.right, elements.BindParameter)
3414 ):
3415 kw["literal_execute"] = True
3416
3417 operator_ = override_operator or binary.operator
3418 disp = self._get_operator_dispatch(operator_, "binary", None)
3419 if disp:
3420 return disp(binary, operator_, **kw)
3421 else:
3422 try:
3423 opstring = OPERATORS[operator_]
3424 except KeyError as err:
3425 raise exc.UnsupportedCompilationError(self, operator_) from err
3426 else:
3427 return self._generate_generic_binary(
3428 binary,
3429 opstring,
3430 from_linter=from_linter,
3431 lateral_from_linter=lateral_from_linter,
3432 **kw,
3433 )
3434
3435 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3436 return self.process(element.sql_function, **kw)
3437
3438 def visit_mod_binary(self, binary, operator, **kw):
3439 if self.preparer._double_percents:
3440 return (
3441 self.process(binary.left, **kw)
3442 + " %% "
3443 + self.process(binary.right, **kw)
3444 )
3445 else:
3446 return (
3447 self.process(binary.left, **kw)
3448 + " % "
3449 + self.process(binary.right, **kw)
3450 )
3451
3452 def visit_custom_op_binary(self, element, operator, **kw):
3453 kw["eager_grouping"] = operator.eager_grouping
3454 return self._generate_generic_binary(
3455 element,
3456 " " + self.escape_literal_column(operator.opstring) + " ",
3457 **kw,
3458 )
3459
3460 def visit_custom_op_unary_operator(self, element, operator, **kw):
3461 return self._generate_generic_unary_operator(
3462 element, self.escape_literal_column(operator.opstring) + " ", **kw
3463 )
3464
3465 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3466 return self._generate_generic_unary_modifier(
3467 element, " " + self.escape_literal_column(operator.opstring), **kw
3468 )
3469
3470 def _generate_generic_binary(
3471 self,
3472 binary: BinaryExpression[Any],
3473 opstring: str,
3474 eager_grouping: bool = False,
3475 **kw: Any,
3476 ) -> str:
3477 _in_operator_expression = kw.get("_in_operator_expression", False)
3478
3479 kw["_in_operator_expression"] = True
3480 kw["_binary_op"] = binary.operator
3481 text = (
3482 binary.left._compiler_dispatch(
3483 self, eager_grouping=eager_grouping, **kw
3484 )
3485 + opstring
3486 + binary.right._compiler_dispatch(
3487 self, eager_grouping=eager_grouping, **kw
3488 )
3489 )
3490
3491 if _in_operator_expression and eager_grouping:
3492 text = "(%s)" % text
3493 return text
3494
3495 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3496 return opstring + unary.element._compiler_dispatch(self, **kw)
3497
3498 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3499 return unary.element._compiler_dispatch(self, **kw) + opstring
3500
3501 @util.memoized_property
3502 def _like_percent_literal(self):
3503 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3504
3505 def visit_ilike_case_insensitive_operand(self, element, **kw):
3506 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3507
3508 def visit_contains_op_binary(self, binary, operator, **kw):
3509 binary = binary._clone()
3510 percent = self._like_percent_literal
3511 binary.right = percent.concat(binary.right).concat(percent)
3512 return self.visit_like_op_binary(binary, operator, **kw)
3513
3514 def visit_not_contains_op_binary(self, binary, operator, **kw):
3515 binary = binary._clone()
3516 percent = self._like_percent_literal
3517 binary.right = percent.concat(binary.right).concat(percent)
3518 return self.visit_not_like_op_binary(binary, operator, **kw)
3519
3520 def visit_icontains_op_binary(self, binary, operator, **kw):
3521 binary = binary._clone()
3522 percent = self._like_percent_literal
3523 binary.left = ilike_case_insensitive(binary.left)
3524 binary.right = percent.concat(
3525 ilike_case_insensitive(binary.right)
3526 ).concat(percent)
3527 return self.visit_ilike_op_binary(binary, operator, **kw)
3528
3529 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3530 binary = binary._clone()
3531 percent = self._like_percent_literal
3532 binary.left = ilike_case_insensitive(binary.left)
3533 binary.right = percent.concat(
3534 ilike_case_insensitive(binary.right)
3535 ).concat(percent)
3536 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3537
3538 def visit_startswith_op_binary(self, binary, operator, **kw):
3539 binary = binary._clone()
3540 percent = self._like_percent_literal
3541 binary.right = percent._rconcat(binary.right)
3542 return self.visit_like_op_binary(binary, operator, **kw)
3543
3544 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3545 binary = binary._clone()
3546 percent = self._like_percent_literal
3547 binary.right = percent._rconcat(binary.right)
3548 return self.visit_not_like_op_binary(binary, operator, **kw)
3549
3550 def visit_istartswith_op_binary(self, binary, operator, **kw):
3551 binary = binary._clone()
3552 percent = self._like_percent_literal
3553 binary.left = ilike_case_insensitive(binary.left)
3554 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3555 return self.visit_ilike_op_binary(binary, operator, **kw)
3556
3557 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3558 binary = binary._clone()
3559 percent = self._like_percent_literal
3560 binary.left = ilike_case_insensitive(binary.left)
3561 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3562 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3563
3564 def visit_endswith_op_binary(self, binary, operator, **kw):
3565 binary = binary._clone()
3566 percent = self._like_percent_literal
3567 binary.right = percent.concat(binary.right)
3568 return self.visit_like_op_binary(binary, operator, **kw)
3569
3570 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3571 binary = binary._clone()
3572 percent = self._like_percent_literal
3573 binary.right = percent.concat(binary.right)
3574 return self.visit_not_like_op_binary(binary, operator, **kw)
3575
3576 def visit_iendswith_op_binary(self, binary, operator, **kw):
3577 binary = binary._clone()
3578 percent = self._like_percent_literal
3579 binary.left = ilike_case_insensitive(binary.left)
3580 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3581 return self.visit_ilike_op_binary(binary, operator, **kw)
3582
3583 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3584 binary = binary._clone()
3585 percent = self._like_percent_literal
3586 binary.left = ilike_case_insensitive(binary.left)
3587 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3588 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3589
3590 def visit_like_op_binary(self, binary, operator, **kw):
3591 escape = binary.modifiers.get("escape", None)
3592
3593 return "%s LIKE %s" % (
3594 binary.left._compiler_dispatch(self, **kw),
3595 binary.right._compiler_dispatch(self, **kw),
3596 ) + (
3597 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3598 if escape is not None
3599 else ""
3600 )
3601
3602 def visit_not_like_op_binary(self, binary, operator, **kw):
3603 escape = binary.modifiers.get("escape", None)
3604 return "%s NOT LIKE %s" % (
3605 binary.left._compiler_dispatch(self, **kw),
3606 binary.right._compiler_dispatch(self, **kw),
3607 ) + (
3608 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3609 if escape is not None
3610 else ""
3611 )
3612
3613 def visit_ilike_op_binary(self, binary, operator, **kw):
3614 if operator is operators.ilike_op:
3615 binary = binary._clone()
3616 binary.left = ilike_case_insensitive(binary.left)
3617 binary.right = ilike_case_insensitive(binary.right)
3618 # else we assume ilower() has been applied
3619
3620 return self.visit_like_op_binary(binary, operator, **kw)
3621
3622 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3623 if operator is operators.not_ilike_op:
3624 binary = binary._clone()
3625 binary.left = ilike_case_insensitive(binary.left)
3626 binary.right = ilike_case_insensitive(binary.right)
3627 # else we assume ilower() has been applied
3628
3629 return self.visit_not_like_op_binary(binary, operator, **kw)
3630
3631 def visit_between_op_binary(self, binary, operator, **kw):
3632 symmetric = binary.modifiers.get("symmetric", False)
3633 return self._generate_generic_binary(
3634 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3635 )
3636
3637 def visit_not_between_op_binary(self, binary, operator, **kw):
3638 symmetric = binary.modifiers.get("symmetric", False)
3639 return self._generate_generic_binary(
3640 binary,
3641 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3642 **kw,
3643 )
3644
3645 def visit_regexp_match_op_binary(
3646 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3647 ) -> str:
3648 raise exc.CompileError(
3649 "%s dialect does not support regular expressions"
3650 % self.dialect.name
3651 )
3652
3653 def visit_not_regexp_match_op_binary(
3654 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3655 ) -> str:
3656 raise exc.CompileError(
3657 "%s dialect does not support regular expressions"
3658 % self.dialect.name
3659 )
3660
3661 def visit_regexp_replace_op_binary(
3662 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3663 ) -> str:
3664 raise exc.CompileError(
3665 "%s dialect does not support regular expression replacements"
3666 % self.dialect.name
3667 )
3668
3669 def visit_bindparam(
3670 self,
3671 bindparam,
3672 within_columns_clause=False,
3673 literal_binds=False,
3674 skip_bind_expression=False,
3675 literal_execute=False,
3676 render_postcompile=False,
3677 **kwargs,
3678 ):
3679
3680 if not skip_bind_expression:
3681 impl = bindparam.type.dialect_impl(self.dialect)
3682 if impl._has_bind_expression:
3683 bind_expression = impl.bind_expression(bindparam)
3684 wrapped = self.process(
3685 bind_expression,
3686 skip_bind_expression=True,
3687 within_columns_clause=within_columns_clause,
3688 literal_binds=literal_binds and not bindparam.expanding,
3689 literal_execute=literal_execute,
3690 render_postcompile=render_postcompile,
3691 **kwargs,
3692 )
3693 if bindparam.expanding:
3694 # for postcompile w/ expanding, move the "wrapped" part
3695 # of this into the inside
3696
3697 m = re.match(
3698 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3699 )
3700 assert m, "unexpected format for expanding parameter"
3701 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3702 m.group(2),
3703 m.group(1),
3704 m.group(3),
3705 )
3706
3707 if literal_binds:
3708 ret = self.render_literal_bindparam(
3709 bindparam,
3710 within_columns_clause=True,
3711 bind_expression_template=wrapped,
3712 **kwargs,
3713 )
3714 return "(%s)" % ret
3715
3716 return wrapped
3717
3718 if not literal_binds:
3719 literal_execute = (
3720 literal_execute
3721 or bindparam.literal_execute
3722 or (within_columns_clause and self.ansi_bind_rules)
3723 )
3724 post_compile = literal_execute or bindparam.expanding
3725 else:
3726 post_compile = False
3727
3728 if literal_binds:
3729 ret = self.render_literal_bindparam(
3730 bindparam, within_columns_clause=True, **kwargs
3731 )
3732 if bindparam.expanding:
3733 ret = "(%s)" % ret
3734 return ret
3735
3736 name = self._truncate_bindparam(bindparam)
3737
3738 if name in self.binds:
3739 existing = self.binds[name]
3740 if existing is not bindparam:
3741 if (
3742 (existing.unique or bindparam.unique)
3743 and not existing.proxy_set.intersection(
3744 bindparam.proxy_set
3745 )
3746 and not existing._cloned_set.intersection(
3747 bindparam._cloned_set
3748 )
3749 ):
3750 raise exc.CompileError(
3751 "Bind parameter '%s' conflicts with "
3752 "unique bind parameter of the same name" % name
3753 )
3754 elif existing.expanding != bindparam.expanding:
3755 raise exc.CompileError(
3756 "Can't reuse bound parameter name '%s' in both "
3757 "'expanding' (e.g. within an IN expression) and "
3758 "non-expanding contexts. If this parameter is to "
3759 "receive a list/array value, set 'expanding=True' on "
3760 "it for expressions that aren't IN, otherwise use "
3761 "a different parameter name." % (name,)
3762 )
3763 elif existing._is_crud or bindparam._is_crud:
3764 if existing._is_crud and bindparam._is_crud:
3765 # TODO: this condition is not well understood.
3766 # see tests in test/sql/test_update.py
3767 raise exc.CompileError(
3768 "Encountered unsupported case when compiling an "
3769 "INSERT or UPDATE statement. If this is a "
3770 "multi-table "
3771 "UPDATE statement, please provide string-named "
3772 "arguments to the "
3773 "values() method with distinct names; support for "
3774 "multi-table UPDATE statements that "
3775 "target multiple tables for UPDATE is very "
3776 "limited",
3777 )
3778 else:
3779 raise exc.CompileError(
3780 f"bindparam() name '{bindparam.key}' is reserved "
3781 "for automatic usage in the VALUES or SET "
3782 "clause of this "
3783 "insert/update statement. Please use a "
3784 "name other than column name when using "
3785 "bindparam() "
3786 "with insert() or update() (for example, "
3787 f"'b_{bindparam.key}')."
3788 )
3789
3790 self.binds[bindparam.key] = self.binds[name] = bindparam
3791
3792 # if we are given a cache key that we're going to match against,
3793 # relate the bindparam here to one that is most likely present
3794 # in the "extracted params" portion of the cache key. this is used
3795 # to set up a positional mapping that is used to determine the
3796 # correct parameters for a subsequent use of this compiled with
3797 # a different set of parameter values. here, we accommodate for
3798 # parameters that may have been cloned both before and after the cache
3799 # key was been generated.
3800 ckbm_tuple = self._cache_key_bind_match
3801
3802 if ckbm_tuple:
3803 ckbm, cksm = ckbm_tuple
3804 for bp in bindparam._cloned_set:
3805 if bp.key in cksm:
3806 cb = cksm[bp.key]
3807 ckbm[cb].append(bindparam)
3808
3809 if bindparam.isoutparam:
3810 self.has_out_parameters = True
3811
3812 if post_compile:
3813 if render_postcompile:
3814 self._render_postcompile = True
3815
3816 if literal_execute:
3817 self.literal_execute_params |= {bindparam}
3818 else:
3819 self.post_compile_params |= {bindparam}
3820
3821 ret = self.bindparam_string(
3822 name,
3823 post_compile=post_compile,
3824 expanding=bindparam.expanding,
3825 bindparam_type=bindparam.type,
3826 **kwargs,
3827 )
3828
3829 if bindparam.expanding:
3830 ret = "(%s)" % ret
3831
3832 return ret
3833
3834 def render_bind_cast(self, type_, dbapi_type, sqltext):
3835 raise NotImplementedError()
3836
3837 def render_literal_bindparam(
3838 self,
3839 bindparam,
3840 render_literal_value=NO_ARG,
3841 bind_expression_template=None,
3842 **kw,
3843 ):
3844 if render_literal_value is not NO_ARG:
3845 value = render_literal_value
3846 else:
3847 if bindparam.value is None and bindparam.callable is None:
3848 op = kw.get("_binary_op", None)
3849 if op and op not in (operators.is_, operators.is_not):
3850 util.warn_limited(
3851 "Bound parameter '%s' rendering literal NULL in a SQL "
3852 "expression; comparisons to NULL should not use "
3853 "operators outside of 'is' or 'is not'",
3854 (bindparam.key,),
3855 )
3856 return self.process(sqltypes.NULLTYPE, **kw)
3857 value = bindparam.effective_value
3858
3859 if bindparam.expanding:
3860 leep = self._literal_execute_expanding_parameter_literal_binds
3861 to_update, replacement_expr = leep(
3862 bindparam,
3863 value,
3864 bind_expression_template=bind_expression_template,
3865 )
3866 return replacement_expr
3867 else:
3868 return self.render_literal_value(value, bindparam.type)
3869
3870 def render_literal_value(
3871 self, value: Any, type_: sqltypes.TypeEngine[Any]
3872 ) -> str:
3873 """Render the value of a bind parameter as a quoted literal.
3874
3875 This is used for statement sections that do not accept bind parameters
3876 on the target driver/database.
3877
3878 This should be implemented by subclasses using the quoting services
3879 of the DBAPI.
3880
3881 """
3882
3883 if value is None and not type_.should_evaluate_none:
3884 # issue #10535 - handle NULL in the compiler without placing
3885 # this onto each type, except for "evaluate None" types
3886 # (e.g. JSON)
3887 return self.process(elements.Null._instance())
3888
3889 processor = type_._cached_literal_processor(self.dialect)
3890 if processor:
3891 try:
3892 return processor(value)
3893 except Exception as e:
3894 raise exc.CompileError(
3895 f"Could not render literal value "
3896 f'"{sql_util._repr_single_value(value)}" '
3897 f"with datatype "
3898 f"{type_}; see parent stack trace for "
3899 "more detail."
3900 ) from e
3901
3902 else:
3903 raise exc.CompileError(
3904 f"No literal value renderer is available for literal value "
3905 f'"{sql_util._repr_single_value(value)}" '
3906 f"with datatype {type_}"
3907 )
3908
3909 def _truncate_bindparam(self, bindparam):
3910 if bindparam in self.bind_names:
3911 return self.bind_names[bindparam]
3912
3913 bind_name = bindparam.key
3914 if isinstance(bind_name, elements._truncated_label):
3915 bind_name = self._truncated_identifier("bindparam", bind_name)
3916
3917 # add to bind_names for translation
3918 self.bind_names[bindparam] = bind_name
3919
3920 return bind_name
3921
3922 def _truncated_identifier(
3923 self, ident_class: str, name: _truncated_label
3924 ) -> str:
3925 if (ident_class, name) in self.truncated_names:
3926 return self.truncated_names[(ident_class, name)]
3927
3928 anonname = name.apply_map(self.anon_map)
3929
3930 if len(anonname) > self.label_length - 6:
3931 counter = self._truncated_counters.get(ident_class, 1)
3932 truncname = (
3933 anonname[0 : max(self.label_length - 6, 0)]
3934 + "_"
3935 + hex(counter)[2:]
3936 )
3937 self._truncated_counters[ident_class] = counter + 1
3938 else:
3939 truncname = anonname
3940 self.truncated_names[(ident_class, name)] = truncname
3941 return truncname
3942
3943 def _anonymize(self, name: str) -> str:
3944 return name % self.anon_map
3945
3946 def bindparam_string(
3947 self,
3948 name: str,
3949 post_compile: bool = False,
3950 expanding: bool = False,
3951 escaped_from: Optional[str] = None,
3952 bindparam_type: Optional[TypeEngine[Any]] = None,
3953 accumulate_bind_names: Optional[Set[str]] = None,
3954 visited_bindparam: Optional[List[str]] = None,
3955 **kw: Any,
3956 ) -> str:
3957 # TODO: accumulate_bind_names is passed by crud.py to gather
3958 # names on a per-value basis, visited_bindparam is passed by
3959 # visit_insert() to collect all parameters in the statement.
3960 # see if this gathering can be simplified somehow
3961 if accumulate_bind_names is not None:
3962 accumulate_bind_names.add(name)
3963 if visited_bindparam is not None:
3964 visited_bindparam.append(name)
3965
3966 if not escaped_from:
3967 if self._bind_translate_re.search(name):
3968 # not quite the translate use case as we want to
3969 # also get a quick boolean if we even found
3970 # unusual characters in the name
3971 new_name = self._bind_translate_re.sub(
3972 lambda m: self._bind_translate_chars[m.group(0)],
3973 name,
3974 )
3975 escaped_from = name
3976 name = new_name
3977
3978 if escaped_from:
3979 self.escaped_bind_names = self.escaped_bind_names.union(
3980 {escaped_from: name}
3981 )
3982 if post_compile:
3983 ret = "__[POSTCOMPILE_%s]" % name
3984 if expanding:
3985 # for expanding, bound parameters or literal values will be
3986 # rendered per item
3987 return ret
3988
3989 # otherwise, for non-expanding "literal execute", apply
3990 # bind casts as determined by the datatype
3991 if bindparam_type is not None:
3992 type_impl = bindparam_type._unwrapped_dialect_impl(
3993 self.dialect
3994 )
3995 if type_impl.render_literal_cast:
3996 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
3997 return ret
3998 elif self.state is CompilerState.COMPILING:
3999 ret = self.compilation_bindtemplate % {"name": name}
4000 else:
4001 ret = self.bindtemplate % {"name": name}
4002
4003 if (
4004 bindparam_type is not None
4005 and self.dialect._bind_typing_render_casts
4006 ):
4007 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4008 if type_impl.render_bind_cast:
4009 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4010
4011 return ret
4012
4013 def _dispatch_independent_ctes(self, stmt, kw):
4014 local_kw = kw.copy()
4015 local_kw.pop("cte_opts", None)
4016 for cte, opt in zip(
4017 stmt._independent_ctes, stmt._independent_ctes_opts
4018 ):
4019 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4020
4021 def visit_cte(
4022 self,
4023 cte: CTE,
4024 asfrom: bool = False,
4025 ashint: bool = False,
4026 fromhints: Optional[_FromHintsType] = None,
4027 visiting_cte: Optional[CTE] = None,
4028 from_linter: Optional[FromLinter] = None,
4029 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4030 **kwargs: Any,
4031 ) -> Optional[str]:
4032 self_ctes = self._init_cte_state()
4033 assert self_ctes is self.ctes
4034
4035 kwargs["visiting_cte"] = cte
4036
4037 cte_name = cte.name
4038
4039 if isinstance(cte_name, elements._truncated_label):
4040 cte_name = self._truncated_identifier("alias", cte_name)
4041
4042 is_new_cte = True
4043 embedded_in_current_named_cte = False
4044
4045 _reference_cte = cte._get_reference_cte()
4046
4047 nesting = cte.nesting or cte_opts.nesting
4048
4049 # check for CTE already encountered
4050 if _reference_cte in self.level_name_by_cte:
4051 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4052 _reference_cte
4053 ]
4054 assert _ == cte_name
4055
4056 cte_level_name = (cte_level, cte_name)
4057 existing_cte = self.ctes_by_level_name[cte_level_name]
4058
4059 # check if we are receiving it here with a specific
4060 # "nest_here" location; if so, move it to this location
4061
4062 if cte_opts.nesting:
4063 if existing_cte_opts.nesting:
4064 raise exc.CompileError(
4065 "CTE is stated as 'nest_here' in "
4066 "more than one location"
4067 )
4068
4069 old_level_name = (cte_level, cte_name)
4070 cte_level = len(self.stack) if nesting else 1
4071 cte_level_name = new_level_name = (cte_level, cte_name)
4072
4073 del self.ctes_by_level_name[old_level_name]
4074 self.ctes_by_level_name[new_level_name] = existing_cte
4075 self.level_name_by_cte[_reference_cte] = new_level_name + (
4076 cte_opts,
4077 )
4078
4079 else:
4080 cte_level = len(self.stack) if nesting else 1
4081 cte_level_name = (cte_level, cte_name)
4082
4083 if cte_level_name in self.ctes_by_level_name:
4084 existing_cte = self.ctes_by_level_name[cte_level_name]
4085 else:
4086 existing_cte = None
4087
4088 if existing_cte is not None:
4089 embedded_in_current_named_cte = visiting_cte is existing_cte
4090
4091 # we've generated a same-named CTE that we are enclosed in,
4092 # or this is the same CTE. just return the name.
4093 if cte is existing_cte._restates or cte is existing_cte:
4094 is_new_cte = False
4095 elif existing_cte is cte._restates:
4096 # we've generated a same-named CTE that is
4097 # enclosed in us - we take precedence, so
4098 # discard the text for the "inner".
4099 del self_ctes[existing_cte]
4100
4101 existing_cte_reference_cte = existing_cte._get_reference_cte()
4102
4103 assert existing_cte_reference_cte is _reference_cte
4104 assert existing_cte_reference_cte is existing_cte
4105
4106 del self.level_name_by_cte[existing_cte_reference_cte]
4107 else:
4108 if (
4109 # if the two CTEs have the same hash, which we expect
4110 # here means that one/both is an annotated of the other
4111 (hash(cte) == hash(existing_cte))
4112 # or...
4113 or (
4114 (
4115 # if they are clones, i.e. they came from the ORM
4116 # or some other visit method
4117 cte._is_clone_of is not None
4118 or existing_cte._is_clone_of is not None
4119 )
4120 # and are deep-copy identical
4121 and cte.compare(existing_cte)
4122 )
4123 ):
4124 # then consider these two CTEs the same
4125 is_new_cte = False
4126 else:
4127 # otherwise these are two CTEs that either will render
4128 # differently, or were indicated separately by the user,
4129 # with the same name
4130 raise exc.CompileError(
4131 "Multiple, unrelated CTEs found with "
4132 "the same name: %r" % cte_name
4133 )
4134
4135 if not asfrom and not is_new_cte:
4136 return None
4137
4138 if cte._cte_alias is not None:
4139 pre_alias_cte = cte._cte_alias
4140 cte_pre_alias_name = cte._cte_alias.name
4141 if isinstance(cte_pre_alias_name, elements._truncated_label):
4142 cte_pre_alias_name = self._truncated_identifier(
4143 "alias", cte_pre_alias_name
4144 )
4145 else:
4146 pre_alias_cte = cte
4147 cte_pre_alias_name = None
4148
4149 if is_new_cte:
4150 self.ctes_by_level_name[cte_level_name] = cte
4151 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4152 cte_opts,
4153 )
4154
4155 if pre_alias_cte not in self.ctes:
4156 self.visit_cte(pre_alias_cte, **kwargs)
4157
4158 if not cte_pre_alias_name and cte not in self_ctes:
4159 if cte.recursive:
4160 self.ctes_recursive = True
4161 text = self.preparer.format_alias(cte, cte_name)
4162 if cte.recursive or cte.element.name_cte_columns:
4163 col_source = cte.element
4164
4165 # TODO: can we get at the .columns_plus_names collection
4166 # that is already (or will be?) generated for the SELECT
4167 # rather than calling twice?
4168 recur_cols = [
4169 # TODO: proxy_name is not technically safe,
4170 # see test_cte->
4171 # test_with_recursive_no_name_currently_buggy. not
4172 # clear what should be done with such a case
4173 fallback_label_name or proxy_name
4174 for (
4175 _,
4176 proxy_name,
4177 fallback_label_name,
4178 c,
4179 repeated,
4180 ) in (col_source._generate_columns_plus_names(True))
4181 if not repeated
4182 ]
4183
4184 text += "(%s)" % (
4185 ", ".join(
4186 self.preparer.format_label_name(
4187 ident, anon_map=self.anon_map
4188 )
4189 for ident in recur_cols
4190 )
4191 )
4192
4193 assert kwargs.get("subquery", False) is False
4194
4195 if not self.stack:
4196 # toplevel, this is a stringify of the
4197 # cte directly. just compile the inner
4198 # the way alias() does.
4199 return cte.element._compiler_dispatch(
4200 self, asfrom=asfrom, **kwargs
4201 )
4202 else:
4203 prefixes = self._generate_prefixes(
4204 cte, cte._prefixes, **kwargs
4205 )
4206 inner = cte.element._compiler_dispatch(
4207 self, asfrom=True, **kwargs
4208 )
4209
4210 text += " AS %s\n(%s)" % (prefixes, inner)
4211
4212 if cte._suffixes:
4213 text += " " + self._generate_prefixes(
4214 cte, cte._suffixes, **kwargs
4215 )
4216
4217 self_ctes[cte] = text
4218
4219 if asfrom:
4220 if from_linter:
4221 from_linter.froms[cte._de_clone()] = cte_name
4222
4223 if not is_new_cte and embedded_in_current_named_cte:
4224 return self.preparer.format_alias(cte, cte_name)
4225
4226 if cte_pre_alias_name:
4227 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4228 if self.preparer._requires_quotes(cte_name):
4229 cte_name = self.preparer.quote(cte_name)
4230 text += self.get_render_as_alias_suffix(cte_name)
4231 return text # type: ignore[no-any-return]
4232 else:
4233 return self.preparer.format_alias(cte, cte_name)
4234
4235 return None
4236
4237 def visit_table_valued_alias(self, element, **kw):
4238 if element.joins_implicitly:
4239 kw["from_linter"] = None
4240 if element._is_lateral:
4241 return self.visit_lateral(element, **kw)
4242 else:
4243 return self.visit_alias(element, **kw)
4244
4245 def visit_table_valued_column(self, element, **kw):
4246 return self.visit_column(element, **kw)
4247
4248 def visit_alias(
4249 self,
4250 alias,
4251 asfrom=False,
4252 ashint=False,
4253 iscrud=False,
4254 fromhints=None,
4255 subquery=False,
4256 lateral=False,
4257 enclosing_alias=None,
4258 from_linter=None,
4259 **kwargs,
4260 ):
4261 if lateral:
4262 if "enclosing_lateral" not in kwargs:
4263 # if lateral is set and enclosing_lateral is not
4264 # present, we assume we are being called directly
4265 # from visit_lateral() and we need to set enclosing_lateral.
4266 assert alias._is_lateral
4267 kwargs["enclosing_lateral"] = alias
4268
4269 # for lateral objects, we track a second from_linter that is...
4270 # lateral! to the level above us.
4271 if (
4272 from_linter
4273 and "lateral_from_linter" not in kwargs
4274 and "enclosing_lateral" in kwargs
4275 ):
4276 kwargs["lateral_from_linter"] = from_linter
4277
4278 if enclosing_alias is not None and enclosing_alias.element is alias:
4279 inner = alias.element._compiler_dispatch(
4280 self,
4281 asfrom=asfrom,
4282 ashint=ashint,
4283 iscrud=iscrud,
4284 fromhints=fromhints,
4285 lateral=lateral,
4286 enclosing_alias=alias,
4287 **kwargs,
4288 )
4289 if subquery and (asfrom or lateral):
4290 inner = "(%s)" % (inner,)
4291 return inner
4292 else:
4293 kwargs["enclosing_alias"] = alias
4294
4295 if asfrom or ashint:
4296 if isinstance(alias.name, elements._truncated_label):
4297 alias_name = self._truncated_identifier("alias", alias.name)
4298 else:
4299 alias_name = alias.name
4300
4301 if ashint:
4302 return self.preparer.format_alias(alias, alias_name)
4303 elif asfrom:
4304 if from_linter:
4305 from_linter.froms[alias._de_clone()] = alias_name
4306
4307 inner = alias.element._compiler_dispatch(
4308 self, asfrom=True, lateral=lateral, **kwargs
4309 )
4310 if subquery:
4311 inner = "(%s)" % (inner,)
4312
4313 ret = inner + self.get_render_as_alias_suffix(
4314 self.preparer.format_alias(alias, alias_name)
4315 )
4316
4317 if alias._supports_derived_columns and alias._render_derived:
4318 ret += "(%s)" % (
4319 ", ".join(
4320 "%s%s"
4321 % (
4322 self.preparer.quote(col.name),
4323 (
4324 " %s"
4325 % self.dialect.type_compiler_instance.process(
4326 col.type, **kwargs
4327 )
4328 if alias._render_derived_w_types
4329 else ""
4330 ),
4331 )
4332 for col in alias.c
4333 )
4334 )
4335
4336 if fromhints and alias in fromhints:
4337 ret = self.format_from_hint_text(
4338 ret, alias, fromhints[alias], iscrud
4339 )
4340
4341 return ret
4342 else:
4343 # note we cancel the "subquery" flag here as well
4344 return alias.element._compiler_dispatch(
4345 self, lateral=lateral, **kwargs
4346 )
4347
4348 def visit_subquery(self, subquery, **kw):
4349 kw["subquery"] = True
4350 return self.visit_alias(subquery, **kw)
4351
4352 def visit_lateral(self, lateral_, **kw):
4353 kw["lateral"] = True
4354 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4355
4356 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4357 text = "%s TABLESAMPLE %s" % (
4358 self.visit_alias(tablesample, asfrom=True, **kw),
4359 tablesample._get_method()._compiler_dispatch(self, **kw),
4360 )
4361
4362 if tablesample.seed is not None:
4363 text += " REPEATABLE (%s)" % (
4364 tablesample.seed._compiler_dispatch(self, **kw)
4365 )
4366
4367 return text
4368
4369 def _render_values(self, element, **kw):
4370 kw.setdefault("literal_binds", element.literal_binds)
4371 tuples = ", ".join(
4372 self.process(
4373 elements.Tuple(
4374 types=element._column_types, *elem
4375 ).self_group(),
4376 **kw,
4377 )
4378 for chunk in element._data
4379 for elem in chunk
4380 )
4381 return f"VALUES {tuples}"
4382
4383 def visit_values(
4384 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4385 ):
4386
4387 if element._independent_ctes:
4388 self._dispatch_independent_ctes(element, kw)
4389
4390 v = self._render_values(element, **kw)
4391
4392 if element._unnamed:
4393 name = None
4394 elif isinstance(element.name, elements._truncated_label):
4395 name = self._truncated_identifier("values", element.name)
4396 else:
4397 name = element.name
4398
4399 if element._is_lateral:
4400 lateral = "LATERAL "
4401 else:
4402 lateral = ""
4403
4404 if asfrom:
4405 if from_linter:
4406 from_linter.froms[element._de_clone()] = (
4407 name if name is not None else "(unnamed VALUES element)"
4408 )
4409
4410 if visiting_cte is not None and visiting_cte.element is element:
4411 if element._is_lateral:
4412 raise exc.CompileError(
4413 "Can't use a LATERAL VALUES expression inside of a CTE"
4414 )
4415 elif name:
4416 kw["include_table"] = False
4417 v = "%s(%s)%s (%s)" % (
4418 lateral,
4419 v,
4420 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4421 (
4422 ", ".join(
4423 c._compiler_dispatch(self, **kw)
4424 for c in element.columns
4425 )
4426 ),
4427 )
4428 else:
4429 v = "%s(%s)" % (lateral, v)
4430 return v
4431
4432 def visit_scalar_values(self, element, **kw):
4433 return f"({self._render_values(element, **kw)})"
4434
4435 def get_render_as_alias_suffix(self, alias_name_text):
4436 return " AS " + alias_name_text
4437
4438 def _add_to_result_map(
4439 self,
4440 keyname: str,
4441 name: str,
4442 objects: Tuple[Any, ...],
4443 type_: TypeEngine[Any],
4444 ) -> None:
4445
4446 # note objects must be non-empty for cursor.py to handle the
4447 # collection properly
4448 assert objects
4449
4450 if keyname is None or keyname == "*":
4451 self._ordered_columns = False
4452 self._ad_hoc_textual = True
4453 if type_._is_tuple_type:
4454 raise exc.CompileError(
4455 "Most backends don't support SELECTing "
4456 "from a tuple() object. If this is an ORM query, "
4457 "consider using the Bundle object."
4458 )
4459 self._result_columns.append(
4460 ResultColumnsEntry(keyname, name, objects, type_)
4461 )
4462
4463 def _label_returning_column(
4464 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4465 ):
4466 """Render a column with necessary labels inside of a RETURNING clause.
4467
4468 This method is provided for individual dialects in place of calling
4469 the _label_select_column method directly, so that the two use cases
4470 of RETURNING vs. SELECT can be disambiguated going forward.
4471
4472 .. versionadded:: 1.4.21
4473
4474 """
4475 return self._label_select_column(
4476 None,
4477 column,
4478 populate_result_map,
4479 False,
4480 {} if column_clause_args is None else column_clause_args,
4481 **kw,
4482 )
4483
4484 def _label_select_column(
4485 self,
4486 select,
4487 column,
4488 populate_result_map,
4489 asfrom,
4490 column_clause_args,
4491 name=None,
4492 proxy_name=None,
4493 fallback_label_name=None,
4494 within_columns_clause=True,
4495 column_is_repeated=False,
4496 need_column_expressions=False,
4497 include_table=True,
4498 ):
4499 """produce labeled columns present in a select()."""
4500 impl = column.type.dialect_impl(self.dialect)
4501
4502 if impl._has_column_expression and (
4503 need_column_expressions or populate_result_map
4504 ):
4505 col_expr = impl.column_expression(column)
4506 else:
4507 col_expr = column
4508
4509 if populate_result_map:
4510 # pass an "add_to_result_map" callable into the compilation
4511 # of embedded columns. this collects information about the
4512 # column as it will be fetched in the result and is coordinated
4513 # with cursor.description when the query is executed.
4514 add_to_result_map = self._add_to_result_map
4515
4516 # if the SELECT statement told us this column is a repeat,
4517 # wrap the callable with one that prevents the addition of the
4518 # targets
4519 if column_is_repeated:
4520 _add_to_result_map = add_to_result_map
4521
4522 def add_to_result_map(keyname, name, objects, type_):
4523 _add_to_result_map(keyname, name, (keyname,), type_)
4524
4525 # if we redefined col_expr for type expressions, wrap the
4526 # callable with one that adds the original column to the targets
4527 elif col_expr is not column:
4528 _add_to_result_map = add_to_result_map
4529
4530 def add_to_result_map(keyname, name, objects, type_):
4531 _add_to_result_map(
4532 keyname, name, (column,) + objects, type_
4533 )
4534
4535 else:
4536 add_to_result_map = None
4537
4538 # this method is used by some of the dialects for RETURNING,
4539 # which has different inputs. _label_returning_column was added
4540 # as the better target for this now however for 1.4 we will keep
4541 # _label_select_column directly compatible with this use case.
4542 # these assertions right now set up the current expected inputs
4543 assert within_columns_clause, (
4544 "_label_select_column is only relevant within "
4545 "the columns clause of a SELECT or RETURNING"
4546 )
4547 if isinstance(column, elements.Label):
4548 if col_expr is not column:
4549 result_expr = _CompileLabel(
4550 col_expr, column.name, alt_names=(column.element,)
4551 )
4552 else:
4553 result_expr = col_expr
4554
4555 elif name:
4556 # here, _columns_plus_names has determined there's an explicit
4557 # label name we need to use. this is the default for
4558 # tablenames_plus_columnnames as well as when columns are being
4559 # deduplicated on name
4560
4561 assert (
4562 proxy_name is not None
4563 ), "proxy_name is required if 'name' is passed"
4564
4565 result_expr = _CompileLabel(
4566 col_expr,
4567 name,
4568 alt_names=(
4569 proxy_name,
4570 # this is a hack to allow legacy result column lookups
4571 # to work as they did before; this goes away in 2.0.
4572 # TODO: this only seems to be tested indirectly
4573 # via test/orm/test_deprecations.py. should be a
4574 # resultset test for this
4575 column._tq_label,
4576 ),
4577 )
4578 else:
4579 # determine here whether this column should be rendered in
4580 # a labelled context or not, as we were given no required label
4581 # name from the caller. Here we apply heuristics based on the kind
4582 # of SQL expression involved.
4583
4584 if col_expr is not column:
4585 # type-specific expression wrapping the given column,
4586 # so we render a label
4587 render_with_label = True
4588 elif isinstance(column, elements.ColumnClause):
4589 # table-bound column, we render its name as a label if we are
4590 # inside of a subquery only
4591 render_with_label = (
4592 asfrom
4593 and not column.is_literal
4594 and column.table is not None
4595 )
4596 elif isinstance(column, elements.TextClause):
4597 render_with_label = False
4598 elif isinstance(column, elements.UnaryExpression):
4599 # unary expression. notes added as of #12681
4600 #
4601 # By convention, the visit_unary() method
4602 # itself does not add an entry to the result map, and relies
4603 # upon either the inner expression creating a result map
4604 # entry, or if not, by creating a label here that produces
4605 # the result map entry. Where that happens is based on whether
4606 # or not the element immediately inside the unary is a
4607 # NamedColumn subclass or not.
4608 #
4609 # Now, this also impacts how the SELECT is written; if
4610 # we decide to generate a label here, we get the usual
4611 # "~(x+y) AS anon_1" thing in the columns clause. If we
4612 # don't, we don't get an AS at all, we get like
4613 # "~table.column".
4614 #
4615 # But here is the important thing as of modernish (like 1.4)
4616 # versions of SQLAlchemy - **whether or not the AS <label>
4617 # is present in the statement is not actually important**.
4618 # We target result columns **positionally** for a fully
4619 # compiled ``Select()`` object; before 1.4 we needed those
4620 # labels to match in cursor.description etc etc but now it
4621 # really doesn't matter.
4622 # So really, we could set render_with_label True in all cases.
4623 # Or we could just have visit_unary() populate the result map
4624 # in all cases.
4625 #
4626 # What we're doing here is strictly trying to not rock the
4627 # boat too much with when we do/don't render "AS label";
4628 # labels being present helps in the edge cases that we
4629 # "fall back" to named cursor.description matching, labels
4630 # not being present for columns keeps us from having awkward
4631 # phrases like "SELECT DISTINCT table.x AS x".
4632 render_with_label = (
4633 (
4634 # exception case to detect if we render "not boolean"
4635 # as "not <col>" for native boolean or "<col> = 1"
4636 # for non-native boolean. this is controlled by
4637 # visit_is_<true|false>_unary_operator
4638 column.operator
4639 in (operators.is_false, operators.is_true)
4640 and not self.dialect.supports_native_boolean
4641 )
4642 or column._wraps_unnamed_column()
4643 or asfrom
4644 )
4645 elif (
4646 # general class of expressions that don't have a SQL-column
4647 # addressible name. includes scalar selects, bind parameters,
4648 # SQL functions, others
4649 not isinstance(column, elements.NamedColumn)
4650 # deeper check that indicates there's no natural "name" to
4651 # this element, which accommodates for custom SQL constructs
4652 # that might have a ".name" attribute (but aren't SQL
4653 # functions) but are not implementing this more recently added
4654 # base class. in theory the "NamedColumn" check should be
4655 # enough, however here we seek to maintain legacy behaviors
4656 # as well.
4657 and column._non_anon_label is None
4658 ):
4659 render_with_label = True
4660 else:
4661 render_with_label = False
4662
4663 if render_with_label:
4664 if not fallback_label_name:
4665 # used by the RETURNING case right now. we generate it
4666 # here as 3rd party dialects may be referring to
4667 # _label_select_column method directly instead of the
4668 # just-added _label_returning_column method
4669 assert not column_is_repeated
4670 fallback_label_name = column._anon_name_label
4671
4672 fallback_label_name = (
4673 elements._truncated_label(fallback_label_name)
4674 if not isinstance(
4675 fallback_label_name, elements._truncated_label
4676 )
4677 else fallback_label_name
4678 )
4679
4680 result_expr = _CompileLabel(
4681 col_expr, fallback_label_name, alt_names=(proxy_name,)
4682 )
4683 else:
4684 result_expr = col_expr
4685
4686 column_clause_args.update(
4687 within_columns_clause=within_columns_clause,
4688 add_to_result_map=add_to_result_map,
4689 include_table=include_table,
4690 )
4691 return result_expr._compiler_dispatch(self, **column_clause_args)
4692
4693 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4694 hinttext = self.get_from_hint_text(table, hint)
4695 if hinttext:
4696 sqltext += " " + hinttext
4697 return sqltext
4698
4699 def get_select_hint_text(self, byfroms):
4700 return None
4701
4702 def get_from_hint_text(
4703 self, table: FromClause, text: Optional[str]
4704 ) -> Optional[str]:
4705 return None
4706
4707 def get_crud_hint_text(self, table, text):
4708 return None
4709
4710 def get_statement_hint_text(self, hint_texts):
4711 return " ".join(hint_texts)
4712
4713 _default_stack_entry: _CompilerStackEntry
4714
4715 if not typing.TYPE_CHECKING:
4716 _default_stack_entry = util.immutabledict(
4717 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4718 )
4719
4720 def _display_froms_for_select(
4721 self, select_stmt, asfrom, lateral=False, **kw
4722 ):
4723 # utility method to help external dialects
4724 # get the correct from list for a select.
4725 # specifically the oracle dialect needs this feature
4726 # right now.
4727 toplevel = not self.stack
4728 entry = self._default_stack_entry if toplevel else self.stack[-1]
4729
4730 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4731
4732 correlate_froms = entry["correlate_froms"]
4733 asfrom_froms = entry["asfrom_froms"]
4734
4735 if asfrom and not lateral:
4736 froms = compile_state._get_display_froms(
4737 explicit_correlate_froms=correlate_froms.difference(
4738 asfrom_froms
4739 ),
4740 implicit_correlate_froms=(),
4741 )
4742 else:
4743 froms = compile_state._get_display_froms(
4744 explicit_correlate_froms=correlate_froms,
4745 implicit_correlate_froms=asfrom_froms,
4746 )
4747 return froms
4748
4749 translate_select_structure: Any = None
4750 """if not ``None``, should be a callable which accepts ``(select_stmt,
4751 **kw)`` and returns a select object. this is used for structural changes
4752 mostly to accommodate for LIMIT/OFFSET schemes
4753
4754 """
4755
4756 def visit_select(
4757 self,
4758 select_stmt,
4759 asfrom=False,
4760 insert_into=False,
4761 fromhints=None,
4762 compound_index=None,
4763 select_wraps_for=None,
4764 lateral=False,
4765 from_linter=None,
4766 **kwargs,
4767 ):
4768 assert select_wraps_for is None, (
4769 "SQLAlchemy 1.4 requires use of "
4770 "the translate_select_structure hook for structural "
4771 "translations of SELECT objects"
4772 )
4773
4774 # initial setup of SELECT. the compile_state_factory may now
4775 # be creating a totally different SELECT from the one that was
4776 # passed in. for ORM use this will convert from an ORM-state
4777 # SELECT to a regular "Core" SELECT. other composed operations
4778 # such as computation of joins will be performed.
4779
4780 kwargs["within_columns_clause"] = False
4781
4782 compile_state = select_stmt._compile_state_factory(
4783 select_stmt, self, **kwargs
4784 )
4785 kwargs["ambiguous_table_name_map"] = (
4786 compile_state._ambiguous_table_name_map
4787 )
4788
4789 select_stmt = compile_state.statement
4790
4791 toplevel = not self.stack
4792
4793 if toplevel and not self.compile_state:
4794 self.compile_state = compile_state
4795
4796 is_embedded_select = compound_index is not None or insert_into
4797
4798 # translate step for Oracle, SQL Server which often need to
4799 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4800 # other conditions
4801 if self.translate_select_structure:
4802 new_select_stmt = self.translate_select_structure(
4803 select_stmt, asfrom=asfrom, **kwargs
4804 )
4805
4806 # if SELECT was restructured, maintain a link to the originals
4807 # and assemble a new compile state
4808 if new_select_stmt is not select_stmt:
4809 compile_state_wraps_for = compile_state
4810 select_wraps_for = select_stmt
4811 select_stmt = new_select_stmt
4812
4813 compile_state = select_stmt._compile_state_factory(
4814 select_stmt, self, **kwargs
4815 )
4816 select_stmt = compile_state.statement
4817
4818 entry = self._default_stack_entry if toplevel else self.stack[-1]
4819
4820 populate_result_map = need_column_expressions = (
4821 toplevel
4822 or entry.get("need_result_map_for_compound", False)
4823 or entry.get("need_result_map_for_nested", False)
4824 )
4825
4826 # indicates there is a CompoundSelect in play and we are not the
4827 # first select
4828 if compound_index:
4829 populate_result_map = False
4830
4831 # this was first proposed as part of #3372; however, it is not
4832 # reached in current tests and could possibly be an assertion
4833 # instead.
4834 if not populate_result_map and "add_to_result_map" in kwargs:
4835 del kwargs["add_to_result_map"]
4836
4837 froms = self._setup_select_stack(
4838 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4839 )
4840
4841 column_clause_args = kwargs.copy()
4842 column_clause_args.update(
4843 {"within_label_clause": False, "within_columns_clause": False}
4844 )
4845
4846 text = "SELECT " # we're off to a good start !
4847
4848 if select_stmt._hints:
4849 hint_text, byfrom = self._setup_select_hints(select_stmt)
4850 if hint_text:
4851 text += hint_text + " "
4852 else:
4853 byfrom = None
4854
4855 if select_stmt._independent_ctes:
4856 self._dispatch_independent_ctes(select_stmt, kwargs)
4857
4858 if select_stmt._prefixes:
4859 text += self._generate_prefixes(
4860 select_stmt, select_stmt._prefixes, **kwargs
4861 )
4862
4863 text += self.get_select_precolumns(select_stmt, **kwargs)
4864 # the actual list of columns to print in the SELECT column list.
4865 inner_columns = [
4866 c
4867 for c in [
4868 self._label_select_column(
4869 select_stmt,
4870 column,
4871 populate_result_map,
4872 asfrom,
4873 column_clause_args,
4874 name=name,
4875 proxy_name=proxy_name,
4876 fallback_label_name=fallback_label_name,
4877 column_is_repeated=repeated,
4878 need_column_expressions=need_column_expressions,
4879 )
4880 for (
4881 name,
4882 proxy_name,
4883 fallback_label_name,
4884 column,
4885 repeated,
4886 ) in compile_state.columns_plus_names
4887 ]
4888 if c is not None
4889 ]
4890
4891 if populate_result_map and select_wraps_for is not None:
4892 # if this select was generated from translate_select,
4893 # rewrite the targeted columns in the result map
4894
4895 translate = dict(
4896 zip(
4897 [
4898 name
4899 for (
4900 key,
4901 proxy_name,
4902 fallback_label_name,
4903 name,
4904 repeated,
4905 ) in compile_state.columns_plus_names
4906 ],
4907 [
4908 name
4909 for (
4910 key,
4911 proxy_name,
4912 fallback_label_name,
4913 name,
4914 repeated,
4915 ) in compile_state_wraps_for.columns_plus_names
4916 ],
4917 )
4918 )
4919
4920 self._result_columns = [
4921 ResultColumnsEntry(
4922 key, name, tuple(translate.get(o, o) for o in obj), type_
4923 )
4924 for key, name, obj, type_ in self._result_columns
4925 ]
4926
4927 text = self._compose_select_body(
4928 text,
4929 select_stmt,
4930 compile_state,
4931 inner_columns,
4932 froms,
4933 byfrom,
4934 toplevel,
4935 kwargs,
4936 )
4937
4938 if select_stmt._statement_hints:
4939 per_dialect = [
4940 ht
4941 for (dialect_name, ht) in select_stmt._statement_hints
4942 if dialect_name in ("*", self.dialect.name)
4943 ]
4944 if per_dialect:
4945 text += " " + self.get_statement_hint_text(per_dialect)
4946
4947 # In compound query, CTEs are shared at the compound level
4948 if self.ctes and (not is_embedded_select or toplevel):
4949 nesting_level = len(self.stack) if not toplevel else None
4950 text = self._render_cte_clause(nesting_level=nesting_level) + text
4951
4952 if select_stmt._suffixes:
4953 text += " " + self._generate_prefixes(
4954 select_stmt, select_stmt._suffixes, **kwargs
4955 )
4956
4957 self.stack.pop(-1)
4958
4959 return text
4960
4961 def _setup_select_hints(
4962 self, select: Select[Any]
4963 ) -> Tuple[str, _FromHintsType]:
4964 byfrom = {
4965 from_: hinttext
4966 % {"name": from_._compiler_dispatch(self, ashint=True)}
4967 for (from_, dialect), hinttext in select._hints.items()
4968 if dialect in ("*", self.dialect.name)
4969 }
4970 hint_text = self.get_select_hint_text(byfrom)
4971 return hint_text, byfrom
4972
4973 def _setup_select_stack(
4974 self, select, compile_state, entry, asfrom, lateral, compound_index
4975 ):
4976 correlate_froms = entry["correlate_froms"]
4977 asfrom_froms = entry["asfrom_froms"]
4978
4979 if compound_index == 0:
4980 entry["select_0"] = select
4981 elif compound_index:
4982 select_0 = entry["select_0"]
4983 numcols = len(select_0._all_selected_columns)
4984
4985 if len(compile_state.columns_plus_names) != numcols:
4986 raise exc.CompileError(
4987 "All selectables passed to "
4988 "CompoundSelect must have identical numbers of "
4989 "columns; select #%d has %d columns, select "
4990 "#%d has %d"
4991 % (
4992 1,
4993 numcols,
4994 compound_index + 1,
4995 len(select._all_selected_columns),
4996 )
4997 )
4998
4999 if asfrom and not lateral:
5000 froms = compile_state._get_display_froms(
5001 explicit_correlate_froms=correlate_froms.difference(
5002 asfrom_froms
5003 ),
5004 implicit_correlate_froms=(),
5005 )
5006 else:
5007 froms = compile_state._get_display_froms(
5008 explicit_correlate_froms=correlate_froms,
5009 implicit_correlate_froms=asfrom_froms,
5010 )
5011
5012 new_correlate_froms = set(_from_objects(*froms))
5013 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5014
5015 new_entry: _CompilerStackEntry = {
5016 "asfrom_froms": new_correlate_froms,
5017 "correlate_froms": all_correlate_froms,
5018 "selectable": select,
5019 "compile_state": compile_state,
5020 }
5021 self.stack.append(new_entry)
5022
5023 return froms
5024
5025 def _compose_select_body(
5026 self,
5027 text,
5028 select,
5029 compile_state,
5030 inner_columns,
5031 froms,
5032 byfrom,
5033 toplevel,
5034 kwargs,
5035 ):
5036 text += ", ".join(inner_columns)
5037
5038 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5039 from_linter = FromLinter({}, set())
5040 warn_linting = self.linting & WARN_LINTING
5041 if toplevel:
5042 self.from_linter = from_linter
5043 else:
5044 from_linter = None
5045 warn_linting = False
5046
5047 # adjust the whitespace for no inner columns, part of #9440,
5048 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5049 # "SELECT FROM ...".
5050 # while it would be better to have built the SELECT starting string
5051 # without trailing whitespace first, then add whitespace only if inner
5052 # cols were present, this breaks compatibility with various custom
5053 # compilation schemes that are currently being tested.
5054 if not inner_columns:
5055 text = text.rstrip()
5056
5057 if froms:
5058 text += " \nFROM "
5059
5060 if select._hints:
5061 text += ", ".join(
5062 [
5063 f._compiler_dispatch(
5064 self,
5065 asfrom=True,
5066 fromhints=byfrom,
5067 from_linter=from_linter,
5068 **kwargs,
5069 )
5070 for f in froms
5071 ]
5072 )
5073 else:
5074 text += ", ".join(
5075 [
5076 f._compiler_dispatch(
5077 self,
5078 asfrom=True,
5079 from_linter=from_linter,
5080 **kwargs,
5081 )
5082 for f in froms
5083 ]
5084 )
5085 else:
5086 text += self.default_from()
5087
5088 if select._where_criteria:
5089 t = self._generate_delimited_and_list(
5090 select._where_criteria, from_linter=from_linter, **kwargs
5091 )
5092 if t:
5093 text += " \nWHERE " + t
5094
5095 if warn_linting:
5096 assert from_linter is not None
5097 from_linter.warn()
5098
5099 if select._group_by_clauses:
5100 text += self.group_by_clause(select, **kwargs)
5101
5102 if select._having_criteria:
5103 t = self._generate_delimited_and_list(
5104 select._having_criteria, **kwargs
5105 )
5106 if t:
5107 text += " \nHAVING " + t
5108
5109 if select._order_by_clauses:
5110 text += self.order_by_clause(select, **kwargs)
5111
5112 if select._has_row_limiting_clause:
5113 text += self._row_limit_clause(select, **kwargs)
5114
5115 if select._for_update_arg is not None:
5116 text += self.for_update_clause(select, **kwargs)
5117
5118 return text
5119
5120 def _generate_prefixes(self, stmt, prefixes, **kw):
5121 clause = " ".join(
5122 prefix._compiler_dispatch(self, **kw)
5123 for prefix, dialect_name in prefixes
5124 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5125 )
5126 if clause:
5127 clause += " "
5128 return clause
5129
5130 def _render_cte_clause(
5131 self,
5132 nesting_level=None,
5133 include_following_stack=False,
5134 ):
5135 """
5136 include_following_stack
5137 Also render the nesting CTEs on the next stack. Useful for
5138 SQL structures like UNION or INSERT that can wrap SELECT
5139 statements containing nesting CTEs.
5140 """
5141 if not self.ctes:
5142 return ""
5143
5144 ctes: MutableMapping[CTE, str]
5145
5146 if nesting_level and nesting_level > 1:
5147 ctes = util.OrderedDict()
5148 for cte in list(self.ctes.keys()):
5149 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5150 cte._get_reference_cte()
5151 ]
5152 nesting = cte.nesting or cte_opts.nesting
5153 is_rendered_level = cte_level == nesting_level or (
5154 include_following_stack and cte_level == nesting_level + 1
5155 )
5156 if not (nesting and is_rendered_level):
5157 continue
5158
5159 ctes[cte] = self.ctes[cte]
5160
5161 else:
5162 ctes = self.ctes
5163
5164 if not ctes:
5165 return ""
5166 ctes_recursive = any([cte.recursive for cte in ctes])
5167
5168 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5169 cte_text += ", \n".join([txt for txt in ctes.values()])
5170 cte_text += "\n "
5171
5172 if nesting_level and nesting_level > 1:
5173 for cte in list(ctes.keys()):
5174 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5175 cte._get_reference_cte()
5176 ]
5177 del self.ctes[cte]
5178 del self.ctes_by_level_name[(cte_level, cte_name)]
5179 del self.level_name_by_cte[cte._get_reference_cte()]
5180
5181 return cte_text
5182
5183 def get_cte_preamble(self, recursive):
5184 if recursive:
5185 return "WITH RECURSIVE"
5186 else:
5187 return "WITH"
5188
5189 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5190 """Called when building a ``SELECT`` statement, position is just
5191 before column list.
5192
5193 """
5194 if select._distinct_on:
5195 util.warn_deprecated(
5196 "DISTINCT ON is currently supported only by the PostgreSQL "
5197 "dialect. Use of DISTINCT ON for other backends is currently "
5198 "silently ignored, however this usage is deprecated, and will "
5199 "raise CompileError in a future release for all backends "
5200 "that do not support this syntax.",
5201 version="1.4",
5202 )
5203 return "DISTINCT " if select._distinct else ""
5204
5205 def group_by_clause(self, select, **kw):
5206 """allow dialects to customize how GROUP BY is rendered."""
5207
5208 group_by = self._generate_delimited_list(
5209 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5210 )
5211 if group_by:
5212 return " GROUP BY " + group_by
5213 else:
5214 return ""
5215
5216 def order_by_clause(self, select, **kw):
5217 """allow dialects to customize how ORDER BY is rendered."""
5218
5219 order_by = self._generate_delimited_list(
5220 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5221 )
5222
5223 if order_by:
5224 return " ORDER BY " + order_by
5225 else:
5226 return ""
5227
5228 def for_update_clause(self, select, **kw):
5229 return " FOR UPDATE"
5230
5231 def returning_clause(
5232 self,
5233 stmt: UpdateBase,
5234 returning_cols: Sequence[_ColumnsClauseElement],
5235 *,
5236 populate_result_map: bool,
5237 **kw: Any,
5238 ) -> str:
5239 columns = [
5240 self._label_returning_column(
5241 stmt,
5242 column,
5243 populate_result_map,
5244 fallback_label_name=fallback_label_name,
5245 column_is_repeated=repeated,
5246 name=name,
5247 proxy_name=proxy_name,
5248 **kw,
5249 )
5250 for (
5251 name,
5252 proxy_name,
5253 fallback_label_name,
5254 column,
5255 repeated,
5256 ) in stmt._generate_columns_plus_names(
5257 True, cols=base._select_iterables(returning_cols)
5258 )
5259 ]
5260
5261 return "RETURNING " + ", ".join(columns)
5262
5263 def limit_clause(self, select, **kw):
5264 text = ""
5265 if select._limit_clause is not None:
5266 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5267 if select._offset_clause is not None:
5268 if select._limit_clause is None:
5269 text += "\n LIMIT -1"
5270 text += " OFFSET " + self.process(select._offset_clause, **kw)
5271 return text
5272
5273 def fetch_clause(
5274 self,
5275 select,
5276 fetch_clause=None,
5277 require_offset=False,
5278 use_literal_execute_for_simple_int=False,
5279 **kw,
5280 ):
5281 if fetch_clause is None:
5282 fetch_clause = select._fetch_clause
5283 fetch_clause_options = select._fetch_clause_options
5284 else:
5285 fetch_clause_options = {"percent": False, "with_ties": False}
5286
5287 text = ""
5288
5289 if select._offset_clause is not None:
5290 offset_clause = select._offset_clause
5291 if (
5292 use_literal_execute_for_simple_int
5293 and select._simple_int_clause(offset_clause)
5294 ):
5295 offset_clause = offset_clause.render_literal_execute()
5296 offset_str = self.process(offset_clause, **kw)
5297 text += "\n OFFSET %s ROWS" % offset_str
5298 elif require_offset:
5299 text += "\n OFFSET 0 ROWS"
5300
5301 if fetch_clause is not None:
5302 if (
5303 use_literal_execute_for_simple_int
5304 and select._simple_int_clause(fetch_clause)
5305 ):
5306 fetch_clause = fetch_clause.render_literal_execute()
5307 text += "\n FETCH FIRST %s%s ROWS %s" % (
5308 self.process(fetch_clause, **kw),
5309 " PERCENT" if fetch_clause_options["percent"] else "",
5310 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5311 )
5312 return text
5313
5314 def visit_table(
5315 self,
5316 table,
5317 asfrom=False,
5318 iscrud=False,
5319 ashint=False,
5320 fromhints=None,
5321 use_schema=True,
5322 from_linter=None,
5323 ambiguous_table_name_map=None,
5324 enclosing_alias=None,
5325 **kwargs,
5326 ):
5327 if from_linter:
5328 from_linter.froms[table] = table.fullname
5329
5330 if asfrom or ashint:
5331 effective_schema = self.preparer.schema_for_object(table)
5332
5333 if use_schema and effective_schema:
5334 ret = (
5335 self.preparer.quote_schema(effective_schema)
5336 + "."
5337 + self.preparer.quote(table.name)
5338 )
5339 else:
5340 ret = self.preparer.quote(table.name)
5341
5342 if (
5343 (
5344 enclosing_alias is None
5345 or enclosing_alias.element is not table
5346 )
5347 and not effective_schema
5348 and ambiguous_table_name_map
5349 and table.name in ambiguous_table_name_map
5350 ):
5351 anon_name = self._truncated_identifier(
5352 "alias", ambiguous_table_name_map[table.name]
5353 )
5354
5355 ret = ret + self.get_render_as_alias_suffix(
5356 self.preparer.format_alias(None, anon_name)
5357 )
5358
5359 if fromhints and table in fromhints:
5360 ret = self.format_from_hint_text(
5361 ret, table, fromhints[table], iscrud
5362 )
5363 return ret
5364 else:
5365 return ""
5366
5367 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5368 if from_linter:
5369 from_linter.edges.update(
5370 itertools.product(
5371 _de_clone(join.left._from_objects),
5372 _de_clone(join.right._from_objects),
5373 )
5374 )
5375
5376 if join.full:
5377 join_type = " FULL OUTER JOIN "
5378 elif join.isouter:
5379 join_type = " LEFT OUTER JOIN "
5380 else:
5381 join_type = " JOIN "
5382 return (
5383 join.left._compiler_dispatch(
5384 self, asfrom=True, from_linter=from_linter, **kwargs
5385 )
5386 + join_type
5387 + join.right._compiler_dispatch(
5388 self, asfrom=True, from_linter=from_linter, **kwargs
5389 )
5390 + " ON "
5391 # TODO: likely need asfrom=True here?
5392 + join.onclause._compiler_dispatch(
5393 self, from_linter=from_linter, **kwargs
5394 )
5395 )
5396
5397 def _setup_crud_hints(self, stmt, table_text):
5398 dialect_hints = {
5399 table: hint_text
5400 for (table, dialect), hint_text in stmt._hints.items()
5401 if dialect in ("*", self.dialect.name)
5402 }
5403 if stmt.table in dialect_hints:
5404 table_text = self.format_from_hint_text(
5405 table_text, stmt.table, dialect_hints[stmt.table], True
5406 )
5407 return dialect_hints, table_text
5408
5409 # within the realm of "insertmanyvalues sentinel columns",
5410 # these lookups match different kinds of Column() configurations
5411 # to specific backend capabilities. they are broken into two
5412 # lookups, one for autoincrement columns and the other for non
5413 # autoincrement columns
5414 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5415 {
5416 _SentinelDefaultCharacterization.CLIENTSIDE: (
5417 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5418 ),
5419 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5420 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5421 ),
5422 _SentinelDefaultCharacterization.NONE: (
5423 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5424 ),
5425 _SentinelDefaultCharacterization.IDENTITY: (
5426 InsertmanyvaluesSentinelOpts.IDENTITY
5427 ),
5428 _SentinelDefaultCharacterization.SEQUENCE: (
5429 InsertmanyvaluesSentinelOpts.SEQUENCE
5430 ),
5431 }
5432 )
5433 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5434 {
5435 _SentinelDefaultCharacterization.NONE: (
5436 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5437 ),
5438 }
5439 )
5440
5441 def _get_sentinel_column_for_table(
5442 self, table: Table
5443 ) -> Optional[Sequence[Column[Any]]]:
5444 """given a :class:`.Table`, return a usable sentinel column or
5445 columns for this dialect if any.
5446
5447 Return None if no sentinel columns could be identified, or raise an
5448 error if a column was marked as a sentinel explicitly but isn't
5449 compatible with this dialect.
5450
5451 """
5452
5453 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5454 sentinel_characteristics = table._sentinel_column_characteristics
5455
5456 sent_cols = sentinel_characteristics.columns
5457
5458 if sent_cols is None:
5459 return None
5460
5461 if sentinel_characteristics.is_autoinc:
5462 bitmask = self._sentinel_col_autoinc_lookup.get(
5463 sentinel_characteristics.default_characterization, 0
5464 )
5465 else:
5466 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5467 sentinel_characteristics.default_characterization, 0
5468 )
5469
5470 if sentinel_opts & bitmask:
5471 return sent_cols
5472
5473 if sentinel_characteristics.is_explicit:
5474 # a column was explicitly marked as insert_sentinel=True,
5475 # however it is not compatible with this dialect. they should
5476 # not indicate this column as a sentinel if they need to include
5477 # this dialect.
5478
5479 # TODO: do we want non-primary key explicit sentinel cols
5480 # that can gracefully degrade for some backends?
5481 # insert_sentinel="degrade" perhaps. not for the initial release.
5482 # I am hoping people are generally not dealing with this sentinel
5483 # business at all.
5484
5485 # if is_explicit is True, there will be only one sentinel column.
5486
5487 raise exc.InvalidRequestError(
5488 f"Column {sent_cols[0]} can't be explicitly "
5489 "marked as a sentinel column when using the "
5490 f"{self.dialect.name} dialect, as the "
5491 "particular type of default generation on this column is "
5492 "not currently compatible with this dialect's specific "
5493 f"INSERT..RETURNING syntax which can receive the "
5494 "server-generated value in "
5495 "a deterministic way. To remove this error, remove "
5496 "insert_sentinel=True from primary key autoincrement "
5497 "columns; these columns are automatically used as "
5498 "sentinels for supported dialects in any case."
5499 )
5500
5501 return None
5502
5503 def _deliver_insertmanyvalues_batches(
5504 self,
5505 statement: str,
5506 parameters: _DBAPIMultiExecuteParams,
5507 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5508 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5509 batch_size: int,
5510 sort_by_parameter_order: bool,
5511 schema_translate_map: Optional[SchemaTranslateMapType],
5512 ) -> Iterator[_InsertManyValuesBatch]:
5513 imv = self._insertmanyvalues
5514 assert imv is not None
5515
5516 if not imv.sentinel_param_keys:
5517 _sentinel_from_params = None
5518 else:
5519 _sentinel_from_params = operator.itemgetter(
5520 *imv.sentinel_param_keys
5521 )
5522
5523 lenparams = len(parameters)
5524 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5525 # backend doesn't support
5526 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5527 # at the moment this is basically SQL Server due to
5528 # not being able to use DEFAULT for identity column
5529 # just yield out that many single statements! still
5530 # faster than a whole connection.execute() call ;)
5531 #
5532 # note we still are taking advantage of the fact that we know
5533 # we are using RETURNING. The generalized approach of fetching
5534 # cursor.lastrowid etc. still goes through the more heavyweight
5535 # "ExecutionContext per statement" system as it isn't usable
5536 # as a generic "RETURNING" approach
5537 use_row_at_a_time = True
5538 downgraded = False
5539 elif not self.dialect.supports_multivalues_insert or (
5540 sort_by_parameter_order
5541 and self._result_columns
5542 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5543 ):
5544 # deterministic order was requested and the compiler could
5545 # not organize sentinel columns for this dialect/statement.
5546 # use row at a time
5547 use_row_at_a_time = True
5548 downgraded = True
5549 else:
5550 use_row_at_a_time = False
5551 downgraded = False
5552
5553 if use_row_at_a_time:
5554 for batchnum, (param, compiled_param) in enumerate(
5555 cast(
5556 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5557 zip(parameters, compiled_parameters),
5558 ),
5559 1,
5560 ):
5561 yield _InsertManyValuesBatch(
5562 statement,
5563 param,
5564 generic_setinputsizes,
5565 [param],
5566 (
5567 [_sentinel_from_params(compiled_param)]
5568 if _sentinel_from_params
5569 else []
5570 ),
5571 1,
5572 batchnum,
5573 lenparams,
5574 sort_by_parameter_order,
5575 downgraded,
5576 )
5577 return
5578
5579 if schema_translate_map:
5580 rst = functools.partial(
5581 self.preparer._render_schema_translates,
5582 schema_translate_map=schema_translate_map,
5583 )
5584 else:
5585 rst = None
5586
5587 imv_single_values_expr = imv.single_values_expr
5588 if rst:
5589 imv_single_values_expr = rst(imv_single_values_expr)
5590
5591 executemany_values = f"({imv_single_values_expr})"
5592 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5593
5594 # Use optional insertmanyvalues_max_parameters
5595 # to further shrink the batch size so that there are no more than
5596 # insertmanyvalues_max_parameters params.
5597 # Currently used by SQL Server, which limits statements to 2100 bound
5598 # parameters (actually 2099).
5599 max_params = self.dialect.insertmanyvalues_max_parameters
5600 if max_params:
5601 total_num_of_params = len(self.bind_names)
5602 num_params_per_batch = len(imv.insert_crud_params)
5603 num_params_outside_of_batch = (
5604 total_num_of_params - num_params_per_batch
5605 )
5606 batch_size = min(
5607 batch_size,
5608 (
5609 (max_params - num_params_outside_of_batch)
5610 // num_params_per_batch
5611 ),
5612 )
5613
5614 batches = cast("List[Sequence[Any]]", list(parameters))
5615 compiled_batches = cast(
5616 "List[Sequence[Any]]", list(compiled_parameters)
5617 )
5618
5619 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5620 batchnum = 1
5621 total_batches = lenparams // batch_size + (
5622 1 if lenparams % batch_size else 0
5623 )
5624
5625 insert_crud_params = imv.insert_crud_params
5626 assert insert_crud_params is not None
5627
5628 if rst:
5629 insert_crud_params = [
5630 (col, key, rst(expr), st)
5631 for col, key, expr, st in insert_crud_params
5632 ]
5633
5634 escaped_bind_names: Mapping[str, str]
5635 expand_pos_lower_index = expand_pos_upper_index = 0
5636
5637 if not self.positional:
5638 if self.escaped_bind_names:
5639 escaped_bind_names = self.escaped_bind_names
5640 else:
5641 escaped_bind_names = {}
5642
5643 all_keys = set(parameters[0])
5644
5645 def apply_placeholders(keys, formatted):
5646 for key in keys:
5647 key = escaped_bind_names.get(key, key)
5648 formatted = formatted.replace(
5649 self.bindtemplate % {"name": key},
5650 self.bindtemplate
5651 % {"name": f"{key}__EXECMANY_INDEX__"},
5652 )
5653 return formatted
5654
5655 if imv.embed_values_counter:
5656 imv_values_counter = ", _IMV_VALUES_COUNTER"
5657 else:
5658 imv_values_counter = ""
5659 formatted_values_clause = f"""({', '.join(
5660 apply_placeholders(bind_keys, formatted)
5661 for _, _, formatted, bind_keys in insert_crud_params
5662 )}{imv_values_counter})"""
5663
5664 keys_to_replace = all_keys.intersection(
5665 escaped_bind_names.get(key, key)
5666 for _, _, _, bind_keys in insert_crud_params
5667 for key in bind_keys
5668 )
5669 base_parameters = {
5670 key: parameters[0][key]
5671 for key in all_keys.difference(keys_to_replace)
5672 }
5673 executemany_values_w_comma = ""
5674 else:
5675 formatted_values_clause = ""
5676 keys_to_replace = set()
5677 base_parameters = {}
5678
5679 if imv.embed_values_counter:
5680 executemany_values_w_comma = (
5681 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5682 )
5683 else:
5684 executemany_values_w_comma = f"({imv_single_values_expr}), "
5685
5686 all_names_we_will_expand: Set[str] = set()
5687 for elem in imv.insert_crud_params:
5688 all_names_we_will_expand.update(elem[3])
5689
5690 # get the start and end position in a particular list
5691 # of parameters where we will be doing the "expanding".
5692 # statements can have params on either side or both sides,
5693 # given RETURNING and CTEs
5694 if all_names_we_will_expand:
5695 positiontup = self.positiontup
5696 assert positiontup is not None
5697
5698 all_expand_positions = {
5699 idx
5700 for idx, name in enumerate(positiontup)
5701 if name in all_names_we_will_expand
5702 }
5703 expand_pos_lower_index = min(all_expand_positions)
5704 expand_pos_upper_index = max(all_expand_positions) + 1
5705 assert (
5706 len(all_expand_positions)
5707 == expand_pos_upper_index - expand_pos_lower_index
5708 )
5709
5710 if self._numeric_binds:
5711 escaped = re.escape(self._numeric_binds_identifier_char)
5712 executemany_values_w_comma = re.sub(
5713 rf"{escaped}\d+", "%s", executemany_values_w_comma
5714 )
5715
5716 while batches:
5717 batch = batches[0:batch_size]
5718 compiled_batch = compiled_batches[0:batch_size]
5719
5720 batches[0:batch_size] = []
5721 compiled_batches[0:batch_size] = []
5722
5723 if batches:
5724 current_batch_size = batch_size
5725 else:
5726 current_batch_size = len(batch)
5727
5728 if generic_setinputsizes:
5729 # if setinputsizes is present, expand this collection to
5730 # suit the batch length as well
5731 # currently this will be mssql+pyodbc for internal dialects
5732 processed_setinputsizes = [
5733 (new_key, len_, typ)
5734 for new_key, len_, typ in (
5735 (f"{key}_{index}", len_, typ)
5736 for index in range(current_batch_size)
5737 for key, len_, typ in generic_setinputsizes
5738 )
5739 ]
5740
5741 replaced_parameters: Any
5742 if self.positional:
5743 num_ins_params = imv.num_positional_params_counted
5744
5745 batch_iterator: Iterable[Sequence[Any]]
5746 extra_params_left: Sequence[Any]
5747 extra_params_right: Sequence[Any]
5748
5749 if num_ins_params == len(batch[0]):
5750 extra_params_left = extra_params_right = ()
5751 batch_iterator = batch
5752 else:
5753 extra_params_left = batch[0][:expand_pos_lower_index]
5754 extra_params_right = batch[0][expand_pos_upper_index:]
5755 batch_iterator = (
5756 b[expand_pos_lower_index:expand_pos_upper_index]
5757 for b in batch
5758 )
5759
5760 if imv.embed_values_counter:
5761 expanded_values_string = (
5762 "".join(
5763 executemany_values_w_comma.replace(
5764 "_IMV_VALUES_COUNTER", str(i)
5765 )
5766 for i, _ in enumerate(batch)
5767 )
5768 )[:-2]
5769 else:
5770 expanded_values_string = (
5771 (executemany_values_w_comma * current_batch_size)
5772 )[:-2]
5773
5774 if self._numeric_binds and num_ins_params > 0:
5775 # numeric will always number the parameters inside of
5776 # VALUES (and thus order self.positiontup) to be higher
5777 # than non-VALUES parameters, no matter where in the
5778 # statement those non-VALUES parameters appear (this is
5779 # ensured in _process_numeric by numbering first all
5780 # params that are not in _values_bindparam)
5781 # therefore all extra params are always
5782 # on the left side and numbered lower than the VALUES
5783 # parameters
5784 assert not extra_params_right
5785
5786 start = expand_pos_lower_index + 1
5787 end = num_ins_params * (current_batch_size) + start
5788
5789 # need to format here, since statement may contain
5790 # unescaped %, while values_string contains just (%s, %s)
5791 positions = tuple(
5792 f"{self._numeric_binds_identifier_char}{i}"
5793 for i in range(start, end)
5794 )
5795 expanded_values_string = expanded_values_string % positions
5796
5797 replaced_statement = statement.replace(
5798 "__EXECMANY_TOKEN__", expanded_values_string
5799 )
5800
5801 replaced_parameters = tuple(
5802 itertools.chain.from_iterable(batch_iterator)
5803 )
5804
5805 replaced_parameters = (
5806 extra_params_left
5807 + replaced_parameters
5808 + extra_params_right
5809 )
5810
5811 else:
5812 replaced_values_clauses = []
5813 replaced_parameters = base_parameters.copy()
5814
5815 for i, param in enumerate(batch):
5816 fmv = formatted_values_clause.replace(
5817 "EXECMANY_INDEX__", str(i)
5818 )
5819 if imv.embed_values_counter:
5820 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5821
5822 replaced_values_clauses.append(fmv)
5823 replaced_parameters.update(
5824 {f"{key}__{i}": param[key] for key in keys_to_replace}
5825 )
5826
5827 replaced_statement = statement.replace(
5828 "__EXECMANY_TOKEN__",
5829 ", ".join(replaced_values_clauses),
5830 )
5831
5832 yield _InsertManyValuesBatch(
5833 replaced_statement,
5834 replaced_parameters,
5835 processed_setinputsizes,
5836 batch,
5837 (
5838 [_sentinel_from_params(cb) for cb in compiled_batch]
5839 if _sentinel_from_params
5840 else []
5841 ),
5842 current_batch_size,
5843 batchnum,
5844 total_batches,
5845 sort_by_parameter_order,
5846 False,
5847 )
5848 batchnum += 1
5849
5850 def visit_insert(
5851 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5852 ):
5853 compile_state = insert_stmt._compile_state_factory(
5854 insert_stmt, self, **kw
5855 )
5856 insert_stmt = compile_state.statement
5857
5858 if visiting_cte is not None:
5859 kw["visiting_cte"] = visiting_cte
5860 toplevel = False
5861 else:
5862 toplevel = not self.stack
5863
5864 if toplevel:
5865 self.isinsert = True
5866 if not self.dml_compile_state:
5867 self.dml_compile_state = compile_state
5868 if not self.compile_state:
5869 self.compile_state = compile_state
5870
5871 self.stack.append(
5872 {
5873 "correlate_froms": set(),
5874 "asfrom_froms": set(),
5875 "selectable": insert_stmt,
5876 }
5877 )
5878
5879 counted_bindparam = 0
5880
5881 # reset any incoming "visited_bindparam" collection
5882 visited_bindparam = None
5883
5884 # for positional, insertmanyvalues needs to know how many
5885 # bound parameters are in the VALUES sequence; there's no simple
5886 # rule because default expressions etc. can have zero or more
5887 # params inside them. After multiple attempts to figure this out,
5888 # this very simplistic "count after" works and is
5889 # likely the least amount of callcounts, though looks clumsy
5890 if self.positional and visiting_cte is None:
5891 # if we are inside a CTE, don't count parameters
5892 # here since they wont be for insertmanyvalues. keep
5893 # visited_bindparam at None so no counting happens.
5894 # see #9173
5895 visited_bindparam = []
5896
5897 crud_params_struct = crud._get_crud_params(
5898 self,
5899 insert_stmt,
5900 compile_state,
5901 toplevel,
5902 visited_bindparam=visited_bindparam,
5903 **kw,
5904 )
5905
5906 if self.positional and visited_bindparam is not None:
5907 counted_bindparam = len(visited_bindparam)
5908 if self._numeric_binds:
5909 if self._values_bindparam is not None:
5910 self._values_bindparam += visited_bindparam
5911 else:
5912 self._values_bindparam = visited_bindparam
5913
5914 crud_params_single = crud_params_struct.single_params
5915
5916 if (
5917 not crud_params_single
5918 and not self.dialect.supports_default_values
5919 and not self.dialect.supports_default_metavalue
5920 and not self.dialect.supports_empty_insert
5921 ):
5922 raise exc.CompileError(
5923 "The '%s' dialect with current database "
5924 "version settings does not support empty "
5925 "inserts." % self.dialect.name
5926 )
5927
5928 if compile_state._has_multi_parameters:
5929 if not self.dialect.supports_multivalues_insert:
5930 raise exc.CompileError(
5931 "The '%s' dialect with current database "
5932 "version settings does not support "
5933 "in-place multirow inserts." % self.dialect.name
5934 )
5935 elif (
5936 self.implicit_returning or insert_stmt._returning
5937 ) and insert_stmt._sort_by_parameter_order:
5938 raise exc.CompileError(
5939 "RETURNING cannot be determinstically sorted when "
5940 "using an INSERT which includes multi-row values()."
5941 )
5942 crud_params_single = crud_params_struct.single_params
5943 else:
5944 crud_params_single = crud_params_struct.single_params
5945
5946 preparer = self.preparer
5947 supports_default_values = self.dialect.supports_default_values
5948
5949 text = "INSERT "
5950
5951 if insert_stmt._prefixes:
5952 text += self._generate_prefixes(
5953 insert_stmt, insert_stmt._prefixes, **kw
5954 )
5955
5956 text += "INTO "
5957 table_text = preparer.format_table(insert_stmt.table)
5958
5959 if insert_stmt._hints:
5960 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
5961
5962 if insert_stmt._independent_ctes:
5963 self._dispatch_independent_ctes(insert_stmt, kw)
5964
5965 text += table_text
5966
5967 if crud_params_single or not supports_default_values:
5968 text += " (%s)" % ", ".join(
5969 [expr for _, expr, _, _ in crud_params_single]
5970 )
5971
5972 # look for insertmanyvalues attributes that would have been configured
5973 # by crud.py as it scanned through the columns to be part of the
5974 # INSERT
5975 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
5976 named_sentinel_params: Optional[Sequence[str]] = None
5977 add_sentinel_cols = None
5978 implicit_sentinel = False
5979
5980 returning_cols = self.implicit_returning or insert_stmt._returning
5981 if returning_cols:
5982 add_sentinel_cols = crud_params_struct.use_sentinel_columns
5983 if add_sentinel_cols is not None:
5984 assert use_insertmanyvalues
5985
5986 # search for the sentinel column explicitly present
5987 # in the INSERT columns list, and additionally check that
5988 # this column has a bound parameter name set up that's in the
5989 # parameter list. If both of these cases are present, it means
5990 # we will have a client side value for the sentinel in each
5991 # parameter set.
5992
5993 _params_by_col = {
5994 col: param_names
5995 for col, _, _, param_names in crud_params_single
5996 }
5997 named_sentinel_params = []
5998 for _add_sentinel_col in add_sentinel_cols:
5999 if _add_sentinel_col not in _params_by_col:
6000 named_sentinel_params = None
6001 break
6002 param_name = self._within_exec_param_key_getter(
6003 _add_sentinel_col
6004 )
6005 if param_name not in _params_by_col[_add_sentinel_col]:
6006 named_sentinel_params = None
6007 break
6008 named_sentinel_params.append(param_name)
6009
6010 if named_sentinel_params is None:
6011 # if we are not going to have a client side value for
6012 # the sentinel in the parameter set, that means it's
6013 # an autoincrement, an IDENTITY, or a server-side SQL
6014 # expression like nextval('seqname'). So this is
6015 # an "implicit" sentinel; we will look for it in
6016 # RETURNING
6017 # only, and then sort on it. For this case on PG,
6018 # SQL Server we have to use a special INSERT form
6019 # that guarantees the server side function lines up with
6020 # the entries in the VALUES.
6021 if (
6022 self.dialect.insertmanyvalues_implicit_sentinel
6023 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6024 ):
6025 implicit_sentinel = True
6026 else:
6027 # here, we are not using a sentinel at all
6028 # and we are likely the SQLite dialect.
6029 # The first add_sentinel_col that we have should not
6030 # be marked as "insert_sentinel=True". if it was,
6031 # an error should have been raised in
6032 # _get_sentinel_column_for_table.
6033 assert not add_sentinel_cols[0]._insert_sentinel, (
6034 "sentinel selection rules should have prevented "
6035 "us from getting here for this dialect"
6036 )
6037
6038 # always put the sentinel columns last. even if they are
6039 # in the returning list already, they will be there twice
6040 # then.
6041 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6042
6043 returning_clause = self.returning_clause(
6044 insert_stmt,
6045 returning_cols,
6046 populate_result_map=toplevel,
6047 )
6048
6049 if self.returning_precedes_values:
6050 text += " " + returning_clause
6051
6052 else:
6053 returning_clause = None
6054
6055 if insert_stmt.select is not None:
6056 # placed here by crud.py
6057 select_text = self.process(
6058 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6059 )
6060
6061 if self.ctes and self.dialect.cte_follows_insert:
6062 nesting_level = len(self.stack) if not toplevel else None
6063 text += " %s%s" % (
6064 self._render_cte_clause(
6065 nesting_level=nesting_level,
6066 include_following_stack=True,
6067 ),
6068 select_text,
6069 )
6070 else:
6071 text += " %s" % select_text
6072 elif not crud_params_single and supports_default_values:
6073 text += " DEFAULT VALUES"
6074 if use_insertmanyvalues:
6075 self._insertmanyvalues = _InsertManyValues(
6076 True,
6077 self.dialect.default_metavalue_token,
6078 crud_params_single,
6079 counted_bindparam,
6080 sort_by_parameter_order=(
6081 insert_stmt._sort_by_parameter_order
6082 ),
6083 includes_upsert_behaviors=(
6084 insert_stmt._post_values_clause is not None
6085 ),
6086 sentinel_columns=add_sentinel_cols,
6087 num_sentinel_columns=(
6088 len(add_sentinel_cols) if add_sentinel_cols else 0
6089 ),
6090 implicit_sentinel=implicit_sentinel,
6091 )
6092 elif compile_state._has_multi_parameters:
6093 text += " VALUES %s" % (
6094 ", ".join(
6095 "(%s)"
6096 % (", ".join(value for _, _, value, _ in crud_param_set))
6097 for crud_param_set in crud_params_struct.all_multi_params
6098 ),
6099 )
6100 elif use_insertmanyvalues:
6101 if (
6102 implicit_sentinel
6103 and (
6104 self.dialect.insertmanyvalues_implicit_sentinel
6105 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6106 )
6107 # this is checking if we have
6108 # INSERT INTO table (id) VALUES (DEFAULT).
6109 and not (crud_params_struct.is_default_metavalue_only)
6110 ):
6111 # if we have a sentinel column that is server generated,
6112 # then for selected backends render the VALUES list as a
6113 # subquery. This is the orderable form supported by
6114 # PostgreSQL and in fewer cases SQL Server
6115 embed_sentinel_value = True
6116
6117 render_bind_casts = (
6118 self.dialect.insertmanyvalues_implicit_sentinel
6119 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6120 )
6121
6122 add_sentinel_set = add_sentinel_cols or ()
6123
6124 insert_single_values_expr = ", ".join(
6125 [
6126 value
6127 for col, _, value, _ in crud_params_single
6128 if col not in add_sentinel_set
6129 ]
6130 )
6131
6132 colnames = ", ".join(
6133 f"p{i}"
6134 for i, cp in enumerate(crud_params_single)
6135 if cp[0] not in add_sentinel_set
6136 )
6137
6138 if render_bind_casts:
6139 # render casts for the SELECT list. For PG, we are
6140 # already rendering bind casts in the parameter list,
6141 # selectively for the more "tricky" types like ARRAY.
6142 # however, even for the "easy" types, if the parameter
6143 # is NULL for every entry, PG gives up and says
6144 # "it must be TEXT", which fails for other easy types
6145 # like ints. So we cast on this side too.
6146 colnames_w_cast = ", ".join(
6147 (
6148 self.render_bind_cast(
6149 col.type,
6150 col.type._unwrapped_dialect_impl(self.dialect),
6151 f"p{i}",
6152 )
6153 if col not in add_sentinel_set
6154 else expr
6155 )
6156 for i, (col, _, expr, _) in enumerate(
6157 crud_params_single
6158 )
6159 )
6160 else:
6161 colnames_w_cast = ", ".join(
6162 (f"p{i}" if col not in add_sentinel_set else expr)
6163 for i, (col, _, expr, _) in enumerate(
6164 crud_params_single
6165 )
6166 )
6167
6168 insert_crud_params = [
6169 elem
6170 for elem in crud_params_single
6171 if elem[0] not in add_sentinel_set
6172 ]
6173
6174 text += (
6175 f" SELECT {colnames_w_cast} FROM "
6176 f"(VALUES ({insert_single_values_expr})) "
6177 f"AS imp_sen({colnames}, sen_counter) "
6178 "ORDER BY sen_counter"
6179 )
6180
6181 else:
6182 # otherwise, if no sentinel or backend doesn't support
6183 # orderable subquery form, use a plain VALUES list
6184 embed_sentinel_value = False
6185 insert_crud_params = crud_params_single
6186 insert_single_values_expr = ", ".join(
6187 [value for _, _, value, _ in crud_params_single]
6188 )
6189
6190 text += f" VALUES ({insert_single_values_expr})"
6191
6192 self._insertmanyvalues = _InsertManyValues(
6193 is_default_expr=False,
6194 single_values_expr=insert_single_values_expr,
6195 insert_crud_params=insert_crud_params,
6196 num_positional_params_counted=counted_bindparam,
6197 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6198 includes_upsert_behaviors=(
6199 insert_stmt._post_values_clause is not None
6200 ),
6201 sentinel_columns=add_sentinel_cols,
6202 num_sentinel_columns=(
6203 len(add_sentinel_cols) if add_sentinel_cols else 0
6204 ),
6205 sentinel_param_keys=named_sentinel_params,
6206 implicit_sentinel=implicit_sentinel,
6207 embed_values_counter=embed_sentinel_value,
6208 )
6209
6210 else:
6211 insert_single_values_expr = ", ".join(
6212 [value for _, _, value, _ in crud_params_single]
6213 )
6214
6215 text += f" VALUES ({insert_single_values_expr})"
6216
6217 if insert_stmt._post_values_clause is not None:
6218 post_values_clause = self.process(
6219 insert_stmt._post_values_clause, **kw
6220 )
6221 if post_values_clause:
6222 text += " " + post_values_clause
6223
6224 if returning_clause and not self.returning_precedes_values:
6225 text += " " + returning_clause
6226
6227 if self.ctes and not self.dialect.cte_follows_insert:
6228 nesting_level = len(self.stack) if not toplevel else None
6229 text = (
6230 self._render_cte_clause(
6231 nesting_level=nesting_level,
6232 include_following_stack=True,
6233 )
6234 + text
6235 )
6236
6237 self.stack.pop(-1)
6238
6239 return text
6240
6241 def update_limit_clause(self, update_stmt):
6242 """Provide a hook for MySQL to add LIMIT to the UPDATE"""
6243 return None
6244
6245 def delete_limit_clause(self, delete_stmt):
6246 """Provide a hook for MySQL to add LIMIT to the DELETE"""
6247 return None
6248
6249 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6250 """Provide a hook to override the initial table clause
6251 in an UPDATE statement.
6252
6253 MySQL overrides this.
6254
6255 """
6256 kw["asfrom"] = True
6257 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6258
6259 def update_from_clause(
6260 self, update_stmt, from_table, extra_froms, from_hints, **kw
6261 ):
6262 """Provide a hook to override the generation of an
6263 UPDATE..FROM clause.
6264
6265 MySQL and MSSQL override this.
6266
6267 """
6268 raise NotImplementedError(
6269 "This backend does not support multiple-table "
6270 "criteria within UPDATE"
6271 )
6272
6273 def visit_update(
6274 self,
6275 update_stmt: Update,
6276 visiting_cte: Optional[CTE] = None,
6277 **kw: Any,
6278 ) -> str:
6279 compile_state = update_stmt._compile_state_factory(
6280 update_stmt, self, **kw
6281 )
6282 if TYPE_CHECKING:
6283 assert isinstance(compile_state, UpdateDMLState)
6284 update_stmt = compile_state.statement # type: ignore[assignment]
6285
6286 if visiting_cte is not None:
6287 kw["visiting_cte"] = visiting_cte
6288 toplevel = False
6289 else:
6290 toplevel = not self.stack
6291
6292 if toplevel:
6293 self.isupdate = True
6294 if not self.dml_compile_state:
6295 self.dml_compile_state = compile_state
6296 if not self.compile_state:
6297 self.compile_state = compile_state
6298
6299 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6300 from_linter = FromLinter({}, set())
6301 warn_linting = self.linting & WARN_LINTING
6302 if toplevel:
6303 self.from_linter = from_linter
6304 else:
6305 from_linter = None
6306 warn_linting = False
6307
6308 extra_froms = compile_state._extra_froms
6309 is_multitable = bool(extra_froms)
6310
6311 if is_multitable:
6312 # main table might be a JOIN
6313 main_froms = set(_from_objects(update_stmt.table))
6314 render_extra_froms = [
6315 f for f in extra_froms if f not in main_froms
6316 ]
6317 correlate_froms = main_froms.union(extra_froms)
6318 else:
6319 render_extra_froms = []
6320 correlate_froms = {update_stmt.table}
6321
6322 self.stack.append(
6323 {
6324 "correlate_froms": correlate_froms,
6325 "asfrom_froms": correlate_froms,
6326 "selectable": update_stmt,
6327 }
6328 )
6329
6330 text = "UPDATE "
6331
6332 if update_stmt._prefixes:
6333 text += self._generate_prefixes(
6334 update_stmt, update_stmt._prefixes, **kw
6335 )
6336
6337 table_text = self.update_tables_clause(
6338 update_stmt,
6339 update_stmt.table,
6340 render_extra_froms,
6341 from_linter=from_linter,
6342 **kw,
6343 )
6344 crud_params_struct = crud._get_crud_params(
6345 self, update_stmt, compile_state, toplevel, **kw
6346 )
6347 crud_params = crud_params_struct.single_params
6348
6349 if update_stmt._hints:
6350 dialect_hints, table_text = self._setup_crud_hints(
6351 update_stmt, table_text
6352 )
6353 else:
6354 dialect_hints = None
6355
6356 if update_stmt._independent_ctes:
6357 self._dispatch_independent_ctes(update_stmt, kw)
6358
6359 text += table_text
6360
6361 text += " SET "
6362 text += ", ".join(
6363 expr + "=" + value
6364 for _, expr, value, _ in cast(
6365 "List[Tuple[Any, str, str, Any]]", crud_params
6366 )
6367 )
6368
6369 if self.implicit_returning or update_stmt._returning:
6370 if self.returning_precedes_values:
6371 text += " " + self.returning_clause(
6372 update_stmt,
6373 self.implicit_returning or update_stmt._returning,
6374 populate_result_map=toplevel,
6375 )
6376
6377 if extra_froms:
6378 extra_from_text = self.update_from_clause(
6379 update_stmt,
6380 update_stmt.table,
6381 render_extra_froms,
6382 dialect_hints,
6383 from_linter=from_linter,
6384 **kw,
6385 )
6386 if extra_from_text:
6387 text += " " + extra_from_text
6388
6389 if update_stmt._where_criteria:
6390 t = self._generate_delimited_and_list(
6391 update_stmt._where_criteria, from_linter=from_linter, **kw
6392 )
6393 if t:
6394 text += " WHERE " + t
6395
6396 limit_clause = self.update_limit_clause(update_stmt)
6397 if limit_clause:
6398 text += " " + limit_clause
6399
6400 if (
6401 self.implicit_returning or update_stmt._returning
6402 ) and not self.returning_precedes_values:
6403 text += " " + self.returning_clause(
6404 update_stmt,
6405 self.implicit_returning or update_stmt._returning,
6406 populate_result_map=toplevel,
6407 )
6408
6409 if self.ctes:
6410 nesting_level = len(self.stack) if not toplevel else None
6411 text = self._render_cte_clause(nesting_level=nesting_level) + text
6412
6413 if warn_linting:
6414 assert from_linter is not None
6415 from_linter.warn(stmt_type="UPDATE")
6416
6417 self.stack.pop(-1)
6418
6419 return text # type: ignore[no-any-return]
6420
6421 def delete_extra_from_clause(
6422 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6423 ):
6424 """Provide a hook to override the generation of an
6425 DELETE..FROM clause.
6426
6427 This can be used to implement DELETE..USING for example.
6428
6429 MySQL and MSSQL override this.
6430
6431 """
6432 raise NotImplementedError(
6433 "This backend does not support multiple-table "
6434 "criteria within DELETE"
6435 )
6436
6437 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6438 return from_table._compiler_dispatch(
6439 self, asfrom=True, iscrud=True, **kw
6440 )
6441
6442 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6443 compile_state = delete_stmt._compile_state_factory(
6444 delete_stmt, self, **kw
6445 )
6446 delete_stmt = compile_state.statement
6447
6448 if visiting_cte is not None:
6449 kw["visiting_cte"] = visiting_cte
6450 toplevel = False
6451 else:
6452 toplevel = not self.stack
6453
6454 if toplevel:
6455 self.isdelete = True
6456 if not self.dml_compile_state:
6457 self.dml_compile_state = compile_state
6458 if not self.compile_state:
6459 self.compile_state = compile_state
6460
6461 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6462 from_linter = FromLinter({}, set())
6463 warn_linting = self.linting & WARN_LINTING
6464 if toplevel:
6465 self.from_linter = from_linter
6466 else:
6467 from_linter = None
6468 warn_linting = False
6469
6470 extra_froms = compile_state._extra_froms
6471
6472 correlate_froms = {delete_stmt.table}.union(extra_froms)
6473 self.stack.append(
6474 {
6475 "correlate_froms": correlate_froms,
6476 "asfrom_froms": correlate_froms,
6477 "selectable": delete_stmt,
6478 }
6479 )
6480
6481 text = "DELETE "
6482
6483 if delete_stmt._prefixes:
6484 text += self._generate_prefixes(
6485 delete_stmt, delete_stmt._prefixes, **kw
6486 )
6487
6488 text += "FROM "
6489
6490 try:
6491 table_text = self.delete_table_clause(
6492 delete_stmt,
6493 delete_stmt.table,
6494 extra_froms,
6495 from_linter=from_linter,
6496 )
6497 except TypeError:
6498 # anticipate 3rd party dialects that don't include **kw
6499 # TODO: remove in 2.1
6500 table_text = self.delete_table_clause(
6501 delete_stmt, delete_stmt.table, extra_froms
6502 )
6503 if from_linter:
6504 _ = self.process(delete_stmt.table, from_linter=from_linter)
6505
6506 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6507
6508 if delete_stmt._hints:
6509 dialect_hints, table_text = self._setup_crud_hints(
6510 delete_stmt, table_text
6511 )
6512 else:
6513 dialect_hints = None
6514
6515 if delete_stmt._independent_ctes:
6516 self._dispatch_independent_ctes(delete_stmt, kw)
6517
6518 text += table_text
6519
6520 if (
6521 self.implicit_returning or delete_stmt._returning
6522 ) and self.returning_precedes_values:
6523 text += " " + self.returning_clause(
6524 delete_stmt,
6525 self.implicit_returning or delete_stmt._returning,
6526 populate_result_map=toplevel,
6527 )
6528
6529 if extra_froms:
6530 extra_from_text = self.delete_extra_from_clause(
6531 delete_stmt,
6532 delete_stmt.table,
6533 extra_froms,
6534 dialect_hints,
6535 from_linter=from_linter,
6536 **kw,
6537 )
6538 if extra_from_text:
6539 text += " " + extra_from_text
6540
6541 if delete_stmt._where_criteria:
6542 t = self._generate_delimited_and_list(
6543 delete_stmt._where_criteria, from_linter=from_linter, **kw
6544 )
6545 if t:
6546 text += " WHERE " + t
6547
6548 limit_clause = self.delete_limit_clause(delete_stmt)
6549 if limit_clause:
6550 text += " " + limit_clause
6551
6552 if (
6553 self.implicit_returning or delete_stmt._returning
6554 ) and not self.returning_precedes_values:
6555 text += " " + self.returning_clause(
6556 delete_stmt,
6557 self.implicit_returning or delete_stmt._returning,
6558 populate_result_map=toplevel,
6559 )
6560
6561 if self.ctes:
6562 nesting_level = len(self.stack) if not toplevel else None
6563 text = self._render_cte_clause(nesting_level=nesting_level) + text
6564
6565 if warn_linting:
6566 assert from_linter is not None
6567 from_linter.warn(stmt_type="DELETE")
6568
6569 self.stack.pop(-1)
6570
6571 return text
6572
6573 def visit_savepoint(self, savepoint_stmt, **kw):
6574 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6575
6576 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6577 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6578 savepoint_stmt
6579 )
6580
6581 def visit_release_savepoint(self, savepoint_stmt, **kw):
6582 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6583 savepoint_stmt
6584 )
6585
6586
6587class StrSQLCompiler(SQLCompiler):
6588 """A :class:`.SQLCompiler` subclass which allows a small selection
6589 of non-standard SQL features to render into a string value.
6590
6591 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6592 element is directly stringified without calling upon the
6593 :meth:`_expression.ClauseElement.compile` method.
6594 It can render a limited set
6595 of non-standard SQL constructs to assist in basic stringification,
6596 however for more substantial custom or dialect-specific SQL constructs,
6597 it will be necessary to make use of
6598 :meth:`_expression.ClauseElement.compile`
6599 directly.
6600
6601 .. seealso::
6602
6603 :ref:`faq_sql_expression_string`
6604
6605 """
6606
6607 def _fallback_column_name(self, column):
6608 return "<name unknown>"
6609
6610 @util.preload_module("sqlalchemy.engine.url")
6611 def visit_unsupported_compilation(self, element, err, **kw):
6612 if element.stringify_dialect != "default":
6613 url = util.preloaded.engine_url
6614 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6615
6616 compiler = dialect.statement_compiler(
6617 dialect, None, _supporting_against=self
6618 )
6619 if not isinstance(compiler, StrSQLCompiler):
6620 return compiler.process(element, **kw)
6621
6622 return super().visit_unsupported_compilation(element, err)
6623
6624 def visit_getitem_binary(self, binary, operator, **kw):
6625 return "%s[%s]" % (
6626 self.process(binary.left, **kw),
6627 self.process(binary.right, **kw),
6628 )
6629
6630 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6631 return self.visit_getitem_binary(binary, operator, **kw)
6632
6633 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6634 return self.visit_getitem_binary(binary, operator, **kw)
6635
6636 def visit_sequence(self, sequence, **kw):
6637 return (
6638 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6639 )
6640
6641 def returning_clause(
6642 self,
6643 stmt: UpdateBase,
6644 returning_cols: Sequence[_ColumnsClauseElement],
6645 *,
6646 populate_result_map: bool,
6647 **kw: Any,
6648 ) -> str:
6649 columns = [
6650 self._label_select_column(None, c, True, False, {})
6651 for c in base._select_iterables(returning_cols)
6652 ]
6653 return "RETURNING " + ", ".join(columns)
6654
6655 def update_from_clause(
6656 self, update_stmt, from_table, extra_froms, from_hints, **kw
6657 ):
6658 kw["asfrom"] = True
6659 return "FROM " + ", ".join(
6660 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6661 for t in extra_froms
6662 )
6663
6664 def delete_extra_from_clause(
6665 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6666 ):
6667 kw["asfrom"] = True
6668 return ", " + ", ".join(
6669 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6670 for t in extra_froms
6671 )
6672
6673 def visit_empty_set_expr(self, element_types, **kw):
6674 return "SELECT 1 WHERE 1!=1"
6675
6676 def get_from_hint_text(self, table, text):
6677 return "[%s]" % text
6678
6679 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6680 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6681
6682 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6683 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6684
6685 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6686 return "<regexp replace>(%s, %s)" % (
6687 binary.left._compiler_dispatch(self, **kw),
6688 binary.right._compiler_dispatch(self, **kw),
6689 )
6690
6691 def visit_try_cast(self, cast, **kwargs):
6692 return "TRY_CAST(%s AS %s)" % (
6693 cast.clause._compiler_dispatch(self, **kwargs),
6694 cast.typeclause._compiler_dispatch(self, **kwargs),
6695 )
6696
6697
6698class DDLCompiler(Compiled):
6699 is_ddl = True
6700
6701 if TYPE_CHECKING:
6702
6703 def __init__(
6704 self,
6705 dialect: Dialect,
6706 statement: ExecutableDDLElement,
6707 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6708 render_schema_translate: bool = ...,
6709 compile_kwargs: Mapping[str, Any] = ...,
6710 ): ...
6711
6712 @util.ro_memoized_property
6713 def sql_compiler(self) -> SQLCompiler:
6714 return self.dialect.statement_compiler(
6715 self.dialect, None, schema_translate_map=self.schema_translate_map
6716 )
6717
6718 @util.memoized_property
6719 def type_compiler(self):
6720 return self.dialect.type_compiler_instance
6721
6722 def construct_params(
6723 self,
6724 params: Optional[_CoreSingleExecuteParams] = None,
6725 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6726 escape_names: bool = True,
6727 ) -> Optional[_MutableCoreSingleExecuteParams]:
6728 return None
6729
6730 def visit_ddl(self, ddl, **kwargs):
6731 # table events can substitute table and schema name
6732 context = ddl.context
6733 if isinstance(ddl.target, schema.Table):
6734 context = context.copy()
6735
6736 preparer = self.preparer
6737 path = preparer.format_table_seq(ddl.target)
6738 if len(path) == 1:
6739 table, sch = path[0], ""
6740 else:
6741 table, sch = path[-1], path[0]
6742
6743 context.setdefault("table", table)
6744 context.setdefault("schema", sch)
6745 context.setdefault("fullname", preparer.format_table(ddl.target))
6746
6747 return self.sql_compiler.post_process_text(ddl.statement % context)
6748
6749 def visit_create_schema(self, create, **kw):
6750 text = "CREATE SCHEMA "
6751 if create.if_not_exists:
6752 text += "IF NOT EXISTS "
6753 return text + self.preparer.format_schema(create.element)
6754
6755 def visit_drop_schema(self, drop, **kw):
6756 text = "DROP SCHEMA "
6757 if drop.if_exists:
6758 text += "IF EXISTS "
6759 text += self.preparer.format_schema(drop.element)
6760 if drop.cascade:
6761 text += " CASCADE"
6762 return text
6763
6764 def visit_create_table(self, create, **kw):
6765 table = create.element
6766 preparer = self.preparer
6767
6768 text = "\nCREATE "
6769 if table._prefixes:
6770 text += " ".join(table._prefixes) + " "
6771
6772 text += "TABLE "
6773 if create.if_not_exists:
6774 text += "IF NOT EXISTS "
6775
6776 text += preparer.format_table(table) + " "
6777
6778 create_table_suffix = self.create_table_suffix(table)
6779 if create_table_suffix:
6780 text += create_table_suffix + " "
6781
6782 text += "("
6783
6784 separator = "\n"
6785
6786 # if only one primary key, specify it along with the column
6787 first_pk = False
6788 for create_column in create.columns:
6789 column = create_column.element
6790 try:
6791 processed = self.process(
6792 create_column, first_pk=column.primary_key and not first_pk
6793 )
6794 if processed is not None:
6795 text += separator
6796 separator = ", \n"
6797 text += "\t" + processed
6798 if column.primary_key:
6799 first_pk = True
6800 except exc.CompileError as ce:
6801 raise exc.CompileError(
6802 "(in table '%s', column '%s'): %s"
6803 % (table.description, column.name, ce.args[0])
6804 ) from ce
6805
6806 const = self.create_table_constraints(
6807 table,
6808 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6809 )
6810 if const:
6811 text += separator + "\t" + const
6812
6813 text += "\n)%s\n\n" % self.post_create_table(table)
6814 return text
6815
6816 def visit_create_column(self, create, first_pk=False, **kw):
6817 column = create.element
6818
6819 if column.system:
6820 return None
6821
6822 text = self.get_column_specification(column, first_pk=first_pk)
6823 const = " ".join(
6824 self.process(constraint) for constraint in column.constraints
6825 )
6826 if const:
6827 text += " " + const
6828
6829 return text
6830
6831 def create_table_constraints(
6832 self, table, _include_foreign_key_constraints=None, **kw
6833 ):
6834 # On some DB order is significant: visit PK first, then the
6835 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6836 constraints = []
6837 if table.primary_key:
6838 constraints.append(table.primary_key)
6839
6840 all_fkcs = table.foreign_key_constraints
6841 if _include_foreign_key_constraints is not None:
6842 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6843 else:
6844 omit_fkcs = set()
6845
6846 constraints.extend(
6847 [
6848 c
6849 for c in table._sorted_constraints
6850 if c is not table.primary_key and c not in omit_fkcs
6851 ]
6852 )
6853
6854 return ", \n\t".join(
6855 p
6856 for p in (
6857 self.process(constraint)
6858 for constraint in constraints
6859 if (constraint._should_create_for_compiler(self))
6860 and (
6861 not self.dialect.supports_alter
6862 or not getattr(constraint, "use_alter", False)
6863 )
6864 )
6865 if p is not None
6866 )
6867
6868 def visit_drop_table(self, drop, **kw):
6869 text = "\nDROP TABLE "
6870 if drop.if_exists:
6871 text += "IF EXISTS "
6872 return text + self.preparer.format_table(drop.element)
6873
6874 def visit_drop_view(self, drop, **kw):
6875 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
6876
6877 def _verify_index_table(self, index: Index) -> None:
6878 if index.table is None:
6879 raise exc.CompileError(
6880 "Index '%s' is not associated with any table." % index.name
6881 )
6882
6883 def visit_create_index(
6884 self, create, include_schema=False, include_table_schema=True, **kw
6885 ):
6886 index = create.element
6887 self._verify_index_table(index)
6888 preparer = self.preparer
6889 text = "CREATE "
6890 if index.unique:
6891 text += "UNIQUE "
6892 if index.name is None:
6893 raise exc.CompileError(
6894 "CREATE INDEX requires that the index have a name"
6895 )
6896
6897 text += "INDEX "
6898 if create.if_not_exists:
6899 text += "IF NOT EXISTS "
6900
6901 text += "%s ON %s (%s)" % (
6902 self._prepared_index_name(index, include_schema=include_schema),
6903 preparer.format_table(
6904 index.table, use_schema=include_table_schema
6905 ),
6906 ", ".join(
6907 self.sql_compiler.process(
6908 expr, include_table=False, literal_binds=True
6909 )
6910 for expr in index.expressions
6911 ),
6912 )
6913 return text
6914
6915 def visit_drop_index(self, drop, **kw):
6916 index = drop.element
6917
6918 if index.name is None:
6919 raise exc.CompileError(
6920 "DROP INDEX requires that the index have a name"
6921 )
6922 text = "\nDROP INDEX "
6923 if drop.if_exists:
6924 text += "IF EXISTS "
6925
6926 return text + self._prepared_index_name(index, include_schema=True)
6927
6928 def _prepared_index_name(
6929 self, index: Index, include_schema: bool = False
6930 ) -> str:
6931 if index.table is not None:
6932 effective_schema = self.preparer.schema_for_object(index.table)
6933 else:
6934 effective_schema = None
6935 if include_schema and effective_schema:
6936 schema_name = self.preparer.quote_schema(effective_schema)
6937 else:
6938 schema_name = None
6939
6940 index_name: str = self.preparer.format_index(index)
6941
6942 if schema_name:
6943 index_name = schema_name + "." + index_name
6944 return index_name
6945
6946 def visit_add_constraint(self, create, **kw):
6947 return "ALTER TABLE %s ADD %s" % (
6948 self.preparer.format_table(create.element.table),
6949 self.process(create.element),
6950 )
6951
6952 def visit_set_table_comment(self, create, **kw):
6953 return "COMMENT ON TABLE %s IS %s" % (
6954 self.preparer.format_table(create.element),
6955 self.sql_compiler.render_literal_value(
6956 create.element.comment, sqltypes.String()
6957 ),
6958 )
6959
6960 def visit_drop_table_comment(self, drop, **kw):
6961 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
6962 drop.element
6963 )
6964
6965 def visit_set_column_comment(self, create, **kw):
6966 return "COMMENT ON COLUMN %s IS %s" % (
6967 self.preparer.format_column(
6968 create.element, use_table=True, use_schema=True
6969 ),
6970 self.sql_compiler.render_literal_value(
6971 create.element.comment, sqltypes.String()
6972 ),
6973 )
6974
6975 def visit_drop_column_comment(self, drop, **kw):
6976 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
6977 drop.element, use_table=True
6978 )
6979
6980 def visit_set_constraint_comment(self, create, **kw):
6981 raise exc.UnsupportedCompilationError(self, type(create))
6982
6983 def visit_drop_constraint_comment(self, drop, **kw):
6984 raise exc.UnsupportedCompilationError(self, type(drop))
6985
6986 def get_identity_options(self, identity_options):
6987 text = []
6988 if identity_options.increment is not None:
6989 text.append("INCREMENT BY %d" % identity_options.increment)
6990 if identity_options.start is not None:
6991 text.append("START WITH %d" % identity_options.start)
6992 if identity_options.minvalue is not None:
6993 text.append("MINVALUE %d" % identity_options.minvalue)
6994 if identity_options.maxvalue is not None:
6995 text.append("MAXVALUE %d" % identity_options.maxvalue)
6996 if identity_options.nominvalue is not None:
6997 text.append("NO MINVALUE")
6998 if identity_options.nomaxvalue is not None:
6999 text.append("NO MAXVALUE")
7000 if identity_options.cache is not None:
7001 text.append("CACHE %d" % identity_options.cache)
7002 if identity_options.cycle is not None:
7003 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7004 return " ".join(text)
7005
7006 def visit_create_sequence(self, create, prefix=None, **kw):
7007 text = "CREATE SEQUENCE "
7008 if create.if_not_exists:
7009 text += "IF NOT EXISTS "
7010 text += self.preparer.format_sequence(create.element)
7011
7012 if prefix:
7013 text += prefix
7014 options = self.get_identity_options(create.element)
7015 if options:
7016 text += " " + options
7017 return text
7018
7019 def visit_drop_sequence(self, drop, **kw):
7020 text = "DROP SEQUENCE "
7021 if drop.if_exists:
7022 text += "IF EXISTS "
7023 return text + self.preparer.format_sequence(drop.element)
7024
7025 def visit_drop_constraint(self, drop, **kw):
7026 constraint = drop.element
7027 if constraint.name is not None:
7028 formatted_name = self.preparer.format_constraint(constraint)
7029 else:
7030 formatted_name = None
7031
7032 if formatted_name is None:
7033 raise exc.CompileError(
7034 "Can't emit DROP CONSTRAINT for constraint %r; "
7035 "it has no name" % drop.element
7036 )
7037 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7038 self.preparer.format_table(drop.element.table),
7039 "IF EXISTS " if drop.if_exists else "",
7040 formatted_name,
7041 " CASCADE" if drop.cascade else "",
7042 )
7043
7044 def get_column_specification(self, column, **kwargs):
7045 colspec = (
7046 self.preparer.format_column(column)
7047 + " "
7048 + self.dialect.type_compiler_instance.process(
7049 column.type, type_expression=column
7050 )
7051 )
7052 default = self.get_column_default_string(column)
7053 if default is not None:
7054 colspec += " DEFAULT " + default
7055
7056 if column.computed is not None:
7057 colspec += " " + self.process(column.computed)
7058
7059 if (
7060 column.identity is not None
7061 and self.dialect.supports_identity_columns
7062 ):
7063 colspec += " " + self.process(column.identity)
7064
7065 if not column.nullable and (
7066 not column.identity or not self.dialect.supports_identity_columns
7067 ):
7068 colspec += " NOT NULL"
7069 return colspec
7070
7071 def create_table_suffix(self, table):
7072 return ""
7073
7074 def post_create_table(self, table):
7075 return ""
7076
7077 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7078 if isinstance(column.server_default, schema.DefaultClause):
7079 return self.render_default_string(column.server_default.arg)
7080 else:
7081 return None
7082
7083 def render_default_string(self, default: Union[Visitable, str]) -> str:
7084 if isinstance(default, str):
7085 return self.sql_compiler.render_literal_value(
7086 default, sqltypes.STRINGTYPE
7087 )
7088 else:
7089 return self.sql_compiler.process(default, literal_binds=True)
7090
7091 def visit_table_or_column_check_constraint(self, constraint, **kw):
7092 if constraint.is_column_level:
7093 return self.visit_column_check_constraint(constraint)
7094 else:
7095 return self.visit_check_constraint(constraint)
7096
7097 def visit_check_constraint(self, constraint, **kw):
7098 text = self.define_constraint_preamble(constraint, **kw)
7099 text += self.define_check_body(constraint, **kw)
7100 text += self.define_constraint_deferrability(constraint)
7101 return text
7102
7103 def visit_column_check_constraint(self, constraint, **kw):
7104 text = self.define_constraint_preamble(constraint, **kw)
7105 text += self.define_check_body(constraint, **kw)
7106 text += self.define_constraint_deferrability(constraint)
7107 return text
7108
7109 def visit_primary_key_constraint(
7110 self, constraint: PrimaryKeyConstraint, **kw: Any
7111 ) -> str:
7112 if len(constraint) == 0:
7113 return ""
7114 text = self.define_constraint_preamble(constraint, **kw)
7115 text += self.define_primary_key_body(constraint, **kw)
7116 text += self.define_constraint_deferrability(constraint)
7117 return text
7118
7119 def visit_foreign_key_constraint(
7120 self, constraint: ForeignKeyConstraint, **kw: Any
7121 ) -> str:
7122 text = self.define_constraint_preamble(constraint, **kw)
7123 text += self.define_foreign_key_body(constraint, **kw)
7124 text += self.define_constraint_match(constraint)
7125 text += self.define_constraint_cascades(constraint)
7126 text += self.define_constraint_deferrability(constraint)
7127 return text
7128
7129 def define_constraint_remote_table(self, constraint, table, preparer):
7130 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7131
7132 return preparer.format_table(table)
7133
7134 def visit_unique_constraint(
7135 self, constraint: UniqueConstraint, **kw: Any
7136 ) -> str:
7137 if len(constraint) == 0:
7138 return ""
7139 text = self.define_constraint_preamble(constraint, **kw)
7140 text += self.define_unique_body(constraint, **kw)
7141 text += self.define_constraint_deferrability(constraint)
7142 return text
7143
7144 def define_constraint_preamble(
7145 self, constraint: Constraint, **kw: Any
7146 ) -> str:
7147 text = ""
7148 if constraint.name is not None:
7149 formatted_name = self.preparer.format_constraint(constraint)
7150 if formatted_name is not None:
7151 text += "CONSTRAINT %s " % formatted_name
7152 return text
7153
7154 def define_primary_key_body(
7155 self, constraint: PrimaryKeyConstraint, **kw: Any
7156 ) -> str:
7157 text = ""
7158 text += "PRIMARY KEY "
7159 text += "(%s)" % ", ".join(
7160 self.preparer.quote(c.name)
7161 for c in (
7162 constraint.columns_autoinc_first
7163 if constraint._implicit_generated
7164 else constraint.columns
7165 )
7166 )
7167 return text
7168
7169 def define_foreign_key_body(
7170 self, constraint: ForeignKeyConstraint, **kw: Any
7171 ) -> str:
7172 preparer = self.preparer
7173 remote_table = list(constraint.elements)[0].column.table
7174 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7175 ", ".join(
7176 preparer.quote(f.parent.name) for f in constraint.elements
7177 ),
7178 self.define_constraint_remote_table(
7179 constraint, remote_table, preparer
7180 ),
7181 ", ".join(
7182 preparer.quote(f.column.name) for f in constraint.elements
7183 ),
7184 )
7185 return text
7186
7187 def define_unique_body(
7188 self, constraint: UniqueConstraint, **kw: Any
7189 ) -> str:
7190 text = "UNIQUE %s(%s)" % (
7191 self.define_unique_constraint_distinct(constraint, **kw),
7192 ", ".join(self.preparer.quote(c.name) for c in constraint),
7193 )
7194 return text
7195
7196 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7197 text = "CHECK (%s)" % self.sql_compiler.process(
7198 constraint.sqltext, include_table=False, literal_binds=True
7199 )
7200 return text
7201
7202 def define_unique_constraint_distinct(
7203 self, constraint: UniqueConstraint, **kw: Any
7204 ) -> str:
7205 return ""
7206
7207 def define_constraint_cascades(
7208 self, constraint: ForeignKeyConstraint
7209 ) -> str:
7210 text = ""
7211 if constraint.ondelete is not None:
7212 text += self.define_constraint_ondelete_cascade(constraint)
7213
7214 if constraint.onupdate is not None:
7215 text += self.define_constraint_onupdate_cascade(constraint)
7216 return text
7217
7218 def define_constraint_ondelete_cascade(
7219 self, constraint: ForeignKeyConstraint
7220 ) -> str:
7221 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7222 constraint.ondelete, FK_ON_DELETE
7223 )
7224
7225 def define_constraint_onupdate_cascade(
7226 self, constraint: ForeignKeyConstraint
7227 ) -> str:
7228 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7229 constraint.onupdate, FK_ON_UPDATE
7230 )
7231
7232 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7233 text = ""
7234 if constraint.deferrable is not None:
7235 if constraint.deferrable:
7236 text += " DEFERRABLE"
7237 else:
7238 text += " NOT DEFERRABLE"
7239 if constraint.initially is not None:
7240 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7241 constraint.initially, FK_INITIALLY
7242 )
7243 return text
7244
7245 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7246 text = ""
7247 if constraint.match is not None:
7248 text += " MATCH %s" % constraint.match
7249 return text
7250
7251 def visit_computed_column(self, generated, **kw):
7252 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7253 generated.sqltext, include_table=False, literal_binds=True
7254 )
7255 if generated.persisted is True:
7256 text += " STORED"
7257 elif generated.persisted is False:
7258 text += " VIRTUAL"
7259 return text
7260
7261 def visit_identity_column(self, identity, **kw):
7262 text = "GENERATED %s AS IDENTITY" % (
7263 "ALWAYS" if identity.always else "BY DEFAULT",
7264 )
7265 options = self.get_identity_options(identity)
7266 if options:
7267 text += " (%s)" % options
7268 return text
7269
7270
7271class GenericTypeCompiler(TypeCompiler):
7272 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7273 return "FLOAT"
7274
7275 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7276 return "DOUBLE"
7277
7278 def visit_DOUBLE_PRECISION(
7279 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7280 ) -> str:
7281 return "DOUBLE PRECISION"
7282
7283 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7284 return "REAL"
7285
7286 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7287 if type_.precision is None:
7288 return "NUMERIC"
7289 elif type_.scale is None:
7290 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7291 else:
7292 return "NUMERIC(%(precision)s, %(scale)s)" % {
7293 "precision": type_.precision,
7294 "scale": type_.scale,
7295 }
7296
7297 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7298 if type_.precision is None:
7299 return "DECIMAL"
7300 elif type_.scale is None:
7301 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7302 else:
7303 return "DECIMAL(%(precision)s, %(scale)s)" % {
7304 "precision": type_.precision,
7305 "scale": type_.scale,
7306 }
7307
7308 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7309 return "INTEGER"
7310
7311 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7312 return "SMALLINT"
7313
7314 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7315 return "BIGINT"
7316
7317 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7318 return "TIMESTAMP"
7319
7320 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7321 return "DATETIME"
7322
7323 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7324 return "DATE"
7325
7326 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7327 return "TIME"
7328
7329 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7330 return "CLOB"
7331
7332 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7333 return "NCLOB"
7334
7335 def _render_string_type(
7336 self, name: str, length: Optional[int], collation: Optional[str]
7337 ) -> str:
7338 text = name
7339 if length:
7340 text += f"({length})"
7341 if collation:
7342 text += f' COLLATE "{collation}"'
7343 return text
7344
7345 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7346 return self._render_string_type("CHAR", type_.length, type_.collation)
7347
7348 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7349 return self._render_string_type("NCHAR", type_.length, type_.collation)
7350
7351 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7352 return self._render_string_type(
7353 "VARCHAR", type_.length, type_.collation
7354 )
7355
7356 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7357 return self._render_string_type(
7358 "NVARCHAR", type_.length, type_.collation
7359 )
7360
7361 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7362 return self._render_string_type("TEXT", type_.length, type_.collation)
7363
7364 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7365 return "UUID"
7366
7367 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7368 return "BLOB"
7369
7370 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7371 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7372
7373 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7374 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7375
7376 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7377 return "BOOLEAN"
7378
7379 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7380 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7381 return self._render_string_type("CHAR", length=32, collation=None)
7382 else:
7383 return self.visit_UUID(type_, **kw)
7384
7385 def visit_large_binary(
7386 self, type_: sqltypes.LargeBinary, **kw: Any
7387 ) -> str:
7388 return self.visit_BLOB(type_, **kw)
7389
7390 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7391 return self.visit_BOOLEAN(type_, **kw)
7392
7393 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7394 return self.visit_TIME(type_, **kw)
7395
7396 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7397 return self.visit_DATETIME(type_, **kw)
7398
7399 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7400 return self.visit_DATE(type_, **kw)
7401
7402 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7403 return self.visit_BIGINT(type_, **kw)
7404
7405 def visit_small_integer(
7406 self, type_: sqltypes.SmallInteger, **kw: Any
7407 ) -> str:
7408 return self.visit_SMALLINT(type_, **kw)
7409
7410 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7411 return self.visit_INTEGER(type_, **kw)
7412
7413 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7414 return self.visit_REAL(type_, **kw)
7415
7416 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7417 return self.visit_FLOAT(type_, **kw)
7418
7419 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7420 return self.visit_DOUBLE(type_, **kw)
7421
7422 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7423 return self.visit_NUMERIC(type_, **kw)
7424
7425 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7426 return self.visit_VARCHAR(type_, **kw)
7427
7428 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7429 return self.visit_VARCHAR(type_, **kw)
7430
7431 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7432 return self.visit_TEXT(type_, **kw)
7433
7434 def visit_unicode_text(
7435 self, type_: sqltypes.UnicodeText, **kw: Any
7436 ) -> str:
7437 return self.visit_TEXT(type_, **kw)
7438
7439 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7440 return self.visit_VARCHAR(type_, **kw)
7441
7442 def visit_null(self, type_, **kw):
7443 raise exc.CompileError(
7444 "Can't generate DDL for %r; "
7445 "did you forget to specify a "
7446 "type on this Column?" % type_
7447 )
7448
7449 def visit_type_decorator(
7450 self, type_: TypeDecorator[Any], **kw: Any
7451 ) -> str:
7452 return self.process(type_.type_engine(self.dialect), **kw)
7453
7454 def visit_user_defined(
7455 self, type_: UserDefinedType[Any], **kw: Any
7456 ) -> str:
7457 return type_.get_col_spec(**kw)
7458
7459
7460class StrSQLTypeCompiler(GenericTypeCompiler):
7461 def process(self, type_, **kw):
7462 try:
7463 _compiler_dispatch = type_._compiler_dispatch
7464 except AttributeError:
7465 return self._visit_unknown(type_, **kw)
7466 else:
7467 return _compiler_dispatch(self, **kw)
7468
7469 def __getattr__(self, key):
7470 if key.startswith("visit_"):
7471 return self._visit_unknown
7472 else:
7473 raise AttributeError(key)
7474
7475 def _visit_unknown(self, type_, **kw):
7476 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7477 return type_.__class__.__name__
7478 else:
7479 return repr(type_)
7480
7481 def visit_null(self, type_, **kw):
7482 return "NULL"
7483
7484 def visit_user_defined(self, type_, **kw):
7485 try:
7486 get_col_spec = type_.get_col_spec
7487 except AttributeError:
7488 return repr(type_)
7489 else:
7490 return get_col_spec(**kw)
7491
7492
7493class _SchemaForObjectCallable(Protocol):
7494 def __call__(self, __obj: Any) -> str: ...
7495
7496
7497class _BindNameForColProtocol(Protocol):
7498 def __call__(self, col: ColumnClause[Any]) -> str: ...
7499
7500
7501class IdentifierPreparer:
7502 """Handle quoting and case-folding of identifiers based on options."""
7503
7504 reserved_words = RESERVED_WORDS
7505
7506 legal_characters = LEGAL_CHARACTERS
7507
7508 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7509
7510 initial_quote: str
7511
7512 final_quote: str
7513
7514 _strings: MutableMapping[str, str]
7515
7516 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7517 """Return the .schema attribute for an object.
7518
7519 For the default IdentifierPreparer, the schema for an object is always
7520 the value of the ".schema" attribute. if the preparer is replaced
7521 with one that has a non-empty schema_translate_map, the value of the
7522 ".schema" attribute is rendered a symbol that will be converted to a
7523 real schema name from the mapping post-compile.
7524
7525 """
7526
7527 _includes_none_schema_translate: bool = False
7528
7529 def __init__(
7530 self,
7531 dialect: Dialect,
7532 initial_quote: str = '"',
7533 final_quote: Optional[str] = None,
7534 escape_quote: str = '"',
7535 quote_case_sensitive_collations: bool = True,
7536 omit_schema: bool = False,
7537 ):
7538 """Construct a new ``IdentifierPreparer`` object.
7539
7540 initial_quote
7541 Character that begins a delimited identifier.
7542
7543 final_quote
7544 Character that ends a delimited identifier. Defaults to
7545 `initial_quote`.
7546
7547 omit_schema
7548 Prevent prepending schema name. Useful for databases that do
7549 not support schemae.
7550 """
7551
7552 self.dialect = dialect
7553 self.initial_quote = initial_quote
7554 self.final_quote = final_quote or self.initial_quote
7555 self.escape_quote = escape_quote
7556 self.escape_to_quote = self.escape_quote * 2
7557 self.omit_schema = omit_schema
7558 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7559 self._strings = {}
7560 self._double_percents = self.dialect.paramstyle in (
7561 "format",
7562 "pyformat",
7563 )
7564
7565 def _with_schema_translate(self, schema_translate_map):
7566 prep = self.__class__.__new__(self.__class__)
7567 prep.__dict__.update(self.__dict__)
7568
7569 includes_none = None in schema_translate_map
7570
7571 def symbol_getter(obj):
7572 name = obj.schema
7573 if obj._use_schema_map and (name is not None or includes_none):
7574 if name is not None and ("[" in name or "]" in name):
7575 raise exc.CompileError(
7576 "Square bracket characters ([]) not supported "
7577 "in schema translate name '%s'" % name
7578 )
7579 return quoted_name(
7580 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7581 )
7582 else:
7583 return obj.schema
7584
7585 prep.schema_for_object = symbol_getter
7586 prep._includes_none_schema_translate = includes_none
7587 return prep
7588
7589 def _render_schema_translates(
7590 self, statement: str, schema_translate_map: SchemaTranslateMapType
7591 ) -> str:
7592 d = schema_translate_map
7593 if None in d:
7594 if not self._includes_none_schema_translate:
7595 raise exc.InvalidRequestError(
7596 "schema translate map which previously did not have "
7597 "`None` present as a key now has `None` present; compiled "
7598 "statement may lack adequate placeholders. Please use "
7599 "consistent keys in successive "
7600 "schema_translate_map dictionaries."
7601 )
7602
7603 d["_none"] = d[None] # type: ignore[index]
7604
7605 def replace(m):
7606 name = m.group(2)
7607 if name in d:
7608 effective_schema = d[name]
7609 else:
7610 if name in (None, "_none"):
7611 raise exc.InvalidRequestError(
7612 "schema translate map which previously had `None` "
7613 "present as a key now no longer has it present; don't "
7614 "know how to apply schema for compiled statement. "
7615 "Please use consistent keys in successive "
7616 "schema_translate_map dictionaries."
7617 )
7618 effective_schema = name
7619
7620 if not effective_schema:
7621 effective_schema = self.dialect.default_schema_name
7622 if not effective_schema:
7623 # TODO: no coverage here
7624 raise exc.CompileError(
7625 "Dialect has no default schema name; can't "
7626 "use None as dynamic schema target."
7627 )
7628 return self.quote_schema(effective_schema)
7629
7630 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7631
7632 def _escape_identifier(self, value: str) -> str:
7633 """Escape an identifier.
7634
7635 Subclasses should override this to provide database-dependent
7636 escaping behavior.
7637 """
7638
7639 value = value.replace(self.escape_quote, self.escape_to_quote)
7640 if self._double_percents:
7641 value = value.replace("%", "%%")
7642 return value
7643
7644 def _unescape_identifier(self, value: str) -> str:
7645 """Canonicalize an escaped identifier.
7646
7647 Subclasses should override this to provide database-dependent
7648 unescaping behavior that reverses _escape_identifier.
7649 """
7650
7651 return value.replace(self.escape_to_quote, self.escape_quote)
7652
7653 def validate_sql_phrase(self, element, reg):
7654 """keyword sequence filter.
7655
7656 a filter for elements that are intended to represent keyword sequences,
7657 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7658 should be present.
7659
7660 .. versionadded:: 1.3
7661
7662 """
7663
7664 if element is not None and not reg.match(element):
7665 raise exc.CompileError(
7666 "Unexpected SQL phrase: %r (matching against %r)"
7667 % (element, reg.pattern)
7668 )
7669 return element
7670
7671 def quote_identifier(self, value: str) -> str:
7672 """Quote an identifier.
7673
7674 Subclasses should override this to provide database-dependent
7675 quoting behavior.
7676 """
7677
7678 return (
7679 self.initial_quote
7680 + self._escape_identifier(value)
7681 + self.final_quote
7682 )
7683
7684 def _requires_quotes(self, value: str) -> bool:
7685 """Return True if the given identifier requires quoting."""
7686 lc_value = value.lower()
7687 return (
7688 lc_value in self.reserved_words
7689 or value[0] in self.illegal_initial_characters
7690 or not self.legal_characters.match(str(value))
7691 or (lc_value != value)
7692 )
7693
7694 def _requires_quotes_illegal_chars(self, value):
7695 """Return True if the given identifier requires quoting, but
7696 not taking case convention into account."""
7697 return not self.legal_characters.match(str(value))
7698
7699 def quote_schema(self, schema: str, force: Any = None) -> str:
7700 """Conditionally quote a schema name.
7701
7702
7703 The name is quoted if it is a reserved word, contains quote-necessary
7704 characters, or is an instance of :class:`.quoted_name` which includes
7705 ``quote`` set to ``True``.
7706
7707 Subclasses can override this to provide database-dependent
7708 quoting behavior for schema names.
7709
7710 :param schema: string schema name
7711 :param force: unused
7712
7713 .. deprecated:: 0.9
7714
7715 The :paramref:`.IdentifierPreparer.quote_schema.force`
7716 parameter is deprecated and will be removed in a future
7717 release. This flag has no effect on the behavior of the
7718 :meth:`.IdentifierPreparer.quote` method; please refer to
7719 :class:`.quoted_name`.
7720
7721 """
7722 if force is not None:
7723 # not using the util.deprecated_params() decorator in this
7724 # case because of the additional function call overhead on this
7725 # very performance-critical spot.
7726 util.warn_deprecated(
7727 "The IdentifierPreparer.quote_schema.force parameter is "
7728 "deprecated and will be removed in a future release. This "
7729 "flag has no effect on the behavior of the "
7730 "IdentifierPreparer.quote method; please refer to "
7731 "quoted_name().",
7732 # deprecated 0.9. warning from 1.3
7733 version="0.9",
7734 )
7735
7736 return self.quote(schema)
7737
7738 def quote(self, ident: str, force: Any = None) -> str:
7739 """Conditionally quote an identifier.
7740
7741 The identifier is quoted if it is a reserved word, contains
7742 quote-necessary characters, or is an instance of
7743 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7744
7745 Subclasses can override this to provide database-dependent
7746 quoting behavior for identifier names.
7747
7748 :param ident: string identifier
7749 :param force: unused
7750
7751 .. deprecated:: 0.9
7752
7753 The :paramref:`.IdentifierPreparer.quote.force`
7754 parameter is deprecated and will be removed in a future
7755 release. This flag has no effect on the behavior of the
7756 :meth:`.IdentifierPreparer.quote` method; please refer to
7757 :class:`.quoted_name`.
7758
7759 """
7760 if force is not None:
7761 # not using the util.deprecated_params() decorator in this
7762 # case because of the additional function call overhead on this
7763 # very performance-critical spot.
7764 util.warn_deprecated(
7765 "The IdentifierPreparer.quote.force parameter is "
7766 "deprecated and will be removed in a future release. This "
7767 "flag has no effect on the behavior of the "
7768 "IdentifierPreparer.quote method; please refer to "
7769 "quoted_name().",
7770 # deprecated 0.9. warning from 1.3
7771 version="0.9",
7772 )
7773
7774 force = getattr(ident, "quote", None)
7775
7776 if force is None:
7777 if ident in self._strings:
7778 return self._strings[ident]
7779 else:
7780 if self._requires_quotes(ident):
7781 self._strings[ident] = self.quote_identifier(ident)
7782 else:
7783 self._strings[ident] = ident
7784 return self._strings[ident]
7785 elif force:
7786 return self.quote_identifier(ident)
7787 else:
7788 return ident
7789
7790 def format_collation(self, collation_name):
7791 if self.quote_case_sensitive_collations:
7792 return self.quote(collation_name)
7793 else:
7794 return collation_name
7795
7796 def format_sequence(
7797 self, sequence: schema.Sequence, use_schema: bool = True
7798 ) -> str:
7799 name = self.quote(sequence.name)
7800
7801 effective_schema = self.schema_for_object(sequence)
7802
7803 if (
7804 not self.omit_schema
7805 and use_schema
7806 and effective_schema is not None
7807 ):
7808 name = self.quote_schema(effective_schema) + "." + name
7809 return name
7810
7811 def format_label(
7812 self, label: Label[Any], name: Optional[str] = None
7813 ) -> str:
7814 return self.quote(name or label.name)
7815
7816 def format_alias(
7817 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7818 ) -> str:
7819 if name is None:
7820 assert alias is not None
7821 return self.quote(alias.name)
7822 else:
7823 return self.quote(name)
7824
7825 def format_savepoint(self, savepoint, name=None):
7826 # Running the savepoint name through quoting is unnecessary
7827 # for all known dialects. This is here to support potential
7828 # third party use cases
7829 ident = name or savepoint.ident
7830 if self._requires_quotes(ident):
7831 ident = self.quote_identifier(ident)
7832 return ident
7833
7834 @util.preload_module("sqlalchemy.sql.naming")
7835 def format_constraint(
7836 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7837 ) -> Optional[str]:
7838 naming = util.preloaded.sql_naming
7839
7840 if constraint.name is _NONE_NAME:
7841 name = naming._constraint_name_for_table(
7842 constraint, constraint.table
7843 )
7844
7845 if name is None:
7846 return None
7847 else:
7848 name = constraint.name
7849
7850 assert name is not None
7851 if constraint.__visit_name__ == "index":
7852 return self.truncate_and_render_index_name(
7853 name, _alembic_quote=_alembic_quote
7854 )
7855 else:
7856 return self.truncate_and_render_constraint_name(
7857 name, _alembic_quote=_alembic_quote
7858 )
7859
7860 def truncate_and_render_index_name(
7861 self, name: str, _alembic_quote: bool = True
7862 ) -> str:
7863 # calculate these at format time so that ad-hoc changes
7864 # to dialect.max_identifier_length etc. can be reflected
7865 # as IdentifierPreparer is long lived
7866 max_ = (
7867 self.dialect.max_index_name_length
7868 or self.dialect.max_identifier_length
7869 )
7870 return self._truncate_and_render_maxlen_name(
7871 name, max_, _alembic_quote
7872 )
7873
7874 def truncate_and_render_constraint_name(
7875 self, name: str, _alembic_quote: bool = True
7876 ) -> str:
7877 # calculate these at format time so that ad-hoc changes
7878 # to dialect.max_identifier_length etc. can be reflected
7879 # as IdentifierPreparer is long lived
7880 max_ = (
7881 self.dialect.max_constraint_name_length
7882 or self.dialect.max_identifier_length
7883 )
7884 return self._truncate_and_render_maxlen_name(
7885 name, max_, _alembic_quote
7886 )
7887
7888 def _truncate_and_render_maxlen_name(
7889 self, name: str, max_: int, _alembic_quote: bool
7890 ) -> str:
7891 if isinstance(name, elements._truncated_label):
7892 if len(name) > max_:
7893 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7894 else:
7895 self.dialect.validate_identifier(name)
7896
7897 if not _alembic_quote:
7898 return name
7899 else:
7900 return self.quote(name)
7901
7902 def format_index(self, index: Index) -> str:
7903 name = self.format_constraint(index)
7904 assert name is not None
7905 return name
7906
7907 def format_table(
7908 self,
7909 table: FromClause,
7910 use_schema: bool = True,
7911 name: Optional[str] = None,
7912 ) -> str:
7913 """Prepare a quoted table and schema name."""
7914 if name is None:
7915 if TYPE_CHECKING:
7916 assert isinstance(table, NamedFromClause)
7917 name = table.name
7918
7919 result = self.quote(name)
7920
7921 effective_schema = self.schema_for_object(table)
7922
7923 if not self.omit_schema and use_schema and effective_schema:
7924 result = self.quote_schema(effective_schema) + "." + result
7925 return result
7926
7927 def format_schema(self, name):
7928 """Prepare a quoted schema name."""
7929
7930 return self.quote(name)
7931
7932 def format_label_name(
7933 self,
7934 name,
7935 anon_map=None,
7936 ):
7937 """Prepare a quoted column name."""
7938
7939 if anon_map is not None and isinstance(
7940 name, elements._truncated_label
7941 ):
7942 name = name.apply_map(anon_map)
7943
7944 return self.quote(name)
7945
7946 def format_column(
7947 self,
7948 column: ColumnElement[Any],
7949 use_table: bool = False,
7950 name: Optional[str] = None,
7951 table_name: Optional[str] = None,
7952 use_schema: bool = False,
7953 anon_map: Optional[Mapping[str, Any]] = None,
7954 ) -> str:
7955 """Prepare a quoted column name."""
7956
7957 if name is None:
7958 name = column.name
7959 assert name is not None
7960
7961 if anon_map is not None and isinstance(
7962 name, elements._truncated_label
7963 ):
7964 name = name.apply_map(anon_map)
7965
7966 if not getattr(column, "is_literal", False):
7967 if use_table:
7968 return (
7969 self.format_table(
7970 column.table, use_schema=use_schema, name=table_name
7971 )
7972 + "."
7973 + self.quote(name)
7974 )
7975 else:
7976 return self.quote(name)
7977 else:
7978 # literal textual elements get stuck into ColumnClause a lot,
7979 # which shouldn't get quoted
7980
7981 if use_table:
7982 return (
7983 self.format_table(
7984 column.table, use_schema=use_schema, name=table_name
7985 )
7986 + "."
7987 + name
7988 )
7989 else:
7990 return name
7991
7992 def format_table_seq(self, table, use_schema=True):
7993 """Format table name and schema as a tuple."""
7994
7995 # Dialects with more levels in their fully qualified references
7996 # ('database', 'owner', etc.) could override this and return
7997 # a longer sequence.
7998
7999 effective_schema = self.schema_for_object(table)
8000
8001 if not self.omit_schema and use_schema and effective_schema:
8002 return (
8003 self.quote_schema(effective_schema),
8004 self.format_table(table, use_schema=False),
8005 )
8006 else:
8007 return (self.format_table(table, use_schema=False),)
8008
8009 @util.memoized_property
8010 def _r_identifiers(self):
8011 initial, final, escaped_final = (
8012 re.escape(s)
8013 for s in (
8014 self.initial_quote,
8015 self.final_quote,
8016 self._escape_identifier(self.final_quote),
8017 )
8018 )
8019 r = re.compile(
8020 r"(?:"
8021 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8022 r"|([^\.]+))(?=\.|$))+"
8023 % {"initial": initial, "final": final, "escaped": escaped_final}
8024 )
8025 return r
8026
8027 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8028 """Unpack 'schema.table.column'-like strings into components."""
8029
8030 r = self._r_identifiers
8031 return [
8032 self._unescape_identifier(i)
8033 for i in [a or b for a, b in r.findall(identifiers)]
8034 ]