1# sql/compiler.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import Final
44from typing import FrozenSet
45from typing import Iterable
46from typing import Iterator
47from typing import List
48from typing import Literal
49from typing import Mapping
50from typing import MutableMapping
51from typing import NamedTuple
52from typing import NoReturn
53from typing import Optional
54from typing import Pattern
55from typing import Protocol
56from typing import Sequence
57from typing import Set
58from typing import Tuple
59from typing import Type
60from typing import TYPE_CHECKING
61from typing import TypedDict
62from typing import Union
63
64from . import base
65from . import coercions
66from . import crud
67from . import elements
68from . import functions
69from . import operators
70from . import roles
71from . import schema
72from . import selectable
73from . import sqltypes
74from . import util as sql_util
75from ._typing import is_column_element
76from ._typing import is_dml
77from .base import _de_clone
78from .base import _from_objects
79from .base import _NONE_NAME
80from .base import _SentinelDefaultCharacterization
81from .base import NO_ARG
82from .elements import quoted_name
83from .sqltypes import TupleType
84from .visitors import prefix_anon_map
85from .. import exc
86from .. import util
87from ..util import FastIntFlag
88from ..util.typing import Self
89from ..util.typing import TupleAny
90from ..util.typing import Unpack
91
92if typing.TYPE_CHECKING:
93 from .annotation import _AnnotationDict
94 from .base import _AmbiguousTableNameMap
95 from .base import CompileState
96 from .base import Executable
97 from .base import ExecutableStatement
98 from .cache_key import CacheKey
99 from .ddl import _TableViaSelect
100 from .ddl import CreateTableAs
101 from .ddl import CreateView
102 from .ddl import ExecutableDDLElement
103 from .dml import Delete
104 from .dml import Insert
105 from .dml import Update
106 from .dml import UpdateBase
107 from .dml import UpdateDMLState
108 from .dml import ValuesBase
109 from .elements import _truncated_label
110 from .elements import BinaryExpression
111 from .elements import BindParameter
112 from .elements import ClauseElement
113 from .elements import ColumnClause
114 from .elements import ColumnElement
115 from .elements import False_
116 from .elements import Label
117 from .elements import Null
118 from .elements import True_
119 from .functions import Function
120 from .schema import Column
121 from .schema import Constraint
122 from .schema import ForeignKeyConstraint
123 from .schema import Index
124 from .schema import PrimaryKeyConstraint
125 from .schema import Table
126 from .schema import UniqueConstraint
127 from .selectable import _ColumnsClauseElement
128 from .selectable import AliasedReturnsRows
129 from .selectable import CompoundSelectState
130 from .selectable import CTE
131 from .selectable import FromClause
132 from .selectable import NamedFromClause
133 from .selectable import ReturnsRows
134 from .selectable import Select
135 from .selectable import SelectState
136 from .type_api import _BindProcessorType
137 from .type_api import TypeDecorator
138 from .type_api import TypeEngine
139 from .type_api import UserDefinedType
140 from .visitors import Visitable
141 from ..engine.cursor import CursorResultMetaData
142 from ..engine.interfaces import _CoreSingleExecuteParams
143 from ..engine.interfaces import _DBAPIAnyExecuteParams
144 from ..engine.interfaces import _DBAPIMultiExecuteParams
145 from ..engine.interfaces import _DBAPISingleExecuteParams
146 from ..engine.interfaces import _ExecuteOptions
147 from ..engine.interfaces import _GenericSetInputSizesType
148 from ..engine.interfaces import _MutableCoreSingleExecuteParams
149 from ..engine.interfaces import Dialect
150 from ..engine.interfaces import SchemaTranslateMapType
151
152
153_FromHintsType = Dict["FromClause", str]
154
155RESERVED_WORDS = {
156 "all",
157 "analyse",
158 "analyze",
159 "and",
160 "any",
161 "array",
162 "as",
163 "asc",
164 "asymmetric",
165 "authorization",
166 "between",
167 "binary",
168 "both",
169 "case",
170 "cast",
171 "check",
172 "collate",
173 "column",
174 "constraint",
175 "create",
176 "cross",
177 "current_date",
178 "current_role",
179 "current_time",
180 "current_timestamp",
181 "current_user",
182 "default",
183 "deferrable",
184 "desc",
185 "distinct",
186 "do",
187 "else",
188 "end",
189 "except",
190 "false",
191 "for",
192 "foreign",
193 "freeze",
194 "from",
195 "full",
196 "grant",
197 "group",
198 "having",
199 "ilike",
200 "in",
201 "initially",
202 "inner",
203 "intersect",
204 "into",
205 "is",
206 "isnull",
207 "join",
208 "leading",
209 "left",
210 "like",
211 "limit",
212 "localtime",
213 "localtimestamp",
214 "natural",
215 "new",
216 "not",
217 "notnull",
218 "null",
219 "off",
220 "offset",
221 "old",
222 "on",
223 "only",
224 "or",
225 "order",
226 "outer",
227 "overlaps",
228 "placing",
229 "primary",
230 "references",
231 "right",
232 "select",
233 "session_user",
234 "set",
235 "similar",
236 "some",
237 "symmetric",
238 "table",
239 "then",
240 "to",
241 "trailing",
242 "true",
243 "union",
244 "unique",
245 "user",
246 "using",
247 "verbose",
248 "when",
249 "where",
250}
251
252LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
253LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
254ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
255
256FK_ON_DELETE = re.compile(
257 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
258)
259FK_ON_UPDATE = re.compile(
260 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
261)
262FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
263BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
264BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
265
266_pyformat_template = "%%(%(name)s)s"
267BIND_TEMPLATES = {
268 "pyformat": _pyformat_template,
269 "qmark": "?",
270 "format": "%%s",
271 "numeric": ":[_POSITION]",
272 "numeric_dollar": "$[_POSITION]",
273 "named": ":%(name)s",
274}
275
276
277OPERATORS = {
278 # binary
279 operators.and_: " AND ",
280 operators.or_: " OR ",
281 operators.add: " + ",
282 operators.mul: " * ",
283 operators.sub: " - ",
284 operators.mod: " % ",
285 operators.neg: "-",
286 operators.lt: " < ",
287 operators.le: " <= ",
288 operators.ne: " != ",
289 operators.gt: " > ",
290 operators.ge: " >= ",
291 operators.eq: " = ",
292 operators.is_distinct_from: " IS DISTINCT FROM ",
293 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
294 operators.concat_op: " || ",
295 operators.match_op: " MATCH ",
296 operators.not_match_op: " NOT MATCH ",
297 operators.in_op: " IN ",
298 operators.not_in_op: " NOT IN ",
299 operators.comma_op: ", ",
300 operators.from_: " FROM ",
301 operators.as_: " AS ",
302 operators.is_: " IS ",
303 operators.is_not: " IS NOT ",
304 operators.collate: " COLLATE ",
305 # unary
306 operators.exists: "EXISTS ",
307 operators.distinct_op: "DISTINCT ",
308 operators.inv: "NOT ",
309 operators.any_op: "ANY ",
310 operators.all_op: "ALL ",
311 # modifiers
312 operators.desc_op: " DESC",
313 operators.asc_op: " ASC",
314 operators.nulls_first_op: " NULLS FIRST",
315 operators.nulls_last_op: " NULLS LAST",
316 # bitwise
317 operators.bitwise_xor_op: " ^ ",
318 operators.bitwise_or_op: " | ",
319 operators.bitwise_and_op: " & ",
320 operators.bitwise_not_op: "~",
321 operators.bitwise_lshift_op: " << ",
322 operators.bitwise_rshift_op: " >> ",
323}
324
325FUNCTIONS: Dict[Type[Function[Any]], str] = {
326 functions.coalesce: "coalesce",
327 functions.current_date: "CURRENT_DATE",
328 functions.current_time: "CURRENT_TIME",
329 functions.current_timestamp: "CURRENT_TIMESTAMP",
330 functions.current_user: "CURRENT_USER",
331 functions.localtime: "LOCALTIME",
332 functions.localtimestamp: "LOCALTIMESTAMP",
333 functions.random: "random",
334 functions.sysdate: "sysdate",
335 functions.session_user: "SESSION_USER",
336 functions.user: "USER",
337 functions.cube: "CUBE",
338 functions.rollup: "ROLLUP",
339 functions.grouping_sets: "GROUPING SETS",
340}
341
342
343EXTRACT_MAP = {
344 "month": "month",
345 "day": "day",
346 "year": "year",
347 "second": "second",
348 "hour": "hour",
349 "doy": "doy",
350 "minute": "minute",
351 "quarter": "quarter",
352 "dow": "dow",
353 "week": "week",
354 "epoch": "epoch",
355 "milliseconds": "milliseconds",
356 "microseconds": "microseconds",
357 "timezone_hour": "timezone_hour",
358 "timezone_minute": "timezone_minute",
359}
360
361COMPOUND_KEYWORDS = {
362 selectable._CompoundSelectKeyword.UNION: "UNION",
363 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
364 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
365 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
366 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
367 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
368}
369
370
371class ResultColumnsEntry(NamedTuple):
372 """Tracks a column expression that is expected to be represented
373 in the result rows for this statement.
374
375 This normally refers to the columns clause of a SELECT statement
376 but may also refer to a RETURNING clause, as well as for dialect-specific
377 emulations.
378
379 """
380
381 keyname: str
382 """string name that's expected in cursor.description"""
383
384 name: str
385 """column name, may be labeled"""
386
387 objects: Tuple[Any, ...]
388 """sequence of objects that should be able to locate this column
389 in a RowMapping. This is typically string names and aliases
390 as well as Column objects.
391
392 """
393
394 type: TypeEngine[Any]
395 """Datatype to be associated with this column. This is where
396 the "result processing" logic directly links the compiled statement
397 to the rows that come back from the cursor.
398
399 """
400
401
402class _ResultMapAppender(Protocol):
403 def __call__(
404 self,
405 keyname: str,
406 name: str,
407 objects: Sequence[Any],
408 type_: TypeEngine[Any],
409 ) -> None: ...
410
411
412# integer indexes into ResultColumnsEntry used by cursor.py.
413# some profiling showed integer access faster than named tuple
414RM_RENDERED_NAME: Literal[0] = 0
415RM_NAME: Literal[1] = 1
416RM_OBJECTS: Literal[2] = 2
417RM_TYPE: Literal[3] = 3
418
419
420class _BaseCompilerStackEntry(TypedDict):
421 asfrom_froms: Set[FromClause]
422 correlate_froms: Set[FromClause]
423 selectable: ReturnsRows
424
425
426class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
427 compile_state: CompileState
428 need_result_map_for_nested: bool
429 need_result_map_for_compound: bool
430 select_0: ReturnsRows
431 insert_from_select: Select[Unpack[TupleAny]]
432
433
434class ExpandedState(NamedTuple):
435 """represents state to use when producing "expanded" and
436 "post compile" bound parameters for a statement.
437
438 "expanded" parameters are parameters that are generated at
439 statement execution time to suit a number of parameters passed, the most
440 prominent example being the individual elements inside of an IN expression.
441
442 "post compile" parameters are parameters where the SQL literal value
443 will be rendered into the SQL statement at execution time, rather than
444 being passed as separate parameters to the driver.
445
446 To create an :class:`.ExpandedState` instance, use the
447 :meth:`.SQLCompiler.construct_expanded_state` method on any
448 :class:`.SQLCompiler` instance.
449
450 """
451
452 statement: str
453 """String SQL statement with parameters fully expanded"""
454
455 parameters: _CoreSingleExecuteParams
456 """Parameter dictionary with parameters fully expanded.
457
458 For a statement that uses named parameters, this dictionary will map
459 exactly to the names in the statement. For a statement that uses
460 positional parameters, the :attr:`.ExpandedState.positional_parameters`
461 will yield a tuple with the positional parameter set.
462
463 """
464
465 processors: Mapping[str, _BindProcessorType[Any]]
466 """mapping of bound value processors"""
467
468 positiontup: Optional[Sequence[str]]
469 """Sequence of string names indicating the order of positional
470 parameters"""
471
472 parameter_expansion: Mapping[str, List[str]]
473 """Mapping representing the intermediary link from original parameter
474 name to list of "expanded" parameter names, for those parameters that
475 were expanded."""
476
477 @property
478 def positional_parameters(self) -> Tuple[Any, ...]:
479 """Tuple of positional parameters, for statements that were compiled
480 using a positional paramstyle.
481
482 """
483 if self.positiontup is None:
484 raise exc.InvalidRequestError(
485 "statement does not use a positional paramstyle"
486 )
487 return tuple(self.parameters[key] for key in self.positiontup)
488
489 @property
490 def additional_parameters(self) -> _CoreSingleExecuteParams:
491 """synonym for :attr:`.ExpandedState.parameters`."""
492 return self.parameters
493
494
495class _InsertManyValues(NamedTuple):
496 """represents state to use for executing an "insertmanyvalues" statement.
497
498 The primary consumers of this object are the
499 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
500 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
501
502 .. versionadded:: 2.0
503
504 """
505
506 is_default_expr: bool
507 """if True, the statement is of the form
508 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
509
510 """
511
512 single_values_expr: str
513 """The rendered "values" clause of the INSERT statement.
514
515 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
516 The insertmanyvalues logic uses this string as a search and replace
517 target.
518
519 """
520
521 insert_crud_params: List[crud._CrudParamElementStr]
522 """List of Column / bind names etc. used while rewriting the statement"""
523
524 num_positional_params_counted: int
525 """the number of bound parameters in a single-row statement.
526
527 This count may be larger or smaller than the actual number of columns
528 targeted in the INSERT, as it accommodates for SQL expressions
529 in the values list that may have zero or more parameters embedded
530 within them.
531
532 This count is part of what's used to organize rewritten parameter lists
533 when batching.
534
535 """
536
537 sort_by_parameter_order: bool = False
538 """if the deterministic_returnined_order parameter were used on the
539 insert.
540
541 All of the attributes following this will only be used if this is True.
542
543 """
544
545 includes_upsert_behaviors: bool = False
546 """if True, we have to accommodate for upsert behaviors.
547
548 This will in some cases downgrade "insertmanyvalues" that requests
549 deterministic ordering.
550
551 """
552
553 sentinel_columns: Optional[Sequence[Column[Any]]] = None
554 """List of sentinel columns that were located.
555
556 This list is only here if the INSERT asked for
557 sort_by_parameter_order=True,
558 and dialect-appropriate sentinel columns were located.
559
560 .. versionadded:: 2.0.10
561
562 """
563
564 num_sentinel_columns: int = 0
565 """how many sentinel columns are in the above list, if any.
566
567 This is the same as
568 ``len(sentinel_columns) if sentinel_columns is not None else 0``
569
570 """
571
572 sentinel_param_keys: Optional[Sequence[str]] = None
573 """parameter str keys in each param dictionary / tuple
574 that would link to the client side "sentinel" values for that row, which
575 we can use to match up parameter sets to result rows.
576
577 This is only present if sentinel_columns is present and the INSERT
578 statement actually refers to client side values for these sentinel
579 columns.
580
581 .. versionadded:: 2.0.10
582
583 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
584 only, used against the "compiled parameteters" collection before
585 the parameters were converted by bound parameter processors
586
587 """
588
589 implicit_sentinel: bool = False
590 """if True, we have exactly one sentinel column and it uses a server side
591 value, currently has to generate an incrementing integer value.
592
593 The dialect in question would have asserted that it supports receiving
594 these values back and sorting on that value as a means of guaranteeing
595 correlation with the incoming parameter list.
596
597 .. versionadded:: 2.0.10
598
599 """
600
601 embed_values_counter: bool = False
602 """Whether to embed an incrementing integer counter in each parameter
603 set within the VALUES clause as parameters are batched over.
604
605 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
606 where a subquery is used to produce value tuples. Current support
607 includes PostgreSQL, Microsoft SQL Server.
608
609 .. versionadded:: 2.0.10
610
611 """
612
613
614class _InsertManyValuesBatch(NamedTuple):
615 """represents an individual batch SQL statement for insertmanyvalues.
616
617 This is passed through the
618 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
619 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
620 to the :class:`.Connection` within the
621 :meth:`.Connection._exec_insertmany_context` method.
622
623 .. versionadded:: 2.0.10
624
625 """
626
627 replaced_statement: str
628 replaced_parameters: _DBAPIAnyExecuteParams
629 processed_setinputsizes: Optional[_GenericSetInputSizesType]
630 batch: Sequence[_DBAPISingleExecuteParams]
631 sentinel_values: Sequence[Tuple[Any, ...]]
632 current_batch_size: int
633 batchnum: int
634 total_batches: int
635 rows_sorted: bool
636 is_downgraded: bool
637
638
639class InsertmanyvaluesSentinelOpts(FastIntFlag):
640 """bitflag enum indicating styles of PK defaults
641 which can work as implicit sentinel columns
642
643 """
644
645 NOT_SUPPORTED = 1
646 AUTOINCREMENT = 2
647 IDENTITY = 4
648 SEQUENCE = 8
649
650 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
651 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
652
653 USE_INSERT_FROM_SELECT = 16
654 RENDER_SELECT_COL_CASTS = 64
655
656
657class AggregateOrderByStyle(IntEnum):
658 """Describes backend database's capabilities with ORDER BY for aggregate
659 functions
660
661 .. versionadded:: 2.1
662
663 """
664
665 NONE = 0
666 """database has no ORDER BY for aggregate functions"""
667
668 INLINE = 1
669 """ORDER BY is rendered inside the function's argument list, typically as
670 the last element"""
671
672 WITHIN_GROUP = 2
673 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
674 functions (not just the ordered set ones)"""
675
676
677class CompilerState(IntEnum):
678 COMPILING = 0
679 """statement is present, compilation phase in progress"""
680
681 STRING_APPLIED = 1
682 """statement is present, string form of the statement has been applied.
683
684 Additional processors by subclasses may still be pending.
685
686 """
687
688 NO_STATEMENT = 2
689 """compiler does not have a statement to compile, is used
690 for method access"""
691
692
693class Linting(IntEnum):
694 """represent preferences for the 'SQL linting' feature.
695
696 this feature currently includes support for flagging cartesian products
697 in SQL statements.
698
699 """
700
701 NO_LINTING = 0
702 "Disable all linting."
703
704 COLLECT_CARTESIAN_PRODUCTS = 1
705 """Collect data on FROMs and cartesian products and gather into
706 'self.from_linter'"""
707
708 WARN_LINTING = 2
709 "Emit warnings for linters that find problems"
710
711 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
712 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
713 and WARN_LINTING"""
714
715
716NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
717 Linting
718)
719
720
721class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
722 """represents current state for the "cartesian product" detection
723 feature."""
724
725 def lint(self, start=None):
726 froms = self.froms
727 if not froms:
728 return None, None
729
730 edges = set(self.edges)
731 the_rest = set(froms)
732
733 if start is not None:
734 start_with = start
735 the_rest.remove(start_with)
736 else:
737 start_with = the_rest.pop()
738
739 stack = collections.deque([start_with])
740
741 while stack and the_rest:
742 node = stack.popleft()
743 the_rest.discard(node)
744
745 # comparison of nodes in edges here is based on hash equality, as
746 # there are "annotated" elements that match the non-annotated ones.
747 # to remove the need for in-python hash() calls, use native
748 # containment routines (e.g. "node in edge", "edge.index(node)")
749 to_remove = {edge for edge in edges if node in edge}
750
751 # appendleft the node in each edge that is not
752 # the one that matched.
753 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
754 edges.difference_update(to_remove)
755
756 # FROMS left over? boom
757 if the_rest:
758 return the_rest, start_with
759 else:
760 return None, None
761
762 def warn(self, stmt_type="SELECT"):
763 the_rest, start_with = self.lint()
764
765 # FROMS left over? boom
766 if the_rest:
767 froms = the_rest
768 if froms:
769 template = (
770 "{stmt_type} statement has a cartesian product between "
771 "FROM element(s) {froms} and "
772 'FROM element "{start}". Apply join condition(s) '
773 "between each element to resolve."
774 )
775 froms_str = ", ".join(
776 f'"{self.froms[from_]}"' for from_ in froms
777 )
778 message = template.format(
779 stmt_type=stmt_type,
780 froms=froms_str,
781 start=self.froms[start_with],
782 )
783
784 util.warn(message)
785
786
787class Compiled:
788 """Represent a compiled SQL or DDL expression.
789
790 The ``__str__`` method of the ``Compiled`` object should produce
791 the actual text of the statement. ``Compiled`` objects are
792 specific to their underlying database dialect, and also may
793 or may not be specific to the columns referenced within a
794 particular set of bind parameters. In no case should the
795 ``Compiled`` object be dependent on the actual values of those
796 bind parameters, even though it may reference those values as
797 defaults.
798 """
799
800 statement: Optional[ClauseElement] = None
801 "The statement to compile."
802 string: str = ""
803 "The string representation of the ``statement``"
804
805 state: CompilerState
806 """description of the compiler's state"""
807
808 is_sql = False
809 is_ddl = False
810
811 _cached_metadata: Optional[CursorResultMetaData] = None
812
813 _result_columns: Optional[List[ResultColumnsEntry]] = None
814
815 schema_translate_map: Optional[SchemaTranslateMapType] = None
816
817 execution_options: _ExecuteOptions = util.EMPTY_DICT
818 """
819 Execution options propagated from the statement. In some cases,
820 sub-elements of the statement can modify these.
821 """
822
823 preparer: IdentifierPreparer
824
825 _annotations: _AnnotationDict = util.EMPTY_DICT
826
827 compile_state: Optional[CompileState] = None
828 """Optional :class:`.CompileState` object that maintains additional
829 state used by the compiler.
830
831 Major executable objects such as :class:`_expression.Insert`,
832 :class:`_expression.Update`, :class:`_expression.Delete`,
833 :class:`_expression.Select` will generate this
834 state when compiled in order to calculate additional information about the
835 object. For the top level object that is to be executed, the state can be
836 stored here where it can also have applicability towards result set
837 processing.
838
839 .. versionadded:: 1.4
840
841 """
842
843 dml_compile_state: Optional[CompileState] = None
844 """Optional :class:`.CompileState` assigned at the same point that
845 .isinsert, .isupdate, or .isdelete is assigned.
846
847 This will normally be the same object as .compile_state, with the
848 exception of cases like the :class:`.ORMFromStatementCompileState`
849 object.
850
851 .. versionadded:: 1.4.40
852
853 """
854
855 cache_key: Optional[CacheKey] = None
856 """The :class:`.CacheKey` that was generated ahead of creating this
857 :class:`.Compiled` object.
858
859 This is used for routines that need access to the original
860 :class:`.CacheKey` instance generated when the :class:`.Compiled`
861 instance was first cached, typically in order to reconcile
862 the original list of :class:`.BindParameter` objects with a
863 per-statement list that's generated on each call.
864
865 """
866
867 _gen_time: float
868 """Generation time of this :class:`.Compiled`, used for reporting
869 cache stats."""
870
871 def __init__(
872 self,
873 dialect: Dialect,
874 statement: Optional[ClauseElement],
875 schema_translate_map: Optional[SchemaTranslateMapType] = None,
876 render_schema_translate: bool = False,
877 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
878 ):
879 """Construct a new :class:`.Compiled` object.
880
881 :param dialect: :class:`.Dialect` to compile against.
882
883 :param statement: :class:`_expression.ClauseElement` to be compiled.
884
885 :param schema_translate_map: dictionary of schema names to be
886 translated when forming the resultant SQL
887
888 .. seealso::
889
890 :ref:`schema_translating`
891
892 :param compile_kwargs: additional kwargs that will be
893 passed to the initial call to :meth:`.Compiled.process`.
894
895
896 """
897 self.dialect = dialect
898 self.preparer = self.dialect.identifier_preparer
899 if schema_translate_map:
900 self.schema_translate_map = schema_translate_map
901 self.preparer = self.preparer._with_schema_translate(
902 schema_translate_map
903 )
904
905 if statement is not None:
906 self.state = CompilerState.COMPILING
907 self.statement = statement
908 self.can_execute = statement.supports_execution
909 self._annotations = statement._annotations
910 if self.can_execute:
911 if TYPE_CHECKING:
912 assert isinstance(statement, Executable)
913 self.execution_options = statement._execution_options
914 self.string = self.process(self.statement, **compile_kwargs)
915
916 if render_schema_translate:
917 assert schema_translate_map is not None
918 self.string = self.preparer._render_schema_translates(
919 self.string, schema_translate_map
920 )
921
922 self.state = CompilerState.STRING_APPLIED
923 else:
924 self.state = CompilerState.NO_STATEMENT
925
926 self._gen_time = perf_counter()
927
928 def __init_subclass__(cls) -> None:
929 cls._init_compiler_cls()
930 return super().__init_subclass__()
931
932 @classmethod
933 def _init_compiler_cls(cls):
934 pass
935
936 def visit_unsupported_compilation(self, element, err, **kw):
937 raise exc.UnsupportedCompilationError(self, type(element)) from err
938
939 @property
940 def sql_compiler(self) -> SQLCompiler:
941 """Return a Compiled that is capable of processing SQL expressions.
942
943 If this compiler is one, it would likely just return 'self'.
944
945 """
946
947 raise NotImplementedError()
948
949 def process(self, obj: Visitable, **kwargs: Any) -> str:
950 return obj._compiler_dispatch(self, **kwargs)
951
952 def __str__(self) -> str:
953 """Return the string text of the generated SQL or DDL."""
954
955 if self.state is CompilerState.STRING_APPLIED:
956 return self.string
957 else:
958 return ""
959
960 def construct_params(
961 self,
962 params: Optional[_CoreSingleExecuteParams] = None,
963 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
964 escape_names: bool = True,
965 ) -> Optional[_MutableCoreSingleExecuteParams]:
966 """Return the bind params for this compiled object.
967
968 :param params: a dict of string/object pairs whose values will
969 override bind values compiled in to the
970 statement.
971 """
972
973 raise NotImplementedError()
974
975 @property
976 def params(self):
977 """Return the bind params for this compiled object."""
978 return self.construct_params()
979
980
981class TypeCompiler(util.EnsureKWArg):
982 """Produces DDL specification for TypeEngine objects."""
983
984 ensure_kwarg = r"visit_\w+"
985
986 def __init__(self, dialect: Dialect):
987 self.dialect = dialect
988
989 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
990 if (
991 type_._variant_mapping
992 and self.dialect.name in type_._variant_mapping
993 ):
994 type_ = type_._variant_mapping[self.dialect.name]
995 return type_._compiler_dispatch(self, **kw)
996
997 def visit_unsupported_compilation(
998 self, element: Any, err: Exception, **kw: Any
999 ) -> NoReturn:
1000 raise exc.UnsupportedCompilationError(self, element) from err
1001
1002
1003# this was a Visitable, but to allow accurate detection of
1004# column elements this is actually a column element
1005class _CompileLabel(
1006 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1007):
1008 """lightweight label object which acts as an expression.Label."""
1009
1010 __visit_name__ = "label"
1011 __slots__ = "element", "name", "_alt_names"
1012
1013 def __init__(self, col, name, alt_names=()):
1014 self.element = col
1015 self.name = name
1016 self._alt_names = (col,) + alt_names
1017
1018 @property
1019 def proxy_set(self):
1020 return self.element.proxy_set
1021
1022 @property
1023 def type(self):
1024 return self.element.type
1025
1026 def self_group(self, **kw):
1027 return self
1028
1029
1030class aggregate_orderby_inline(
1031 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1032):
1033 """produce ORDER BY inside of function argument lists"""
1034
1035 __visit_name__ = "aggregate_orderby_inline"
1036 __slots__ = "element", "aggregate_order_by"
1037
1038 def __init__(self, element, orderby):
1039 self.element = element
1040 self.aggregate_order_by = orderby
1041
1042 def __iter__(self):
1043 return iter(self.element)
1044
1045 @property
1046 def proxy_set(self):
1047 return self.element.proxy_set
1048
1049 @property
1050 def type(self):
1051 return self.element.type
1052
1053 def self_group(self, **kw):
1054 return self
1055
1056 def _with_binary_element_type(self, type_):
1057 return aggregate_orderby_inline(
1058 self.element._with_binary_element_type(type_),
1059 self.aggregate_order_by,
1060 )
1061
1062
1063class ilike_case_insensitive(
1064 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1065):
1066 """produce a wrapping element for a case-insensitive portion of
1067 an ILIKE construct.
1068
1069 The construct usually renders the ``lower()`` function, but on
1070 PostgreSQL will pass silently with the assumption that "ILIKE"
1071 is being used.
1072
1073 .. versionadded:: 2.0
1074
1075 """
1076
1077 __visit_name__ = "ilike_case_insensitive_operand"
1078 __slots__ = "element", "comparator"
1079
1080 def __init__(self, element):
1081 self.element = element
1082 self.comparator = element.comparator
1083
1084 @property
1085 def proxy_set(self):
1086 return self.element.proxy_set
1087
1088 @property
1089 def type(self):
1090 return self.element.type
1091
1092 def self_group(self, **kw):
1093 return self
1094
1095 def _with_binary_element_type(self, type_):
1096 return ilike_case_insensitive(
1097 self.element._with_binary_element_type(type_)
1098 )
1099
1100
1101class SQLCompiler(Compiled):
1102 """Default implementation of :class:`.Compiled`.
1103
1104 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1105
1106 """
1107
1108 extract_map = EXTRACT_MAP
1109
1110 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1111 util.immutabledict(
1112 {
1113 "%": "P",
1114 "(": "A",
1115 ")": "Z",
1116 ":": "C",
1117 ".": "_",
1118 "[": "_",
1119 "]": "_",
1120 " ": "_",
1121 }
1122 )
1123 )
1124 """A mapping (e.g. dict or similar) containing a lookup of
1125 characters keyed to replacement characters which will be applied to all
1126 'bind names' used in SQL statements as a form of 'escaping'; the given
1127 characters are replaced entirely with the 'replacement' character when
1128 rendered in the SQL statement, and a similar translation is performed
1129 on the incoming names used in parameter dictionaries passed to methods
1130 like :meth:`_engine.Connection.execute`.
1131
1132 This allows bound parameter names used in :func:`_sql.bindparam` and
1133 other constructs to have any arbitrary characters present without any
1134 concern for characters that aren't allowed at all on the target database.
1135
1136 Third party dialects can establish their own dictionary here to replace the
1137 default mapping, which will ensure that the particular characters in the
1138 mapping will never appear in a bound parameter name.
1139
1140 The dictionary is evaluated at **class creation time**, so cannot be
1141 modified at runtime; it must be present on the class when the class
1142 is first declared.
1143
1144 Note that for dialects that have additional bound parameter rules such
1145 as additional restrictions on leading characters, the
1146 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1147 See the cx_Oracle compiler for an example of this.
1148
1149 .. versionadded:: 2.0.0rc1
1150
1151 """
1152
1153 _bind_translate_re: ClassVar[Pattern[str]]
1154 _bind_translate_chars: ClassVar[Mapping[str, str]]
1155
1156 is_sql = True
1157
1158 compound_keywords = COMPOUND_KEYWORDS
1159
1160 isdelete: bool = False
1161 isinsert: bool = False
1162 isupdate: bool = False
1163 """class-level defaults which can be set at the instance
1164 level to define if this Compiled instance represents
1165 INSERT/UPDATE/DELETE
1166 """
1167
1168 postfetch: Optional[List[Column[Any]]]
1169 """list of columns that can be post-fetched after INSERT or UPDATE to
1170 receive server-updated values"""
1171
1172 insert_prefetch: Sequence[Column[Any]] = ()
1173 """list of columns for which default values should be evaluated before
1174 an INSERT takes place"""
1175
1176 update_prefetch: Sequence[Column[Any]] = ()
1177 """list of columns for which onupdate default values should be evaluated
1178 before an UPDATE takes place"""
1179
1180 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1181 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1182 statement, used to receive newly generated values of columns.
1183
1184 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1185 ``returning`` collection, which was not a generalized RETURNING
1186 collection and instead was in fact specific to the "implicit returning"
1187 feature.
1188
1189 """
1190
1191 isplaintext: bool = False
1192
1193 binds: Dict[str, BindParameter[Any]]
1194 """a dictionary of bind parameter keys to BindParameter instances."""
1195
1196 bind_names: Dict[BindParameter[Any], str]
1197 """a dictionary of BindParameter instances to "compiled" names
1198 that are actually present in the generated SQL"""
1199
1200 stack: List[_CompilerStackEntry]
1201 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1202 tracked in this stack using an entry format."""
1203
1204 returning_precedes_values: bool = False
1205 """set to True classwide to generate RETURNING
1206 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1207 """
1208
1209 render_table_with_column_in_update_from: bool = False
1210 """set to True classwide to indicate the SET clause
1211 in a multi-table UPDATE statement should qualify
1212 columns with the table name (i.e. MySQL only)
1213 """
1214
1215 ansi_bind_rules: bool = False
1216 """SQL 92 doesn't allow bind parameters to be used
1217 in the columns clause of a SELECT, nor does it allow
1218 ambiguous expressions like "? = ?". A compiler
1219 subclass can set this flag to False if the target
1220 driver/DB enforces this
1221 """
1222
1223 bindtemplate: str
1224 """template to render bound parameters based on paramstyle."""
1225
1226 compilation_bindtemplate: str
1227 """template used by compiler to render parameters before positional
1228 paramstyle application"""
1229
1230 _numeric_binds_identifier_char: str
1231 """Character that's used to as the identifier of a numerical bind param.
1232 For example if this char is set to ``$``, numerical binds will be rendered
1233 in the form ``$1, $2, $3``.
1234 """
1235
1236 _result_columns: List[ResultColumnsEntry]
1237 """relates label names in the final SQL to a tuple of local
1238 column/label name, ColumnElement object (if any) and
1239 TypeEngine. CursorResult uses this for type processing and
1240 column targeting"""
1241
1242 _textual_ordered_columns: bool = False
1243 """tell the result object that the column names as rendered are important,
1244 but they are also "ordered" vs. what is in the compiled object here.
1245
1246 As of 1.4.42 this condition is only present when the statement is a
1247 TextualSelect, e.g. text("....").columns(...), where it is required
1248 that the columns are considered positionally and not by name.
1249
1250 """
1251
1252 _ad_hoc_textual: bool = False
1253 """tell the result that we encountered text() or '*' constructs in the
1254 middle of the result columns, but we also have compiled columns, so
1255 if the number of columns in cursor.description does not match how many
1256 expressions we have, that means we can't rely on positional at all and
1257 should match on name.
1258
1259 """
1260
1261 _ordered_columns: bool = True
1262 """
1263 if False, means we can't be sure the list of entries
1264 in _result_columns is actually the rendered order. Usually
1265 True unless using an unordered TextualSelect.
1266 """
1267
1268 _loose_column_name_matching: bool = False
1269 """tell the result object that the SQL statement is textual, wants to match
1270 up to Column objects, and may be using the ._tq_label in the SELECT rather
1271 than the base name.
1272
1273 """
1274
1275 _numeric_binds: bool = False
1276 """
1277 True if paramstyle is "numeric". This paramstyle is trickier than
1278 all the others.
1279
1280 """
1281
1282 _render_postcompile: bool = False
1283 """
1284 whether to render out POSTCOMPILE params during the compile phase.
1285
1286 This attribute is used only for end-user invocation of stmt.compile();
1287 it's never used for actual statement execution, where instead the
1288 dialect internals access and render the internal postcompile structure
1289 directly.
1290
1291 """
1292
1293 _post_compile_expanded_state: Optional[ExpandedState] = None
1294 """When render_postcompile is used, the ``ExpandedState`` used to create
1295 the "expanded" SQL is assigned here, and then used by the ``.params``
1296 accessor and ``.construct_params()`` methods for their return values.
1297
1298 .. versionadded:: 2.0.0rc1
1299
1300 """
1301
1302 _pre_expanded_string: Optional[str] = None
1303 """Stores the original string SQL before 'post_compile' is applied,
1304 for cases where 'post_compile' were used.
1305
1306 """
1307
1308 _pre_expanded_positiontup: Optional[List[str]] = None
1309
1310 _insertmanyvalues: Optional[_InsertManyValues] = None
1311
1312 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1313
1314 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1315 """bindparameter objects that are rendered as literal values at statement
1316 execution time.
1317
1318 """
1319
1320 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1321 """bindparameter objects that are rendered as bound parameter placeholders
1322 at statement execution time.
1323
1324 """
1325
1326 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1327 """Late escaping of bound parameter names that has to be converted
1328 to the original name when looking in the parameter dictionary.
1329
1330 """
1331
1332 has_out_parameters = False
1333 """if True, there are bindparam() objects that have the isoutparam
1334 flag set."""
1335
1336 postfetch_lastrowid = False
1337 """if True, and this in insert, use cursor.lastrowid to populate
1338 result.inserted_primary_key. """
1339
1340 _cache_key_bind_match: Optional[
1341 Tuple[
1342 Dict[
1343 BindParameter[Any],
1344 List[BindParameter[Any]],
1345 ],
1346 Dict[
1347 str,
1348 BindParameter[Any],
1349 ],
1350 ]
1351 ] = None
1352 """a mapping that will relate the BindParameter object we compile
1353 to those that are part of the extracted collection of parameters
1354 in the cache key, if we were given a cache key.
1355
1356 """
1357
1358 positiontup: Optional[List[str]] = None
1359 """for a compiled construct that uses a positional paramstyle, will be
1360 a sequence of strings, indicating the names of bound parameters in order.
1361
1362 This is used in order to render bound parameters in their correct order,
1363 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1364 render parameters.
1365
1366 This sequence always contains the unescaped name of the parameters.
1367
1368 .. seealso::
1369
1370 :ref:`faq_sql_expression_string` - includes a usage example for
1371 debugging use cases.
1372
1373 """
1374 _values_bindparam: Optional[List[str]] = None
1375
1376 _visited_bindparam: Optional[List[str]] = None
1377
1378 inline: bool = False
1379
1380 ctes: Optional[MutableMapping[CTE, str]]
1381
1382 # Detect same CTE references - Dict[(level, name), cte]
1383 # Level is required for supporting nesting
1384 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1385
1386 # To retrieve key/level in ctes_by_level_name -
1387 # Dict[cte_reference, (level, cte_name, cte_opts)]
1388 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1389
1390 ctes_recursive: bool
1391
1392 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1393 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1394 _positional_pattern = re.compile(
1395 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1396 )
1397 _collect_params: Final[bool]
1398 _collected_params: util.immutabledict[str, Any]
1399
1400 @classmethod
1401 def _init_compiler_cls(cls):
1402 cls._init_bind_translate()
1403
1404 @classmethod
1405 def _init_bind_translate(cls):
1406 reg = re.escape("".join(cls.bindname_escape_characters))
1407 cls._bind_translate_re = re.compile(f"[{reg}]")
1408 cls._bind_translate_chars = cls.bindname_escape_characters
1409
1410 def __init__(
1411 self,
1412 dialect: Dialect,
1413 statement: Optional[ClauseElement],
1414 cache_key: Optional[CacheKey] = None,
1415 column_keys: Optional[Sequence[str]] = None,
1416 for_executemany: bool = False,
1417 linting: Linting = NO_LINTING,
1418 _supporting_against: Optional[SQLCompiler] = None,
1419 **kwargs: Any,
1420 ):
1421 """Construct a new :class:`.SQLCompiler` object.
1422
1423 :param dialect: :class:`.Dialect` to be used
1424
1425 :param statement: :class:`_expression.ClauseElement` to be compiled
1426
1427 :param column_keys: a list of column names to be compiled into an
1428 INSERT or UPDATE statement.
1429
1430 :param for_executemany: whether INSERT / UPDATE statements should
1431 expect that they are to be invoked in an "executemany" style,
1432 which may impact how the statement will be expected to return the
1433 values of defaults and autoincrement / sequences and similar.
1434 Depending on the backend and driver in use, support for retrieving
1435 these values may be disabled which means SQL expressions may
1436 be rendered inline, RETURNING may not be rendered, etc.
1437
1438 :param kwargs: additional keyword arguments to be consumed by the
1439 superclass.
1440
1441 """
1442 self.column_keys = column_keys
1443
1444 self.cache_key = cache_key
1445
1446 if cache_key:
1447 cksm = {b.key: b for b in cache_key[1]}
1448 ckbm = {b: [b] for b in cache_key[1]}
1449 self._cache_key_bind_match = (ckbm, cksm)
1450
1451 # compile INSERT/UPDATE defaults/sequences to expect executemany
1452 # style execution, which may mean no pre-execute of defaults,
1453 # or no RETURNING
1454 self.for_executemany = for_executemany
1455
1456 self.linting = linting
1457
1458 # a dictionary of bind parameter keys to BindParameter
1459 # instances.
1460 self.binds = {}
1461
1462 # a dictionary of BindParameter instances to "compiled" names
1463 # that are actually present in the generated SQL
1464 self.bind_names = util.column_dict()
1465
1466 # stack which keeps track of nested SELECT statements
1467 self.stack = []
1468
1469 self._result_columns = []
1470
1471 # true if the paramstyle is positional
1472 self.positional = dialect.positional
1473 if self.positional:
1474 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1475 if nb:
1476 self._numeric_binds_identifier_char = (
1477 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1478 )
1479
1480 self.compilation_bindtemplate = _pyformat_template
1481 else:
1482 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1483
1484 self.ctes = None
1485
1486 self.label_length = (
1487 dialect.label_length or dialect.max_identifier_length
1488 )
1489
1490 # a map which tracks "anonymous" identifiers that are created on
1491 # the fly here
1492 self.anon_map = prefix_anon_map()
1493
1494 # a map which tracks "truncated" names based on
1495 # dialect.label_length or dialect.max_identifier_length
1496 self.truncated_names: Dict[Tuple[str, str], str] = {}
1497 self._truncated_counters: Dict[str, int] = {}
1498 if not cache_key:
1499 self._collect_params = True
1500 self._collected_params = util.EMPTY_DICT
1501 else:
1502 self._collect_params = False # type: ignore[misc]
1503
1504 Compiled.__init__(self, dialect, statement, **kwargs)
1505
1506 if self.isinsert or self.isupdate or self.isdelete:
1507 if TYPE_CHECKING:
1508 assert isinstance(statement, UpdateBase)
1509
1510 if self.isinsert or self.isupdate:
1511 if TYPE_CHECKING:
1512 assert isinstance(statement, ValuesBase)
1513 if statement._inline:
1514 self.inline = True
1515 elif self.for_executemany and (
1516 not self.isinsert
1517 or (
1518 self.dialect.insert_executemany_returning
1519 and statement._return_defaults
1520 )
1521 ):
1522 self.inline = True
1523
1524 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1525
1526 if _supporting_against:
1527 self.__dict__.update(
1528 {
1529 k: v
1530 for k, v in _supporting_against.__dict__.items()
1531 if k
1532 not in {
1533 "state",
1534 "dialect",
1535 "preparer",
1536 "positional",
1537 "_numeric_binds",
1538 "compilation_bindtemplate",
1539 "bindtemplate",
1540 }
1541 }
1542 )
1543
1544 if self.state is CompilerState.STRING_APPLIED:
1545 if self.positional:
1546 if self._numeric_binds:
1547 self._process_numeric()
1548 else:
1549 self._process_positional()
1550
1551 if self._render_postcompile:
1552 parameters = self.construct_params(
1553 escape_names=False,
1554 _no_postcompile=True,
1555 )
1556
1557 self._process_parameters_for_postcompile(
1558 parameters, _populate_self=True
1559 )
1560
1561 @property
1562 def insert_single_values_expr(self) -> Optional[str]:
1563 """When an INSERT is compiled with a single set of parameters inside
1564 a VALUES expression, the string is assigned here, where it can be
1565 used for insert batching schemes to rewrite the VALUES expression.
1566
1567 .. versionchanged:: 2.0 This collection is no longer used by
1568 SQLAlchemy's built-in dialects, in favor of the currently
1569 internal ``_insertmanyvalues`` collection that is used only by
1570 :class:`.SQLCompiler`.
1571
1572 """
1573 if self._insertmanyvalues is None:
1574 return None
1575 else:
1576 return self._insertmanyvalues.single_values_expr
1577
1578 @util.ro_memoized_property
1579 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1580 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1581
1582 This is either the so-called "implicit returning" columns which are
1583 calculated by the compiler on the fly, or those present based on what's
1584 present in ``self.statement._returning`` (expanded into individual
1585 columns using the ``._all_selected_columns`` attribute) i.e. those set
1586 explicitly using the :meth:`.UpdateBase.returning` method.
1587
1588 .. versionadded:: 2.0
1589
1590 """
1591 if self.implicit_returning:
1592 return self.implicit_returning
1593 elif self.statement is not None and is_dml(self.statement):
1594 return [
1595 c
1596 for c in self.statement._all_selected_columns
1597 if is_column_element(c)
1598 ]
1599
1600 else:
1601 return None
1602
1603 @property
1604 def returning(self):
1605 """backwards compatibility; returns the
1606 effective_returning collection.
1607
1608 """
1609 return self.effective_returning
1610
1611 @property
1612 def current_executable(self):
1613 """Return the current 'executable' that is being compiled.
1614
1615 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1616 :class:`_sql.Update`, :class:`_sql.Delete`,
1617 :class:`_sql.CompoundSelect` object that is being compiled.
1618 Specifically it's assigned to the ``self.stack`` list of elements.
1619
1620 When a statement like the above is being compiled, it normally
1621 is also assigned to the ``.statement`` attribute of the
1622 :class:`_sql.Compiler` object. However, all SQL constructs are
1623 ultimately nestable, and this attribute should never be consulted
1624 by a ``visit_`` method, as it is not guaranteed to be assigned
1625 nor guaranteed to correspond to the current statement being compiled.
1626
1627 """
1628 try:
1629 return self.stack[-1]["selectable"]
1630 except IndexError as ie:
1631 raise IndexError("Compiler does not have a stack entry") from ie
1632
1633 @property
1634 def prefetch(self):
1635 return list(self.insert_prefetch) + list(self.update_prefetch)
1636
1637 @util.memoized_property
1638 def _global_attributes(self) -> Dict[Any, Any]:
1639 return {}
1640
1641 def _add_to_params(self, item: ExecutableStatement) -> None:
1642 # assumes that this is called before traversing the statement
1643 # so the call happens outer to inner, meaning that existing params
1644 # take precedence
1645 if item._params:
1646 self._collected_params = item._params | self._collected_params
1647
1648 @util.memoized_instancemethod
1649 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1650 """Initialize collections related to CTEs only if
1651 a CTE is located, to save on the overhead of
1652 these collections otherwise.
1653
1654 """
1655 # collect CTEs to tack on top of a SELECT
1656 # To store the query to print - Dict[cte, text_query]
1657 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1658 self.ctes = ctes
1659
1660 # Detect same CTE references - Dict[(level, name), cte]
1661 # Level is required for supporting nesting
1662 self.ctes_by_level_name = {}
1663
1664 # To retrieve key/level in ctes_by_level_name -
1665 # Dict[cte_reference, (level, cte_name, cte_opts)]
1666 self.level_name_by_cte = {}
1667
1668 self.ctes_recursive = False
1669
1670 return ctes
1671
1672 @contextlib.contextmanager
1673 def _nested_result(self):
1674 """special API to support the use case of 'nested result sets'"""
1675 result_columns, ordered_columns = (
1676 self._result_columns,
1677 self._ordered_columns,
1678 )
1679 self._result_columns, self._ordered_columns = [], False
1680
1681 try:
1682 if self.stack:
1683 entry = self.stack[-1]
1684 entry["need_result_map_for_nested"] = True
1685 else:
1686 entry = None
1687 yield self._result_columns, self._ordered_columns
1688 finally:
1689 if entry:
1690 entry.pop("need_result_map_for_nested")
1691 self._result_columns, self._ordered_columns = (
1692 result_columns,
1693 ordered_columns,
1694 )
1695
1696 def _process_positional(self):
1697 assert not self.positiontup
1698 assert self.state is CompilerState.STRING_APPLIED
1699 assert not self._numeric_binds
1700
1701 if self.dialect.paramstyle == "format":
1702 placeholder = "%s"
1703 else:
1704 assert self.dialect.paramstyle == "qmark"
1705 placeholder = "?"
1706
1707 positions = []
1708
1709 def find_position(m: re.Match[str]) -> str:
1710 normal_bind = m.group(1)
1711 if normal_bind:
1712 positions.append(normal_bind)
1713 return placeholder
1714 else:
1715 # this a post-compile bind
1716 positions.append(m.group(2))
1717 return m.group(0)
1718
1719 self.string = re.sub(
1720 self._positional_pattern, find_position, self.string
1721 )
1722
1723 if self.escaped_bind_names:
1724 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1725 assert len(self.escaped_bind_names) == len(reverse_escape)
1726 self.positiontup = [
1727 reverse_escape.get(name, name) for name in positions
1728 ]
1729 else:
1730 self.positiontup = positions
1731
1732 if self._insertmanyvalues:
1733 positions = []
1734
1735 single_values_expr = re.sub(
1736 self._positional_pattern,
1737 find_position,
1738 self._insertmanyvalues.single_values_expr,
1739 )
1740 insert_crud_params = [
1741 (
1742 v[0],
1743 v[1],
1744 re.sub(self._positional_pattern, find_position, v[2]),
1745 v[3],
1746 )
1747 for v in self._insertmanyvalues.insert_crud_params
1748 ]
1749
1750 self._insertmanyvalues = self._insertmanyvalues._replace(
1751 single_values_expr=single_values_expr,
1752 insert_crud_params=insert_crud_params,
1753 )
1754
1755 def _process_numeric(self):
1756 assert self._numeric_binds
1757 assert self.state is CompilerState.STRING_APPLIED
1758
1759 num = 1
1760 param_pos: Dict[str, str] = {}
1761 order: Iterable[str]
1762 if self._insertmanyvalues and self._values_bindparam is not None:
1763 # bindparams that are not in values are always placed first.
1764 # this avoids the need of changing them when using executemany
1765 # values () ()
1766 order = itertools.chain(
1767 (
1768 name
1769 for name in self.bind_names.values()
1770 if name not in self._values_bindparam
1771 ),
1772 self.bind_names.values(),
1773 )
1774 else:
1775 order = self.bind_names.values()
1776
1777 for bind_name in order:
1778 if bind_name in param_pos:
1779 continue
1780 bind = self.binds[bind_name]
1781 if (
1782 bind in self.post_compile_params
1783 or bind in self.literal_execute_params
1784 ):
1785 # set to None to just mark the in positiontup, it will not
1786 # be replaced below.
1787 param_pos[bind_name] = None # type: ignore
1788 else:
1789 ph = f"{self._numeric_binds_identifier_char}{num}"
1790 num += 1
1791 param_pos[bind_name] = ph
1792
1793 self.next_numeric_pos = num
1794
1795 self.positiontup = list(param_pos)
1796 if self.escaped_bind_names:
1797 len_before = len(param_pos)
1798 param_pos = {
1799 self.escaped_bind_names.get(name, name): pos
1800 for name, pos in param_pos.items()
1801 }
1802 assert len(param_pos) == len_before
1803
1804 # Can't use format here since % chars are not escaped.
1805 self.string = self._pyformat_pattern.sub(
1806 lambda m: param_pos[m.group(1)], self.string
1807 )
1808
1809 if self._insertmanyvalues:
1810 single_values_expr = (
1811 # format is ok here since single_values_expr includes only
1812 # place-holders
1813 self._insertmanyvalues.single_values_expr
1814 % param_pos
1815 )
1816 insert_crud_params = [
1817 (v[0], v[1], "%s", v[3])
1818 for v in self._insertmanyvalues.insert_crud_params
1819 ]
1820
1821 self._insertmanyvalues = self._insertmanyvalues._replace(
1822 # This has the numbers (:1, :2)
1823 single_values_expr=single_values_expr,
1824 # The single binds are instead %s so they can be formatted
1825 insert_crud_params=insert_crud_params,
1826 )
1827
1828 @util.memoized_property
1829 def _bind_processors(
1830 self,
1831 ) -> MutableMapping[
1832 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1833 ]:
1834 # mypy is not able to see the two value types as the above Union,
1835 # it just sees "object". don't know how to resolve
1836 return {
1837 key: value # type: ignore
1838 for key, value in (
1839 (
1840 self.bind_names[bindparam],
1841 (
1842 bindparam.type._cached_bind_processor(self.dialect)
1843 if not bindparam.type._is_tuple_type
1844 else tuple(
1845 elem_type._cached_bind_processor(self.dialect)
1846 for elem_type in cast(
1847 TupleType, bindparam.type
1848 ).types
1849 )
1850 ),
1851 )
1852 for bindparam in self.bind_names
1853 )
1854 if value is not None
1855 }
1856
1857 def is_subquery(self):
1858 return len(self.stack) > 1
1859
1860 @property
1861 def sql_compiler(self) -> Self:
1862 return self
1863
1864 def construct_expanded_state(
1865 self,
1866 params: Optional[_CoreSingleExecuteParams] = None,
1867 escape_names: bool = True,
1868 ) -> ExpandedState:
1869 """Return a new :class:`.ExpandedState` for a given parameter set.
1870
1871 For queries that use "expanding" or other late-rendered parameters,
1872 this method will provide for both the finalized SQL string as well
1873 as the parameters that would be used for a particular parameter set.
1874
1875 .. versionadded:: 2.0.0rc1
1876
1877 """
1878 parameters = self.construct_params(
1879 params,
1880 escape_names=escape_names,
1881 _no_postcompile=True,
1882 )
1883 return self._process_parameters_for_postcompile(
1884 parameters,
1885 )
1886
1887 def construct_params(
1888 self,
1889 params: Optional[_CoreSingleExecuteParams] = None,
1890 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1891 escape_names: bool = True,
1892 _group_number: Optional[int] = None,
1893 _check: bool = True,
1894 _no_postcompile: bool = False,
1895 _collected_params: _CoreSingleExecuteParams | None = None,
1896 ) -> _MutableCoreSingleExecuteParams:
1897 """return a dictionary of bind parameter keys and values"""
1898 if _collected_params is not None:
1899 assert not self._collect_params
1900 elif self._collect_params:
1901 _collected_params = self._collected_params
1902
1903 if _collected_params:
1904 if not params:
1905 params = _collected_params
1906 else:
1907 params = {**_collected_params, **params}
1908
1909 if self._render_postcompile and not _no_postcompile:
1910 assert self._post_compile_expanded_state is not None
1911 if not params:
1912 return dict(self._post_compile_expanded_state.parameters)
1913 else:
1914 raise exc.InvalidRequestError(
1915 "can't construct new parameters when render_postcompile "
1916 "is used; the statement is hard-linked to the original "
1917 "parameters. Use construct_expanded_state to generate a "
1918 "new statement and parameters."
1919 )
1920
1921 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1922
1923 if extracted_parameters:
1924 # related the bound parameters collected in the original cache key
1925 # to those collected in the incoming cache key. They will not have
1926 # matching names but they will line up positionally in the same
1927 # way. The parameters present in self.bind_names may be clones of
1928 # these original cache key params in the case of DML but the .key
1929 # will be guaranteed to match.
1930 if self.cache_key is None:
1931 raise exc.CompileError(
1932 "This compiled object has no original cache key; "
1933 "can't pass extracted_parameters to construct_params"
1934 )
1935 else:
1936 orig_extracted = self.cache_key[1]
1937
1938 ckbm_tuple = self._cache_key_bind_match
1939 assert ckbm_tuple is not None
1940 ckbm, _ = ckbm_tuple
1941 resolved_extracted = {
1942 bind: extracted
1943 for b, extracted in zip(orig_extracted, extracted_parameters)
1944 for bind in ckbm[b]
1945 }
1946 else:
1947 resolved_extracted = None
1948
1949 if params:
1950 pd = {}
1951 for bindparam, name in self.bind_names.items():
1952 escaped_name = (
1953 self.escaped_bind_names.get(name, name)
1954 if has_escaped_names
1955 else name
1956 )
1957
1958 if bindparam.key in params:
1959 pd[escaped_name] = params[bindparam.key]
1960 elif name in params:
1961 pd[escaped_name] = params[name]
1962
1963 elif _check and bindparam.required:
1964 if _group_number:
1965 raise exc.InvalidRequestError(
1966 "A value is required for bind parameter %r, "
1967 "in parameter group %d"
1968 % (bindparam.key, _group_number),
1969 code="cd3x",
1970 )
1971 else:
1972 raise exc.InvalidRequestError(
1973 "A value is required for bind parameter %r"
1974 % bindparam.key,
1975 code="cd3x",
1976 )
1977 else:
1978 if resolved_extracted:
1979 value_param = resolved_extracted.get(
1980 bindparam, bindparam
1981 )
1982 else:
1983 value_param = bindparam
1984
1985 if bindparam.callable:
1986 pd[escaped_name] = value_param.effective_value
1987 else:
1988 pd[escaped_name] = value_param.value
1989 return pd
1990 else:
1991 pd = {}
1992 for bindparam, name in self.bind_names.items():
1993 escaped_name = (
1994 self.escaped_bind_names.get(name, name)
1995 if has_escaped_names
1996 else name
1997 )
1998
1999 if _check and bindparam.required:
2000 if _group_number:
2001 raise exc.InvalidRequestError(
2002 "A value is required for bind parameter %r, "
2003 "in parameter group %d"
2004 % (bindparam.key, _group_number),
2005 code="cd3x",
2006 )
2007 else:
2008 raise exc.InvalidRequestError(
2009 "A value is required for bind parameter %r"
2010 % bindparam.key,
2011 code="cd3x",
2012 )
2013
2014 if resolved_extracted:
2015 value_param = resolved_extracted.get(bindparam, bindparam)
2016 else:
2017 value_param = bindparam
2018
2019 if bindparam.callable:
2020 pd[escaped_name] = value_param.effective_value
2021 else:
2022 pd[escaped_name] = value_param.value
2023
2024 return pd
2025
2026 @util.memoized_instancemethod
2027 def _get_set_input_sizes_lookup(self):
2028 dialect = self.dialect
2029
2030 include_types = dialect.include_set_input_sizes
2031 exclude_types = dialect.exclude_set_input_sizes
2032
2033 dbapi = dialect.dbapi
2034
2035 def lookup_type(typ):
2036 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2037
2038 if (
2039 dbtype is not None
2040 and (exclude_types is None or dbtype not in exclude_types)
2041 and (include_types is None or dbtype in include_types)
2042 ):
2043 return dbtype
2044 else:
2045 return None
2046
2047 inputsizes = {}
2048
2049 literal_execute_params = self.literal_execute_params
2050
2051 for bindparam in self.bind_names:
2052 if bindparam in literal_execute_params:
2053 continue
2054
2055 if bindparam.type._is_tuple_type:
2056 inputsizes[bindparam] = [
2057 lookup_type(typ)
2058 for typ in cast(TupleType, bindparam.type).types
2059 ]
2060 else:
2061 inputsizes[bindparam] = lookup_type(bindparam.type)
2062
2063 return inputsizes
2064
2065 @property
2066 def params(self):
2067 """Return the bind param dictionary embedded into this
2068 compiled object, for those values that are present.
2069
2070 .. seealso::
2071
2072 :ref:`faq_sql_expression_string` - includes a usage example for
2073 debugging use cases.
2074
2075 """
2076 return self.construct_params(_check=False)
2077
2078 def _process_parameters_for_postcompile(
2079 self,
2080 parameters: _MutableCoreSingleExecuteParams,
2081 _populate_self: bool = False,
2082 ) -> ExpandedState:
2083 """handle special post compile parameters.
2084
2085 These include:
2086
2087 * "expanding" parameters -typically IN tuples that are rendered
2088 on a per-parameter basis for an otherwise fixed SQL statement string.
2089
2090 * literal_binds compiled with the literal_execute flag. Used for
2091 things like SQL Server "TOP N" where the driver does not accommodate
2092 N as a bound parameter.
2093
2094 """
2095
2096 expanded_parameters = {}
2097 new_positiontup: Optional[List[str]]
2098
2099 pre_expanded_string = self._pre_expanded_string
2100 if pre_expanded_string is None:
2101 pre_expanded_string = self.string
2102
2103 if self.positional:
2104 new_positiontup = []
2105
2106 pre_expanded_positiontup = self._pre_expanded_positiontup
2107 if pre_expanded_positiontup is None:
2108 pre_expanded_positiontup = self.positiontup
2109
2110 else:
2111 new_positiontup = pre_expanded_positiontup = None
2112
2113 processors = self._bind_processors
2114 single_processors = cast(
2115 "Mapping[str, _BindProcessorType[Any]]", processors
2116 )
2117 tuple_processors = cast(
2118 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2119 )
2120
2121 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2122
2123 replacement_expressions: Dict[str, Any] = {}
2124 to_update_sets: Dict[str, Any] = {}
2125
2126 # notes:
2127 # *unescaped* parameter names in:
2128 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2129 #
2130 # *escaped* parameter names in:
2131 # construct_params(), replacement_expressions
2132
2133 numeric_positiontup: Optional[List[str]] = None
2134
2135 if self.positional and pre_expanded_positiontup is not None:
2136 names: Iterable[str] = pre_expanded_positiontup
2137 if self._numeric_binds:
2138 numeric_positiontup = []
2139 else:
2140 names = self.bind_names.values()
2141
2142 ebn = self.escaped_bind_names
2143 for name in names:
2144 escaped_name = ebn.get(name, name) if ebn else name
2145 parameter = self.binds[name]
2146
2147 if parameter in self.literal_execute_params:
2148 if escaped_name not in replacement_expressions:
2149 replacement_expressions[escaped_name] = (
2150 self.render_literal_bindparam(
2151 parameter,
2152 render_literal_value=parameters.pop(escaped_name),
2153 )
2154 )
2155 continue
2156
2157 if parameter in self.post_compile_params:
2158 if escaped_name in replacement_expressions:
2159 to_update = to_update_sets[escaped_name]
2160 values = None
2161 else:
2162 # we are removing the parameter from parameters
2163 # because it is a list value, which is not expected by
2164 # TypeEngine objects that would otherwise be asked to
2165 # process it. the single name is being replaced with
2166 # individual numbered parameters for each value in the
2167 # param.
2168 #
2169 # note we are also inserting *escaped* parameter names
2170 # into the given dictionary. default dialect will
2171 # use these param names directly as they will not be
2172 # in the escaped_bind_names dictionary.
2173 values = parameters.pop(name)
2174
2175 leep_res = self._literal_execute_expanding_parameter(
2176 escaped_name, parameter, values
2177 )
2178 (to_update, replacement_expr) = leep_res
2179
2180 to_update_sets[escaped_name] = to_update
2181 replacement_expressions[escaped_name] = replacement_expr
2182
2183 if not parameter.literal_execute:
2184 parameters.update(to_update)
2185 if parameter.type._is_tuple_type:
2186 assert values is not None
2187 new_processors.update(
2188 (
2189 "%s_%s_%s" % (name, i, j),
2190 tuple_processors[name][j - 1],
2191 )
2192 for i, tuple_element in enumerate(values, 1)
2193 for j, _ in enumerate(tuple_element, 1)
2194 if name in tuple_processors
2195 and tuple_processors[name][j - 1] is not None
2196 )
2197 else:
2198 new_processors.update(
2199 (key, single_processors[name])
2200 for key, _ in to_update
2201 if name in single_processors
2202 )
2203 if numeric_positiontup is not None:
2204 numeric_positiontup.extend(
2205 name for name, _ in to_update
2206 )
2207 elif new_positiontup is not None:
2208 # to_update has escaped names, but that's ok since
2209 # these are new names, that aren't in the
2210 # escaped_bind_names dict.
2211 new_positiontup.extend(name for name, _ in to_update)
2212 expanded_parameters[name] = [
2213 expand_key for expand_key, _ in to_update
2214 ]
2215 elif new_positiontup is not None:
2216 new_positiontup.append(name)
2217
2218 def process_expanding(m):
2219 key = m.group(1)
2220 expr = replacement_expressions[key]
2221
2222 # if POSTCOMPILE included a bind_expression, render that
2223 # around each element
2224 if m.group(2):
2225 tok = m.group(2).split("~~")
2226 be_left, be_right = tok[1], tok[3]
2227 expr = ", ".join(
2228 "%s%s%s" % (be_left, exp, be_right)
2229 for exp in expr.split(", ")
2230 )
2231 return expr
2232
2233 statement = re.sub(
2234 self._post_compile_pattern, process_expanding, pre_expanded_string
2235 )
2236
2237 if numeric_positiontup is not None:
2238 assert new_positiontup is not None
2239 param_pos = {
2240 key: f"{self._numeric_binds_identifier_char}{num}"
2241 for num, key in enumerate(
2242 numeric_positiontup, self.next_numeric_pos
2243 )
2244 }
2245 # Can't use format here since % chars are not escaped.
2246 statement = self._pyformat_pattern.sub(
2247 lambda m: param_pos[m.group(1)], statement
2248 )
2249 new_positiontup.extend(numeric_positiontup)
2250
2251 expanded_state = ExpandedState(
2252 statement,
2253 parameters,
2254 new_processors,
2255 new_positiontup,
2256 expanded_parameters,
2257 )
2258
2259 if _populate_self:
2260 # this is for the "render_postcompile" flag, which is not
2261 # otherwise used internally and is for end-user debugging and
2262 # special use cases.
2263 self._pre_expanded_string = pre_expanded_string
2264 self._pre_expanded_positiontup = pre_expanded_positiontup
2265 self.string = expanded_state.statement
2266 self.positiontup = (
2267 list(expanded_state.positiontup or ())
2268 if self.positional
2269 else None
2270 )
2271 self._post_compile_expanded_state = expanded_state
2272
2273 return expanded_state
2274
2275 @util.preload_module("sqlalchemy.engine.cursor")
2276 def _create_result_map(self):
2277 """utility method used for unit tests only."""
2278 cursor = util.preloaded.engine_cursor
2279 return cursor.CursorResultMetaData._create_description_match_map(
2280 self._result_columns
2281 )
2282
2283 # assigned by crud.py for insert/update statements
2284 _get_bind_name_for_col: _BindNameForColProtocol
2285
2286 @util.memoized_property
2287 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2288 getter = self._get_bind_name_for_col
2289 return getter
2290
2291 @util.memoized_property
2292 @util.preload_module("sqlalchemy.engine.result")
2293 def _inserted_primary_key_from_lastrowid_getter(self):
2294 result = util.preloaded.engine_result
2295
2296 param_key_getter = self._within_exec_param_key_getter
2297
2298 assert self.compile_state is not None
2299 statement = self.compile_state.statement
2300
2301 if TYPE_CHECKING:
2302 assert isinstance(statement, Insert)
2303
2304 table = statement.table
2305
2306 getters = [
2307 (operator.methodcaller("get", param_key_getter(col), None), col)
2308 for col in table.primary_key
2309 ]
2310
2311 autoinc_getter = None
2312 autoinc_col = table._autoincrement_column
2313 if autoinc_col is not None:
2314 # apply type post processors to the lastrowid
2315 lastrowid_processor = autoinc_col.type._cached_result_processor(
2316 self.dialect, None
2317 )
2318 autoinc_key = param_key_getter(autoinc_col)
2319
2320 # if a bind value is present for the autoincrement column
2321 # in the parameters, we need to do the logic dictated by
2322 # #7998; honor a non-None user-passed parameter over lastrowid.
2323 # previously in the 1.4 series we weren't fetching lastrowid
2324 # at all if the key were present in the parameters
2325 if autoinc_key in self.binds:
2326
2327 def _autoinc_getter(lastrowid, parameters):
2328 param_value = parameters.get(autoinc_key, lastrowid)
2329 if param_value is not None:
2330 # they supplied non-None parameter, use that.
2331 # SQLite at least is observed to return the wrong
2332 # cursor.lastrowid for INSERT..ON CONFLICT so it
2333 # can't be used in all cases
2334 return param_value
2335 else:
2336 # use lastrowid
2337 return lastrowid
2338
2339 # work around mypy https://github.com/python/mypy/issues/14027
2340 autoinc_getter = _autoinc_getter
2341
2342 else:
2343 lastrowid_processor = None
2344
2345 row_fn = result.result_tuple([col.key for col in table.primary_key])
2346
2347 def get(lastrowid, parameters):
2348 """given cursor.lastrowid value and the parameters used for INSERT,
2349 return a "row" that represents the primary key, either by
2350 using the "lastrowid" or by extracting values from the parameters
2351 that were sent along with the INSERT.
2352
2353 """
2354 if lastrowid_processor is not None:
2355 lastrowid = lastrowid_processor(lastrowid)
2356
2357 if lastrowid is None:
2358 return row_fn(getter(parameters) for getter, col in getters)
2359 else:
2360 return row_fn(
2361 (
2362 (
2363 autoinc_getter(lastrowid, parameters)
2364 if autoinc_getter is not None
2365 else lastrowid
2366 )
2367 if col is autoinc_col
2368 else getter(parameters)
2369 )
2370 for getter, col in getters
2371 )
2372
2373 return get
2374
2375 @util.memoized_property
2376 @util.preload_module("sqlalchemy.engine.result")
2377 def _inserted_primary_key_from_returning_getter(self):
2378 result = util.preloaded.engine_result
2379
2380 assert self.compile_state is not None
2381 statement = self.compile_state.statement
2382
2383 if TYPE_CHECKING:
2384 assert isinstance(statement, Insert)
2385
2386 param_key_getter = self._within_exec_param_key_getter
2387 table = statement.table
2388
2389 returning = self.implicit_returning
2390 assert returning is not None
2391 ret = {col: idx for idx, col in enumerate(returning)}
2392
2393 getters = cast(
2394 "List[Tuple[Callable[[Any], Any], bool]]",
2395 [
2396 (
2397 (operator.itemgetter(ret[col]), True)
2398 if col in ret
2399 else (
2400 operator.methodcaller(
2401 "get", param_key_getter(col), None
2402 ),
2403 False,
2404 )
2405 )
2406 for col in table.primary_key
2407 ],
2408 )
2409
2410 row_fn = result.result_tuple([col.key for col in table.primary_key])
2411
2412 def get(row, parameters):
2413 return row_fn(
2414 getter(row) if use_row else getter(parameters)
2415 for getter, use_row in getters
2416 )
2417
2418 return get
2419
2420 def default_from(self) -> str:
2421 """Called when a SELECT statement has no froms, and no FROM clause is
2422 to be appended.
2423
2424 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2425 output.
2426
2427 """
2428 return ""
2429
2430 def visit_override_binds(self, override_binds, **kw):
2431 """SQL compile the nested element of an _OverrideBinds with
2432 bindparams swapped out.
2433
2434 The _OverrideBinds is not normally expected to be compiled; it
2435 is meant to be used when an already cached statement is to be used,
2436 the compilation was already performed, and only the bound params should
2437 be swapped in at execution time.
2438
2439 However, there are test cases that exericise this object, and
2440 additionally the ORM subquery loader is known to feed in expressions
2441 which include this construct into new queries (discovered in #11173),
2442 so it has to do the right thing at compile time as well.
2443
2444 """
2445
2446 # get SQL text first
2447 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2448
2449 # for a test compile that is not for caching, change binds after the
2450 # fact. note that we don't try to
2451 # swap the bindparam as we compile, because our element may be
2452 # elsewhere in the statement already (e.g. a subquery or perhaps a
2453 # CTE) and was already visited / compiled. See
2454 # test_relationship_criteria.py ->
2455 # test_selectinload_local_criteria_subquery
2456 for k in override_binds.translate:
2457 if k not in self.binds:
2458 continue
2459 bp = self.binds[k]
2460
2461 # so this would work, just change the value of bp in place.
2462 # but we dont want to mutate things outside.
2463 # bp.value = override_binds.translate[bp.key]
2464 # continue
2465
2466 # instead, need to replace bp with new_bp or otherwise accommodate
2467 # in all internal collections
2468 new_bp = bp._with_value(
2469 override_binds.translate[bp.key],
2470 maintain_key=True,
2471 required=False,
2472 )
2473
2474 name = self.bind_names[bp]
2475 self.binds[k] = self.binds[name] = new_bp
2476 self.bind_names[new_bp] = name
2477 self.bind_names.pop(bp, None)
2478
2479 if bp in self.post_compile_params:
2480 self.post_compile_params |= {new_bp}
2481 if bp in self.literal_execute_params:
2482 self.literal_execute_params |= {new_bp}
2483
2484 ckbm_tuple = self._cache_key_bind_match
2485 if ckbm_tuple:
2486 ckbm, cksm = ckbm_tuple
2487 for bp in bp._cloned_set:
2488 if bp.key in cksm:
2489 cb = cksm[bp.key]
2490 ckbm[cb].append(new_bp)
2491
2492 return sqltext
2493
2494 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2495 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2496
2497 def visit_select_statement_grouping(self, grouping, **kwargs):
2498 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2499
2500 def visit_label_reference(
2501 self, element, within_columns_clause=False, **kwargs
2502 ):
2503 if self.stack and self.dialect.supports_simple_order_by_label:
2504 try:
2505 compile_state = cast(
2506 "Union[SelectState, CompoundSelectState]",
2507 self.stack[-1]["compile_state"],
2508 )
2509 except KeyError as ke:
2510 raise exc.CompileError(
2511 "Can't resolve label reference for ORDER BY / "
2512 "GROUP BY / DISTINCT etc."
2513 ) from ke
2514
2515 (
2516 with_cols,
2517 only_froms,
2518 only_cols,
2519 ) = compile_state._label_resolve_dict
2520 if within_columns_clause:
2521 resolve_dict = only_froms
2522 else:
2523 resolve_dict = only_cols
2524
2525 # this can be None in the case that a _label_reference()
2526 # were subject to a replacement operation, in which case
2527 # the replacement of the Label element may have changed
2528 # to something else like a ColumnClause expression.
2529 order_by_elem = element.element._order_by_label_element
2530
2531 if (
2532 order_by_elem is not None
2533 and order_by_elem.name in resolve_dict
2534 and order_by_elem.shares_lineage(
2535 resolve_dict[order_by_elem.name]
2536 )
2537 ):
2538 kwargs["render_label_as_label"] = (
2539 element.element._order_by_label_element
2540 )
2541 return self.process(
2542 element.element,
2543 within_columns_clause=within_columns_clause,
2544 **kwargs,
2545 )
2546
2547 def visit_textual_label_reference(
2548 self, element, within_columns_clause=False, **kwargs
2549 ):
2550 if not self.stack:
2551 # compiling the element outside of the context of a SELECT
2552 return self.process(element._text_clause)
2553
2554 try:
2555 compile_state = cast(
2556 "Union[SelectState, CompoundSelectState]",
2557 self.stack[-1]["compile_state"],
2558 )
2559 except KeyError as ke:
2560 coercions._no_text_coercion(
2561 element.element,
2562 extra=(
2563 "Can't resolve label reference for ORDER BY / "
2564 "GROUP BY / DISTINCT etc."
2565 ),
2566 exc_cls=exc.CompileError,
2567 err=ke,
2568 )
2569
2570 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2571 try:
2572 if within_columns_clause:
2573 col = only_froms[element.element]
2574 else:
2575 col = with_cols[element.element]
2576 except KeyError as err:
2577 coercions._no_text_coercion(
2578 element.element,
2579 extra=(
2580 "Can't resolve label reference for ORDER BY / "
2581 "GROUP BY / DISTINCT etc."
2582 ),
2583 exc_cls=exc.CompileError,
2584 err=err,
2585 )
2586 else:
2587 kwargs["render_label_as_label"] = col
2588 return self.process(
2589 col, within_columns_clause=within_columns_clause, **kwargs
2590 )
2591
2592 def visit_label(
2593 self,
2594 label,
2595 add_to_result_map=None,
2596 within_label_clause=False,
2597 within_columns_clause=False,
2598 render_label_as_label=None,
2599 result_map_targets=(),
2600 **kw,
2601 ):
2602 # only render labels within the columns clause
2603 # or ORDER BY clause of a select. dialect-specific compilers
2604 # can modify this behavior.
2605 render_label_with_as = (
2606 within_columns_clause and not within_label_clause
2607 )
2608 render_label_only = render_label_as_label is label
2609
2610 if render_label_only or render_label_with_as:
2611 if isinstance(label.name, elements._truncated_label):
2612 labelname = self._truncated_identifier("colident", label.name)
2613 else:
2614 labelname = label.name
2615
2616 if render_label_with_as:
2617 if add_to_result_map is not None:
2618 add_to_result_map(
2619 labelname,
2620 label.name,
2621 (label, labelname) + label._alt_names + result_map_targets,
2622 label.type,
2623 )
2624 return (
2625 label.element._compiler_dispatch(
2626 self,
2627 within_columns_clause=True,
2628 within_label_clause=True,
2629 **kw,
2630 )
2631 + OPERATORS[operators.as_]
2632 + self.preparer.format_label(label, labelname)
2633 )
2634 elif render_label_only:
2635 return self.preparer.format_label(label, labelname)
2636 else:
2637 return label.element._compiler_dispatch(
2638 self, within_columns_clause=False, **kw
2639 )
2640
2641 def _fallback_column_name(self, column):
2642 raise exc.CompileError(
2643 "Cannot compile Column object until its 'name' is assigned."
2644 )
2645
2646 def visit_lambda_element(self, element, **kw):
2647 sql_element = element._resolved
2648 return self.process(sql_element, **kw)
2649
2650 def visit_column(
2651 self,
2652 column: ColumnClause[Any],
2653 add_to_result_map: Optional[_ResultMapAppender] = None,
2654 include_table: bool = True,
2655 result_map_targets: Tuple[Any, ...] = (),
2656 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2657 **kwargs: Any,
2658 ) -> str:
2659 name = orig_name = column.name
2660 if name is None:
2661 name = self._fallback_column_name(column)
2662
2663 is_literal = column.is_literal
2664 if not is_literal and isinstance(name, elements._truncated_label):
2665 name = self._truncated_identifier("colident", name)
2666
2667 if add_to_result_map is not None:
2668 targets = (column, name, column.key) + result_map_targets
2669 if column._tq_label:
2670 targets += (column._tq_label,)
2671
2672 add_to_result_map(name, orig_name, targets, column.type)
2673
2674 if is_literal:
2675 # note we are not currently accommodating for
2676 # literal_column(quoted_name('ident', True)) here
2677 name = self.escape_literal_column(name)
2678 else:
2679 name = self.preparer.quote(name)
2680 table = column.table
2681 if table is None or not include_table or not table.named_with_column:
2682 return name
2683 else:
2684 effective_schema = self.preparer.schema_for_object(table)
2685
2686 if effective_schema:
2687 schema_prefix = (
2688 self.preparer.quote_schema(effective_schema) + "."
2689 )
2690 else:
2691 schema_prefix = ""
2692
2693 if TYPE_CHECKING:
2694 assert isinstance(table, NamedFromClause)
2695 tablename = table.name
2696
2697 if (
2698 not effective_schema
2699 and ambiguous_table_name_map
2700 and tablename in ambiguous_table_name_map
2701 ):
2702 tablename = ambiguous_table_name_map[tablename]
2703
2704 if isinstance(tablename, elements._truncated_label):
2705 tablename = self._truncated_identifier("alias", tablename)
2706
2707 return schema_prefix + self.preparer.quote(tablename) + "." + name
2708
2709 def visit_collation(self, element, **kw):
2710 return self.preparer.format_collation(element.collation)
2711
2712 def visit_fromclause(self, fromclause, **kwargs):
2713 return fromclause.name
2714
2715 def visit_index(self, index, **kwargs):
2716 return index.name
2717
2718 def visit_typeclause(self, typeclause, **kw):
2719 kw["type_expression"] = typeclause
2720 kw["identifier_preparer"] = self.preparer
2721 return self.dialect.type_compiler_instance.process(
2722 typeclause.type, **kw
2723 )
2724
2725 def post_process_text(self, text):
2726 if self.preparer._double_percents:
2727 text = text.replace("%", "%%")
2728 return text
2729
2730 def escape_literal_column(self, text):
2731 if self.preparer._double_percents:
2732 text = text.replace("%", "%%")
2733 return text
2734
2735 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2736 if self._collect_params:
2737 self._add_to_params(textclause)
2738
2739 def do_bindparam(m):
2740 name = m.group(1)
2741 if name in textclause._bindparams:
2742 return self.process(textclause._bindparams[name], **kw)
2743 else:
2744 return self.bindparam_string(name, **kw)
2745
2746 if not self.stack:
2747 self.isplaintext = True
2748
2749 if add_to_result_map:
2750 # text() object is present in the columns clause of a
2751 # select(). Add a no-name entry to the result map so that
2752 # row[text()] produces a result
2753 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2754
2755 # un-escape any \:params
2756 return BIND_PARAMS_ESC.sub(
2757 lambda m: m.group(1),
2758 BIND_PARAMS.sub(
2759 do_bindparam, self.post_process_text(textclause.text)
2760 ),
2761 )
2762
2763 def visit_textual_select(
2764 self, taf, compound_index=None, asfrom=False, **kw
2765 ):
2766 if self._collect_params:
2767 self._add_to_params(taf)
2768 toplevel = not self.stack
2769 entry = self._default_stack_entry if toplevel else self.stack[-1]
2770
2771 new_entry: _CompilerStackEntry = {
2772 "correlate_froms": set(),
2773 "asfrom_froms": set(),
2774 "selectable": taf,
2775 }
2776 self.stack.append(new_entry)
2777
2778 if taf._independent_ctes:
2779 self._dispatch_independent_ctes(taf, kw)
2780
2781 populate_result_map = (
2782 toplevel
2783 or (
2784 compound_index == 0
2785 and entry.get("need_result_map_for_compound", False)
2786 )
2787 or entry.get("need_result_map_for_nested", False)
2788 )
2789
2790 if populate_result_map:
2791 self._ordered_columns = self._textual_ordered_columns = (
2792 taf.positional
2793 )
2794
2795 # enable looser result column matching when the SQL text links to
2796 # Column objects by name only
2797 self._loose_column_name_matching = not taf.positional and bool(
2798 taf.column_args
2799 )
2800
2801 for c in taf.column_args:
2802 self.process(
2803 c,
2804 within_columns_clause=True,
2805 add_to_result_map=self._add_to_result_map,
2806 )
2807
2808 text = self.process(taf.element, **kw)
2809 if self.ctes:
2810 nesting_level = len(self.stack) if not toplevel else None
2811 text = self._render_cte_clause(nesting_level=nesting_level) + text
2812
2813 self.stack.pop(-1)
2814
2815 return text
2816
2817 def visit_null(self, expr: Null, **kw: Any) -> str:
2818 return "NULL"
2819
2820 def visit_true(self, expr: True_, **kw: Any) -> str:
2821 if self.dialect.supports_native_boolean:
2822 return "true"
2823 else:
2824 return "1"
2825
2826 def visit_false(self, expr: False_, **kw: Any) -> str:
2827 if self.dialect.supports_native_boolean:
2828 return "false"
2829 else:
2830 return "0"
2831
2832 def _generate_delimited_list(self, elements, separator, **kw):
2833 return separator.join(
2834 s
2835 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2836 if s
2837 )
2838
2839 def _generate_delimited_and_list(self, clauses, **kw):
2840 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2841 operators.and_,
2842 elements.True_._singleton,
2843 elements.False_._singleton,
2844 clauses,
2845 )
2846 if lcc == 1:
2847 return clauses[0]._compiler_dispatch(self, **kw)
2848 else:
2849 separator = OPERATORS[operators.and_]
2850 return separator.join(
2851 s
2852 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2853 if s
2854 )
2855
2856 def visit_tuple(self, clauselist, **kw):
2857 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2858
2859 def visit_element_list(self, element, **kw):
2860 return self._generate_delimited_list(element.clauses, " ", **kw)
2861
2862 def visit_order_by_list(self, element, **kw):
2863 return self._generate_delimited_list(element.clauses, ", ", **kw)
2864
2865 def visit_clauselist(self, clauselist, **kw):
2866 sep = clauselist.operator
2867 if sep is None:
2868 sep = " "
2869 else:
2870 sep = OPERATORS[clauselist.operator]
2871
2872 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2873
2874 def visit_expression_clauselist(self, clauselist, **kw):
2875 operator_ = clauselist.operator
2876
2877 disp = self._get_operator_dispatch(
2878 operator_, "expression_clauselist", None
2879 )
2880 if disp:
2881 return disp(clauselist, operator_, **kw)
2882
2883 try:
2884 opstring = OPERATORS[operator_]
2885 except KeyError as err:
2886 raise exc.UnsupportedCompilationError(self, operator_) from err
2887 else:
2888 kw["_in_operator_expression"] = True
2889 return self._generate_delimited_list(
2890 clauselist.clauses, opstring, **kw
2891 )
2892
2893 def visit_case(self, clause, **kwargs):
2894 x = "CASE "
2895 if clause.value is not None:
2896 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2897 for cond, result in clause.whens:
2898 x += (
2899 "WHEN "
2900 + cond._compiler_dispatch(self, **kwargs)
2901 + " THEN "
2902 + result._compiler_dispatch(self, **kwargs)
2903 + " "
2904 )
2905 if clause.else_ is not None:
2906 x += (
2907 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2908 )
2909 x += "END"
2910 return x
2911
2912 def visit_type_coerce(self, type_coerce, **kw):
2913 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2914
2915 def visit_cast(self, cast, **kwargs):
2916 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2917 match = re.match("(.*)( COLLATE .*)", type_clause)
2918 return "CAST(%s AS %s)%s" % (
2919 cast.clause._compiler_dispatch(self, **kwargs),
2920 match.group(1) if match else type_clause,
2921 match.group(2) if match else "",
2922 )
2923
2924 def visit_frame_clause(self, frameclause, **kw):
2925
2926 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2927 left = "UNBOUNDED PRECEDING"
2928 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2929 left = "CURRENT ROW"
2930 else:
2931 val = self.process(frameclause.lower_bind, **kw)
2932 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2933 left = f"{val} PRECEDING"
2934 else:
2935 left = f"{val} FOLLOWING"
2936
2937 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2938 right = "UNBOUNDED FOLLOWING"
2939 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2940 right = "CURRENT ROW"
2941 else:
2942 val = self.process(frameclause.upper_bind, **kw)
2943 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2944 right = f"{val} PRECEDING"
2945 else:
2946 right = f"{val} FOLLOWING"
2947
2948 return f"{left} AND {right}"
2949
2950 def visit_over(self, over, **kwargs):
2951 text = over.element._compiler_dispatch(self, **kwargs)
2952 if over.range_ is not None:
2953 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2954 elif over.rows is not None:
2955 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
2956 elif over.groups is not None:
2957 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
2958 else:
2959 range_ = None
2960
2961 return "%s OVER (%s)" % (
2962 text,
2963 " ".join(
2964 [
2965 "%s BY %s"
2966 % (word, clause._compiler_dispatch(self, **kwargs))
2967 for word, clause in (
2968 ("PARTITION", over.partition_by),
2969 ("ORDER", over.order_by),
2970 )
2971 if clause is not None and len(clause)
2972 ]
2973 + ([range_] if range_ else [])
2974 ),
2975 )
2976
2977 def visit_withingroup(self, withingroup, **kwargs):
2978 return "%s WITHIN GROUP (ORDER BY %s)" % (
2979 withingroup.element._compiler_dispatch(self, **kwargs),
2980 withingroup.order_by._compiler_dispatch(self, **kwargs),
2981 )
2982
2983 def visit_funcfilter(self, funcfilter, **kwargs):
2984 return "%s FILTER (WHERE %s)" % (
2985 funcfilter.func._compiler_dispatch(self, **kwargs),
2986 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2987 )
2988
2989 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
2990 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
2991 raise exc.CompileError(
2992 "this dialect does not support "
2993 "ORDER BY within an aggregate function"
2994 )
2995 elif (
2996 self.dialect.aggregate_order_by_style
2997 is AggregateOrderByStyle.INLINE
2998 ):
2999 new_fn = aggregateorderby.element._clone()
3000 new_fn.clause_expr = elements.Grouping(
3001 aggregate_orderby_inline(
3002 new_fn.clause_expr.element, aggregateorderby.order_by
3003 )
3004 )
3005
3006 return new_fn._compiler_dispatch(self, **kwargs)
3007 else:
3008 return self.visit_withingroup(aggregateorderby, **kwargs)
3009
3010 def visit_aggregate_orderby_inline(self, element, **kw):
3011 return "%s ORDER BY %s" % (
3012 self.process(element.element, **kw),
3013 self.process(element.aggregate_order_by, **kw),
3014 )
3015
3016 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3017 # aggreagate_order_by attribute is present if visit_function
3018 # gave us a Function with aggregate_orderby_inline() as the inner
3019 # contents
3020 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3021
3022 literal_exec = dict(kw)
3023 literal_exec["literal_execute"] = True
3024
3025 # break up the function into its components so we can apply
3026 # literal_execute to the second argument (the delimeter)
3027 cl = list(fn.clauses)
3028 expr, delimeter = cl[0:2]
3029 if (
3030 order_by is not None
3031 and self.dialect.aggregate_order_by_style
3032 is AggregateOrderByStyle.INLINE
3033 ):
3034 return (
3035 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3036 f"{delimeter._compiler_dispatch(self, **literal_exec)} "
3037 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3038 )
3039 else:
3040 return (
3041 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3042 f"{delimeter._compiler_dispatch(self, **literal_exec)})"
3043 )
3044
3045 def visit_extract(self, extract, **kwargs):
3046 field = self.extract_map.get(extract.field, extract.field)
3047 return "EXTRACT(%s FROM %s)" % (
3048 field,
3049 extract.expr._compiler_dispatch(self, **kwargs),
3050 )
3051
3052 def visit_scalar_function_column(self, element, **kw):
3053 compiled_fn = self.visit_function(element.fn, **kw)
3054 compiled_col = self.visit_column(element, **kw)
3055 return "(%s).%s" % (compiled_fn, compiled_col)
3056
3057 def visit_function(
3058 self,
3059 func: Function[Any],
3060 add_to_result_map: Optional[_ResultMapAppender] = None,
3061 **kwargs: Any,
3062 ) -> str:
3063 if self._collect_params:
3064 self._add_to_params(func)
3065 if add_to_result_map is not None:
3066 add_to_result_map(func.name, func.name, (func.name,), func.type)
3067
3068 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3069
3070 text: str
3071
3072 if disp:
3073 text = disp(func, **kwargs)
3074 else:
3075 name = FUNCTIONS.get(func._deannotate().__class__, None)
3076 if name:
3077 if func._has_args:
3078 name += "%(expr)s"
3079 else:
3080 name = func.name
3081 name = (
3082 self.preparer.quote(name)
3083 if self.preparer._requires_quotes_illegal_chars(name)
3084 or isinstance(name, elements.quoted_name)
3085 else name
3086 )
3087 name = name + "%(expr)s"
3088 text = ".".join(
3089 [
3090 (
3091 self.preparer.quote(tok)
3092 if self.preparer._requires_quotes_illegal_chars(tok)
3093 or isinstance(name, elements.quoted_name)
3094 else tok
3095 )
3096 for tok in func.packagenames
3097 ]
3098 + [name]
3099 ) % {"expr": self.function_argspec(func, **kwargs)}
3100
3101 if func._with_ordinality:
3102 text += " WITH ORDINALITY"
3103 return text
3104
3105 def visit_next_value_func(self, next_value, **kw):
3106 return self.visit_sequence(next_value.sequence)
3107
3108 def visit_sequence(self, sequence, **kw):
3109 raise NotImplementedError(
3110 "Dialect '%s' does not support sequence increments."
3111 % self.dialect.name
3112 )
3113
3114 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3115 return func.clause_expr._compiler_dispatch(self, **kwargs)
3116
3117 def visit_compound_select(
3118 self, cs, asfrom=False, compound_index=None, **kwargs
3119 ):
3120 if self._collect_params:
3121 self._add_to_params(cs)
3122 toplevel = not self.stack
3123
3124 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3125
3126 if toplevel and not self.compile_state:
3127 self.compile_state = compile_state
3128
3129 compound_stmt = compile_state.statement
3130
3131 entry = self._default_stack_entry if toplevel else self.stack[-1]
3132 need_result_map = toplevel or (
3133 not compound_index
3134 and entry.get("need_result_map_for_compound", False)
3135 )
3136
3137 # indicates there is already a CompoundSelect in play
3138 if compound_index == 0:
3139 entry["select_0"] = cs
3140
3141 self.stack.append(
3142 {
3143 "correlate_froms": entry["correlate_froms"],
3144 "asfrom_froms": entry["asfrom_froms"],
3145 "selectable": cs,
3146 "compile_state": compile_state,
3147 "need_result_map_for_compound": need_result_map,
3148 }
3149 )
3150
3151 if compound_stmt._independent_ctes:
3152 self._dispatch_independent_ctes(compound_stmt, kwargs)
3153
3154 keyword = self.compound_keywords[cs.keyword]
3155
3156 text = (" " + keyword + " ").join(
3157 (
3158 c._compiler_dispatch(
3159 self, asfrom=asfrom, compound_index=i, **kwargs
3160 )
3161 for i, c in enumerate(cs.selects)
3162 )
3163 )
3164
3165 kwargs["include_table"] = False
3166 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3167 text += self.order_by_clause(cs, **kwargs)
3168 if cs._has_row_limiting_clause:
3169 text += self._row_limit_clause(cs, **kwargs)
3170
3171 if self.ctes:
3172 nesting_level = len(self.stack) if not toplevel else None
3173 text = (
3174 self._render_cte_clause(
3175 nesting_level=nesting_level,
3176 include_following_stack=True,
3177 )
3178 + text
3179 )
3180
3181 self.stack.pop(-1)
3182 return text
3183
3184 def _row_limit_clause(self, cs, **kwargs):
3185 if cs._fetch_clause is not None:
3186 return self.fetch_clause(cs, **kwargs)
3187 else:
3188 return self.limit_clause(cs, **kwargs)
3189
3190 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3191 attrname = "visit_%s_%s%s" % (
3192 operator_.__name__,
3193 qualifier1,
3194 "_" + qualifier2 if qualifier2 else "",
3195 )
3196 return getattr(self, attrname, None)
3197
3198 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3199 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3200 return getattr(self, attrname, None)
3201
3202 def visit_unary(
3203 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3204 ):
3205 if add_to_result_map is not None:
3206 result_map_targets += (unary,)
3207 kw["add_to_result_map"] = add_to_result_map
3208 kw["result_map_targets"] = result_map_targets
3209
3210 if unary.operator:
3211 if unary.modifier:
3212 raise exc.CompileError(
3213 "Unary expression does not support operator "
3214 "and modifier simultaneously"
3215 )
3216 disp = self._get_operator_dispatch(
3217 unary.operator, "unary", "operator"
3218 )
3219 if disp:
3220 return disp(unary, unary.operator, **kw)
3221 else:
3222 return self._generate_generic_unary_operator(
3223 unary, OPERATORS[unary.operator], **kw
3224 )
3225 elif unary.modifier:
3226 disp = self._get_operator_dispatch(
3227 unary.modifier, "unary", "modifier"
3228 )
3229 if disp:
3230 return disp(unary, unary.modifier, **kw)
3231 else:
3232 return self._generate_generic_unary_modifier(
3233 unary, OPERATORS[unary.modifier], **kw
3234 )
3235 else:
3236 raise exc.CompileError(
3237 "Unary expression has no operator or modifier"
3238 )
3239
3240 def visit_truediv_binary(self, binary, operator, **kw):
3241 if self.dialect.div_is_floordiv:
3242 return (
3243 self.process(binary.left, **kw)
3244 + " / "
3245 # TODO: would need a fast cast again here,
3246 # unless we want to use an implicit cast like "+ 0.0"
3247 + self.process(
3248 elements.Cast(
3249 binary.right,
3250 (
3251 binary.right.type
3252 if binary.right.type._type_affinity
3253 in (sqltypes.Numeric, sqltypes.Float)
3254 else sqltypes.Numeric()
3255 ),
3256 ),
3257 **kw,
3258 )
3259 )
3260 else:
3261 return (
3262 self.process(binary.left, **kw)
3263 + " / "
3264 + self.process(binary.right, **kw)
3265 )
3266
3267 def visit_floordiv_binary(self, binary, operator, **kw):
3268 if (
3269 self.dialect.div_is_floordiv
3270 and binary.right.type._type_affinity is sqltypes.Integer
3271 ):
3272 return (
3273 self.process(binary.left, **kw)
3274 + " / "
3275 + self.process(binary.right, **kw)
3276 )
3277 else:
3278 return "FLOOR(%s)" % (
3279 self.process(binary.left, **kw)
3280 + " / "
3281 + self.process(binary.right, **kw)
3282 )
3283
3284 def visit_is_true_unary_operator(self, element, operator, **kw):
3285 if (
3286 element._is_implicitly_boolean
3287 or self.dialect.supports_native_boolean
3288 ):
3289 return self.process(element.element, **kw)
3290 else:
3291 return "%s = 1" % self.process(element.element, **kw)
3292
3293 def visit_is_false_unary_operator(self, element, operator, **kw):
3294 if (
3295 element._is_implicitly_boolean
3296 or self.dialect.supports_native_boolean
3297 ):
3298 return "NOT %s" % self.process(element.element, **kw)
3299 else:
3300 return "%s = 0" % self.process(element.element, **kw)
3301
3302 def visit_not_match_op_binary(self, binary, operator, **kw):
3303 return "NOT %s" % self.visit_binary(
3304 binary, override_operator=operators.match_op
3305 )
3306
3307 def visit_not_in_op_binary(self, binary, operator, **kw):
3308 # The brackets are required in the NOT IN operation because the empty
3309 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3310 # The presence of the OR makes the brackets required.
3311 return "(%s)" % self._generate_generic_binary(
3312 binary, OPERATORS[operator], **kw
3313 )
3314
3315 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3316 if expand_op is operators.not_in_op:
3317 if len(type_) > 1:
3318 return "(%s)) OR (1 = 1" % (
3319 ", ".join("NULL" for element in type_)
3320 )
3321 else:
3322 return "NULL) OR (1 = 1"
3323 elif expand_op is operators.in_op:
3324 if len(type_) > 1:
3325 return "(%s)) AND (1 != 1" % (
3326 ", ".join("NULL" for element in type_)
3327 )
3328 else:
3329 return "NULL) AND (1 != 1"
3330 else:
3331 return self.visit_empty_set_expr(type_)
3332
3333 def visit_empty_set_expr(self, element_types, **kw):
3334 raise NotImplementedError(
3335 "Dialect '%s' does not support empty set expression."
3336 % self.dialect.name
3337 )
3338
3339 def _literal_execute_expanding_parameter_literal_binds(
3340 self, parameter, values, bind_expression_template=None
3341 ):
3342 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3343
3344 if not values:
3345 # empty IN expression. note we don't need to use
3346 # bind_expression_template here because there are no
3347 # expressions to render.
3348
3349 if typ_dialect_impl._is_tuple_type:
3350 replacement_expression = (
3351 "VALUES " if self.dialect.tuple_in_values else ""
3352 ) + self.visit_empty_set_op_expr(
3353 parameter.type.types, parameter.expand_op
3354 )
3355
3356 else:
3357 replacement_expression = self.visit_empty_set_op_expr(
3358 [parameter.type], parameter.expand_op
3359 )
3360
3361 elif typ_dialect_impl._is_tuple_type or (
3362 typ_dialect_impl._isnull
3363 and isinstance(values[0], collections_abc.Sequence)
3364 and not isinstance(values[0], (str, bytes))
3365 ):
3366 if typ_dialect_impl._has_bind_expression:
3367 raise NotImplementedError(
3368 "bind_expression() on TupleType not supported with "
3369 "literal_binds"
3370 )
3371
3372 replacement_expression = (
3373 "VALUES " if self.dialect.tuple_in_values else ""
3374 ) + ", ".join(
3375 "(%s)"
3376 % (
3377 ", ".join(
3378 self.render_literal_value(value, param_type)
3379 for value, param_type in zip(
3380 tuple_element, parameter.type.types
3381 )
3382 )
3383 )
3384 for i, tuple_element in enumerate(values)
3385 )
3386 else:
3387 if bind_expression_template:
3388 post_compile_pattern = self._post_compile_pattern
3389 m = post_compile_pattern.search(bind_expression_template)
3390 assert m and m.group(
3391 2
3392 ), "unexpected format for expanding parameter"
3393
3394 tok = m.group(2).split("~~")
3395 be_left, be_right = tok[1], tok[3]
3396 replacement_expression = ", ".join(
3397 "%s%s%s"
3398 % (
3399 be_left,
3400 self.render_literal_value(value, parameter.type),
3401 be_right,
3402 )
3403 for value in values
3404 )
3405 else:
3406 replacement_expression = ", ".join(
3407 self.render_literal_value(value, parameter.type)
3408 for value in values
3409 )
3410
3411 return (), replacement_expression
3412
3413 def _literal_execute_expanding_parameter(self, name, parameter, values):
3414 if parameter.literal_execute:
3415 return self._literal_execute_expanding_parameter_literal_binds(
3416 parameter, values
3417 )
3418
3419 dialect = self.dialect
3420 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3421
3422 if self._numeric_binds:
3423 bind_template = self.compilation_bindtemplate
3424 else:
3425 bind_template = self.bindtemplate
3426
3427 if (
3428 self.dialect._bind_typing_render_casts
3429 and typ_dialect_impl.render_bind_cast
3430 ):
3431
3432 def _render_bindtemplate(name):
3433 return self.render_bind_cast(
3434 parameter.type,
3435 typ_dialect_impl,
3436 bind_template % {"name": name},
3437 )
3438
3439 else:
3440
3441 def _render_bindtemplate(name):
3442 return bind_template % {"name": name}
3443
3444 if not values:
3445 to_update = []
3446 if typ_dialect_impl._is_tuple_type:
3447 replacement_expression = self.visit_empty_set_op_expr(
3448 parameter.type.types, parameter.expand_op
3449 )
3450 else:
3451 replacement_expression = self.visit_empty_set_op_expr(
3452 [parameter.type], parameter.expand_op
3453 )
3454
3455 elif typ_dialect_impl._is_tuple_type or (
3456 typ_dialect_impl._isnull
3457 and isinstance(values[0], collections_abc.Sequence)
3458 and not isinstance(values[0], (str, bytes))
3459 ):
3460 assert not typ_dialect_impl._is_array
3461 to_update = [
3462 ("%s_%s_%s" % (name, i, j), value)
3463 for i, tuple_element in enumerate(values, 1)
3464 for j, value in enumerate(tuple_element, 1)
3465 ]
3466
3467 replacement_expression = (
3468 "VALUES " if dialect.tuple_in_values else ""
3469 ) + ", ".join(
3470 "(%s)"
3471 % (
3472 ", ".join(
3473 _render_bindtemplate(
3474 to_update[i * len(tuple_element) + j][0]
3475 )
3476 for j, value in enumerate(tuple_element)
3477 )
3478 )
3479 for i, tuple_element in enumerate(values)
3480 )
3481 else:
3482 to_update = [
3483 ("%s_%s" % (name, i), value)
3484 for i, value in enumerate(values, 1)
3485 ]
3486 replacement_expression = ", ".join(
3487 _render_bindtemplate(key) for key, value in to_update
3488 )
3489
3490 return to_update, replacement_expression
3491
3492 def visit_binary(
3493 self,
3494 binary,
3495 override_operator=None,
3496 eager_grouping=False,
3497 from_linter=None,
3498 lateral_from_linter=None,
3499 **kw,
3500 ):
3501 if from_linter and operators.is_comparison(binary.operator):
3502 if lateral_from_linter is not None:
3503 enclosing_lateral = kw["enclosing_lateral"]
3504 lateral_from_linter.edges.update(
3505 itertools.product(
3506 _de_clone(
3507 binary.left._from_objects + [enclosing_lateral]
3508 ),
3509 _de_clone(
3510 binary.right._from_objects + [enclosing_lateral]
3511 ),
3512 )
3513 )
3514 else:
3515 from_linter.edges.update(
3516 itertools.product(
3517 _de_clone(binary.left._from_objects),
3518 _de_clone(binary.right._from_objects),
3519 )
3520 )
3521
3522 # don't allow "? = ?" to render
3523 if (
3524 self.ansi_bind_rules
3525 and isinstance(binary.left, elements.BindParameter)
3526 and isinstance(binary.right, elements.BindParameter)
3527 ):
3528 kw["literal_execute"] = True
3529
3530 operator_ = override_operator or binary.operator
3531 disp = self._get_operator_dispatch(operator_, "binary", None)
3532 if disp:
3533 return disp(binary, operator_, **kw)
3534 else:
3535 try:
3536 opstring = OPERATORS[operator_]
3537 except KeyError as err:
3538 raise exc.UnsupportedCompilationError(self, operator_) from err
3539 else:
3540 return self._generate_generic_binary(
3541 binary,
3542 opstring,
3543 from_linter=from_linter,
3544 lateral_from_linter=lateral_from_linter,
3545 **kw,
3546 )
3547
3548 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3549 return self.process(element.sql_function, **kw)
3550
3551 def visit_mod_binary(self, binary, operator, **kw):
3552 if self.preparer._double_percents:
3553 return (
3554 self.process(binary.left, **kw)
3555 + " %% "
3556 + self.process(binary.right, **kw)
3557 )
3558 else:
3559 return (
3560 self.process(binary.left, **kw)
3561 + " % "
3562 + self.process(binary.right, **kw)
3563 )
3564
3565 def visit_custom_op_binary(self, element, operator, **kw):
3566 if operator.visit_name:
3567 disp = self._get_custom_operator_dispatch(operator, "binary")
3568 if disp:
3569 return disp(element, operator, **kw)
3570
3571 kw["eager_grouping"] = operator.eager_grouping
3572 return self._generate_generic_binary(
3573 element,
3574 " " + self.escape_literal_column(operator.opstring) + " ",
3575 **kw,
3576 )
3577
3578 def visit_custom_op_unary_operator(self, element, operator, **kw):
3579 if operator.visit_name:
3580 disp = self._get_custom_operator_dispatch(operator, "unary")
3581 if disp:
3582 return disp(element, operator, **kw)
3583
3584 return self._generate_generic_unary_operator(
3585 element, self.escape_literal_column(operator.opstring) + " ", **kw
3586 )
3587
3588 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3589 if operator.visit_name:
3590 disp = self._get_custom_operator_dispatch(operator, "unary")
3591 if disp:
3592 return disp(element, operator, **kw)
3593
3594 return self._generate_generic_unary_modifier(
3595 element, " " + self.escape_literal_column(operator.opstring), **kw
3596 )
3597
3598 def _generate_generic_binary(
3599 self,
3600 binary: BinaryExpression[Any],
3601 opstring: str,
3602 eager_grouping: bool = False,
3603 **kw: Any,
3604 ) -> str:
3605 _in_operator_expression = kw.get("_in_operator_expression", False)
3606
3607 kw["_in_operator_expression"] = True
3608 kw["_binary_op"] = binary.operator
3609 text = (
3610 binary.left._compiler_dispatch(
3611 self, eager_grouping=eager_grouping, **kw
3612 )
3613 + opstring
3614 + binary.right._compiler_dispatch(
3615 self, eager_grouping=eager_grouping, **kw
3616 )
3617 )
3618
3619 if _in_operator_expression and eager_grouping:
3620 text = "(%s)" % text
3621 return text
3622
3623 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3624 return opstring + unary.element._compiler_dispatch(self, **kw)
3625
3626 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3627 return unary.element._compiler_dispatch(self, **kw) + opstring
3628
3629 @util.memoized_property
3630 def _like_percent_literal(self):
3631 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3632
3633 def visit_ilike_case_insensitive_operand(self, element, **kw):
3634 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3635
3636 def visit_contains_op_binary(self, binary, operator, **kw):
3637 binary = binary._clone()
3638 percent = self._like_percent_literal
3639 binary.right = percent.concat(binary.right).concat(percent)
3640 return self.visit_like_op_binary(binary, operator, **kw)
3641
3642 def visit_not_contains_op_binary(self, binary, operator, **kw):
3643 binary = binary._clone()
3644 percent = self._like_percent_literal
3645 binary.right = percent.concat(binary.right).concat(percent)
3646 return self.visit_not_like_op_binary(binary, operator, **kw)
3647
3648 def visit_icontains_op_binary(self, binary, operator, **kw):
3649 binary = binary._clone()
3650 percent = self._like_percent_literal
3651 binary.left = ilike_case_insensitive(binary.left)
3652 binary.right = percent.concat(
3653 ilike_case_insensitive(binary.right)
3654 ).concat(percent)
3655 return self.visit_ilike_op_binary(binary, operator, **kw)
3656
3657 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3658 binary = binary._clone()
3659 percent = self._like_percent_literal
3660 binary.left = ilike_case_insensitive(binary.left)
3661 binary.right = percent.concat(
3662 ilike_case_insensitive(binary.right)
3663 ).concat(percent)
3664 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3665
3666 def visit_startswith_op_binary(self, binary, operator, **kw):
3667 binary = binary._clone()
3668 percent = self._like_percent_literal
3669 binary.right = percent._rconcat(binary.right)
3670 return self.visit_like_op_binary(binary, operator, **kw)
3671
3672 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3673 binary = binary._clone()
3674 percent = self._like_percent_literal
3675 binary.right = percent._rconcat(binary.right)
3676 return self.visit_not_like_op_binary(binary, operator, **kw)
3677
3678 def visit_istartswith_op_binary(self, binary, operator, **kw):
3679 binary = binary._clone()
3680 percent = self._like_percent_literal
3681 binary.left = ilike_case_insensitive(binary.left)
3682 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3683 return self.visit_ilike_op_binary(binary, operator, **kw)
3684
3685 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3686 binary = binary._clone()
3687 percent = self._like_percent_literal
3688 binary.left = ilike_case_insensitive(binary.left)
3689 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3690 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3691
3692 def visit_endswith_op_binary(self, binary, operator, **kw):
3693 binary = binary._clone()
3694 percent = self._like_percent_literal
3695 binary.right = percent.concat(binary.right)
3696 return self.visit_like_op_binary(binary, operator, **kw)
3697
3698 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3699 binary = binary._clone()
3700 percent = self._like_percent_literal
3701 binary.right = percent.concat(binary.right)
3702 return self.visit_not_like_op_binary(binary, operator, **kw)
3703
3704 def visit_iendswith_op_binary(self, binary, operator, **kw):
3705 binary = binary._clone()
3706 percent = self._like_percent_literal
3707 binary.left = ilike_case_insensitive(binary.left)
3708 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3709 return self.visit_ilike_op_binary(binary, operator, **kw)
3710
3711 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3712 binary = binary._clone()
3713 percent = self._like_percent_literal
3714 binary.left = ilike_case_insensitive(binary.left)
3715 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3716 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3717
3718 def visit_like_op_binary(self, binary, operator, **kw):
3719 escape = binary.modifiers.get("escape", None)
3720
3721 return "%s LIKE %s" % (
3722 binary.left._compiler_dispatch(self, **kw),
3723 binary.right._compiler_dispatch(self, **kw),
3724 ) + (
3725 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3726 if escape is not None
3727 else ""
3728 )
3729
3730 def visit_not_like_op_binary(self, binary, operator, **kw):
3731 escape = binary.modifiers.get("escape", None)
3732 return "%s NOT LIKE %s" % (
3733 binary.left._compiler_dispatch(self, **kw),
3734 binary.right._compiler_dispatch(self, **kw),
3735 ) + (
3736 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3737 if escape is not None
3738 else ""
3739 )
3740
3741 def visit_ilike_op_binary(self, binary, operator, **kw):
3742 if operator is operators.ilike_op:
3743 binary = binary._clone()
3744 binary.left = ilike_case_insensitive(binary.left)
3745 binary.right = ilike_case_insensitive(binary.right)
3746 # else we assume ilower() has been applied
3747
3748 return self.visit_like_op_binary(binary, operator, **kw)
3749
3750 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3751 if operator is operators.not_ilike_op:
3752 binary = binary._clone()
3753 binary.left = ilike_case_insensitive(binary.left)
3754 binary.right = ilike_case_insensitive(binary.right)
3755 # else we assume ilower() has been applied
3756
3757 return self.visit_not_like_op_binary(binary, operator, **kw)
3758
3759 def visit_between_op_binary(self, binary, operator, **kw):
3760 symmetric = binary.modifiers.get("symmetric", False)
3761 return self._generate_generic_binary(
3762 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3763 )
3764
3765 def visit_not_between_op_binary(self, binary, operator, **kw):
3766 symmetric = binary.modifiers.get("symmetric", False)
3767 return self._generate_generic_binary(
3768 binary,
3769 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3770 **kw,
3771 )
3772
3773 def visit_regexp_match_op_binary(
3774 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3775 ) -> str:
3776 raise exc.CompileError(
3777 "%s dialect does not support regular expressions"
3778 % self.dialect.name
3779 )
3780
3781 def visit_not_regexp_match_op_binary(
3782 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3783 ) -> str:
3784 raise exc.CompileError(
3785 "%s dialect does not support regular expressions"
3786 % self.dialect.name
3787 )
3788
3789 def visit_regexp_replace_op_binary(
3790 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3791 ) -> str:
3792 raise exc.CompileError(
3793 "%s dialect does not support regular expression replacements"
3794 % self.dialect.name
3795 )
3796
3797 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3798 if bindmarkers is None:
3799 raise exc.CompileError(
3800 "DML target objects may only be used with "
3801 "compiled INSERT or UPDATE statements"
3802 )
3803
3804 bindmarkers[element.column.key] = element
3805 return f"__BINDMARKER_~~{element.column.key}~~"
3806
3807 def visit_bindparam(
3808 self,
3809 bindparam,
3810 within_columns_clause=False,
3811 literal_binds=False,
3812 skip_bind_expression=False,
3813 literal_execute=False,
3814 render_postcompile=False,
3815 **kwargs,
3816 ):
3817
3818 if not skip_bind_expression:
3819 impl = bindparam.type.dialect_impl(self.dialect)
3820 if impl._has_bind_expression:
3821 bind_expression = impl.bind_expression(bindparam)
3822 wrapped = self.process(
3823 bind_expression,
3824 skip_bind_expression=True,
3825 within_columns_clause=within_columns_clause,
3826 literal_binds=literal_binds and not bindparam.expanding,
3827 literal_execute=literal_execute,
3828 render_postcompile=render_postcompile,
3829 **kwargs,
3830 )
3831 if bindparam.expanding:
3832 # for postcompile w/ expanding, move the "wrapped" part
3833 # of this into the inside
3834
3835 m = re.match(
3836 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3837 )
3838 assert m, "unexpected format for expanding parameter"
3839 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3840 m.group(2),
3841 m.group(1),
3842 m.group(3),
3843 )
3844
3845 if literal_binds:
3846 ret = self.render_literal_bindparam(
3847 bindparam,
3848 within_columns_clause=True,
3849 bind_expression_template=wrapped,
3850 **kwargs,
3851 )
3852 return f"({ret})"
3853
3854 return wrapped
3855
3856 if not literal_binds:
3857 literal_execute = (
3858 literal_execute
3859 or bindparam.literal_execute
3860 or (within_columns_clause and self.ansi_bind_rules)
3861 )
3862 post_compile = literal_execute or bindparam.expanding
3863 else:
3864 post_compile = False
3865
3866 if literal_binds:
3867 ret = self.render_literal_bindparam(
3868 bindparam, within_columns_clause=True, **kwargs
3869 )
3870 if bindparam.expanding:
3871 ret = f"({ret})"
3872 return ret
3873
3874 name = self._truncate_bindparam(bindparam)
3875
3876 if name in self.binds:
3877 existing = self.binds[name]
3878 if existing is not bindparam:
3879 if (
3880 (existing.unique or bindparam.unique)
3881 and not existing.proxy_set.intersection(
3882 bindparam.proxy_set
3883 )
3884 and not existing._cloned_set.intersection(
3885 bindparam._cloned_set
3886 )
3887 ):
3888 raise exc.CompileError(
3889 "Bind parameter '%s' conflicts with "
3890 "unique bind parameter of the same name" % name
3891 )
3892 elif existing.expanding != bindparam.expanding:
3893 raise exc.CompileError(
3894 "Can't reuse bound parameter name '%s' in both "
3895 "'expanding' (e.g. within an IN expression) and "
3896 "non-expanding contexts. If this parameter is to "
3897 "receive a list/array value, set 'expanding=True' on "
3898 "it for expressions that aren't IN, otherwise use "
3899 "a different parameter name." % (name,)
3900 )
3901 elif existing._is_crud or bindparam._is_crud:
3902 if existing._is_crud and bindparam._is_crud:
3903 # TODO: this condition is not well understood.
3904 # see tests in test/sql/test_update.py
3905 raise exc.CompileError(
3906 "Encountered unsupported case when compiling an "
3907 "INSERT or UPDATE statement. If this is a "
3908 "multi-table "
3909 "UPDATE statement, please provide string-named "
3910 "arguments to the "
3911 "values() method with distinct names; support for "
3912 "multi-table UPDATE statements that "
3913 "target multiple tables for UPDATE is very "
3914 "limited",
3915 )
3916 else:
3917 raise exc.CompileError(
3918 f"bindparam() name '{bindparam.key}' is reserved "
3919 "for automatic usage in the VALUES or SET "
3920 "clause of this "
3921 "insert/update statement. Please use a "
3922 "name other than column name when using "
3923 "bindparam() "
3924 "with insert() or update() (for example, "
3925 f"'b_{bindparam.key}')."
3926 )
3927
3928 self.binds[bindparam.key] = self.binds[name] = bindparam
3929
3930 # if we are given a cache key that we're going to match against,
3931 # relate the bindparam here to one that is most likely present
3932 # in the "extracted params" portion of the cache key. this is used
3933 # to set up a positional mapping that is used to determine the
3934 # correct parameters for a subsequent use of this compiled with
3935 # a different set of parameter values. here, we accommodate for
3936 # parameters that may have been cloned both before and after the cache
3937 # key was been generated.
3938 ckbm_tuple = self._cache_key_bind_match
3939
3940 if ckbm_tuple:
3941 ckbm, cksm = ckbm_tuple
3942 for bp in bindparam._cloned_set:
3943 if bp.key in cksm:
3944 cb = cksm[bp.key]
3945 ckbm[cb].append(bindparam)
3946
3947 if bindparam.isoutparam:
3948 self.has_out_parameters = True
3949
3950 if post_compile:
3951 if render_postcompile:
3952 self._render_postcompile = True
3953
3954 if literal_execute:
3955 self.literal_execute_params |= {bindparam}
3956 else:
3957 self.post_compile_params |= {bindparam}
3958
3959 ret = self.bindparam_string(
3960 name,
3961 post_compile=post_compile,
3962 expanding=bindparam.expanding,
3963 bindparam_type=bindparam.type,
3964 **kwargs,
3965 )
3966
3967 if bindparam.expanding:
3968 ret = f"({ret})"
3969
3970 return ret
3971
3972 def render_bind_cast(self, type_, dbapi_type, sqltext):
3973 raise NotImplementedError()
3974
3975 def render_literal_bindparam(
3976 self,
3977 bindparam,
3978 render_literal_value=NO_ARG,
3979 bind_expression_template=None,
3980 **kw,
3981 ):
3982 if render_literal_value is not NO_ARG:
3983 value = render_literal_value
3984 else:
3985 if bindparam.value is None and bindparam.callable is None:
3986 op = kw.get("_binary_op", None)
3987 if op and op not in (operators.is_, operators.is_not):
3988 util.warn_limited(
3989 "Bound parameter '%s' rendering literal NULL in a SQL "
3990 "expression; comparisons to NULL should not use "
3991 "operators outside of 'is' or 'is not'",
3992 (bindparam.key,),
3993 )
3994 return self.process(sqltypes.NULLTYPE, **kw)
3995 value = bindparam.effective_value
3996
3997 if bindparam.expanding:
3998 leep = self._literal_execute_expanding_parameter_literal_binds
3999 to_update, replacement_expr = leep(
4000 bindparam,
4001 value,
4002 bind_expression_template=bind_expression_template,
4003 )
4004 return replacement_expr
4005 else:
4006 return self.render_literal_value(value, bindparam.type)
4007
4008 def render_literal_value(
4009 self, value: Any, type_: sqltypes.TypeEngine[Any]
4010 ) -> str:
4011 """Render the value of a bind parameter as a quoted literal.
4012
4013 This is used for statement sections that do not accept bind parameters
4014 on the target driver/database.
4015
4016 This should be implemented by subclasses using the quoting services
4017 of the DBAPI.
4018
4019 """
4020
4021 if value is None and not type_.should_evaluate_none:
4022 # issue #10535 - handle NULL in the compiler without placing
4023 # this onto each type, except for "evaluate None" types
4024 # (e.g. JSON)
4025 return self.process(elements.Null._instance())
4026
4027 processor = type_._cached_literal_processor(self.dialect)
4028 if processor:
4029 try:
4030 return processor(value)
4031 except Exception as e:
4032 raise exc.CompileError(
4033 f"Could not render literal value "
4034 f'"{sql_util._repr_single_value(value)}" '
4035 f"with datatype "
4036 f"{type_}; see parent stack trace for "
4037 "more detail."
4038 ) from e
4039
4040 else:
4041 raise exc.CompileError(
4042 f"No literal value renderer is available for literal value "
4043 f'"{sql_util._repr_single_value(value)}" '
4044 f"with datatype {type_}"
4045 )
4046
4047 def _truncate_bindparam(self, bindparam):
4048 if bindparam in self.bind_names:
4049 return self.bind_names[bindparam]
4050
4051 bind_name = bindparam.key
4052 if isinstance(bind_name, elements._truncated_label):
4053 bind_name = self._truncated_identifier("bindparam", bind_name)
4054
4055 # add to bind_names for translation
4056 self.bind_names[bindparam] = bind_name
4057
4058 return bind_name
4059
4060 def _truncated_identifier(
4061 self, ident_class: str, name: _truncated_label
4062 ) -> str:
4063 if (ident_class, name) in self.truncated_names:
4064 return self.truncated_names[(ident_class, name)]
4065
4066 anonname = name.apply_map(self.anon_map)
4067
4068 if len(anonname) > self.label_length - 6:
4069 counter = self._truncated_counters.get(ident_class, 1)
4070 truncname = (
4071 anonname[0 : max(self.label_length - 6, 0)]
4072 + "_"
4073 + hex(counter)[2:]
4074 )
4075 self._truncated_counters[ident_class] = counter + 1
4076 else:
4077 truncname = anonname
4078 self.truncated_names[(ident_class, name)] = truncname
4079 return truncname
4080
4081 def _anonymize(self, name: str) -> str:
4082 return name % self.anon_map
4083
4084 def bindparam_string(
4085 self,
4086 name: str,
4087 post_compile: bool = False,
4088 expanding: bool = False,
4089 escaped_from: Optional[str] = None,
4090 bindparam_type: Optional[TypeEngine[Any]] = None,
4091 accumulate_bind_names: Optional[Set[str]] = None,
4092 visited_bindparam: Optional[List[str]] = None,
4093 **kw: Any,
4094 ) -> str:
4095 # TODO: accumulate_bind_names is passed by crud.py to gather
4096 # names on a per-value basis, visited_bindparam is passed by
4097 # visit_insert() to collect all parameters in the statement.
4098 # see if this gathering can be simplified somehow
4099 if accumulate_bind_names is not None:
4100 accumulate_bind_names.add(name)
4101 if visited_bindparam is not None:
4102 visited_bindparam.append(name)
4103
4104 if not escaped_from:
4105 if self._bind_translate_re.search(name):
4106 # not quite the translate use case as we want to
4107 # also get a quick boolean if we even found
4108 # unusual characters in the name
4109 new_name = self._bind_translate_re.sub(
4110 lambda m: self._bind_translate_chars[m.group(0)],
4111 name,
4112 )
4113 escaped_from = name
4114 name = new_name
4115
4116 if escaped_from:
4117 self.escaped_bind_names = self.escaped_bind_names.union(
4118 {escaped_from: name}
4119 )
4120 if post_compile:
4121 ret = "__[POSTCOMPILE_%s]" % name
4122 if expanding:
4123 # for expanding, bound parameters or literal values will be
4124 # rendered per item
4125 return ret
4126
4127 # otherwise, for non-expanding "literal execute", apply
4128 # bind casts as determined by the datatype
4129 if bindparam_type is not None:
4130 type_impl = bindparam_type._unwrapped_dialect_impl(
4131 self.dialect
4132 )
4133 if type_impl.render_literal_cast:
4134 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4135 return ret
4136 elif self.state is CompilerState.COMPILING:
4137 ret = self.compilation_bindtemplate % {"name": name}
4138 else:
4139 ret = self.bindtemplate % {"name": name}
4140
4141 if (
4142 bindparam_type is not None
4143 and self.dialect._bind_typing_render_casts
4144 ):
4145 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4146 if type_impl.render_bind_cast:
4147 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4148
4149 return ret
4150
4151 def _dispatch_independent_ctes(self, stmt, kw):
4152 local_kw = kw.copy()
4153 local_kw.pop("cte_opts", None)
4154 for cte, opt in zip(
4155 stmt._independent_ctes, stmt._independent_ctes_opts
4156 ):
4157 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4158
4159 def visit_cte(
4160 self,
4161 cte: CTE,
4162 asfrom: bool = False,
4163 ashint: bool = False,
4164 fromhints: Optional[_FromHintsType] = None,
4165 visiting_cte: Optional[CTE] = None,
4166 from_linter: Optional[FromLinter] = None,
4167 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4168 **kwargs: Any,
4169 ) -> Optional[str]:
4170 self_ctes = self._init_cte_state()
4171 assert self_ctes is self.ctes
4172
4173 kwargs["visiting_cte"] = cte
4174
4175 cte_name = cte.name
4176
4177 if isinstance(cte_name, elements._truncated_label):
4178 cte_name = self._truncated_identifier("alias", cte_name)
4179
4180 is_new_cte = True
4181 embedded_in_current_named_cte = False
4182
4183 _reference_cte = cte._get_reference_cte()
4184
4185 nesting = cte.nesting or cte_opts.nesting
4186
4187 # check for CTE already encountered
4188 if _reference_cte in self.level_name_by_cte:
4189 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4190 _reference_cte
4191 ]
4192 assert _ == cte_name
4193
4194 cte_level_name = (cte_level, cte_name)
4195 existing_cte = self.ctes_by_level_name[cte_level_name]
4196
4197 # check if we are receiving it here with a specific
4198 # "nest_here" location; if so, move it to this location
4199
4200 if cte_opts.nesting:
4201 if existing_cte_opts.nesting:
4202 raise exc.CompileError(
4203 "CTE is stated as 'nest_here' in "
4204 "more than one location"
4205 )
4206
4207 old_level_name = (cte_level, cte_name)
4208 cte_level = len(self.stack) if nesting else 1
4209 cte_level_name = new_level_name = (cte_level, cte_name)
4210
4211 del self.ctes_by_level_name[old_level_name]
4212 self.ctes_by_level_name[new_level_name] = existing_cte
4213 self.level_name_by_cte[_reference_cte] = new_level_name + (
4214 cte_opts,
4215 )
4216
4217 else:
4218 cte_level = len(self.stack) if nesting else 1
4219 cte_level_name = (cte_level, cte_name)
4220
4221 if cte_level_name in self.ctes_by_level_name:
4222 existing_cte = self.ctes_by_level_name[cte_level_name]
4223 else:
4224 existing_cte = None
4225
4226 if existing_cte is not None:
4227 embedded_in_current_named_cte = visiting_cte is existing_cte
4228
4229 # we've generated a same-named CTE that we are enclosed in,
4230 # or this is the same CTE. just return the name.
4231 if cte is existing_cte._restates or cte is existing_cte:
4232 is_new_cte = False
4233 elif existing_cte is cte._restates:
4234 # we've generated a same-named CTE that is
4235 # enclosed in us - we take precedence, so
4236 # discard the text for the "inner".
4237 del self_ctes[existing_cte]
4238
4239 existing_cte_reference_cte = existing_cte._get_reference_cte()
4240
4241 assert existing_cte_reference_cte is _reference_cte
4242 assert existing_cte_reference_cte is existing_cte
4243
4244 del self.level_name_by_cte[existing_cte_reference_cte]
4245 else:
4246 if (
4247 # if the two CTEs have the same hash, which we expect
4248 # here means that one/both is an annotated of the other
4249 (hash(cte) == hash(existing_cte))
4250 # or...
4251 or (
4252 (
4253 # if they are clones, i.e. they came from the ORM
4254 # or some other visit method
4255 cte._is_clone_of is not None
4256 or existing_cte._is_clone_of is not None
4257 )
4258 # and are deep-copy identical
4259 and cte.compare(existing_cte)
4260 )
4261 ):
4262 # then consider these two CTEs the same
4263 is_new_cte = False
4264 else:
4265 # otherwise these are two CTEs that either will render
4266 # differently, or were indicated separately by the user,
4267 # with the same name
4268 raise exc.CompileError(
4269 "Multiple, unrelated CTEs found with "
4270 "the same name: %r" % cte_name
4271 )
4272
4273 if not asfrom and not is_new_cte:
4274 return None
4275
4276 if cte._cte_alias is not None:
4277 pre_alias_cte = cte._cte_alias
4278 cte_pre_alias_name = cte._cte_alias.name
4279 if isinstance(cte_pre_alias_name, elements._truncated_label):
4280 cte_pre_alias_name = self._truncated_identifier(
4281 "alias", cte_pre_alias_name
4282 )
4283 else:
4284 pre_alias_cte = cte
4285 cte_pre_alias_name = None
4286
4287 if is_new_cte:
4288 self.ctes_by_level_name[cte_level_name] = cte
4289 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4290 cte_opts,
4291 )
4292
4293 if pre_alias_cte not in self.ctes:
4294 self.visit_cte(pre_alias_cte, **kwargs)
4295
4296 if not cte_pre_alias_name and cte not in self_ctes:
4297 if cte.recursive:
4298 self.ctes_recursive = True
4299 text = self.preparer.format_alias(cte, cte_name)
4300 if cte.recursive or cte.element.name_cte_columns:
4301 col_source = cte.element
4302
4303 # TODO: can we get at the .columns_plus_names collection
4304 # that is already (or will be?) generated for the SELECT
4305 # rather than calling twice?
4306 recur_cols = [
4307 # TODO: proxy_name is not technically safe,
4308 # see test_cte->
4309 # test_with_recursive_no_name_currently_buggy. not
4310 # clear what should be done with such a case
4311 fallback_label_name or proxy_name
4312 for (
4313 _,
4314 proxy_name,
4315 fallback_label_name,
4316 c,
4317 repeated,
4318 ) in (col_source._generate_columns_plus_names(True))
4319 if not repeated
4320 ]
4321
4322 text += "(%s)" % (
4323 ", ".join(
4324 self.preparer.format_label_name(
4325 ident, anon_map=self.anon_map
4326 )
4327 for ident in recur_cols
4328 )
4329 )
4330
4331 assert kwargs.get("subquery", False) is False
4332
4333 if not self.stack:
4334 # toplevel, this is a stringify of the
4335 # cte directly. just compile the inner
4336 # the way alias() does.
4337 return cte.element._compiler_dispatch(
4338 self, asfrom=asfrom, **kwargs
4339 )
4340 else:
4341 prefixes = self._generate_prefixes(
4342 cte, cte._prefixes, **kwargs
4343 )
4344 inner = cte.element._compiler_dispatch(
4345 self, asfrom=True, **kwargs
4346 )
4347
4348 text += " AS %s\n(%s)" % (prefixes, inner)
4349
4350 if cte._suffixes:
4351 text += " " + self._generate_prefixes(
4352 cte, cte._suffixes, **kwargs
4353 )
4354
4355 self_ctes[cte] = text
4356
4357 if asfrom:
4358 if from_linter:
4359 from_linter.froms[cte._de_clone()] = cte_name
4360
4361 if not is_new_cte and embedded_in_current_named_cte:
4362 return self.preparer.format_alias(cte, cte_name)
4363
4364 if cte_pre_alias_name:
4365 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4366 if self.preparer._requires_quotes(cte_name):
4367 cte_name = self.preparer.quote(cte_name)
4368 text += self.get_render_as_alias_suffix(cte_name)
4369 return text # type: ignore[no-any-return]
4370 else:
4371 return self.preparer.format_alias(cte, cte_name)
4372
4373 return None
4374
4375 def visit_table_valued_alias(self, element, **kw):
4376 if element.joins_implicitly:
4377 kw["from_linter"] = None
4378 if element._is_lateral:
4379 return self.visit_lateral(element, **kw)
4380 else:
4381 return self.visit_alias(element, **kw)
4382
4383 def visit_table_valued_column(self, element, **kw):
4384 return self.visit_column(element, **kw)
4385
4386 def visit_alias(
4387 self,
4388 alias,
4389 asfrom=False,
4390 ashint=False,
4391 iscrud=False,
4392 fromhints=None,
4393 subquery=False,
4394 lateral=False,
4395 enclosing_alias=None,
4396 from_linter=None,
4397 **kwargs,
4398 ):
4399 if lateral:
4400 if "enclosing_lateral" not in kwargs:
4401 # if lateral is set and enclosing_lateral is not
4402 # present, we assume we are being called directly
4403 # from visit_lateral() and we need to set enclosing_lateral.
4404 assert alias._is_lateral
4405 kwargs["enclosing_lateral"] = alias
4406
4407 # for lateral objects, we track a second from_linter that is...
4408 # lateral! to the level above us.
4409 if (
4410 from_linter
4411 and "lateral_from_linter" not in kwargs
4412 and "enclosing_lateral" in kwargs
4413 ):
4414 kwargs["lateral_from_linter"] = from_linter
4415
4416 if enclosing_alias is not None and enclosing_alias.element is alias:
4417 inner = alias.element._compiler_dispatch(
4418 self,
4419 asfrom=asfrom,
4420 ashint=ashint,
4421 iscrud=iscrud,
4422 fromhints=fromhints,
4423 lateral=lateral,
4424 enclosing_alias=alias,
4425 **kwargs,
4426 )
4427 if subquery and (asfrom or lateral):
4428 inner = "(%s)" % (inner,)
4429 return inner
4430 else:
4431 kwargs["enclosing_alias"] = alias
4432
4433 if asfrom or ashint:
4434 if isinstance(alias.name, elements._truncated_label):
4435 alias_name = self._truncated_identifier("alias", alias.name)
4436 else:
4437 alias_name = alias.name
4438
4439 if ashint:
4440 return self.preparer.format_alias(alias, alias_name)
4441 elif asfrom:
4442 if from_linter:
4443 from_linter.froms[alias._de_clone()] = alias_name
4444
4445 inner = alias.element._compiler_dispatch(
4446 self, asfrom=True, lateral=lateral, **kwargs
4447 )
4448 if subquery:
4449 inner = "(%s)" % (inner,)
4450
4451 ret = inner + self.get_render_as_alias_suffix(
4452 self.preparer.format_alias(alias, alias_name)
4453 )
4454
4455 if alias._supports_derived_columns and alias._render_derived:
4456 ret += "(%s)" % (
4457 ", ".join(
4458 "%s%s"
4459 % (
4460 self.preparer.quote(col.name),
4461 (
4462 " %s"
4463 % self.dialect.type_compiler_instance.process(
4464 col.type, **kwargs
4465 )
4466 if alias._render_derived_w_types
4467 else ""
4468 ),
4469 )
4470 for col in alias.c
4471 )
4472 )
4473
4474 if fromhints and alias in fromhints:
4475 ret = self.format_from_hint_text(
4476 ret, alias, fromhints[alias], iscrud
4477 )
4478
4479 return ret
4480 else:
4481 # note we cancel the "subquery" flag here as well
4482 return alias.element._compiler_dispatch(
4483 self, lateral=lateral, **kwargs
4484 )
4485
4486 def visit_subquery(self, subquery, **kw):
4487 kw["subquery"] = True
4488 return self.visit_alias(subquery, **kw)
4489
4490 def visit_lateral(self, lateral_, **kw):
4491 kw["lateral"] = True
4492 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4493
4494 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4495 text = "%s TABLESAMPLE %s" % (
4496 self.visit_alias(tablesample, asfrom=True, **kw),
4497 tablesample._get_method()._compiler_dispatch(self, **kw),
4498 )
4499
4500 if tablesample.seed is not None:
4501 text += " REPEATABLE (%s)" % (
4502 tablesample.seed._compiler_dispatch(self, **kw)
4503 )
4504
4505 return text
4506
4507 def _render_values(self, element, **kw):
4508 kw.setdefault("literal_binds", element.literal_binds)
4509 tuples = ", ".join(
4510 self.process(
4511 elements.Tuple(
4512 types=element._column_types, *elem
4513 ).self_group(),
4514 **kw,
4515 )
4516 for chunk in element._data
4517 for elem in chunk
4518 )
4519 return f"VALUES {tuples}"
4520
4521 def visit_values(
4522 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4523 ):
4524
4525 if element._independent_ctes:
4526 self._dispatch_independent_ctes(element, kw)
4527
4528 v = self._render_values(element, **kw)
4529
4530 if element._unnamed:
4531 name = None
4532 elif isinstance(element.name, elements._truncated_label):
4533 name = self._truncated_identifier("values", element.name)
4534 else:
4535 name = element.name
4536
4537 if element._is_lateral:
4538 lateral = "LATERAL "
4539 else:
4540 lateral = ""
4541
4542 if asfrom:
4543 if from_linter:
4544 from_linter.froms[element._de_clone()] = (
4545 name if name is not None else "(unnamed VALUES element)"
4546 )
4547
4548 if visiting_cte is not None and visiting_cte.element is element:
4549 if element._is_lateral:
4550 raise exc.CompileError(
4551 "Can't use a LATERAL VALUES expression inside of a CTE"
4552 )
4553 elif name:
4554 kw["include_table"] = False
4555 v = "%s(%s)%s (%s)" % (
4556 lateral,
4557 v,
4558 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4559 (
4560 ", ".join(
4561 c._compiler_dispatch(self, **kw)
4562 for c in element.columns
4563 )
4564 ),
4565 )
4566 else:
4567 v = "%s(%s)" % (lateral, v)
4568 return v
4569
4570 def visit_scalar_values(self, element, **kw):
4571 return f"({self._render_values(element, **kw)})"
4572
4573 def get_render_as_alias_suffix(self, alias_name_text):
4574 return " AS " + alias_name_text
4575
4576 def _add_to_result_map(
4577 self,
4578 keyname: str,
4579 name: str,
4580 objects: Tuple[Any, ...],
4581 type_: TypeEngine[Any],
4582 ) -> None:
4583
4584 # note objects must be non-empty for cursor.py to handle the
4585 # collection properly
4586 assert objects
4587
4588 if keyname is None or keyname == "*":
4589 self._ordered_columns = False
4590 self._ad_hoc_textual = True
4591 if type_._is_tuple_type:
4592 raise exc.CompileError(
4593 "Most backends don't support SELECTing "
4594 "from a tuple() object. If this is an ORM query, "
4595 "consider using the Bundle object."
4596 )
4597 self._result_columns.append(
4598 ResultColumnsEntry(keyname, name, objects, type_)
4599 )
4600
4601 def _label_returning_column(
4602 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4603 ):
4604 """Render a column with necessary labels inside of a RETURNING clause.
4605
4606 This method is provided for individual dialects in place of calling
4607 the _label_select_column method directly, so that the two use cases
4608 of RETURNING vs. SELECT can be disambiguated going forward.
4609
4610 .. versionadded:: 1.4.21
4611
4612 """
4613 return self._label_select_column(
4614 None,
4615 column,
4616 populate_result_map,
4617 False,
4618 {} if column_clause_args is None else column_clause_args,
4619 **kw,
4620 )
4621
4622 def _label_select_column(
4623 self,
4624 select,
4625 column,
4626 populate_result_map,
4627 asfrom,
4628 column_clause_args,
4629 name=None,
4630 proxy_name=None,
4631 fallback_label_name=None,
4632 within_columns_clause=True,
4633 column_is_repeated=False,
4634 need_column_expressions=False,
4635 include_table=True,
4636 ):
4637 """produce labeled columns present in a select()."""
4638 impl = column.type.dialect_impl(self.dialect)
4639
4640 if impl._has_column_expression and (
4641 need_column_expressions or populate_result_map
4642 ):
4643 col_expr = impl.column_expression(column)
4644 else:
4645 col_expr = column
4646
4647 if populate_result_map:
4648 # pass an "add_to_result_map" callable into the compilation
4649 # of embedded columns. this collects information about the
4650 # column as it will be fetched in the result and is coordinated
4651 # with cursor.description when the query is executed.
4652 add_to_result_map = self._add_to_result_map
4653
4654 # if the SELECT statement told us this column is a repeat,
4655 # wrap the callable with one that prevents the addition of the
4656 # targets
4657 if column_is_repeated:
4658 _add_to_result_map = add_to_result_map
4659
4660 def add_to_result_map(keyname, name, objects, type_):
4661 _add_to_result_map(keyname, name, (keyname,), type_)
4662
4663 # if we redefined col_expr for type expressions, wrap the
4664 # callable with one that adds the original column to the targets
4665 elif col_expr is not column:
4666 _add_to_result_map = add_to_result_map
4667
4668 def add_to_result_map(keyname, name, objects, type_):
4669 _add_to_result_map(
4670 keyname, name, (column,) + objects, type_
4671 )
4672
4673 else:
4674 add_to_result_map = None
4675
4676 # this method is used by some of the dialects for RETURNING,
4677 # which has different inputs. _label_returning_column was added
4678 # as the better target for this now however for 1.4 we will keep
4679 # _label_select_column directly compatible with this use case.
4680 # these assertions right now set up the current expected inputs
4681 assert within_columns_clause, (
4682 "_label_select_column is only relevant within "
4683 "the columns clause of a SELECT or RETURNING"
4684 )
4685 if isinstance(column, elements.Label):
4686 if col_expr is not column:
4687 result_expr = _CompileLabel(
4688 col_expr, column.name, alt_names=(column.element,)
4689 )
4690 else:
4691 result_expr = col_expr
4692
4693 elif name:
4694 # here, _columns_plus_names has determined there's an explicit
4695 # label name we need to use. this is the default for
4696 # tablenames_plus_columnnames as well as when columns are being
4697 # deduplicated on name
4698
4699 assert (
4700 proxy_name is not None
4701 ), "proxy_name is required if 'name' is passed"
4702
4703 result_expr = _CompileLabel(
4704 col_expr,
4705 name,
4706 alt_names=(
4707 proxy_name,
4708 # this is a hack to allow legacy result column lookups
4709 # to work as they did before; this goes away in 2.0.
4710 # TODO: this only seems to be tested indirectly
4711 # via test/orm/test_deprecations.py. should be a
4712 # resultset test for this
4713 column._tq_label,
4714 ),
4715 )
4716 else:
4717 # determine here whether this column should be rendered in
4718 # a labelled context or not, as we were given no required label
4719 # name from the caller. Here we apply heuristics based on the kind
4720 # of SQL expression involved.
4721
4722 if col_expr is not column:
4723 # type-specific expression wrapping the given column,
4724 # so we render a label
4725 render_with_label = True
4726 elif isinstance(column, elements.ColumnClause):
4727 # table-bound column, we render its name as a label if we are
4728 # inside of a subquery only
4729 render_with_label = (
4730 asfrom
4731 and not column.is_literal
4732 and column.table is not None
4733 )
4734 elif isinstance(column, elements.TextClause):
4735 render_with_label = False
4736 elif isinstance(column, elements.UnaryExpression):
4737 # unary expression. notes added as of #12681
4738 #
4739 # By convention, the visit_unary() method
4740 # itself does not add an entry to the result map, and relies
4741 # upon either the inner expression creating a result map
4742 # entry, or if not, by creating a label here that produces
4743 # the result map entry. Where that happens is based on whether
4744 # or not the element immediately inside the unary is a
4745 # NamedColumn subclass or not.
4746 #
4747 # Now, this also impacts how the SELECT is written; if
4748 # we decide to generate a label here, we get the usual
4749 # "~(x+y) AS anon_1" thing in the columns clause. If we
4750 # don't, we don't get an AS at all, we get like
4751 # "~table.column".
4752 #
4753 # But here is the important thing as of modernish (like 1.4)
4754 # versions of SQLAlchemy - **whether or not the AS <label>
4755 # is present in the statement is not actually important**.
4756 # We target result columns **positionally** for a fully
4757 # compiled ``Select()`` object; before 1.4 we needed those
4758 # labels to match in cursor.description etc etc but now it
4759 # really doesn't matter.
4760 # So really, we could set render_with_label True in all cases.
4761 # Or we could just have visit_unary() populate the result map
4762 # in all cases.
4763 #
4764 # What we're doing here is strictly trying to not rock the
4765 # boat too much with when we do/don't render "AS label";
4766 # labels being present helps in the edge cases that we
4767 # "fall back" to named cursor.description matching, labels
4768 # not being present for columns keeps us from having awkward
4769 # phrases like "SELECT DISTINCT table.x AS x".
4770 render_with_label = (
4771 (
4772 # exception case to detect if we render "not boolean"
4773 # as "not <col>" for native boolean or "<col> = 1"
4774 # for non-native boolean. this is controlled by
4775 # visit_is_<true|false>_unary_operator
4776 column.operator
4777 in (operators.is_false, operators.is_true)
4778 and not self.dialect.supports_native_boolean
4779 )
4780 or column._wraps_unnamed_column()
4781 or asfrom
4782 )
4783 elif (
4784 # general class of expressions that don't have a SQL-column
4785 # addressible name. includes scalar selects, bind parameters,
4786 # SQL functions, others
4787 not isinstance(column, elements.NamedColumn)
4788 # deeper check that indicates there's no natural "name" to
4789 # this element, which accommodates for custom SQL constructs
4790 # that might have a ".name" attribute (but aren't SQL
4791 # functions) but are not implementing this more recently added
4792 # base class. in theory the "NamedColumn" check should be
4793 # enough, however here we seek to maintain legacy behaviors
4794 # as well.
4795 and column._non_anon_label is None
4796 ):
4797 render_with_label = True
4798 else:
4799 render_with_label = False
4800
4801 if render_with_label:
4802 if not fallback_label_name:
4803 # used by the RETURNING case right now. we generate it
4804 # here as 3rd party dialects may be referring to
4805 # _label_select_column method directly instead of the
4806 # just-added _label_returning_column method
4807 assert not column_is_repeated
4808 fallback_label_name = column._anon_name_label
4809
4810 fallback_label_name = (
4811 elements._truncated_label(fallback_label_name)
4812 if not isinstance(
4813 fallback_label_name, elements._truncated_label
4814 )
4815 else fallback_label_name
4816 )
4817
4818 result_expr = _CompileLabel(
4819 col_expr, fallback_label_name, alt_names=(proxy_name,)
4820 )
4821 else:
4822 result_expr = col_expr
4823
4824 column_clause_args.update(
4825 within_columns_clause=within_columns_clause,
4826 add_to_result_map=add_to_result_map,
4827 include_table=include_table,
4828 )
4829 return result_expr._compiler_dispatch(self, **column_clause_args)
4830
4831 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4832 hinttext = self.get_from_hint_text(table, hint)
4833 if hinttext:
4834 sqltext += " " + hinttext
4835 return sqltext
4836
4837 def get_select_hint_text(self, byfroms):
4838 return None
4839
4840 def get_from_hint_text(
4841 self, table: FromClause, text: Optional[str]
4842 ) -> Optional[str]:
4843 return None
4844
4845 def get_crud_hint_text(self, table, text):
4846 return None
4847
4848 def get_statement_hint_text(self, hint_texts):
4849 return " ".join(hint_texts)
4850
4851 _default_stack_entry: _CompilerStackEntry
4852
4853 if not typing.TYPE_CHECKING:
4854 _default_stack_entry = util.immutabledict(
4855 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4856 )
4857
4858 def _display_froms_for_select(
4859 self, select_stmt, asfrom, lateral=False, **kw
4860 ):
4861 # utility method to help external dialects
4862 # get the correct from list for a select.
4863 # specifically the oracle dialect needs this feature
4864 # right now.
4865 toplevel = not self.stack
4866 entry = self._default_stack_entry if toplevel else self.stack[-1]
4867
4868 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4869
4870 correlate_froms = entry["correlate_froms"]
4871 asfrom_froms = entry["asfrom_froms"]
4872
4873 if asfrom and not lateral:
4874 froms = compile_state._get_display_froms(
4875 explicit_correlate_froms=correlate_froms.difference(
4876 asfrom_froms
4877 ),
4878 implicit_correlate_froms=(),
4879 )
4880 else:
4881 froms = compile_state._get_display_froms(
4882 explicit_correlate_froms=correlate_froms,
4883 implicit_correlate_froms=asfrom_froms,
4884 )
4885 return froms
4886
4887 translate_select_structure: Any = None
4888 """if not ``None``, should be a callable which accepts ``(select_stmt,
4889 **kw)`` and returns a select object. this is used for structural changes
4890 mostly to accommodate for LIMIT/OFFSET schemes
4891
4892 """
4893
4894 def visit_select(
4895 self,
4896 select_stmt,
4897 asfrom=False,
4898 insert_into=False,
4899 fromhints=None,
4900 compound_index=None,
4901 select_wraps_for=None,
4902 lateral=False,
4903 from_linter=None,
4904 **kwargs,
4905 ):
4906 assert select_wraps_for is None, (
4907 "SQLAlchemy 1.4 requires use of "
4908 "the translate_select_structure hook for structural "
4909 "translations of SELECT objects"
4910 )
4911 if self._collect_params:
4912 self._add_to_params(select_stmt)
4913
4914 # initial setup of SELECT. the compile_state_factory may now
4915 # be creating a totally different SELECT from the one that was
4916 # passed in. for ORM use this will convert from an ORM-state
4917 # SELECT to a regular "Core" SELECT. other composed operations
4918 # such as computation of joins will be performed.
4919
4920 kwargs["within_columns_clause"] = False
4921
4922 compile_state = select_stmt._compile_state_factory(
4923 select_stmt, self, **kwargs
4924 )
4925 kwargs["ambiguous_table_name_map"] = (
4926 compile_state._ambiguous_table_name_map
4927 )
4928
4929 select_stmt = compile_state.statement
4930
4931 toplevel = not self.stack
4932
4933 if toplevel and not self.compile_state:
4934 self.compile_state = compile_state
4935
4936 is_embedded_select = compound_index is not None or insert_into
4937
4938 # translate step for Oracle, SQL Server which often need to
4939 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4940 # other conditions
4941 if self.translate_select_structure:
4942 new_select_stmt = self.translate_select_structure(
4943 select_stmt, asfrom=asfrom, **kwargs
4944 )
4945
4946 # if SELECT was restructured, maintain a link to the originals
4947 # and assemble a new compile state
4948 if new_select_stmt is not select_stmt:
4949 compile_state_wraps_for = compile_state
4950 select_wraps_for = select_stmt
4951 select_stmt = new_select_stmt
4952
4953 compile_state = select_stmt._compile_state_factory(
4954 select_stmt, self, **kwargs
4955 )
4956 select_stmt = compile_state.statement
4957
4958 entry = self._default_stack_entry if toplevel else self.stack[-1]
4959
4960 populate_result_map = need_column_expressions = (
4961 toplevel
4962 or entry.get("need_result_map_for_compound", False)
4963 or entry.get("need_result_map_for_nested", False)
4964 )
4965
4966 # indicates there is a CompoundSelect in play and we are not the
4967 # first select
4968 if compound_index:
4969 populate_result_map = False
4970
4971 # this was first proposed as part of #3372; however, it is not
4972 # reached in current tests and could possibly be an assertion
4973 # instead.
4974 if not populate_result_map and "add_to_result_map" in kwargs:
4975 del kwargs["add_to_result_map"]
4976
4977 froms = self._setup_select_stack(
4978 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4979 )
4980
4981 column_clause_args = kwargs.copy()
4982 column_clause_args.update(
4983 {"within_label_clause": False, "within_columns_clause": False}
4984 )
4985
4986 text = "SELECT " # we're off to a good start !
4987
4988 if select_stmt._post_select_clause is not None:
4989 psc = self.process(select_stmt._post_select_clause, **kwargs)
4990 if psc is not None:
4991 text += psc + " "
4992
4993 if select_stmt._hints:
4994 hint_text, byfrom = self._setup_select_hints(select_stmt)
4995 if hint_text:
4996 text += hint_text + " "
4997 else:
4998 byfrom = None
4999
5000 if select_stmt._independent_ctes:
5001 self._dispatch_independent_ctes(select_stmt, kwargs)
5002
5003 if select_stmt._prefixes:
5004 text += self._generate_prefixes(
5005 select_stmt, select_stmt._prefixes, **kwargs
5006 )
5007
5008 text += self.get_select_precolumns(select_stmt, **kwargs)
5009
5010 if select_stmt._pre_columns_clause is not None:
5011 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5012 if pcc is not None:
5013 text += pcc + " "
5014
5015 # the actual list of columns to print in the SELECT column list.
5016 inner_columns = [
5017 c
5018 for c in [
5019 self._label_select_column(
5020 select_stmt,
5021 column,
5022 populate_result_map,
5023 asfrom,
5024 column_clause_args,
5025 name=name,
5026 proxy_name=proxy_name,
5027 fallback_label_name=fallback_label_name,
5028 column_is_repeated=repeated,
5029 need_column_expressions=need_column_expressions,
5030 )
5031 for (
5032 name,
5033 proxy_name,
5034 fallback_label_name,
5035 column,
5036 repeated,
5037 ) in compile_state.columns_plus_names
5038 ]
5039 if c is not None
5040 ]
5041
5042 if populate_result_map and select_wraps_for is not None:
5043 # if this select was generated from translate_select,
5044 # rewrite the targeted columns in the result map
5045
5046 translate = dict(
5047 zip(
5048 [
5049 name
5050 for (
5051 key,
5052 proxy_name,
5053 fallback_label_name,
5054 name,
5055 repeated,
5056 ) in compile_state.columns_plus_names
5057 ],
5058 [
5059 name
5060 for (
5061 key,
5062 proxy_name,
5063 fallback_label_name,
5064 name,
5065 repeated,
5066 ) in compile_state_wraps_for.columns_plus_names
5067 ],
5068 )
5069 )
5070
5071 self._result_columns = [
5072 ResultColumnsEntry(
5073 key, name, tuple(translate.get(o, o) for o in obj), type_
5074 )
5075 for key, name, obj, type_ in self._result_columns
5076 ]
5077
5078 text = self._compose_select_body(
5079 text,
5080 select_stmt,
5081 compile_state,
5082 inner_columns,
5083 froms,
5084 byfrom,
5085 toplevel,
5086 kwargs,
5087 )
5088
5089 if select_stmt._post_body_clause is not None:
5090 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5091 if pbc:
5092 text += " " + pbc
5093
5094 if select_stmt._statement_hints:
5095 per_dialect = [
5096 ht
5097 for (dialect_name, ht) in select_stmt._statement_hints
5098 if dialect_name in ("*", self.dialect.name)
5099 ]
5100 if per_dialect:
5101 text += " " + self.get_statement_hint_text(per_dialect)
5102
5103 # In compound query, CTEs are shared at the compound level
5104 if self.ctes and (not is_embedded_select or toplevel):
5105 nesting_level = len(self.stack) if not toplevel else None
5106 text = self._render_cte_clause(nesting_level=nesting_level) + text
5107
5108 if select_stmt._suffixes:
5109 text += " " + self._generate_prefixes(
5110 select_stmt, select_stmt._suffixes, **kwargs
5111 )
5112
5113 self.stack.pop(-1)
5114
5115 return text
5116
5117 def _setup_select_hints(
5118 self, select: Select[Unpack[TupleAny]]
5119 ) -> Tuple[str, _FromHintsType]:
5120 byfrom = {
5121 from_: hinttext
5122 % {"name": from_._compiler_dispatch(self, ashint=True)}
5123 for (from_, dialect), hinttext in select._hints.items()
5124 if dialect in ("*", self.dialect.name)
5125 }
5126 hint_text = self.get_select_hint_text(byfrom)
5127 return hint_text, byfrom
5128
5129 def _setup_select_stack(
5130 self, select, compile_state, entry, asfrom, lateral, compound_index
5131 ):
5132 correlate_froms = entry["correlate_froms"]
5133 asfrom_froms = entry["asfrom_froms"]
5134
5135 if compound_index == 0:
5136 entry["select_0"] = select
5137 elif compound_index:
5138 select_0 = entry["select_0"]
5139 numcols = len(select_0._all_selected_columns)
5140
5141 if len(compile_state.columns_plus_names) != numcols:
5142 raise exc.CompileError(
5143 "All selectables passed to "
5144 "CompoundSelect must have identical numbers of "
5145 "columns; select #%d has %d columns, select "
5146 "#%d has %d"
5147 % (
5148 1,
5149 numcols,
5150 compound_index + 1,
5151 len(select._all_selected_columns),
5152 )
5153 )
5154
5155 if asfrom and not lateral:
5156 froms = compile_state._get_display_froms(
5157 explicit_correlate_froms=correlate_froms.difference(
5158 asfrom_froms
5159 ),
5160 implicit_correlate_froms=(),
5161 )
5162 else:
5163 froms = compile_state._get_display_froms(
5164 explicit_correlate_froms=correlate_froms,
5165 implicit_correlate_froms=asfrom_froms,
5166 )
5167
5168 new_correlate_froms = set(_from_objects(*froms))
5169 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5170
5171 new_entry: _CompilerStackEntry = {
5172 "asfrom_froms": new_correlate_froms,
5173 "correlate_froms": all_correlate_froms,
5174 "selectable": select,
5175 "compile_state": compile_state,
5176 }
5177 self.stack.append(new_entry)
5178
5179 return froms
5180
5181 def _compose_select_body(
5182 self,
5183 text,
5184 select,
5185 compile_state,
5186 inner_columns,
5187 froms,
5188 byfrom,
5189 toplevel,
5190 kwargs,
5191 ):
5192 text += ", ".join(inner_columns)
5193
5194 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5195 from_linter = FromLinter({}, set())
5196 warn_linting = self.linting & WARN_LINTING
5197 if toplevel:
5198 self.from_linter = from_linter
5199 else:
5200 from_linter = None
5201 warn_linting = False
5202
5203 # adjust the whitespace for no inner columns, part of #9440,
5204 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5205 # "SELECT FROM ...".
5206 # while it would be better to have built the SELECT starting string
5207 # without trailing whitespace first, then add whitespace only if inner
5208 # cols were present, this breaks compatibility with various custom
5209 # compilation schemes that are currently being tested.
5210 if not inner_columns:
5211 text = text.rstrip()
5212
5213 if froms:
5214 text += " \nFROM "
5215
5216 if select._hints:
5217 text += ", ".join(
5218 [
5219 f._compiler_dispatch(
5220 self,
5221 asfrom=True,
5222 fromhints=byfrom,
5223 from_linter=from_linter,
5224 **kwargs,
5225 )
5226 for f in froms
5227 ]
5228 )
5229 else:
5230 text += ", ".join(
5231 [
5232 f._compiler_dispatch(
5233 self,
5234 asfrom=True,
5235 from_linter=from_linter,
5236 **kwargs,
5237 )
5238 for f in froms
5239 ]
5240 )
5241 else:
5242 text += self.default_from()
5243
5244 if select._where_criteria:
5245 t = self._generate_delimited_and_list(
5246 select._where_criteria, from_linter=from_linter, **kwargs
5247 )
5248 if t:
5249 text += " \nWHERE " + t
5250
5251 if warn_linting:
5252 assert from_linter is not None
5253 from_linter.warn()
5254
5255 if select._group_by_clauses:
5256 text += self.group_by_clause(select, **kwargs)
5257
5258 if select._having_criteria:
5259 t = self._generate_delimited_and_list(
5260 select._having_criteria, **kwargs
5261 )
5262 if t:
5263 text += " \nHAVING " + t
5264
5265 if select._post_criteria_clause is not None:
5266 pcc = self.process(select._post_criteria_clause, **kwargs)
5267 if pcc is not None:
5268 text += " \n" + pcc
5269
5270 if select._order_by_clauses:
5271 text += self.order_by_clause(select, **kwargs)
5272
5273 if select._has_row_limiting_clause:
5274 text += self._row_limit_clause(select, **kwargs)
5275
5276 if select._for_update_arg is not None:
5277 text += self.for_update_clause(select, **kwargs)
5278
5279 return text
5280
5281 def _generate_prefixes(self, stmt, prefixes, **kw):
5282 clause = " ".join(
5283 prefix._compiler_dispatch(self, **kw)
5284 for prefix, dialect_name in prefixes
5285 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5286 )
5287 if clause:
5288 clause += " "
5289 return clause
5290
5291 def _render_cte_clause(
5292 self,
5293 nesting_level=None,
5294 include_following_stack=False,
5295 ):
5296 """
5297 include_following_stack
5298 Also render the nesting CTEs on the next stack. Useful for
5299 SQL structures like UNION or INSERT that can wrap SELECT
5300 statements containing nesting CTEs.
5301 """
5302 if not self.ctes:
5303 return ""
5304
5305 ctes: MutableMapping[CTE, str]
5306
5307 if nesting_level and nesting_level > 1:
5308 ctes = util.OrderedDict()
5309 for cte in list(self.ctes.keys()):
5310 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5311 cte._get_reference_cte()
5312 ]
5313 nesting = cte.nesting or cte_opts.nesting
5314 is_rendered_level = cte_level == nesting_level or (
5315 include_following_stack and cte_level == nesting_level + 1
5316 )
5317 if not (nesting and is_rendered_level):
5318 continue
5319
5320 ctes[cte] = self.ctes[cte]
5321
5322 else:
5323 ctes = self.ctes
5324
5325 if not ctes:
5326 return ""
5327 ctes_recursive = any([cte.recursive for cte in ctes])
5328
5329 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5330 cte_text += ", \n".join([txt for txt in ctes.values()])
5331 cte_text += "\n "
5332
5333 if nesting_level and nesting_level > 1:
5334 for cte in list(ctes.keys()):
5335 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5336 cte._get_reference_cte()
5337 ]
5338 del self.ctes[cte]
5339 del self.ctes_by_level_name[(cte_level, cte_name)]
5340 del self.level_name_by_cte[cte._get_reference_cte()]
5341
5342 return cte_text
5343
5344 def get_cte_preamble(self, recursive):
5345 if recursive:
5346 return "WITH RECURSIVE"
5347 else:
5348 return "WITH"
5349
5350 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5351 """Called when building a ``SELECT`` statement, position is just
5352 before column list.
5353
5354 """
5355 if select._distinct_on:
5356 util.warn_deprecated(
5357 "DISTINCT ON is currently supported only by the PostgreSQL "
5358 "dialect. Use of DISTINCT ON for other backends is currently "
5359 "silently ignored, however this usage is deprecated, and will "
5360 "raise CompileError in a future release for all backends "
5361 "that do not support this syntax.",
5362 version="1.4",
5363 )
5364 return "DISTINCT " if select._distinct else ""
5365
5366 def group_by_clause(self, select, **kw):
5367 """allow dialects to customize how GROUP BY is rendered."""
5368
5369 group_by = self._generate_delimited_list(
5370 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5371 )
5372 if group_by:
5373 return " GROUP BY " + group_by
5374 else:
5375 return ""
5376
5377 def order_by_clause(self, select, **kw):
5378 """allow dialects to customize how ORDER BY is rendered."""
5379
5380 order_by = self._generate_delimited_list(
5381 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5382 )
5383
5384 if order_by:
5385 return " ORDER BY " + order_by
5386 else:
5387 return ""
5388
5389 def for_update_clause(self, select, **kw):
5390 return " FOR UPDATE"
5391
5392 def returning_clause(
5393 self,
5394 stmt: UpdateBase,
5395 returning_cols: Sequence[_ColumnsClauseElement],
5396 *,
5397 populate_result_map: bool,
5398 **kw: Any,
5399 ) -> str:
5400 columns = [
5401 self._label_returning_column(
5402 stmt,
5403 column,
5404 populate_result_map,
5405 fallback_label_name=fallback_label_name,
5406 column_is_repeated=repeated,
5407 name=name,
5408 proxy_name=proxy_name,
5409 **kw,
5410 )
5411 for (
5412 name,
5413 proxy_name,
5414 fallback_label_name,
5415 column,
5416 repeated,
5417 ) in stmt._generate_columns_plus_names(
5418 True, cols=base._select_iterables(returning_cols)
5419 )
5420 ]
5421
5422 return "RETURNING " + ", ".join(columns)
5423
5424 def limit_clause(self, select, **kw):
5425 text = ""
5426 if select._limit_clause is not None:
5427 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5428 if select._offset_clause is not None:
5429 if select._limit_clause is None:
5430 text += "\n LIMIT -1"
5431 text += " OFFSET " + self.process(select._offset_clause, **kw)
5432 return text
5433
5434 def fetch_clause(
5435 self,
5436 select,
5437 fetch_clause=None,
5438 require_offset=False,
5439 use_literal_execute_for_simple_int=False,
5440 **kw,
5441 ):
5442 if fetch_clause is None:
5443 fetch_clause = select._fetch_clause
5444 fetch_clause_options = select._fetch_clause_options
5445 else:
5446 fetch_clause_options = {"percent": False, "with_ties": False}
5447
5448 text = ""
5449
5450 if select._offset_clause is not None:
5451 offset_clause = select._offset_clause
5452 if (
5453 use_literal_execute_for_simple_int
5454 and select._simple_int_clause(offset_clause)
5455 ):
5456 offset_clause = offset_clause.render_literal_execute()
5457 offset_str = self.process(offset_clause, **kw)
5458 text += "\n OFFSET %s ROWS" % offset_str
5459 elif require_offset:
5460 text += "\n OFFSET 0 ROWS"
5461
5462 if fetch_clause is not None:
5463 if (
5464 use_literal_execute_for_simple_int
5465 and select._simple_int_clause(fetch_clause)
5466 ):
5467 fetch_clause = fetch_clause.render_literal_execute()
5468 text += "\n FETCH FIRST %s%s ROWS %s" % (
5469 self.process(fetch_clause, **kw),
5470 " PERCENT" if fetch_clause_options["percent"] else "",
5471 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5472 )
5473 return text
5474
5475 def visit_table(
5476 self,
5477 table,
5478 asfrom=False,
5479 iscrud=False,
5480 ashint=False,
5481 fromhints=None,
5482 use_schema=True,
5483 from_linter=None,
5484 ambiguous_table_name_map=None,
5485 enclosing_alias=None,
5486 **kwargs,
5487 ):
5488 if from_linter:
5489 from_linter.froms[table] = table.fullname
5490
5491 if asfrom or ashint:
5492 effective_schema = self.preparer.schema_for_object(table)
5493
5494 if use_schema and effective_schema:
5495 ret = (
5496 self.preparer.quote_schema(effective_schema)
5497 + "."
5498 + self.preparer.quote(table.name)
5499 )
5500 else:
5501 ret = self.preparer.quote(table.name)
5502
5503 if (
5504 (
5505 enclosing_alias is None
5506 or enclosing_alias.element is not table
5507 )
5508 and not effective_schema
5509 and ambiguous_table_name_map
5510 and table.name in ambiguous_table_name_map
5511 ):
5512 anon_name = self._truncated_identifier(
5513 "alias", ambiguous_table_name_map[table.name]
5514 )
5515
5516 ret = ret + self.get_render_as_alias_suffix(
5517 self.preparer.format_alias(None, anon_name)
5518 )
5519
5520 if fromhints and table in fromhints:
5521 ret = self.format_from_hint_text(
5522 ret, table, fromhints[table], iscrud
5523 )
5524 return ret
5525 else:
5526 return ""
5527
5528 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5529 if from_linter:
5530 from_linter.edges.update(
5531 itertools.product(
5532 _de_clone(join.left._from_objects),
5533 _de_clone(join.right._from_objects),
5534 )
5535 )
5536
5537 if join.full:
5538 join_type = " FULL OUTER JOIN "
5539 elif join.isouter:
5540 join_type = " LEFT OUTER JOIN "
5541 else:
5542 join_type = " JOIN "
5543 return (
5544 join.left._compiler_dispatch(
5545 self, asfrom=True, from_linter=from_linter, **kwargs
5546 )
5547 + join_type
5548 + join.right._compiler_dispatch(
5549 self, asfrom=True, from_linter=from_linter, **kwargs
5550 )
5551 + " ON "
5552 # TODO: likely need asfrom=True here?
5553 + join.onclause._compiler_dispatch(
5554 self, from_linter=from_linter, **kwargs
5555 )
5556 )
5557
5558 def _setup_crud_hints(self, stmt, table_text):
5559 dialect_hints = {
5560 table: hint_text
5561 for (table, dialect), hint_text in stmt._hints.items()
5562 if dialect in ("*", self.dialect.name)
5563 }
5564 if stmt.table in dialect_hints:
5565 table_text = self.format_from_hint_text(
5566 table_text, stmt.table, dialect_hints[stmt.table], True
5567 )
5568 return dialect_hints, table_text
5569
5570 # within the realm of "insertmanyvalues sentinel columns",
5571 # these lookups match different kinds of Column() configurations
5572 # to specific backend capabilities. they are broken into two
5573 # lookups, one for autoincrement columns and the other for non
5574 # autoincrement columns
5575 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5576 {
5577 _SentinelDefaultCharacterization.CLIENTSIDE: (
5578 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5579 ),
5580 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5581 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5582 ),
5583 _SentinelDefaultCharacterization.NONE: (
5584 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5585 ),
5586 _SentinelDefaultCharacterization.IDENTITY: (
5587 InsertmanyvaluesSentinelOpts.IDENTITY
5588 ),
5589 _SentinelDefaultCharacterization.SEQUENCE: (
5590 InsertmanyvaluesSentinelOpts.SEQUENCE
5591 ),
5592 }
5593 )
5594 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5595 {
5596 _SentinelDefaultCharacterization.NONE: (
5597 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5598 ),
5599 }
5600 )
5601
5602 def _get_sentinel_column_for_table(
5603 self, table: Table
5604 ) -> Optional[Sequence[Column[Any]]]:
5605 """given a :class:`.Table`, return a usable sentinel column or
5606 columns for this dialect if any.
5607
5608 Return None if no sentinel columns could be identified, or raise an
5609 error if a column was marked as a sentinel explicitly but isn't
5610 compatible with this dialect.
5611
5612 """
5613
5614 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5615 sentinel_characteristics = table._sentinel_column_characteristics
5616
5617 sent_cols = sentinel_characteristics.columns
5618
5619 if sent_cols is None:
5620 return None
5621
5622 if sentinel_characteristics.is_autoinc:
5623 bitmask = self._sentinel_col_autoinc_lookup.get(
5624 sentinel_characteristics.default_characterization, 0
5625 )
5626 else:
5627 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5628 sentinel_characteristics.default_characterization, 0
5629 )
5630
5631 if sentinel_opts & bitmask:
5632 return sent_cols
5633
5634 if sentinel_characteristics.is_explicit:
5635 # a column was explicitly marked as insert_sentinel=True,
5636 # however it is not compatible with this dialect. they should
5637 # not indicate this column as a sentinel if they need to include
5638 # this dialect.
5639
5640 # TODO: do we want non-primary key explicit sentinel cols
5641 # that can gracefully degrade for some backends?
5642 # insert_sentinel="degrade" perhaps. not for the initial release.
5643 # I am hoping people are generally not dealing with this sentinel
5644 # business at all.
5645
5646 # if is_explicit is True, there will be only one sentinel column.
5647
5648 raise exc.InvalidRequestError(
5649 f"Column {sent_cols[0]} can't be explicitly "
5650 "marked as a sentinel column when using the "
5651 f"{self.dialect.name} dialect, as the "
5652 "particular type of default generation on this column is "
5653 "not currently compatible with this dialect's specific "
5654 f"INSERT..RETURNING syntax which can receive the "
5655 "server-generated value in "
5656 "a deterministic way. To remove this error, remove "
5657 "insert_sentinel=True from primary key autoincrement "
5658 "columns; these columns are automatically used as "
5659 "sentinels for supported dialects in any case."
5660 )
5661
5662 return None
5663
5664 def _deliver_insertmanyvalues_batches(
5665 self,
5666 statement: str,
5667 parameters: _DBAPIMultiExecuteParams,
5668 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5669 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5670 batch_size: int,
5671 sort_by_parameter_order: bool,
5672 schema_translate_map: Optional[SchemaTranslateMapType],
5673 ) -> Iterator[_InsertManyValuesBatch]:
5674 imv = self._insertmanyvalues
5675 assert imv is not None
5676
5677 if not imv.sentinel_param_keys:
5678 _sentinel_from_params = None
5679 else:
5680 _sentinel_from_params = operator.itemgetter(
5681 *imv.sentinel_param_keys
5682 )
5683
5684 lenparams = len(parameters)
5685 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5686 # backend doesn't support
5687 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5688 # at the moment this is basically SQL Server due to
5689 # not being able to use DEFAULT for identity column
5690 # just yield out that many single statements! still
5691 # faster than a whole connection.execute() call ;)
5692 #
5693 # note we still are taking advantage of the fact that we know
5694 # we are using RETURNING. The generalized approach of fetching
5695 # cursor.lastrowid etc. still goes through the more heavyweight
5696 # "ExecutionContext per statement" system as it isn't usable
5697 # as a generic "RETURNING" approach
5698 use_row_at_a_time = True
5699 downgraded = False
5700 elif not self.dialect.supports_multivalues_insert or (
5701 sort_by_parameter_order
5702 and self._result_columns
5703 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5704 ):
5705 # deterministic order was requested and the compiler could
5706 # not organize sentinel columns for this dialect/statement.
5707 # use row at a time
5708 use_row_at_a_time = True
5709 downgraded = True
5710 else:
5711 use_row_at_a_time = False
5712 downgraded = False
5713
5714 if use_row_at_a_time:
5715 for batchnum, (param, compiled_param) in enumerate(
5716 cast(
5717 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5718 zip(parameters, compiled_parameters),
5719 ),
5720 1,
5721 ):
5722 yield _InsertManyValuesBatch(
5723 statement,
5724 param,
5725 generic_setinputsizes,
5726 [param],
5727 (
5728 [_sentinel_from_params(compiled_param)]
5729 if _sentinel_from_params
5730 else []
5731 ),
5732 1,
5733 batchnum,
5734 lenparams,
5735 sort_by_parameter_order,
5736 downgraded,
5737 )
5738 return
5739
5740 if schema_translate_map:
5741 rst = functools.partial(
5742 self.preparer._render_schema_translates,
5743 schema_translate_map=schema_translate_map,
5744 )
5745 else:
5746 rst = None
5747
5748 imv_single_values_expr = imv.single_values_expr
5749 if rst:
5750 imv_single_values_expr = rst(imv_single_values_expr)
5751
5752 executemany_values = f"({imv_single_values_expr})"
5753 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5754
5755 # Use optional insertmanyvalues_max_parameters
5756 # to further shrink the batch size so that there are no more than
5757 # insertmanyvalues_max_parameters params.
5758 # Currently used by SQL Server, which limits statements to 2100 bound
5759 # parameters (actually 2099).
5760 max_params = self.dialect.insertmanyvalues_max_parameters
5761 if max_params:
5762 total_num_of_params = len(self.bind_names)
5763 num_params_per_batch = len(imv.insert_crud_params)
5764 num_params_outside_of_batch = (
5765 total_num_of_params - num_params_per_batch
5766 )
5767 batch_size = min(
5768 batch_size,
5769 (
5770 (max_params - num_params_outside_of_batch)
5771 // num_params_per_batch
5772 ),
5773 )
5774
5775 batches = cast("List[Sequence[Any]]", list(parameters))
5776 compiled_batches = cast(
5777 "List[Sequence[Any]]", list(compiled_parameters)
5778 )
5779
5780 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5781 batchnum = 1
5782 total_batches = lenparams // batch_size + (
5783 1 if lenparams % batch_size else 0
5784 )
5785
5786 insert_crud_params = imv.insert_crud_params
5787 assert insert_crud_params is not None
5788
5789 if rst:
5790 insert_crud_params = [
5791 (col, key, rst(expr), st)
5792 for col, key, expr, st in insert_crud_params
5793 ]
5794
5795 escaped_bind_names: Mapping[str, str]
5796 expand_pos_lower_index = expand_pos_upper_index = 0
5797
5798 if not self.positional:
5799 if self.escaped_bind_names:
5800 escaped_bind_names = self.escaped_bind_names
5801 else:
5802 escaped_bind_names = {}
5803
5804 all_keys = set(parameters[0])
5805
5806 def apply_placeholders(keys, formatted):
5807 for key in keys:
5808 key = escaped_bind_names.get(key, key)
5809 formatted = formatted.replace(
5810 self.bindtemplate % {"name": key},
5811 self.bindtemplate
5812 % {"name": f"{key}__EXECMANY_INDEX__"},
5813 )
5814 return formatted
5815
5816 if imv.embed_values_counter:
5817 imv_values_counter = ", _IMV_VALUES_COUNTER"
5818 else:
5819 imv_values_counter = ""
5820 formatted_values_clause = f"""({', '.join(
5821 apply_placeholders(bind_keys, formatted)
5822 for _, _, formatted, bind_keys in insert_crud_params
5823 )}{imv_values_counter})"""
5824
5825 keys_to_replace = all_keys.intersection(
5826 escaped_bind_names.get(key, key)
5827 for _, _, _, bind_keys in insert_crud_params
5828 for key in bind_keys
5829 )
5830 base_parameters = {
5831 key: parameters[0][key]
5832 for key in all_keys.difference(keys_to_replace)
5833 }
5834 executemany_values_w_comma = ""
5835 else:
5836 formatted_values_clause = ""
5837 keys_to_replace = set()
5838 base_parameters = {}
5839
5840 if imv.embed_values_counter:
5841 executemany_values_w_comma = (
5842 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5843 )
5844 else:
5845 executemany_values_w_comma = f"({imv_single_values_expr}), "
5846
5847 all_names_we_will_expand: Set[str] = set()
5848 for elem in imv.insert_crud_params:
5849 all_names_we_will_expand.update(elem[3])
5850
5851 # get the start and end position in a particular list
5852 # of parameters where we will be doing the "expanding".
5853 # statements can have params on either side or both sides,
5854 # given RETURNING and CTEs
5855 if all_names_we_will_expand:
5856 positiontup = self.positiontup
5857 assert positiontup is not None
5858
5859 all_expand_positions = {
5860 idx
5861 for idx, name in enumerate(positiontup)
5862 if name in all_names_we_will_expand
5863 }
5864 expand_pos_lower_index = min(all_expand_positions)
5865 expand_pos_upper_index = max(all_expand_positions) + 1
5866 assert (
5867 len(all_expand_positions)
5868 == expand_pos_upper_index - expand_pos_lower_index
5869 )
5870
5871 if self._numeric_binds:
5872 escaped = re.escape(self._numeric_binds_identifier_char)
5873 executemany_values_w_comma = re.sub(
5874 rf"{escaped}\d+", "%s", executemany_values_w_comma
5875 )
5876
5877 while batches:
5878 batch = batches[0:batch_size]
5879 compiled_batch = compiled_batches[0:batch_size]
5880
5881 batches[0:batch_size] = []
5882 compiled_batches[0:batch_size] = []
5883
5884 if batches:
5885 current_batch_size = batch_size
5886 else:
5887 current_batch_size = len(batch)
5888
5889 if generic_setinputsizes:
5890 # if setinputsizes is present, expand this collection to
5891 # suit the batch length as well
5892 # currently this will be mssql+pyodbc for internal dialects
5893 processed_setinputsizes = [
5894 (new_key, len_, typ)
5895 for new_key, len_, typ in (
5896 (f"{key}_{index}", len_, typ)
5897 for index in range(current_batch_size)
5898 for key, len_, typ in generic_setinputsizes
5899 )
5900 ]
5901
5902 replaced_parameters: Any
5903 if self.positional:
5904 num_ins_params = imv.num_positional_params_counted
5905
5906 batch_iterator: Iterable[Sequence[Any]]
5907 extra_params_left: Sequence[Any]
5908 extra_params_right: Sequence[Any]
5909
5910 if num_ins_params == len(batch[0]):
5911 extra_params_left = extra_params_right = ()
5912 batch_iterator = batch
5913 else:
5914 extra_params_left = batch[0][:expand_pos_lower_index]
5915 extra_params_right = batch[0][expand_pos_upper_index:]
5916 batch_iterator = (
5917 b[expand_pos_lower_index:expand_pos_upper_index]
5918 for b in batch
5919 )
5920
5921 if imv.embed_values_counter:
5922 expanded_values_string = (
5923 "".join(
5924 executemany_values_w_comma.replace(
5925 "_IMV_VALUES_COUNTER", str(i)
5926 )
5927 for i, _ in enumerate(batch)
5928 )
5929 )[:-2]
5930 else:
5931 expanded_values_string = (
5932 (executemany_values_w_comma * current_batch_size)
5933 )[:-2]
5934
5935 if self._numeric_binds and num_ins_params > 0:
5936 # numeric will always number the parameters inside of
5937 # VALUES (and thus order self.positiontup) to be higher
5938 # than non-VALUES parameters, no matter where in the
5939 # statement those non-VALUES parameters appear (this is
5940 # ensured in _process_numeric by numbering first all
5941 # params that are not in _values_bindparam)
5942 # therefore all extra params are always
5943 # on the left side and numbered lower than the VALUES
5944 # parameters
5945 assert not extra_params_right
5946
5947 start = expand_pos_lower_index + 1
5948 end = num_ins_params * (current_batch_size) + start
5949
5950 # need to format here, since statement may contain
5951 # unescaped %, while values_string contains just (%s, %s)
5952 positions = tuple(
5953 f"{self._numeric_binds_identifier_char}{i}"
5954 for i in range(start, end)
5955 )
5956 expanded_values_string = expanded_values_string % positions
5957
5958 replaced_statement = statement.replace(
5959 "__EXECMANY_TOKEN__", expanded_values_string
5960 )
5961
5962 replaced_parameters = tuple(
5963 itertools.chain.from_iterable(batch_iterator)
5964 )
5965
5966 replaced_parameters = (
5967 extra_params_left
5968 + replaced_parameters
5969 + extra_params_right
5970 )
5971
5972 else:
5973 replaced_values_clauses = []
5974 replaced_parameters = base_parameters.copy()
5975
5976 for i, param in enumerate(batch):
5977 fmv = formatted_values_clause.replace(
5978 "EXECMANY_INDEX__", str(i)
5979 )
5980 if imv.embed_values_counter:
5981 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5982
5983 replaced_values_clauses.append(fmv)
5984 replaced_parameters.update(
5985 {f"{key}__{i}": param[key] for key in keys_to_replace}
5986 )
5987
5988 replaced_statement = statement.replace(
5989 "__EXECMANY_TOKEN__",
5990 ", ".join(replaced_values_clauses),
5991 )
5992
5993 yield _InsertManyValuesBatch(
5994 replaced_statement,
5995 replaced_parameters,
5996 processed_setinputsizes,
5997 batch,
5998 (
5999 [_sentinel_from_params(cb) for cb in compiled_batch]
6000 if _sentinel_from_params
6001 else []
6002 ),
6003 current_batch_size,
6004 batchnum,
6005 total_batches,
6006 sort_by_parameter_order,
6007 False,
6008 )
6009 batchnum += 1
6010
6011 def visit_insert(
6012 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6013 ):
6014 compile_state = insert_stmt._compile_state_factory(
6015 insert_stmt, self, **kw
6016 )
6017 insert_stmt = compile_state.statement
6018
6019 if visiting_cte is not None:
6020 kw["visiting_cte"] = visiting_cte
6021 toplevel = False
6022 else:
6023 toplevel = not self.stack
6024
6025 if toplevel:
6026 self.isinsert = True
6027 if not self.dml_compile_state:
6028 self.dml_compile_state = compile_state
6029 if not self.compile_state:
6030 self.compile_state = compile_state
6031
6032 self.stack.append(
6033 {
6034 "correlate_froms": set(),
6035 "asfrom_froms": set(),
6036 "selectable": insert_stmt,
6037 }
6038 )
6039
6040 counted_bindparam = 0
6041
6042 # reset any incoming "visited_bindparam" collection
6043 visited_bindparam = None
6044
6045 # for positional, insertmanyvalues needs to know how many
6046 # bound parameters are in the VALUES sequence; there's no simple
6047 # rule because default expressions etc. can have zero or more
6048 # params inside them. After multiple attempts to figure this out,
6049 # this very simplistic "count after" works and is
6050 # likely the least amount of callcounts, though looks clumsy
6051 if self.positional and visiting_cte is None:
6052 # if we are inside a CTE, don't count parameters
6053 # here since they wont be for insertmanyvalues. keep
6054 # visited_bindparam at None so no counting happens.
6055 # see #9173
6056 visited_bindparam = []
6057
6058 crud_params_struct = crud._get_crud_params(
6059 self,
6060 insert_stmt,
6061 compile_state,
6062 toplevel,
6063 visited_bindparam=visited_bindparam,
6064 **kw,
6065 )
6066
6067 if self.positional and visited_bindparam is not None:
6068 counted_bindparam = len(visited_bindparam)
6069 if self._numeric_binds:
6070 if self._values_bindparam is not None:
6071 self._values_bindparam += visited_bindparam
6072 else:
6073 self._values_bindparam = visited_bindparam
6074
6075 crud_params_single = crud_params_struct.single_params
6076
6077 if (
6078 not crud_params_single
6079 and not self.dialect.supports_default_values
6080 and not self.dialect.supports_default_metavalue
6081 and not self.dialect.supports_empty_insert
6082 ):
6083 raise exc.CompileError(
6084 "The '%s' dialect with current database "
6085 "version settings does not support empty "
6086 "inserts." % self.dialect.name
6087 )
6088
6089 if compile_state._has_multi_parameters:
6090 if not self.dialect.supports_multivalues_insert:
6091 raise exc.CompileError(
6092 "The '%s' dialect with current database "
6093 "version settings does not support "
6094 "in-place multirow inserts." % self.dialect.name
6095 )
6096 elif (
6097 self.implicit_returning or insert_stmt._returning
6098 ) and insert_stmt._sort_by_parameter_order:
6099 raise exc.CompileError(
6100 "RETURNING cannot be determinstically sorted when "
6101 "using an INSERT which includes multi-row values()."
6102 )
6103 crud_params_single = crud_params_struct.single_params
6104 else:
6105 crud_params_single = crud_params_struct.single_params
6106
6107 preparer = self.preparer
6108 supports_default_values = self.dialect.supports_default_values
6109
6110 text = "INSERT "
6111
6112 if insert_stmt._prefixes:
6113 text += self._generate_prefixes(
6114 insert_stmt, insert_stmt._prefixes, **kw
6115 )
6116
6117 text += "INTO "
6118 table_text = preparer.format_table(insert_stmt.table)
6119
6120 if insert_stmt._hints:
6121 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6122
6123 if insert_stmt._independent_ctes:
6124 self._dispatch_independent_ctes(insert_stmt, kw)
6125
6126 text += table_text
6127
6128 if crud_params_single or not supports_default_values:
6129 text += " (%s)" % ", ".join(
6130 [expr for _, expr, _, _ in crud_params_single]
6131 )
6132
6133 # look for insertmanyvalues attributes that would have been configured
6134 # by crud.py as it scanned through the columns to be part of the
6135 # INSERT
6136 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6137 named_sentinel_params: Optional[Sequence[str]] = None
6138 add_sentinel_cols = None
6139 implicit_sentinel = False
6140
6141 returning_cols = self.implicit_returning or insert_stmt._returning
6142 if returning_cols:
6143 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6144 if add_sentinel_cols is not None:
6145 assert use_insertmanyvalues
6146
6147 # search for the sentinel column explicitly present
6148 # in the INSERT columns list, and additionally check that
6149 # this column has a bound parameter name set up that's in the
6150 # parameter list. If both of these cases are present, it means
6151 # we will have a client side value for the sentinel in each
6152 # parameter set.
6153
6154 _params_by_col = {
6155 col: param_names
6156 for col, _, _, param_names in crud_params_single
6157 }
6158 named_sentinel_params = []
6159 for _add_sentinel_col in add_sentinel_cols:
6160 if _add_sentinel_col not in _params_by_col:
6161 named_sentinel_params = None
6162 break
6163 param_name = self._within_exec_param_key_getter(
6164 _add_sentinel_col
6165 )
6166 if param_name not in _params_by_col[_add_sentinel_col]:
6167 named_sentinel_params = None
6168 break
6169 named_sentinel_params.append(param_name)
6170
6171 if named_sentinel_params is None:
6172 # if we are not going to have a client side value for
6173 # the sentinel in the parameter set, that means it's
6174 # an autoincrement, an IDENTITY, or a server-side SQL
6175 # expression like nextval('seqname'). So this is
6176 # an "implicit" sentinel; we will look for it in
6177 # RETURNING
6178 # only, and then sort on it. For this case on PG,
6179 # SQL Server we have to use a special INSERT form
6180 # that guarantees the server side function lines up with
6181 # the entries in the VALUES.
6182 if (
6183 self.dialect.insertmanyvalues_implicit_sentinel
6184 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6185 ):
6186 implicit_sentinel = True
6187 else:
6188 # here, we are not using a sentinel at all
6189 # and we are likely the SQLite dialect.
6190 # The first add_sentinel_col that we have should not
6191 # be marked as "insert_sentinel=True". if it was,
6192 # an error should have been raised in
6193 # _get_sentinel_column_for_table.
6194 assert not add_sentinel_cols[0]._insert_sentinel, (
6195 "sentinel selection rules should have prevented "
6196 "us from getting here for this dialect"
6197 )
6198
6199 # always put the sentinel columns last. even if they are
6200 # in the returning list already, they will be there twice
6201 # then.
6202 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6203
6204 returning_clause = self.returning_clause(
6205 insert_stmt,
6206 returning_cols,
6207 populate_result_map=toplevel,
6208 )
6209
6210 if self.returning_precedes_values:
6211 text += " " + returning_clause
6212
6213 else:
6214 returning_clause = None
6215
6216 if insert_stmt.select is not None:
6217 # placed here by crud.py
6218 select_text = self.process(
6219 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6220 )
6221
6222 if self.ctes and self.dialect.cte_follows_insert:
6223 nesting_level = len(self.stack) if not toplevel else None
6224 text += " %s%s" % (
6225 self._render_cte_clause(
6226 nesting_level=nesting_level,
6227 include_following_stack=True,
6228 ),
6229 select_text,
6230 )
6231 else:
6232 text += " %s" % select_text
6233 elif not crud_params_single and supports_default_values:
6234 text += " DEFAULT VALUES"
6235 if use_insertmanyvalues:
6236 self._insertmanyvalues = _InsertManyValues(
6237 True,
6238 self.dialect.default_metavalue_token,
6239 cast(
6240 "List[crud._CrudParamElementStr]", crud_params_single
6241 ),
6242 counted_bindparam,
6243 sort_by_parameter_order=(
6244 insert_stmt._sort_by_parameter_order
6245 ),
6246 includes_upsert_behaviors=(
6247 insert_stmt._post_values_clause is not None
6248 ),
6249 sentinel_columns=add_sentinel_cols,
6250 num_sentinel_columns=(
6251 len(add_sentinel_cols) if add_sentinel_cols else 0
6252 ),
6253 implicit_sentinel=implicit_sentinel,
6254 )
6255 elif compile_state._has_multi_parameters:
6256 text += " VALUES %s" % (
6257 ", ".join(
6258 "(%s)"
6259 % (", ".join(value for _, _, value, _ in crud_param_set))
6260 for crud_param_set in crud_params_struct.all_multi_params
6261 ),
6262 )
6263 else:
6264 insert_single_values_expr = ", ".join(
6265 [
6266 value
6267 for _, _, value, _ in cast(
6268 "List[crud._CrudParamElementStr]",
6269 crud_params_single,
6270 )
6271 ]
6272 )
6273
6274 if use_insertmanyvalues:
6275 if (
6276 implicit_sentinel
6277 and (
6278 self.dialect.insertmanyvalues_implicit_sentinel
6279 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6280 )
6281 # this is checking if we have
6282 # INSERT INTO table (id) VALUES (DEFAULT).
6283 and not (crud_params_struct.is_default_metavalue_only)
6284 ):
6285 # if we have a sentinel column that is server generated,
6286 # then for selected backends render the VALUES list as a
6287 # subquery. This is the orderable form supported by
6288 # PostgreSQL and SQL Server.
6289 embed_sentinel_value = True
6290
6291 render_bind_casts = (
6292 self.dialect.insertmanyvalues_implicit_sentinel
6293 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6294 )
6295
6296 colnames = ", ".join(
6297 f"p{i}" for i, _ in enumerate(crud_params_single)
6298 )
6299
6300 if render_bind_casts:
6301 # render casts for the SELECT list. For PG, we are
6302 # already rendering bind casts in the parameter list,
6303 # selectively for the more "tricky" types like ARRAY.
6304 # however, even for the "easy" types, if the parameter
6305 # is NULL for every entry, PG gives up and says
6306 # "it must be TEXT", which fails for other easy types
6307 # like ints. So we cast on this side too.
6308 colnames_w_cast = ", ".join(
6309 self.render_bind_cast(
6310 col.type,
6311 col.type._unwrapped_dialect_impl(self.dialect),
6312 f"p{i}",
6313 )
6314 for i, (col, *_) in enumerate(crud_params_single)
6315 )
6316 else:
6317 colnames_w_cast = colnames
6318
6319 text += (
6320 f" SELECT {colnames_w_cast} FROM "
6321 f"(VALUES ({insert_single_values_expr})) "
6322 f"AS imp_sen({colnames}, sen_counter) "
6323 "ORDER BY sen_counter"
6324 )
6325 else:
6326 # otherwise, if no sentinel or backend doesn't support
6327 # orderable subquery form, use a plain VALUES list
6328 embed_sentinel_value = False
6329 text += f" VALUES ({insert_single_values_expr})"
6330
6331 self._insertmanyvalues = _InsertManyValues(
6332 is_default_expr=False,
6333 single_values_expr=insert_single_values_expr,
6334 insert_crud_params=cast(
6335 "List[crud._CrudParamElementStr]",
6336 crud_params_single,
6337 ),
6338 num_positional_params_counted=counted_bindparam,
6339 sort_by_parameter_order=(
6340 insert_stmt._sort_by_parameter_order
6341 ),
6342 includes_upsert_behaviors=(
6343 insert_stmt._post_values_clause is not None
6344 ),
6345 sentinel_columns=add_sentinel_cols,
6346 num_sentinel_columns=(
6347 len(add_sentinel_cols) if add_sentinel_cols else 0
6348 ),
6349 sentinel_param_keys=named_sentinel_params,
6350 implicit_sentinel=implicit_sentinel,
6351 embed_values_counter=embed_sentinel_value,
6352 )
6353
6354 else:
6355 text += f" VALUES ({insert_single_values_expr})"
6356
6357 if insert_stmt._post_values_clause is not None:
6358 post_values_clause = self.process(
6359 insert_stmt._post_values_clause, **kw
6360 )
6361 if post_values_clause:
6362 text += " " + post_values_clause
6363
6364 if returning_clause and not self.returning_precedes_values:
6365 text += " " + returning_clause
6366
6367 if self.ctes and not self.dialect.cte_follows_insert:
6368 nesting_level = len(self.stack) if not toplevel else None
6369 text = (
6370 self._render_cte_clause(
6371 nesting_level=nesting_level,
6372 include_following_stack=True,
6373 )
6374 + text
6375 )
6376
6377 self.stack.pop(-1)
6378
6379 return text
6380
6381 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6382 """Provide a hook to override the initial table clause
6383 in an UPDATE statement.
6384
6385 MySQL overrides this.
6386
6387 """
6388 kw["asfrom"] = True
6389 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6390
6391 def update_from_clause(
6392 self, update_stmt, from_table, extra_froms, from_hints, **kw
6393 ):
6394 """Provide a hook to override the generation of an
6395 UPDATE..FROM clause.
6396 MySQL and MSSQL override this.
6397 """
6398 raise NotImplementedError(
6399 "This backend does not support multiple-table "
6400 "criteria within UPDATE"
6401 )
6402
6403 def update_post_criteria_clause(
6404 self, update_stmt: Update, **kw: Any
6405 ) -> Optional[str]:
6406 """provide a hook to override generation after the WHERE criteria
6407 in an UPDATE statement
6408
6409 .. versionadded:: 2.1
6410
6411 """
6412 if update_stmt._post_criteria_clause is not None:
6413 return self.process(
6414 update_stmt._post_criteria_clause,
6415 **kw,
6416 )
6417 else:
6418 return None
6419
6420 def delete_post_criteria_clause(
6421 self, delete_stmt: Delete, **kw: Any
6422 ) -> Optional[str]:
6423 """provide a hook to override generation after the WHERE criteria
6424 in a DELETE statement
6425
6426 .. versionadded:: 2.1
6427
6428 """
6429 if delete_stmt._post_criteria_clause is not None:
6430 return self.process(
6431 delete_stmt._post_criteria_clause,
6432 **kw,
6433 )
6434 else:
6435 return None
6436
6437 def visit_update(
6438 self,
6439 update_stmt: Update,
6440 visiting_cte: Optional[CTE] = None,
6441 **kw: Any,
6442 ) -> str:
6443 compile_state = update_stmt._compile_state_factory(
6444 update_stmt, self, **kw
6445 )
6446 if TYPE_CHECKING:
6447 assert isinstance(compile_state, UpdateDMLState)
6448 update_stmt = compile_state.statement # type: ignore[assignment]
6449
6450 if visiting_cte is not None:
6451 kw["visiting_cte"] = visiting_cte
6452 toplevel = False
6453 else:
6454 toplevel = not self.stack
6455
6456 if toplevel:
6457 self.isupdate = True
6458 if not self.dml_compile_state:
6459 self.dml_compile_state = compile_state
6460 if not self.compile_state:
6461 self.compile_state = compile_state
6462
6463 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6464 from_linter = FromLinter({}, set())
6465 warn_linting = self.linting & WARN_LINTING
6466 if toplevel:
6467 self.from_linter = from_linter
6468 else:
6469 from_linter = None
6470 warn_linting = False
6471
6472 extra_froms = compile_state._extra_froms
6473 is_multitable = bool(extra_froms)
6474
6475 if is_multitable:
6476 # main table might be a JOIN
6477 main_froms = set(_from_objects(update_stmt.table))
6478 render_extra_froms = [
6479 f for f in extra_froms if f not in main_froms
6480 ]
6481 correlate_froms = main_froms.union(extra_froms)
6482 else:
6483 render_extra_froms = []
6484 correlate_froms = {update_stmt.table}
6485
6486 self.stack.append(
6487 {
6488 "correlate_froms": correlate_froms,
6489 "asfrom_froms": correlate_froms,
6490 "selectable": update_stmt,
6491 }
6492 )
6493
6494 text = "UPDATE "
6495
6496 if update_stmt._prefixes:
6497 text += self._generate_prefixes(
6498 update_stmt, update_stmt._prefixes, **kw
6499 )
6500
6501 table_text = self.update_tables_clause(
6502 update_stmt,
6503 update_stmt.table,
6504 render_extra_froms,
6505 from_linter=from_linter,
6506 **kw,
6507 )
6508 crud_params_struct = crud._get_crud_params(
6509 self, update_stmt, compile_state, toplevel, **kw
6510 )
6511 crud_params = crud_params_struct.single_params
6512
6513 if update_stmt._hints:
6514 dialect_hints, table_text = self._setup_crud_hints(
6515 update_stmt, table_text
6516 )
6517 else:
6518 dialect_hints = None
6519
6520 if update_stmt._independent_ctes:
6521 self._dispatch_independent_ctes(update_stmt, kw)
6522
6523 text += table_text
6524
6525 text += " SET "
6526 text += ", ".join(
6527 expr + "=" + value
6528 for _, expr, value, _ in cast(
6529 "List[Tuple[Any, str, str, Any]]", crud_params
6530 )
6531 )
6532
6533 if self.implicit_returning or update_stmt._returning:
6534 if self.returning_precedes_values:
6535 text += " " + self.returning_clause(
6536 update_stmt,
6537 self.implicit_returning or update_stmt._returning,
6538 populate_result_map=toplevel,
6539 )
6540
6541 if extra_froms:
6542 extra_from_text = self.update_from_clause(
6543 update_stmt,
6544 update_stmt.table,
6545 render_extra_froms,
6546 dialect_hints,
6547 from_linter=from_linter,
6548 **kw,
6549 )
6550 if extra_from_text:
6551 text += " " + extra_from_text
6552
6553 if update_stmt._where_criteria:
6554 t = self._generate_delimited_and_list(
6555 update_stmt._where_criteria, from_linter=from_linter, **kw
6556 )
6557 if t:
6558 text += " WHERE " + t
6559
6560 ulc = self.update_post_criteria_clause(
6561 update_stmt, from_linter=from_linter, **kw
6562 )
6563 if ulc:
6564 text += " " + ulc
6565
6566 if (
6567 self.implicit_returning or update_stmt._returning
6568 ) and not self.returning_precedes_values:
6569 text += " " + self.returning_clause(
6570 update_stmt,
6571 self.implicit_returning or update_stmt._returning,
6572 populate_result_map=toplevel,
6573 )
6574
6575 if self.ctes:
6576 nesting_level = len(self.stack) if not toplevel else None
6577 text = self._render_cte_clause(nesting_level=nesting_level) + text
6578
6579 if warn_linting:
6580 assert from_linter is not None
6581 from_linter.warn(stmt_type="UPDATE")
6582
6583 self.stack.pop(-1)
6584
6585 return text # type: ignore[no-any-return]
6586
6587 def delete_extra_from_clause(
6588 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6589 ):
6590 """Provide a hook to override the generation of an
6591 DELETE..FROM clause.
6592
6593 This can be used to implement DELETE..USING for example.
6594
6595 MySQL and MSSQL override this.
6596
6597 """
6598 raise NotImplementedError(
6599 "This backend does not support multiple-table "
6600 "criteria within DELETE"
6601 )
6602
6603 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6604 return from_table._compiler_dispatch(
6605 self, asfrom=True, iscrud=True, **kw
6606 )
6607
6608 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6609 compile_state = delete_stmt._compile_state_factory(
6610 delete_stmt, self, **kw
6611 )
6612 delete_stmt = compile_state.statement
6613
6614 if visiting_cte is not None:
6615 kw["visiting_cte"] = visiting_cte
6616 toplevel = False
6617 else:
6618 toplevel = not self.stack
6619
6620 if toplevel:
6621 self.isdelete = True
6622 if not self.dml_compile_state:
6623 self.dml_compile_state = compile_state
6624 if not self.compile_state:
6625 self.compile_state = compile_state
6626
6627 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6628 from_linter = FromLinter({}, set())
6629 warn_linting = self.linting & WARN_LINTING
6630 if toplevel:
6631 self.from_linter = from_linter
6632 else:
6633 from_linter = None
6634 warn_linting = False
6635
6636 extra_froms = compile_state._extra_froms
6637
6638 correlate_froms = {delete_stmt.table}.union(extra_froms)
6639 self.stack.append(
6640 {
6641 "correlate_froms": correlate_froms,
6642 "asfrom_froms": correlate_froms,
6643 "selectable": delete_stmt,
6644 }
6645 )
6646
6647 text = "DELETE "
6648
6649 if delete_stmt._prefixes:
6650 text += self._generate_prefixes(
6651 delete_stmt, delete_stmt._prefixes, **kw
6652 )
6653
6654 text += "FROM "
6655
6656 try:
6657 table_text = self.delete_table_clause(
6658 delete_stmt,
6659 delete_stmt.table,
6660 extra_froms,
6661 from_linter=from_linter,
6662 )
6663 except TypeError:
6664 # anticipate 3rd party dialects that don't include **kw
6665 # TODO: remove in 2.1
6666 table_text = self.delete_table_clause(
6667 delete_stmt, delete_stmt.table, extra_froms
6668 )
6669 if from_linter:
6670 _ = self.process(delete_stmt.table, from_linter=from_linter)
6671
6672 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6673
6674 if delete_stmt._hints:
6675 dialect_hints, table_text = self._setup_crud_hints(
6676 delete_stmt, table_text
6677 )
6678 else:
6679 dialect_hints = None
6680
6681 if delete_stmt._independent_ctes:
6682 self._dispatch_independent_ctes(delete_stmt, kw)
6683
6684 text += table_text
6685
6686 if (
6687 self.implicit_returning or delete_stmt._returning
6688 ) and self.returning_precedes_values:
6689 text += " " + self.returning_clause(
6690 delete_stmt,
6691 self.implicit_returning or delete_stmt._returning,
6692 populate_result_map=toplevel,
6693 )
6694
6695 if extra_froms:
6696 extra_from_text = self.delete_extra_from_clause(
6697 delete_stmt,
6698 delete_stmt.table,
6699 extra_froms,
6700 dialect_hints,
6701 from_linter=from_linter,
6702 **kw,
6703 )
6704 if extra_from_text:
6705 text += " " + extra_from_text
6706
6707 if delete_stmt._where_criteria:
6708 t = self._generate_delimited_and_list(
6709 delete_stmt._where_criteria, from_linter=from_linter, **kw
6710 )
6711 if t:
6712 text += " WHERE " + t
6713
6714 dlc = self.delete_post_criteria_clause(
6715 delete_stmt, from_linter=from_linter, **kw
6716 )
6717 if dlc:
6718 text += " " + dlc
6719
6720 if (
6721 self.implicit_returning or delete_stmt._returning
6722 ) and not self.returning_precedes_values:
6723 text += " " + self.returning_clause(
6724 delete_stmt,
6725 self.implicit_returning or delete_stmt._returning,
6726 populate_result_map=toplevel,
6727 )
6728
6729 if self.ctes:
6730 nesting_level = len(self.stack) if not toplevel else None
6731 text = self._render_cte_clause(nesting_level=nesting_level) + text
6732
6733 if warn_linting:
6734 assert from_linter is not None
6735 from_linter.warn(stmt_type="DELETE")
6736
6737 self.stack.pop(-1)
6738
6739 return text
6740
6741 def visit_savepoint(self, savepoint_stmt, **kw):
6742 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6743
6744 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6745 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6746 savepoint_stmt
6747 )
6748
6749 def visit_release_savepoint(self, savepoint_stmt, **kw):
6750 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6751 savepoint_stmt
6752 )
6753
6754
6755class StrSQLCompiler(SQLCompiler):
6756 """A :class:`.SQLCompiler` subclass which allows a small selection
6757 of non-standard SQL features to render into a string value.
6758
6759 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6760 element is directly stringified without calling upon the
6761 :meth:`_expression.ClauseElement.compile` method.
6762 It can render a limited set
6763 of non-standard SQL constructs to assist in basic stringification,
6764 however for more substantial custom or dialect-specific SQL constructs,
6765 it will be necessary to make use of
6766 :meth:`_expression.ClauseElement.compile`
6767 directly.
6768
6769 .. seealso::
6770
6771 :ref:`faq_sql_expression_string`
6772
6773 """
6774
6775 def _fallback_column_name(self, column):
6776 return "<name unknown>"
6777
6778 @util.preload_module("sqlalchemy.engine.url")
6779 def visit_unsupported_compilation(self, element, err, **kw):
6780 if element.stringify_dialect != "default":
6781 url = util.preloaded.engine_url
6782 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6783
6784 compiler = dialect.statement_compiler(
6785 dialect, None, _supporting_against=self
6786 )
6787 if not isinstance(compiler, StrSQLCompiler):
6788 return compiler.process(element, **kw)
6789
6790 return super().visit_unsupported_compilation(element, err)
6791
6792 def visit_getitem_binary(self, binary, operator, **kw):
6793 return "%s[%s]" % (
6794 self.process(binary.left, **kw),
6795 self.process(binary.right, **kw),
6796 )
6797
6798 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6799 return self.visit_getitem_binary(binary, operator, **kw)
6800
6801 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6802 return self.visit_getitem_binary(binary, operator, **kw)
6803
6804 def visit_sequence(self, sequence, **kw):
6805 return (
6806 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6807 )
6808
6809 def returning_clause(
6810 self,
6811 stmt: UpdateBase,
6812 returning_cols: Sequence[_ColumnsClauseElement],
6813 *,
6814 populate_result_map: bool,
6815 **kw: Any,
6816 ) -> str:
6817 columns = [
6818 self._label_select_column(None, c, True, False, {})
6819 for c in base._select_iterables(returning_cols)
6820 ]
6821 return "RETURNING " + ", ".join(columns)
6822
6823 def update_from_clause(
6824 self, update_stmt, from_table, extra_froms, from_hints, **kw
6825 ):
6826 kw["asfrom"] = True
6827 return "FROM " + ", ".join(
6828 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6829 for t in extra_froms
6830 )
6831
6832 def delete_extra_from_clause(
6833 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6834 ):
6835 kw["asfrom"] = True
6836 return ", " + ", ".join(
6837 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6838 for t in extra_froms
6839 )
6840
6841 def visit_empty_set_expr(self, element_types, **kw):
6842 return "SELECT 1 WHERE 1!=1"
6843
6844 def get_from_hint_text(self, table, text):
6845 return "[%s]" % text
6846
6847 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6848 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6849
6850 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6851 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6852
6853 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6854 return "<regexp replace>(%s, %s)" % (
6855 binary.left._compiler_dispatch(self, **kw),
6856 binary.right._compiler_dispatch(self, **kw),
6857 )
6858
6859 def visit_try_cast(self, cast, **kwargs):
6860 return "TRY_CAST(%s AS %s)" % (
6861 cast.clause._compiler_dispatch(self, **kwargs),
6862 cast.typeclause._compiler_dispatch(self, **kwargs),
6863 )
6864
6865
6866class DDLCompiler(Compiled):
6867 is_ddl = True
6868
6869 if TYPE_CHECKING:
6870
6871 def __init__(
6872 self,
6873 dialect: Dialect,
6874 statement: ExecutableDDLElement,
6875 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6876 render_schema_translate: bool = ...,
6877 compile_kwargs: Mapping[str, Any] = ...,
6878 ): ...
6879
6880 @util.ro_memoized_property
6881 def sql_compiler(self) -> SQLCompiler:
6882 return self.dialect.statement_compiler(
6883 self.dialect, None, schema_translate_map=self.schema_translate_map
6884 )
6885
6886 @util.memoized_property
6887 def type_compiler(self):
6888 return self.dialect.type_compiler_instance
6889
6890 def construct_params(
6891 self,
6892 params: Optional[_CoreSingleExecuteParams] = None,
6893 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6894 escape_names: bool = True,
6895 ) -> Optional[_MutableCoreSingleExecuteParams]:
6896 return None
6897
6898 def visit_ddl(self, ddl, **kwargs):
6899 # table events can substitute table and schema name
6900 context = ddl.context
6901 if isinstance(ddl.target, schema.Table):
6902 context = context.copy()
6903
6904 preparer = self.preparer
6905 path = preparer.format_table_seq(ddl.target)
6906 if len(path) == 1:
6907 table, sch = path[0], ""
6908 else:
6909 table, sch = path[-1], path[0]
6910
6911 context.setdefault("table", table)
6912 context.setdefault("schema", sch)
6913 context.setdefault("fullname", preparer.format_table(ddl.target))
6914
6915 return self.sql_compiler.post_process_text(ddl.statement % context)
6916
6917 def visit_create_schema(self, create, **kw):
6918 text = "CREATE SCHEMA "
6919 if create.if_not_exists:
6920 text += "IF NOT EXISTS "
6921 return text + self.preparer.format_schema(create.element)
6922
6923 def visit_drop_schema(self, drop, **kw):
6924 text = "DROP SCHEMA "
6925 if drop.if_exists:
6926 text += "IF EXISTS "
6927 text += self.preparer.format_schema(drop.element)
6928 if drop.cascade:
6929 text += " CASCADE"
6930 return text
6931
6932 def visit_create_table(self, create, **kw):
6933 table = create.element
6934 preparer = self.preparer
6935
6936 text = "\nCREATE "
6937 if table._prefixes:
6938 text += " ".join(table._prefixes) + " "
6939
6940 text += "TABLE "
6941 if create.if_not_exists:
6942 text += "IF NOT EXISTS "
6943
6944 text += preparer.format_table(table) + " "
6945
6946 create_table_suffix = self.create_table_suffix(table)
6947 if create_table_suffix:
6948 text += create_table_suffix + " "
6949
6950 text += "("
6951
6952 separator = "\n"
6953
6954 # if only one primary key, specify it along with the column
6955 first_pk = False
6956 for create_column in create.columns:
6957 column = create_column.element
6958 try:
6959 processed = self.process(
6960 create_column, first_pk=column.primary_key and not first_pk
6961 )
6962 if processed is not None:
6963 text += separator
6964 separator = ", \n"
6965 text += "\t" + processed
6966 if column.primary_key:
6967 first_pk = True
6968 except exc.CompileError as ce:
6969 raise exc.CompileError(
6970 "(in table '%s', column '%s'): %s"
6971 % (table.description, column.name, ce.args[0])
6972 ) from ce
6973
6974 const = self.create_table_constraints(
6975 table,
6976 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6977 )
6978 if const:
6979 text += separator + "\t" + const
6980
6981 text += "\n)%s\n\n" % self.post_create_table(table)
6982 return text
6983
6984 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
6985 return self._generate_table_select(element, "view", **kw)
6986
6987 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
6988 return self._generate_table_select(element, "create_table_as", **kw)
6989
6990 def _generate_table_select(
6991 self,
6992 element: _TableViaSelect,
6993 type_: str,
6994 if_not_exists: Optional[bool] = None,
6995 **kw: Any,
6996 ) -> str:
6997 prep = self.preparer
6998
6999 inner_kw = dict(kw)
7000 inner_kw["literal_binds"] = True
7001 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7002
7003 # Use if_not_exists parameter if provided, otherwise use element's
7004 use_if_not_exists = (
7005 if_not_exists
7006 if if_not_exists is not None
7007 else element.if_not_exists
7008 )
7009
7010 parts = [
7011 "CREATE",
7012 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7013 "TEMPORARY" if element.temporary else None,
7014 (
7015 "MATERIALIZED VIEW"
7016 if type_ == "view" and getattr(element, "materialized", False)
7017 else "TABLE" if type_ == "create_table_as" else "VIEW"
7018 ),
7019 "IF NOT EXISTS" if use_if_not_exists else None,
7020 prep.format_table(element.table),
7021 "AS",
7022 select_sql,
7023 ]
7024 return " ".join(p for p in parts if p)
7025
7026 def visit_create_column(self, create, first_pk=False, **kw):
7027 column = create.element
7028
7029 if column.system:
7030 return None
7031
7032 text = self.get_column_specification(column, first_pk=first_pk)
7033 const = " ".join(
7034 self.process(constraint) for constraint in column.constraints
7035 )
7036 if const:
7037 text += " " + const
7038
7039 return text
7040
7041 def create_table_constraints(
7042 self, table, _include_foreign_key_constraints=None, **kw
7043 ):
7044 # On some DB order is significant: visit PK first, then the
7045 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7046 constraints = []
7047 if table.primary_key:
7048 constraints.append(table.primary_key)
7049
7050 all_fkcs = table.foreign_key_constraints
7051 if _include_foreign_key_constraints is not None:
7052 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7053 else:
7054 omit_fkcs = set()
7055
7056 constraints.extend(
7057 [
7058 c
7059 for c in table._sorted_constraints
7060 if c is not table.primary_key and c not in omit_fkcs
7061 ]
7062 )
7063
7064 return ", \n\t".join(
7065 p
7066 for p in (
7067 self.process(constraint)
7068 for constraint in constraints
7069 if (constraint._should_create_for_compiler(self))
7070 and (
7071 not self.dialect.supports_alter
7072 or not getattr(constraint, "use_alter", False)
7073 )
7074 )
7075 if p is not None
7076 )
7077
7078 def visit_drop_table(self, drop, **kw):
7079 text = "\nDROP TABLE "
7080 if drop.if_exists:
7081 text += "IF EXISTS "
7082 return text + self.preparer.format_table(drop.element)
7083
7084 def visit_drop_view(self, drop, **kw):
7085 text = "\nDROP "
7086 if drop.materialized:
7087 text += "MATERIALIZED VIEW "
7088 else:
7089 text += "VIEW "
7090 if drop.if_exists:
7091 text += "IF EXISTS "
7092 return text + self.preparer.format_table(drop.element)
7093
7094 def _verify_index_table(self, index: Index) -> None:
7095 if index.table is None:
7096 raise exc.CompileError(
7097 "Index '%s' is not associated with any table." % index.name
7098 )
7099
7100 def visit_create_index(
7101 self, create, include_schema=False, include_table_schema=True, **kw
7102 ):
7103 index = create.element
7104 self._verify_index_table(index)
7105 preparer = self.preparer
7106 text = "CREATE "
7107 if index.unique:
7108 text += "UNIQUE "
7109 if index.name is None:
7110 raise exc.CompileError(
7111 "CREATE INDEX requires that the index have a name"
7112 )
7113
7114 text += "INDEX "
7115 if create.if_not_exists:
7116 text += "IF NOT EXISTS "
7117
7118 text += "%s ON %s (%s)" % (
7119 self._prepared_index_name(index, include_schema=include_schema),
7120 preparer.format_table(
7121 index.table, use_schema=include_table_schema
7122 ),
7123 ", ".join(
7124 self.sql_compiler.process(
7125 expr, include_table=False, literal_binds=True
7126 )
7127 for expr in index.expressions
7128 ),
7129 )
7130 return text
7131
7132 def visit_drop_index(self, drop, **kw):
7133 index = drop.element
7134
7135 if index.name is None:
7136 raise exc.CompileError(
7137 "DROP INDEX requires that the index have a name"
7138 )
7139 text = "\nDROP INDEX "
7140 if drop.if_exists:
7141 text += "IF EXISTS "
7142
7143 return text + self._prepared_index_name(index, include_schema=True)
7144
7145 def _prepared_index_name(
7146 self, index: Index, include_schema: bool = False
7147 ) -> str:
7148 if index.table is not None:
7149 effective_schema = self.preparer.schema_for_object(index.table)
7150 else:
7151 effective_schema = None
7152 if include_schema and effective_schema:
7153 schema_name = self.preparer.quote_schema(effective_schema)
7154 else:
7155 schema_name = None
7156
7157 index_name: str = self.preparer.format_index(index)
7158
7159 if schema_name:
7160 index_name = schema_name + "." + index_name
7161 return index_name
7162
7163 def visit_add_constraint(self, create, **kw):
7164 return "ALTER TABLE %s ADD %s" % (
7165 self.preparer.format_table(create.element.table),
7166 self.process(create.element),
7167 )
7168
7169 def visit_set_table_comment(self, create, **kw):
7170 return "COMMENT ON TABLE %s IS %s" % (
7171 self.preparer.format_table(create.element),
7172 self.sql_compiler.render_literal_value(
7173 create.element.comment, sqltypes.String()
7174 ),
7175 )
7176
7177 def visit_drop_table_comment(self, drop, **kw):
7178 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7179 drop.element
7180 )
7181
7182 def visit_set_column_comment(self, create, **kw):
7183 return "COMMENT ON COLUMN %s IS %s" % (
7184 self.preparer.format_column(
7185 create.element, use_table=True, use_schema=True
7186 ),
7187 self.sql_compiler.render_literal_value(
7188 create.element.comment, sqltypes.String()
7189 ),
7190 )
7191
7192 def visit_drop_column_comment(self, drop, **kw):
7193 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7194 drop.element, use_table=True
7195 )
7196
7197 def visit_set_constraint_comment(self, create, **kw):
7198 raise exc.UnsupportedCompilationError(self, type(create))
7199
7200 def visit_drop_constraint_comment(self, drop, **kw):
7201 raise exc.UnsupportedCompilationError(self, type(drop))
7202
7203 def get_identity_options(self, identity_options):
7204 text = []
7205 if identity_options.increment is not None:
7206 text.append("INCREMENT BY %d" % identity_options.increment)
7207 if identity_options.start is not None:
7208 text.append("START WITH %d" % identity_options.start)
7209 if identity_options.minvalue is not None:
7210 text.append("MINVALUE %d" % identity_options.minvalue)
7211 if identity_options.maxvalue is not None:
7212 text.append("MAXVALUE %d" % identity_options.maxvalue)
7213 if identity_options.nominvalue is not None:
7214 text.append("NO MINVALUE")
7215 if identity_options.nomaxvalue is not None:
7216 text.append("NO MAXVALUE")
7217 if identity_options.cache is not None:
7218 text.append("CACHE %d" % identity_options.cache)
7219 if identity_options.cycle is not None:
7220 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7221 return " ".join(text)
7222
7223 def visit_create_sequence(self, create, prefix=None, **kw):
7224 text = "CREATE SEQUENCE "
7225 if create.if_not_exists:
7226 text += "IF NOT EXISTS "
7227 text += self.preparer.format_sequence(create.element)
7228
7229 if prefix:
7230 text += prefix
7231 options = self.get_identity_options(create.element)
7232 if options:
7233 text += " " + options
7234 return text
7235
7236 def visit_drop_sequence(self, drop, **kw):
7237 text = "DROP SEQUENCE "
7238 if drop.if_exists:
7239 text += "IF EXISTS "
7240 return text + self.preparer.format_sequence(drop.element)
7241
7242 def visit_drop_constraint(self, drop, **kw):
7243 constraint = drop.element
7244 if constraint.name is not None:
7245 formatted_name = self.preparer.format_constraint(constraint)
7246 else:
7247 formatted_name = None
7248
7249 if formatted_name is None:
7250 raise exc.CompileError(
7251 "Can't emit DROP CONSTRAINT for constraint %r; "
7252 "it has no name" % drop.element
7253 )
7254 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7255 self.preparer.format_table(drop.element.table),
7256 "IF EXISTS " if drop.if_exists else "",
7257 formatted_name,
7258 " CASCADE" if drop.cascade else "",
7259 )
7260
7261 def get_column_specification(self, column, **kwargs):
7262 colspec = (
7263 self.preparer.format_column(column)
7264 + " "
7265 + self.dialect.type_compiler_instance.process(
7266 column.type, type_expression=column
7267 )
7268 )
7269 default = self.get_column_default_string(column)
7270 if default is not None:
7271 colspec += " DEFAULT " + default
7272
7273 if column.computed is not None:
7274 colspec += " " + self.process(column.computed)
7275
7276 if (
7277 column.identity is not None
7278 and self.dialect.supports_identity_columns
7279 ):
7280 colspec += " " + self.process(column.identity)
7281
7282 if not column.nullable and (
7283 not column.identity or not self.dialect.supports_identity_columns
7284 ):
7285 colspec += " NOT NULL"
7286 return colspec
7287
7288 def create_table_suffix(self, table):
7289 return ""
7290
7291 def post_create_table(self, table):
7292 return ""
7293
7294 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7295 if isinstance(column.server_default, schema.DefaultClause):
7296 return self.render_default_string(column.server_default.arg)
7297 else:
7298 return None
7299
7300 def render_default_string(self, default: Union[Visitable, str]) -> str:
7301 if isinstance(default, str):
7302 return self.sql_compiler.render_literal_value(
7303 default, sqltypes.STRINGTYPE
7304 )
7305 else:
7306 return self.sql_compiler.process(default, literal_binds=True)
7307
7308 def visit_table_or_column_check_constraint(self, constraint, **kw):
7309 if constraint.is_column_level:
7310 return self.visit_column_check_constraint(constraint)
7311 else:
7312 return self.visit_check_constraint(constraint)
7313
7314 def visit_check_constraint(self, constraint, **kw):
7315 text = ""
7316 if constraint.name is not None:
7317 formatted_name = self.preparer.format_constraint(constraint)
7318 if formatted_name is not None:
7319 text += "CONSTRAINT %s " % formatted_name
7320 text += "CHECK (%s)" % self.sql_compiler.process(
7321 constraint.sqltext, include_table=False, literal_binds=True
7322 )
7323 text += self.define_constraint_deferrability(constraint)
7324 return text
7325
7326 def visit_column_check_constraint(self, constraint, **kw):
7327 text = ""
7328 if constraint.name is not None:
7329 formatted_name = self.preparer.format_constraint(constraint)
7330 if formatted_name is not None:
7331 text += "CONSTRAINT %s " % formatted_name
7332 text += "CHECK (%s)" % self.sql_compiler.process(
7333 constraint.sqltext, include_table=False, literal_binds=True
7334 )
7335 text += self.define_constraint_deferrability(constraint)
7336 return text
7337
7338 def visit_primary_key_constraint(
7339 self, constraint: PrimaryKeyConstraint, **kw: Any
7340 ) -> str:
7341 if len(constraint) == 0:
7342 return ""
7343 text = ""
7344 if constraint.name is not None:
7345 formatted_name = self.preparer.format_constraint(constraint)
7346 if formatted_name is not None:
7347 text += "CONSTRAINT %s " % formatted_name
7348 text += "PRIMARY KEY "
7349 text += "(%s)" % ", ".join(
7350 self.preparer.quote(c.name)
7351 for c in (
7352 constraint.columns_autoinc_first
7353 if constraint._implicit_generated
7354 else constraint.columns
7355 )
7356 )
7357 text += self.define_constraint_deferrability(constraint)
7358 return text
7359
7360 def visit_foreign_key_constraint(self, constraint, **kw):
7361 preparer = self.preparer
7362 text = ""
7363 if constraint.name is not None:
7364 formatted_name = self.preparer.format_constraint(constraint)
7365 if formatted_name is not None:
7366 text += "CONSTRAINT %s " % formatted_name
7367 remote_table = list(constraint.elements)[0].column.table
7368 text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7369 ", ".join(
7370 preparer.quote(f.parent.name) for f in constraint.elements
7371 ),
7372 self.define_constraint_remote_table(
7373 constraint, remote_table, preparer
7374 ),
7375 ", ".join(
7376 preparer.quote(f.column.name) for f in constraint.elements
7377 ),
7378 )
7379 text += self.define_constraint_match(constraint)
7380 text += self.define_constraint_cascades(constraint)
7381 text += self.define_constraint_deferrability(constraint)
7382 return text
7383
7384 def define_constraint_remote_table(self, constraint, table, preparer):
7385 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7386
7387 return preparer.format_table(table)
7388
7389 def visit_unique_constraint(
7390 self, constraint: UniqueConstraint, **kw: Any
7391 ) -> str:
7392 if len(constraint) == 0:
7393 return ""
7394 text = ""
7395 if constraint.name is not None:
7396 formatted_name = self.preparer.format_constraint(constraint)
7397 if formatted_name is not None:
7398 text += "CONSTRAINT %s " % formatted_name
7399 text += "UNIQUE %s(%s)" % (
7400 self.define_unique_constraint_distinct(constraint, **kw),
7401 ", ".join(self.preparer.quote(c.name) for c in constraint),
7402 )
7403 text += self.define_constraint_deferrability(constraint)
7404 return text
7405
7406 def define_unique_constraint_distinct(
7407 self, constraint: UniqueConstraint, **kw: Any
7408 ) -> str:
7409 return ""
7410
7411 def define_constraint_cascades(
7412 self, constraint: ForeignKeyConstraint
7413 ) -> str:
7414 text = ""
7415 if constraint.ondelete is not None:
7416 text += self.define_constraint_ondelete_cascade(constraint)
7417
7418 if constraint.onupdate is not None:
7419 text += self.define_constraint_onupdate_cascade(constraint)
7420 return text
7421
7422 def define_constraint_ondelete_cascade(
7423 self, constraint: ForeignKeyConstraint
7424 ) -> str:
7425 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7426 constraint.ondelete, FK_ON_DELETE
7427 )
7428
7429 def define_constraint_onupdate_cascade(
7430 self, constraint: ForeignKeyConstraint
7431 ) -> str:
7432 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7433 constraint.onupdate, FK_ON_UPDATE
7434 )
7435
7436 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7437 text = ""
7438 if constraint.deferrable is not None:
7439 if constraint.deferrable:
7440 text += " DEFERRABLE"
7441 else:
7442 text += " NOT DEFERRABLE"
7443 if constraint.initially is not None:
7444 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7445 constraint.initially, FK_INITIALLY
7446 )
7447 return text
7448
7449 def define_constraint_match(self, constraint):
7450 text = ""
7451 if constraint.match is not None:
7452 text += " MATCH %s" % constraint.match
7453 return text
7454
7455 def visit_computed_column(self, generated, **kw):
7456 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7457 generated.sqltext, include_table=False, literal_binds=True
7458 )
7459 if generated.persisted is True:
7460 text += " STORED"
7461 elif generated.persisted is False:
7462 text += " VIRTUAL"
7463 return text
7464
7465 def visit_identity_column(self, identity, **kw):
7466 text = "GENERATED %s AS IDENTITY" % (
7467 "ALWAYS" if identity.always else "BY DEFAULT",
7468 )
7469 options = self.get_identity_options(identity)
7470 if options:
7471 text += " (%s)" % options
7472 return text
7473
7474
7475class GenericTypeCompiler(TypeCompiler):
7476 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7477 return "FLOAT"
7478
7479 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7480 return "DOUBLE"
7481
7482 def visit_DOUBLE_PRECISION(
7483 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7484 ) -> str:
7485 return "DOUBLE PRECISION"
7486
7487 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7488 return "REAL"
7489
7490 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7491 if type_.precision is None:
7492 return "NUMERIC"
7493 elif type_.scale is None:
7494 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7495 else:
7496 return "NUMERIC(%(precision)s, %(scale)s)" % {
7497 "precision": type_.precision,
7498 "scale": type_.scale,
7499 }
7500
7501 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7502 if type_.precision is None:
7503 return "DECIMAL"
7504 elif type_.scale is None:
7505 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7506 else:
7507 return "DECIMAL(%(precision)s, %(scale)s)" % {
7508 "precision": type_.precision,
7509 "scale": type_.scale,
7510 }
7511
7512 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7513 return "INTEGER"
7514
7515 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7516 return "SMALLINT"
7517
7518 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7519 return "BIGINT"
7520
7521 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7522 return "TIMESTAMP"
7523
7524 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7525 return "DATETIME"
7526
7527 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7528 return "DATE"
7529
7530 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7531 return "TIME"
7532
7533 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7534 return "CLOB"
7535
7536 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7537 return "NCLOB"
7538
7539 def _render_string_type(
7540 self, name: str, length: Optional[int], collation: Optional[str]
7541 ) -> str:
7542 text = name
7543 if length:
7544 text += f"({length})"
7545 if collation:
7546 text += f' COLLATE "{collation}"'
7547 return text
7548
7549 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7550 return self._render_string_type("CHAR", type_.length, type_.collation)
7551
7552 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7553 return self._render_string_type("NCHAR", type_.length, type_.collation)
7554
7555 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7556 return self._render_string_type(
7557 "VARCHAR", type_.length, type_.collation
7558 )
7559
7560 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7561 return self._render_string_type(
7562 "NVARCHAR", type_.length, type_.collation
7563 )
7564
7565 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7566 return self._render_string_type("TEXT", type_.length, type_.collation)
7567
7568 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7569 return "UUID"
7570
7571 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7572 return "BLOB"
7573
7574 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7575 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7576
7577 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7578 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7579
7580 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7581 return "BOOLEAN"
7582
7583 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7584 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7585 return self._render_string_type("CHAR", length=32, collation=None)
7586 else:
7587 return self.visit_UUID(type_, **kw)
7588
7589 def visit_large_binary(
7590 self, type_: sqltypes.LargeBinary, **kw: Any
7591 ) -> str:
7592 return self.visit_BLOB(type_, **kw)
7593
7594 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7595 return self.visit_BOOLEAN(type_, **kw)
7596
7597 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7598 return self.visit_TIME(type_, **kw)
7599
7600 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7601 return self.visit_DATETIME(type_, **kw)
7602
7603 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7604 return self.visit_DATE(type_, **kw)
7605
7606 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7607 return self.visit_BIGINT(type_, **kw)
7608
7609 def visit_small_integer(
7610 self, type_: sqltypes.SmallInteger, **kw: Any
7611 ) -> str:
7612 return self.visit_SMALLINT(type_, **kw)
7613
7614 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7615 return self.visit_INTEGER(type_, **kw)
7616
7617 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7618 return self.visit_REAL(type_, **kw)
7619
7620 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7621 return self.visit_FLOAT(type_, **kw)
7622
7623 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7624 return self.visit_DOUBLE(type_, **kw)
7625
7626 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7627 return self.visit_NUMERIC(type_, **kw)
7628
7629 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7630 return self.visit_VARCHAR(type_, **kw)
7631
7632 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7633 return self.visit_VARCHAR(type_, **kw)
7634
7635 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7636 return self.visit_TEXT(type_, **kw)
7637
7638 def visit_unicode_text(
7639 self, type_: sqltypes.UnicodeText, **kw: Any
7640 ) -> str:
7641 return self.visit_TEXT(type_, **kw)
7642
7643 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7644 return self.visit_VARCHAR(type_, **kw)
7645
7646 def visit_null(self, type_, **kw):
7647 raise exc.CompileError(
7648 "Can't generate DDL for %r; "
7649 "did you forget to specify a "
7650 "type on this Column?" % type_
7651 )
7652
7653 def visit_type_decorator(
7654 self, type_: TypeDecorator[Any], **kw: Any
7655 ) -> str:
7656 return self.process(type_.type_engine(self.dialect), **kw)
7657
7658 def visit_user_defined(
7659 self, type_: UserDefinedType[Any], **kw: Any
7660 ) -> str:
7661 return type_.get_col_spec(**kw)
7662
7663
7664class StrSQLTypeCompiler(GenericTypeCompiler):
7665 def process(self, type_, **kw):
7666 try:
7667 _compiler_dispatch = type_._compiler_dispatch
7668 except AttributeError:
7669 return self._visit_unknown(type_, **kw)
7670 else:
7671 return _compiler_dispatch(self, **kw)
7672
7673 def __getattr__(self, key):
7674 if key.startswith("visit_"):
7675 return self._visit_unknown
7676 else:
7677 raise AttributeError(key)
7678
7679 def _visit_unknown(self, type_, **kw):
7680 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7681 return type_.__class__.__name__
7682 else:
7683 return repr(type_)
7684
7685 def visit_null(self, type_, **kw):
7686 return "NULL"
7687
7688 def visit_user_defined(self, type_, **kw):
7689 try:
7690 get_col_spec = type_.get_col_spec
7691 except AttributeError:
7692 return repr(type_)
7693 else:
7694 return get_col_spec(**kw)
7695
7696
7697class _SchemaForObjectCallable(Protocol):
7698 def __call__(self, obj: Any, /) -> str: ...
7699
7700
7701class _BindNameForColProtocol(Protocol):
7702 def __call__(self, col: ColumnClause[Any]) -> str: ...
7703
7704
7705class IdentifierPreparer:
7706 """Handle quoting and case-folding of identifiers based on options."""
7707
7708 reserved_words = RESERVED_WORDS
7709
7710 legal_characters = LEGAL_CHARACTERS
7711
7712 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7713
7714 initial_quote: str
7715
7716 final_quote: str
7717
7718 _strings: MutableMapping[str, str]
7719
7720 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7721 """Return the .schema attribute for an object.
7722
7723 For the default IdentifierPreparer, the schema for an object is always
7724 the value of the ".schema" attribute. if the preparer is replaced
7725 with one that has a non-empty schema_translate_map, the value of the
7726 ".schema" attribute is rendered a symbol that will be converted to a
7727 real schema name from the mapping post-compile.
7728
7729 """
7730
7731 _includes_none_schema_translate: bool = False
7732
7733 def __init__(
7734 self,
7735 dialect: Dialect,
7736 initial_quote: str = '"',
7737 final_quote: Optional[str] = None,
7738 escape_quote: str = '"',
7739 quote_case_sensitive_collations: bool = True,
7740 omit_schema: bool = False,
7741 ):
7742 """Construct a new ``IdentifierPreparer`` object.
7743
7744 initial_quote
7745 Character that begins a delimited identifier.
7746
7747 final_quote
7748 Character that ends a delimited identifier. Defaults to
7749 `initial_quote`.
7750
7751 omit_schema
7752 Prevent prepending schema name. Useful for databases that do
7753 not support schemae.
7754 """
7755
7756 self.dialect = dialect
7757 self.initial_quote = initial_quote
7758 self.final_quote = final_quote or self.initial_quote
7759 self.escape_quote = escape_quote
7760 self.escape_to_quote = self.escape_quote * 2
7761 self.omit_schema = omit_schema
7762 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7763 self._strings = {}
7764 self._double_percents = self.dialect.paramstyle in (
7765 "format",
7766 "pyformat",
7767 )
7768
7769 def _with_schema_translate(self, schema_translate_map):
7770 prep = self.__class__.__new__(self.__class__)
7771 prep.__dict__.update(self.__dict__)
7772
7773 includes_none = None in schema_translate_map
7774
7775 def symbol_getter(obj):
7776 name = obj.schema
7777 if obj._use_schema_map and (name is not None or includes_none):
7778 if name is not None and ("[" in name or "]" in name):
7779 raise exc.CompileError(
7780 "Square bracket characters ([]) not supported "
7781 "in schema translate name '%s'" % name
7782 )
7783 return quoted_name(
7784 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7785 )
7786 else:
7787 return obj.schema
7788
7789 prep.schema_for_object = symbol_getter
7790 prep._includes_none_schema_translate = includes_none
7791 return prep
7792
7793 def _render_schema_translates(
7794 self, statement: str, schema_translate_map: SchemaTranslateMapType
7795 ) -> str:
7796 d = schema_translate_map
7797 if None in d:
7798 if not self._includes_none_schema_translate:
7799 raise exc.InvalidRequestError(
7800 "schema translate map which previously did not have "
7801 "`None` present as a key now has `None` present; compiled "
7802 "statement may lack adequate placeholders. Please use "
7803 "consistent keys in successive "
7804 "schema_translate_map dictionaries."
7805 )
7806
7807 d["_none"] = d[None] # type: ignore[index]
7808
7809 def replace(m):
7810 name = m.group(2)
7811 if name in d:
7812 effective_schema = d[name]
7813 else:
7814 if name in (None, "_none"):
7815 raise exc.InvalidRequestError(
7816 "schema translate map which previously had `None` "
7817 "present as a key now no longer has it present; don't "
7818 "know how to apply schema for compiled statement. "
7819 "Please use consistent keys in successive "
7820 "schema_translate_map dictionaries."
7821 )
7822 effective_schema = name
7823
7824 if not effective_schema:
7825 effective_schema = self.dialect.default_schema_name
7826 if not effective_schema:
7827 # TODO: no coverage here
7828 raise exc.CompileError(
7829 "Dialect has no default schema name; can't "
7830 "use None as dynamic schema target."
7831 )
7832 return self.quote_schema(effective_schema)
7833
7834 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7835
7836 def _escape_identifier(self, value: str) -> str:
7837 """Escape an identifier.
7838
7839 Subclasses should override this to provide database-dependent
7840 escaping behavior.
7841 """
7842
7843 value = value.replace(self.escape_quote, self.escape_to_quote)
7844 if self._double_percents:
7845 value = value.replace("%", "%%")
7846 return value
7847
7848 def _unescape_identifier(self, value: str) -> str:
7849 """Canonicalize an escaped identifier.
7850
7851 Subclasses should override this to provide database-dependent
7852 unescaping behavior that reverses _escape_identifier.
7853 """
7854
7855 return value.replace(self.escape_to_quote, self.escape_quote)
7856
7857 def validate_sql_phrase(self, element, reg):
7858 """keyword sequence filter.
7859
7860 a filter for elements that are intended to represent keyword sequences,
7861 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7862 should be present.
7863
7864 """
7865
7866 if element is not None and not reg.match(element):
7867 raise exc.CompileError(
7868 "Unexpected SQL phrase: %r (matching against %r)"
7869 % (element, reg.pattern)
7870 )
7871 return element
7872
7873 def quote_identifier(self, value: str) -> str:
7874 """Quote an identifier.
7875
7876 Subclasses should override this to provide database-dependent
7877 quoting behavior.
7878 """
7879
7880 return (
7881 self.initial_quote
7882 + self._escape_identifier(value)
7883 + self.final_quote
7884 )
7885
7886 def _requires_quotes(self, value: str) -> bool:
7887 """Return True if the given identifier requires quoting."""
7888 lc_value = value.lower()
7889 return (
7890 lc_value in self.reserved_words
7891 or value[0] in self.illegal_initial_characters
7892 or not self.legal_characters.match(str(value))
7893 or (lc_value != value)
7894 )
7895
7896 def _requires_quotes_illegal_chars(self, value):
7897 """Return True if the given identifier requires quoting, but
7898 not taking case convention into account."""
7899 return not self.legal_characters.match(str(value))
7900
7901 def quote_schema(self, schema: str) -> str:
7902 """Conditionally quote a schema name.
7903
7904
7905 The name is quoted if it is a reserved word, contains quote-necessary
7906 characters, or is an instance of :class:`.quoted_name` which includes
7907 ``quote`` set to ``True``.
7908
7909 Subclasses can override this to provide database-dependent
7910 quoting behavior for schema names.
7911
7912 :param schema: string schema name
7913 """
7914 return self.quote(schema)
7915
7916 def quote(self, ident: str) -> str:
7917 """Conditionally quote an identifier.
7918
7919 The identifier is quoted if it is a reserved word, contains
7920 quote-necessary characters, or is an instance of
7921 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7922
7923 Subclasses can override this to provide database-dependent
7924 quoting behavior for identifier names.
7925
7926 :param ident: string identifier
7927 """
7928 force = getattr(ident, "quote", None)
7929
7930 if force is None:
7931 if ident in self._strings:
7932 return self._strings[ident]
7933 else:
7934 if self._requires_quotes(ident):
7935 self._strings[ident] = self.quote_identifier(ident)
7936 else:
7937 self._strings[ident] = ident
7938 return self._strings[ident]
7939 elif force:
7940 return self.quote_identifier(ident)
7941 else:
7942 return ident
7943
7944 def format_collation(self, collation_name):
7945 if self.quote_case_sensitive_collations:
7946 return self.quote(collation_name)
7947 else:
7948 return collation_name
7949
7950 def format_sequence(
7951 self, sequence: schema.Sequence, use_schema: bool = True
7952 ) -> str:
7953 name = self.quote(sequence.name)
7954
7955 effective_schema = self.schema_for_object(sequence)
7956
7957 if (
7958 not self.omit_schema
7959 and use_schema
7960 and effective_schema is not None
7961 ):
7962 name = self.quote_schema(effective_schema) + "." + name
7963 return name
7964
7965 def format_label(
7966 self, label: Label[Any], name: Optional[str] = None
7967 ) -> str:
7968 return self.quote(name or label.name)
7969
7970 def format_alias(
7971 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7972 ) -> str:
7973 if name is None:
7974 assert alias is not None
7975 return self.quote(alias.name)
7976 else:
7977 return self.quote(name)
7978
7979 def format_savepoint(self, savepoint, name=None):
7980 # Running the savepoint name through quoting is unnecessary
7981 # for all known dialects. This is here to support potential
7982 # third party use cases
7983 ident = name or savepoint.ident
7984 if self._requires_quotes(ident):
7985 ident = self.quote_identifier(ident)
7986 return ident
7987
7988 @util.preload_module("sqlalchemy.sql.naming")
7989 def format_constraint(
7990 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
7991 ) -> Optional[str]:
7992 naming = util.preloaded.sql_naming
7993
7994 if constraint.name is _NONE_NAME:
7995 name = naming._constraint_name_for_table(
7996 constraint, constraint.table
7997 )
7998
7999 if name is None:
8000 return None
8001 else:
8002 name = constraint.name
8003
8004 assert name is not None
8005 if constraint.__visit_name__ == "index":
8006 return self.truncate_and_render_index_name(
8007 name, _alembic_quote=_alembic_quote
8008 )
8009 else:
8010 return self.truncate_and_render_constraint_name(
8011 name, _alembic_quote=_alembic_quote
8012 )
8013
8014 def truncate_and_render_index_name(
8015 self, name: str, _alembic_quote: bool = True
8016 ) -> str:
8017 # calculate these at format time so that ad-hoc changes
8018 # to dialect.max_identifier_length etc. can be reflected
8019 # as IdentifierPreparer is long lived
8020 max_ = (
8021 self.dialect.max_index_name_length
8022 or self.dialect.max_identifier_length
8023 )
8024 return self._truncate_and_render_maxlen_name(
8025 name, max_, _alembic_quote
8026 )
8027
8028 def truncate_and_render_constraint_name(
8029 self, name: str, _alembic_quote: bool = True
8030 ) -> str:
8031 # calculate these at format time so that ad-hoc changes
8032 # to dialect.max_identifier_length etc. can be reflected
8033 # as IdentifierPreparer is long lived
8034 max_ = (
8035 self.dialect.max_constraint_name_length
8036 or self.dialect.max_identifier_length
8037 )
8038 return self._truncate_and_render_maxlen_name(
8039 name, max_, _alembic_quote
8040 )
8041
8042 def _truncate_and_render_maxlen_name(
8043 self, name: str, max_: int, _alembic_quote: bool
8044 ) -> str:
8045 if isinstance(name, elements._truncated_label):
8046 if len(name) > max_:
8047 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8048 else:
8049 self.dialect.validate_identifier(name)
8050
8051 if not _alembic_quote:
8052 return name
8053 else:
8054 return self.quote(name)
8055
8056 def format_index(self, index: Index) -> str:
8057 name = self.format_constraint(index)
8058 assert name is not None
8059 return name
8060
8061 def format_table(
8062 self,
8063 table: FromClause,
8064 use_schema: bool = True,
8065 name: Optional[str] = None,
8066 ) -> str:
8067 """Prepare a quoted table and schema name."""
8068 if name is None:
8069 if TYPE_CHECKING:
8070 assert isinstance(table, NamedFromClause)
8071 name = table.name
8072
8073 result = self.quote(name)
8074
8075 effective_schema = self.schema_for_object(table)
8076
8077 if not self.omit_schema and use_schema and effective_schema:
8078 result = self.quote_schema(effective_schema) + "." + result
8079 return result
8080
8081 def format_schema(self, name):
8082 """Prepare a quoted schema name."""
8083
8084 return self.quote(name)
8085
8086 def format_label_name(
8087 self,
8088 name,
8089 anon_map=None,
8090 ):
8091 """Prepare a quoted column name."""
8092
8093 if anon_map is not None and isinstance(
8094 name, elements._truncated_label
8095 ):
8096 name = name.apply_map(anon_map)
8097
8098 return self.quote(name)
8099
8100 def format_column(
8101 self,
8102 column: ColumnElement[Any],
8103 use_table: bool = False,
8104 name: Optional[str] = None,
8105 table_name: Optional[str] = None,
8106 use_schema: bool = False,
8107 anon_map: Optional[Mapping[str, Any]] = None,
8108 ) -> str:
8109 """Prepare a quoted column name."""
8110
8111 if name is None:
8112 name = column.name
8113 assert name is not None
8114
8115 if anon_map is not None and isinstance(
8116 name, elements._truncated_label
8117 ):
8118 name = name.apply_map(anon_map)
8119
8120 if not getattr(column, "is_literal", False):
8121 if use_table:
8122 return (
8123 self.format_table(
8124 column.table, use_schema=use_schema, name=table_name
8125 )
8126 + "."
8127 + self.quote(name)
8128 )
8129 else:
8130 return self.quote(name)
8131 else:
8132 # literal textual elements get stuck into ColumnClause a lot,
8133 # which shouldn't get quoted
8134
8135 if use_table:
8136 return (
8137 self.format_table(
8138 column.table, use_schema=use_schema, name=table_name
8139 )
8140 + "."
8141 + name
8142 )
8143 else:
8144 return name
8145
8146 def format_table_seq(self, table, use_schema=True):
8147 """Format table name and schema as a tuple."""
8148
8149 # Dialects with more levels in their fully qualified references
8150 # ('database', 'owner', etc.) could override this and return
8151 # a longer sequence.
8152
8153 effective_schema = self.schema_for_object(table)
8154
8155 if not self.omit_schema and use_schema and effective_schema:
8156 return (
8157 self.quote_schema(effective_schema),
8158 self.format_table(table, use_schema=False),
8159 )
8160 else:
8161 return (self.format_table(table, use_schema=False),)
8162
8163 @util.memoized_property
8164 def _r_identifiers(self):
8165 initial, final, escaped_final = (
8166 re.escape(s)
8167 for s in (
8168 self.initial_quote,
8169 self.final_quote,
8170 self._escape_identifier(self.final_quote),
8171 )
8172 )
8173 r = re.compile(
8174 r"(?:"
8175 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8176 r"|([^\.]+))(?=\.|$))+"
8177 % {"initial": initial, "final": final, "escaped": escaped_final}
8178 )
8179 return r
8180
8181 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8182 """Unpack 'schema.table.column'-like strings into components."""
8183
8184 r = self._r_identifiers
8185 return [
8186 self._unescape_identifier(i)
8187 for i in [a or b for a, b in r.findall(identifiers)]
8188 ]