1# sql/compiler.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import Final
44from typing import FrozenSet
45from typing import Iterable
46from typing import Iterator
47from typing import List
48from typing import Literal
49from typing import Mapping
50from typing import MutableMapping
51from typing import NamedTuple
52from typing import NoReturn
53from typing import Optional
54from typing import Pattern
55from typing import Protocol
56from typing import Sequence
57from typing import Set
58from typing import Tuple
59from typing import Type
60from typing import TYPE_CHECKING
61from typing import TypedDict
62from typing import Union
63
64from . import base
65from . import coercions
66from . import crud
67from . import elements
68from . import functions
69from . import operators
70from . import roles
71from . import schema
72from . import selectable
73from . import sqltypes
74from . import util as sql_util
75from ._typing import is_column_element
76from ._typing import is_dml
77from .base import _de_clone
78from .base import _from_objects
79from .base import _NONE_NAME
80from .base import _SentinelDefaultCharacterization
81from .base import NO_ARG
82from .elements import quoted_name
83from .sqltypes import TupleType
84from .visitors import prefix_anon_map
85from .. import exc
86from .. import util
87from ..util import FastIntFlag
88from ..util.typing import Self
89from ..util.typing import TupleAny
90from ..util.typing import Unpack
91
92if typing.TYPE_CHECKING:
93 from .annotation import _AnnotationDict
94 from .base import _AmbiguousTableNameMap
95 from .base import CompileState
96 from .base import Executable
97 from .base import ExecutableStatement
98 from .cache_key import CacheKey
99 from .ddl import _TableViaSelect
100 from .ddl import CreateTableAs
101 from .ddl import CreateView
102 from .ddl import ExecutableDDLElement
103 from .dml import Delete
104 from .dml import Insert
105 from .dml import Update
106 from .dml import UpdateBase
107 from .dml import UpdateDMLState
108 from .dml import ValuesBase
109 from .elements import _truncated_label
110 from .elements import BinaryExpression
111 from .elements import BindParameter
112 from .elements import ClauseElement
113 from .elements import ColumnClause
114 from .elements import ColumnElement
115 from .elements import False_
116 from .elements import Label
117 from .elements import Null
118 from .elements import True_
119 from .functions import Function
120 from .schema import CheckConstraint
121 from .schema import Column
122 from .schema import Constraint
123 from .schema import ForeignKeyConstraint
124 from .schema import IdentityOptions
125 from .schema import Index
126 from .schema import PrimaryKeyConstraint
127 from .schema import Table
128 from .schema import UniqueConstraint
129 from .selectable import _ColumnsClauseElement
130 from .selectable import AliasedReturnsRows
131 from .selectable import CompoundSelectState
132 from .selectable import CTE
133 from .selectable import FromClause
134 from .selectable import NamedFromClause
135 from .selectable import ReturnsRows
136 from .selectable import Select
137 from .selectable import SelectState
138 from .type_api import _BindProcessorType
139 from .type_api import TypeDecorator
140 from .type_api import TypeEngine
141 from .type_api import UserDefinedType
142 from .visitors import Visitable
143 from ..engine.cursor import CursorResultMetaData
144 from ..engine.interfaces import _CoreSingleExecuteParams
145 from ..engine.interfaces import _DBAPIAnyExecuteParams
146 from ..engine.interfaces import _DBAPIMultiExecuteParams
147 from ..engine.interfaces import _DBAPISingleExecuteParams
148 from ..engine.interfaces import _ExecuteOptions
149 from ..engine.interfaces import _GenericSetInputSizesType
150 from ..engine.interfaces import _MutableCoreSingleExecuteParams
151 from ..engine.interfaces import Dialect
152 from ..engine.interfaces import SchemaTranslateMapType
153
154
155_FromHintsType = Dict["FromClause", str]
156
157RESERVED_WORDS = {
158 "all",
159 "analyse",
160 "analyze",
161 "and",
162 "any",
163 "array",
164 "as",
165 "asc",
166 "asymmetric",
167 "authorization",
168 "between",
169 "binary",
170 "both",
171 "case",
172 "cast",
173 "check",
174 "collate",
175 "column",
176 "constraint",
177 "create",
178 "cross",
179 "current_date",
180 "current_role",
181 "current_time",
182 "current_timestamp",
183 "current_user",
184 "default",
185 "deferrable",
186 "desc",
187 "distinct",
188 "do",
189 "else",
190 "end",
191 "except",
192 "false",
193 "for",
194 "foreign",
195 "freeze",
196 "from",
197 "full",
198 "grant",
199 "group",
200 "having",
201 "ilike",
202 "in",
203 "initially",
204 "inner",
205 "intersect",
206 "into",
207 "is",
208 "isnull",
209 "join",
210 "leading",
211 "left",
212 "like",
213 "limit",
214 "localtime",
215 "localtimestamp",
216 "natural",
217 "new",
218 "not",
219 "notnull",
220 "null",
221 "off",
222 "offset",
223 "old",
224 "on",
225 "only",
226 "or",
227 "order",
228 "outer",
229 "overlaps",
230 "placing",
231 "primary",
232 "references",
233 "right",
234 "select",
235 "session_user",
236 "set",
237 "similar",
238 "some",
239 "symmetric",
240 "table",
241 "then",
242 "to",
243 "trailing",
244 "true",
245 "union",
246 "unique",
247 "user",
248 "using",
249 "verbose",
250 "when",
251 "where",
252}
253
254LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
255LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
256ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
257
258FK_ON_DELETE = re.compile(
259 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
260)
261FK_ON_UPDATE = re.compile(
262 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
263)
264FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
265BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
266BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
267
268_pyformat_template = "%%(%(name)s)s"
269BIND_TEMPLATES = {
270 "pyformat": _pyformat_template,
271 "qmark": "?",
272 "format": "%%s",
273 "numeric": ":[_POSITION]",
274 "numeric_dollar": "$[_POSITION]",
275 "named": ":%(name)s",
276}
277
278
279OPERATORS = {
280 # binary
281 operators.and_: " AND ",
282 operators.or_: " OR ",
283 operators.add: " + ",
284 operators.mul: " * ",
285 operators.sub: " - ",
286 operators.mod: " % ",
287 operators.neg: "-",
288 operators.lt: " < ",
289 operators.le: " <= ",
290 operators.ne: " != ",
291 operators.gt: " > ",
292 operators.ge: " >= ",
293 operators.eq: " = ",
294 operators.is_distinct_from: " IS DISTINCT FROM ",
295 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
296 operators.concat_op: " || ",
297 operators.match_op: " MATCH ",
298 operators.not_match_op: " NOT MATCH ",
299 operators.in_op: " IN ",
300 operators.not_in_op: " NOT IN ",
301 operators.comma_op: ", ",
302 operators.from_: " FROM ",
303 operators.as_: " AS ",
304 operators.is_: " IS ",
305 operators.is_not: " IS NOT ",
306 operators.collate: " COLLATE ",
307 # unary
308 operators.exists: "EXISTS ",
309 operators.distinct_op: "DISTINCT ",
310 operators.inv: "NOT ",
311 operators.any_op: "ANY ",
312 operators.all_op: "ALL ",
313 # modifiers
314 operators.desc_op: " DESC",
315 operators.asc_op: " ASC",
316 operators.nulls_first_op: " NULLS FIRST",
317 operators.nulls_last_op: " NULLS LAST",
318 # bitwise
319 operators.bitwise_xor_op: " ^ ",
320 operators.bitwise_or_op: " | ",
321 operators.bitwise_and_op: " & ",
322 operators.bitwise_not_op: "~",
323 operators.bitwise_lshift_op: " << ",
324 operators.bitwise_rshift_op: " >> ",
325}
326
327FUNCTIONS: Dict[Type[Function[Any]], str] = {
328 functions.coalesce: "coalesce",
329 functions.current_date: "CURRENT_DATE",
330 functions.current_time: "CURRENT_TIME",
331 functions.current_timestamp: "CURRENT_TIMESTAMP",
332 functions.current_user: "CURRENT_USER",
333 functions.localtime: "LOCALTIME",
334 functions.localtimestamp: "LOCALTIMESTAMP",
335 functions.random: "random",
336 functions.sysdate: "sysdate",
337 functions.session_user: "SESSION_USER",
338 functions.user: "USER",
339 functions.cube: "CUBE",
340 functions.rollup: "ROLLUP",
341 functions.grouping_sets: "GROUPING SETS",
342}
343
344
345EXTRACT_MAP = {
346 "month": "month",
347 "day": "day",
348 "year": "year",
349 "second": "second",
350 "hour": "hour",
351 "doy": "doy",
352 "minute": "minute",
353 "quarter": "quarter",
354 "dow": "dow",
355 "week": "week",
356 "epoch": "epoch",
357 "milliseconds": "milliseconds",
358 "microseconds": "microseconds",
359 "timezone_hour": "timezone_hour",
360 "timezone_minute": "timezone_minute",
361}
362
363COMPOUND_KEYWORDS = {
364 selectable._CompoundSelectKeyword.UNION: "UNION",
365 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
366 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
367 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
368 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
369 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
370}
371
372
373class ResultColumnsEntry(NamedTuple):
374 """Tracks a column expression that is expected to be represented
375 in the result rows for this statement.
376
377 This normally refers to the columns clause of a SELECT statement
378 but may also refer to a RETURNING clause, as well as for dialect-specific
379 emulations.
380
381 """
382
383 keyname: str
384 """string name that's expected in cursor.description"""
385
386 name: str
387 """column name, may be labeled"""
388
389 objects: Tuple[Any, ...]
390 """sequence of objects that should be able to locate this column
391 in a RowMapping. This is typically string names and aliases
392 as well as Column objects.
393
394 """
395
396 type: TypeEngine[Any]
397 """Datatype to be associated with this column. This is where
398 the "result processing" logic directly links the compiled statement
399 to the rows that come back from the cursor.
400
401 """
402
403
404class _ResultMapAppender(Protocol):
405 def __call__(
406 self,
407 keyname: str,
408 name: str,
409 objects: Sequence[Any],
410 type_: TypeEngine[Any],
411 ) -> None: ...
412
413
414# integer indexes into ResultColumnsEntry used by cursor.py.
415# some profiling showed integer access faster than named tuple
416RM_RENDERED_NAME: Literal[0] = 0
417RM_NAME: Literal[1] = 1
418RM_OBJECTS: Literal[2] = 2
419RM_TYPE: Literal[3] = 3
420
421
422class _BaseCompilerStackEntry(TypedDict):
423 asfrom_froms: Set[FromClause]
424 correlate_froms: Set[FromClause]
425 selectable: ReturnsRows
426
427
428class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
429 compile_state: CompileState
430 need_result_map_for_nested: bool
431 need_result_map_for_compound: bool
432 select_0: ReturnsRows
433 insert_from_select: Select[Unpack[TupleAny]]
434
435
436class ExpandedState(NamedTuple):
437 """represents state to use when producing "expanded" and
438 "post compile" bound parameters for a statement.
439
440 "expanded" parameters are parameters that are generated at
441 statement execution time to suit a number of parameters passed, the most
442 prominent example being the individual elements inside of an IN expression.
443
444 "post compile" parameters are parameters where the SQL literal value
445 will be rendered into the SQL statement at execution time, rather than
446 being passed as separate parameters to the driver.
447
448 To create an :class:`.ExpandedState` instance, use the
449 :meth:`.SQLCompiler.construct_expanded_state` method on any
450 :class:`.SQLCompiler` instance.
451
452 """
453
454 statement: str
455 """String SQL statement with parameters fully expanded"""
456
457 parameters: _CoreSingleExecuteParams
458 """Parameter dictionary with parameters fully expanded.
459
460 For a statement that uses named parameters, this dictionary will map
461 exactly to the names in the statement. For a statement that uses
462 positional parameters, the :attr:`.ExpandedState.positional_parameters`
463 will yield a tuple with the positional parameter set.
464
465 """
466
467 processors: Mapping[str, _BindProcessorType[Any]]
468 """mapping of bound value processors"""
469
470 positiontup: Optional[Sequence[str]]
471 """Sequence of string names indicating the order of positional
472 parameters"""
473
474 parameter_expansion: Mapping[str, List[str]]
475 """Mapping representing the intermediary link from original parameter
476 name to list of "expanded" parameter names, for those parameters that
477 were expanded."""
478
479 @property
480 def positional_parameters(self) -> Tuple[Any, ...]:
481 """Tuple of positional parameters, for statements that were compiled
482 using a positional paramstyle.
483
484 """
485 if self.positiontup is None:
486 raise exc.InvalidRequestError(
487 "statement does not use a positional paramstyle"
488 )
489 return tuple(self.parameters[key] for key in self.positiontup)
490
491 @property
492 def additional_parameters(self) -> _CoreSingleExecuteParams:
493 """synonym for :attr:`.ExpandedState.parameters`."""
494 return self.parameters
495
496
497class _InsertManyValues(NamedTuple):
498 """represents state to use for executing an "insertmanyvalues" statement.
499
500 The primary consumers of this object are the
501 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
502 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
503
504 .. versionadded:: 2.0
505
506 """
507
508 is_default_expr: bool
509 """if True, the statement is of the form
510 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
511
512 """
513
514 single_values_expr: str
515 """The rendered "values" clause of the INSERT statement.
516
517 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
518 The insertmanyvalues logic uses this string as a search and replace
519 target.
520
521 """
522
523 insert_crud_params: List[crud._CrudParamElementStr]
524 """List of Column / bind names etc. used while rewriting the statement"""
525
526 num_positional_params_counted: int
527 """the number of bound parameters in a single-row statement.
528
529 This count may be larger or smaller than the actual number of columns
530 targeted in the INSERT, as it accommodates for SQL expressions
531 in the values list that may have zero or more parameters embedded
532 within them.
533
534 This count is part of what's used to organize rewritten parameter lists
535 when batching.
536
537 """
538
539 sort_by_parameter_order: bool = False
540 """if the deterministic_returnined_order parameter were used on the
541 insert.
542
543 All of the attributes following this will only be used if this is True.
544
545 """
546
547 includes_upsert_behaviors: bool = False
548 """if True, we have to accommodate for upsert behaviors.
549
550 This will in some cases downgrade "insertmanyvalues" that requests
551 deterministic ordering.
552
553 """
554
555 sentinel_columns: Optional[Sequence[Column[Any]]] = None
556 """List of sentinel columns that were located.
557
558 This list is only here if the INSERT asked for
559 sort_by_parameter_order=True,
560 and dialect-appropriate sentinel columns were located.
561
562 .. versionadded:: 2.0.10
563
564 """
565
566 num_sentinel_columns: int = 0
567 """how many sentinel columns are in the above list, if any.
568
569 This is the same as
570 ``len(sentinel_columns) if sentinel_columns is not None else 0``
571
572 """
573
574 sentinel_param_keys: Optional[Sequence[str]] = None
575 """parameter str keys in each param dictionary / tuple
576 that would link to the client side "sentinel" values for that row, which
577 we can use to match up parameter sets to result rows.
578
579 This is only present if sentinel_columns is present and the INSERT
580 statement actually refers to client side values for these sentinel
581 columns.
582
583 .. versionadded:: 2.0.10
584
585 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
586 only, used against the "compiled parameteters" collection before
587 the parameters were converted by bound parameter processors
588
589 """
590
591 implicit_sentinel: bool = False
592 """if True, we have exactly one sentinel column and it uses a server side
593 value, currently has to generate an incrementing integer value.
594
595 The dialect in question would have asserted that it supports receiving
596 these values back and sorting on that value as a means of guaranteeing
597 correlation with the incoming parameter list.
598
599 .. versionadded:: 2.0.10
600
601 """
602
603 embed_values_counter: bool = False
604 """Whether to embed an incrementing integer counter in each parameter
605 set within the VALUES clause as parameters are batched over.
606
607 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
608 where a subquery is used to produce value tuples. Current support
609 includes PostgreSQL, Microsoft SQL Server.
610
611 .. versionadded:: 2.0.10
612
613 """
614
615
616class _InsertManyValuesBatch(NamedTuple):
617 """represents an individual batch SQL statement for insertmanyvalues.
618
619 This is passed through the
620 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
621 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
622 to the :class:`.Connection` within the
623 :meth:`.Connection._exec_insertmany_context` method.
624
625 .. versionadded:: 2.0.10
626
627 """
628
629 replaced_statement: str
630 replaced_parameters: _DBAPIAnyExecuteParams
631 processed_setinputsizes: Optional[_GenericSetInputSizesType]
632 batch: Sequence[_DBAPISingleExecuteParams]
633 sentinel_values: Sequence[Tuple[Any, ...]]
634 current_batch_size: int
635 batchnum: int
636 total_batches: int
637 rows_sorted: bool
638 is_downgraded: bool
639
640
641class InsertmanyvaluesSentinelOpts(FastIntFlag):
642 """bitflag enum indicating styles of PK defaults
643 which can work as implicit sentinel columns
644
645 """
646
647 NOT_SUPPORTED = 1
648 AUTOINCREMENT = 2
649 IDENTITY = 4
650 SEQUENCE = 8
651 MONOTONIC_FUNCTION = 16
652
653 ANY_AUTOINCREMENT = (
654 AUTOINCREMENT | IDENTITY | SEQUENCE | MONOTONIC_FUNCTION
655 )
656 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
657
658 USE_INSERT_FROM_SELECT = 32
659 RENDER_SELECT_COL_CASTS = 64
660
661
662class AggregateOrderByStyle(IntEnum):
663 """Describes backend database's capabilities with ORDER BY for aggregate
664 functions
665
666 .. versionadded:: 2.1
667
668 """
669
670 NONE = 0
671 """database has no ORDER BY for aggregate functions"""
672
673 INLINE = 1
674 """ORDER BY is rendered inside the function's argument list, typically as
675 the last element"""
676
677 WITHIN_GROUP = 2
678 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate
679 functions (not just the ordered set ones)"""
680
681
682class CompilerState(IntEnum):
683 COMPILING = 0
684 """statement is present, compilation phase in progress"""
685
686 STRING_APPLIED = 1
687 """statement is present, string form of the statement has been applied.
688
689 Additional processors by subclasses may still be pending.
690
691 """
692
693 NO_STATEMENT = 2
694 """compiler does not have a statement to compile, is used
695 for method access"""
696
697
698class Linting(IntEnum):
699 """represent preferences for the 'SQL linting' feature.
700
701 this feature currently includes support for flagging cartesian products
702 in SQL statements.
703
704 """
705
706 NO_LINTING = 0
707 "Disable all linting."
708
709 COLLECT_CARTESIAN_PRODUCTS = 1
710 """Collect data on FROMs and cartesian products and gather into
711 'self.from_linter'"""
712
713 WARN_LINTING = 2
714 "Emit warnings for linters that find problems"
715
716 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
717 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
718 and WARN_LINTING"""
719
720
721NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
722 Linting
723)
724
725
726class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
727 """represents current state for the "cartesian product" detection
728 feature."""
729
730 def lint(self, start=None):
731 froms = self.froms
732 if not froms:
733 return None, None
734
735 edges = set(self.edges)
736 the_rest = set(froms)
737
738 if start is not None:
739 start_with = start
740 the_rest.remove(start_with)
741 else:
742 start_with = the_rest.pop()
743
744 stack = collections.deque([start_with])
745
746 while stack and the_rest:
747 node = stack.popleft()
748 the_rest.discard(node)
749
750 # comparison of nodes in edges here is based on hash equality, as
751 # there are "annotated" elements that match the non-annotated ones.
752 # to remove the need for in-python hash() calls, use native
753 # containment routines (e.g. "node in edge", "edge.index(node)")
754 to_remove = {edge for edge in edges if node in edge}
755
756 # appendleft the node in each edge that is not
757 # the one that matched.
758 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
759 edges.difference_update(to_remove)
760
761 # FROMS left over? boom
762 if the_rest:
763 return the_rest, start_with
764 else:
765 return None, None
766
767 def warn(self, stmt_type="SELECT"):
768 the_rest, start_with = self.lint()
769
770 # FROMS left over? boom
771 if the_rest:
772 froms = the_rest
773 if froms:
774 template = (
775 "{stmt_type} statement has a cartesian product between "
776 "FROM element(s) {froms} and "
777 'FROM element "{start}". Apply join condition(s) '
778 "between each element to resolve."
779 )
780 froms_str = ", ".join(
781 f'"{self.froms[from_]}"' for from_ in froms
782 )
783 message = template.format(
784 stmt_type=stmt_type,
785 froms=froms_str,
786 start=self.froms[start_with],
787 )
788
789 util.warn(message)
790
791
792class Compiled:
793 """Represent a compiled SQL or DDL expression.
794
795 The ``__str__`` method of the ``Compiled`` object should produce
796 the actual text of the statement. ``Compiled`` objects are
797 specific to their underlying database dialect, and also may
798 or may not be specific to the columns referenced within a
799 particular set of bind parameters. In no case should the
800 ``Compiled`` object be dependent on the actual values of those
801 bind parameters, even though it may reference those values as
802 defaults.
803 """
804
805 statement: Optional[ClauseElement] = None
806 "The statement to compile."
807 string: str = ""
808 "The string representation of the ``statement``"
809
810 state: CompilerState
811 """description of the compiler's state"""
812
813 is_sql = False
814 is_ddl = False
815
816 _cached_metadata: Optional[CursorResultMetaData] = None
817
818 _result_columns: Optional[List[ResultColumnsEntry]] = None
819
820 schema_translate_map: Optional[SchemaTranslateMapType] = None
821
822 execution_options: _ExecuteOptions = util.EMPTY_DICT
823 """
824 Execution options propagated from the statement. In some cases,
825 sub-elements of the statement can modify these.
826 """
827
828 preparer: IdentifierPreparer
829
830 _annotations: _AnnotationDict = util.EMPTY_DICT
831
832 compile_state: Optional[CompileState] = None
833 """Optional :class:`.CompileState` object that maintains additional
834 state used by the compiler.
835
836 Major executable objects such as :class:`_expression.Insert`,
837 :class:`_expression.Update`, :class:`_expression.Delete`,
838 :class:`_expression.Select` will generate this
839 state when compiled in order to calculate additional information about the
840 object. For the top level object that is to be executed, the state can be
841 stored here where it can also have applicability towards result set
842 processing.
843
844 .. versionadded:: 1.4
845
846 """
847
848 dml_compile_state: Optional[CompileState] = None
849 """Optional :class:`.CompileState` assigned at the same point that
850 .isinsert, .isupdate, or .isdelete is assigned.
851
852 This will normally be the same object as .compile_state, with the
853 exception of cases like the :class:`.ORMFromStatementCompileState`
854 object.
855
856 .. versionadded:: 1.4.40
857
858 """
859
860 cache_key: Optional[CacheKey] = None
861 """The :class:`.CacheKey` that was generated ahead of creating this
862 :class:`.Compiled` object.
863
864 This is used for routines that need access to the original
865 :class:`.CacheKey` instance generated when the :class:`.Compiled`
866 instance was first cached, typically in order to reconcile
867 the original list of :class:`.BindParameter` objects with a
868 per-statement list that's generated on each call.
869
870 """
871
872 _gen_time: float
873 """Generation time of this :class:`.Compiled`, used for reporting
874 cache stats."""
875
876 def __init__(
877 self,
878 dialect: Dialect,
879 statement: Optional[ClauseElement],
880 schema_translate_map: Optional[SchemaTranslateMapType] = None,
881 render_schema_translate: bool = False,
882 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
883 ):
884 """Construct a new :class:`.Compiled` object.
885
886 :param dialect: :class:`.Dialect` to compile against.
887
888 :param statement: :class:`_expression.ClauseElement` to be compiled.
889
890 :param schema_translate_map: dictionary of schema names to be
891 translated when forming the resultant SQL
892
893 .. seealso::
894
895 :ref:`schema_translating`
896
897 :param compile_kwargs: additional kwargs that will be
898 passed to the initial call to :meth:`.Compiled.process`.
899
900
901 """
902 self.dialect = dialect
903 self.preparer = self.dialect.identifier_preparer
904 if schema_translate_map:
905 self.schema_translate_map = schema_translate_map
906 self.preparer = self.preparer._with_schema_translate(
907 schema_translate_map
908 )
909
910 if statement is not None:
911 self.state = CompilerState.COMPILING
912 self.statement = statement
913 self.can_execute = statement.supports_execution
914 self._annotations = statement._annotations
915 if self.can_execute:
916 if TYPE_CHECKING:
917 assert isinstance(statement, Executable)
918 self.execution_options = statement._execution_options
919 self.string = self.process(self.statement, **compile_kwargs)
920
921 if render_schema_translate:
922 assert schema_translate_map is not None
923 self.string = self.preparer._render_schema_translates(
924 self.string, schema_translate_map
925 )
926
927 self.state = CompilerState.STRING_APPLIED
928 else:
929 self.state = CompilerState.NO_STATEMENT
930
931 self._gen_time = perf_counter()
932
933 def __init_subclass__(cls) -> None:
934 cls._init_compiler_cls()
935 return super().__init_subclass__()
936
937 @classmethod
938 def _init_compiler_cls(cls):
939 pass
940
941 def visit_unsupported_compilation(self, element, err, **kw):
942 raise exc.UnsupportedCompilationError(self, type(element)) from err
943
944 @property
945 def sql_compiler(self) -> SQLCompiler:
946 """Return a Compiled that is capable of processing SQL expressions.
947
948 If this compiler is one, it would likely just return 'self'.
949
950 """
951
952 raise NotImplementedError()
953
954 def process(self, obj: Visitable, **kwargs: Any) -> str:
955 return obj._compiler_dispatch(self, **kwargs)
956
957 def __str__(self) -> str:
958 """Return the string text of the generated SQL or DDL."""
959
960 if self.state is CompilerState.STRING_APPLIED:
961 return self.string
962 else:
963 return ""
964
965 def construct_params(
966 self,
967 params: Optional[_CoreSingleExecuteParams] = None,
968 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
969 escape_names: bool = True,
970 ) -> Optional[_MutableCoreSingleExecuteParams]:
971 """Return the bind params for this compiled object.
972
973 :param params: a dict of string/object pairs whose values will
974 override bind values compiled in to the
975 statement.
976 """
977
978 raise NotImplementedError()
979
980 @property
981 def params(self):
982 """Return the bind params for this compiled object."""
983 return self.construct_params()
984
985
986class TypeCompiler(util.EnsureKWArg):
987 """Produces DDL specification for TypeEngine objects."""
988
989 ensure_kwarg = r"visit_\w+"
990
991 def __init__(self, dialect: Dialect):
992 self.dialect = dialect
993
994 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
995 if (
996 type_._variant_mapping
997 and self.dialect.name in type_._variant_mapping
998 ):
999 type_ = type_._variant_mapping[self.dialect.name]
1000 return type_._compiler_dispatch(self, **kw)
1001
1002 def visit_unsupported_compilation(
1003 self, element: Any, err: Exception, **kw: Any
1004 ) -> NoReturn:
1005 raise exc.UnsupportedCompilationError(self, element) from err
1006
1007
1008# this was a Visitable, but to allow accurate detection of
1009# column elements this is actually a column element
1010class _CompileLabel(
1011 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1012):
1013 """lightweight label object which acts as an expression.Label."""
1014
1015 __visit_name__ = "label"
1016 __slots__ = "element", "name", "_alt_names"
1017
1018 def __init__(self, col, name, alt_names=()):
1019 self.element = col
1020 self.name = name
1021 self._alt_names = (col,) + alt_names
1022
1023 @property
1024 def proxy_set(self):
1025 return self.element.proxy_set
1026
1027 @property
1028 def type(self):
1029 return self.element.type
1030
1031 def self_group(self, **kw):
1032 return self
1033
1034
1035class aggregate_orderby_inline(
1036 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1037):
1038 """produce ORDER BY inside of function argument lists"""
1039
1040 __visit_name__ = "aggregate_orderby_inline"
1041 __slots__ = "element", "aggregate_order_by"
1042
1043 def __init__(self, element, orderby):
1044 self.element = element
1045 self.aggregate_order_by = orderby
1046
1047 def __iter__(self):
1048 return iter(self.element)
1049
1050 @property
1051 def proxy_set(self):
1052 return self.element.proxy_set
1053
1054 @property
1055 def type(self):
1056 return self.element.type
1057
1058 def self_group(self, **kw):
1059 return self
1060
1061 def _with_binary_element_type(self, type_):
1062 return aggregate_orderby_inline(
1063 self.element._with_binary_element_type(type_),
1064 self.aggregate_order_by,
1065 )
1066
1067
1068class ilike_case_insensitive(
1069 roles.BinaryElementRole[Any], elements.CompilerColumnElement
1070):
1071 """produce a wrapping element for a case-insensitive portion of
1072 an ILIKE construct.
1073
1074 The construct usually renders the ``lower()`` function, but on
1075 PostgreSQL will pass silently with the assumption that "ILIKE"
1076 is being used.
1077
1078 .. versionadded:: 2.0
1079
1080 """
1081
1082 __visit_name__ = "ilike_case_insensitive_operand"
1083 __slots__ = "element", "comparator"
1084
1085 def __init__(self, element):
1086 self.element = element
1087 self.comparator = element.comparator
1088
1089 @property
1090 def proxy_set(self):
1091 return self.element.proxy_set
1092
1093 @property
1094 def type(self):
1095 return self.element.type
1096
1097 def self_group(self, **kw):
1098 return self
1099
1100 def _with_binary_element_type(self, type_):
1101 return ilike_case_insensitive(
1102 self.element._with_binary_element_type(type_)
1103 )
1104
1105
1106class SQLCompiler(Compiled):
1107 """Default implementation of :class:`.Compiled`.
1108
1109 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1110
1111 """
1112
1113 extract_map = EXTRACT_MAP
1114
1115 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1116 util.immutabledict(
1117 {
1118 "%": "P",
1119 "(": "A",
1120 ")": "Z",
1121 ":": "C",
1122 ".": "_",
1123 "[": "_",
1124 "]": "_",
1125 " ": "_",
1126 }
1127 )
1128 )
1129 """A mapping (e.g. dict or similar) containing a lookup of
1130 characters keyed to replacement characters which will be applied to all
1131 'bind names' used in SQL statements as a form of 'escaping'; the given
1132 characters are replaced entirely with the 'replacement' character when
1133 rendered in the SQL statement, and a similar translation is performed
1134 on the incoming names used in parameter dictionaries passed to methods
1135 like :meth:`_engine.Connection.execute`.
1136
1137 This allows bound parameter names used in :func:`_sql.bindparam` and
1138 other constructs to have any arbitrary characters present without any
1139 concern for characters that aren't allowed at all on the target database.
1140
1141 Third party dialects can establish their own dictionary here to replace the
1142 default mapping, which will ensure that the particular characters in the
1143 mapping will never appear in a bound parameter name.
1144
1145 The dictionary is evaluated at **class creation time**, so cannot be
1146 modified at runtime; it must be present on the class when the class
1147 is first declared.
1148
1149 Note that for dialects that have additional bound parameter rules such
1150 as additional restrictions on leading characters, the
1151 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1152 See the cx_Oracle compiler for an example of this.
1153
1154 .. versionadded:: 2.0.0rc1
1155
1156 """
1157
1158 _bind_translate_re: ClassVar[Pattern[str]]
1159 _bind_translate_chars: ClassVar[Mapping[str, str]]
1160
1161 is_sql = True
1162
1163 compound_keywords = COMPOUND_KEYWORDS
1164
1165 isdelete: bool = False
1166 isinsert: bool = False
1167 isupdate: bool = False
1168 """class-level defaults which can be set at the instance
1169 level to define if this Compiled instance represents
1170 INSERT/UPDATE/DELETE
1171 """
1172
1173 postfetch: Optional[List[Column[Any]]]
1174 """list of columns that can be post-fetched after INSERT or UPDATE to
1175 receive server-updated values"""
1176
1177 insert_prefetch: Sequence[Column[Any]] = ()
1178 """list of columns for which default values should be evaluated before
1179 an INSERT takes place"""
1180
1181 update_prefetch: Sequence[Column[Any]] = ()
1182 """list of columns for which onupdate default values should be evaluated
1183 before an UPDATE takes place"""
1184
1185 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1186 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1187 statement, used to receive newly generated values of columns.
1188
1189 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1190 ``returning`` collection, which was not a generalized RETURNING
1191 collection and instead was in fact specific to the "implicit returning"
1192 feature.
1193
1194 """
1195
1196 isplaintext: bool = False
1197
1198 binds: Dict[str, BindParameter[Any]]
1199 """a dictionary of bind parameter keys to BindParameter instances."""
1200
1201 bind_names: Dict[BindParameter[Any], str]
1202 """a dictionary of BindParameter instances to "compiled" names
1203 that are actually present in the generated SQL"""
1204
1205 stack: List[_CompilerStackEntry]
1206 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1207 tracked in this stack using an entry format."""
1208
1209 returning_precedes_values: bool = False
1210 """set to True classwide to generate RETURNING
1211 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1212 """
1213
1214 render_table_with_column_in_update_from: bool = False
1215 """set to True classwide to indicate the SET clause
1216 in a multi-table UPDATE statement should qualify
1217 columns with the table name (i.e. MySQL only)
1218 """
1219
1220 ansi_bind_rules: bool = False
1221 """SQL 92 doesn't allow bind parameters to be used
1222 in the columns clause of a SELECT, nor does it allow
1223 ambiguous expressions like "? = ?". A compiler
1224 subclass can set this flag to False if the target
1225 driver/DB enforces this
1226 """
1227
1228 bindtemplate: str
1229 """template to render bound parameters based on paramstyle."""
1230
1231 compilation_bindtemplate: str
1232 """template used by compiler to render parameters before positional
1233 paramstyle application"""
1234
1235 _numeric_binds_identifier_char: str
1236 """Character that's used to as the identifier of a numerical bind param.
1237 For example if this char is set to ``$``, numerical binds will be rendered
1238 in the form ``$1, $2, $3``.
1239 """
1240
1241 _result_columns: List[ResultColumnsEntry]
1242 """relates label names in the final SQL to a tuple of local
1243 column/label name, ColumnElement object (if any) and
1244 TypeEngine. CursorResult uses this for type processing and
1245 column targeting"""
1246
1247 _textual_ordered_columns: bool = False
1248 """tell the result object that the column names as rendered are important,
1249 but they are also "ordered" vs. what is in the compiled object here.
1250
1251 As of 1.4.42 this condition is only present when the statement is a
1252 TextualSelect, e.g. text("....").columns(...), where it is required
1253 that the columns are considered positionally and not by name.
1254
1255 """
1256
1257 _ad_hoc_textual: bool = False
1258 """tell the result that we encountered text() or '*' constructs in the
1259 middle of the result columns, but we also have compiled columns, so
1260 if the number of columns in cursor.description does not match how many
1261 expressions we have, that means we can't rely on positional at all and
1262 should match on name.
1263
1264 """
1265
1266 _ordered_columns: bool = True
1267 """
1268 if False, means we can't be sure the list of entries
1269 in _result_columns is actually the rendered order. Usually
1270 True unless using an unordered TextualSelect.
1271 """
1272
1273 _loose_column_name_matching: bool = False
1274 """tell the result object that the SQL statement is textual, wants to match
1275 up to Column objects, and may be using the ._tq_label in the SELECT rather
1276 than the base name.
1277
1278 """
1279
1280 _numeric_binds: bool = False
1281 """
1282 True if paramstyle is "numeric". This paramstyle is trickier than
1283 all the others.
1284
1285 """
1286
1287 _render_postcompile: bool = False
1288 """
1289 whether to render out POSTCOMPILE params during the compile phase.
1290
1291 This attribute is used only for end-user invocation of stmt.compile();
1292 it's never used for actual statement execution, where instead the
1293 dialect internals access and render the internal postcompile structure
1294 directly.
1295
1296 """
1297
1298 _post_compile_expanded_state: Optional[ExpandedState] = None
1299 """When render_postcompile is used, the ``ExpandedState`` used to create
1300 the "expanded" SQL is assigned here, and then used by the ``.params``
1301 accessor and ``.construct_params()`` methods for their return values.
1302
1303 .. versionadded:: 2.0.0rc1
1304
1305 """
1306
1307 _pre_expanded_string: Optional[str] = None
1308 """Stores the original string SQL before 'post_compile' is applied,
1309 for cases where 'post_compile' were used.
1310
1311 """
1312
1313 _pre_expanded_positiontup: Optional[List[str]] = None
1314
1315 _insertmanyvalues: Optional[_InsertManyValues] = None
1316
1317 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1318
1319 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1320 """bindparameter objects that are rendered as literal values at statement
1321 execution time.
1322
1323 """
1324
1325 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1326 """bindparameter objects that are rendered as bound parameter placeholders
1327 at statement execution time.
1328
1329 """
1330
1331 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1332 """Late escaping of bound parameter names that has to be converted
1333 to the original name when looking in the parameter dictionary.
1334
1335 """
1336
1337 has_out_parameters = False
1338 """if True, there are bindparam() objects that have the isoutparam
1339 flag set."""
1340
1341 postfetch_lastrowid = False
1342 """if True, and this in insert, use cursor.lastrowid to populate
1343 result.inserted_primary_key. """
1344
1345 _cache_key_bind_match: Optional[
1346 Tuple[
1347 Dict[
1348 BindParameter[Any],
1349 List[BindParameter[Any]],
1350 ],
1351 Dict[
1352 str,
1353 BindParameter[Any],
1354 ],
1355 ]
1356 ] = None
1357 """a mapping that will relate the BindParameter object we compile
1358 to those that are part of the extracted collection of parameters
1359 in the cache key, if we were given a cache key.
1360
1361 """
1362
1363 positiontup: Optional[List[str]] = None
1364 """for a compiled construct that uses a positional paramstyle, will be
1365 a sequence of strings, indicating the names of bound parameters in order.
1366
1367 This is used in order to render bound parameters in their correct order,
1368 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1369 render parameters.
1370
1371 This sequence always contains the unescaped name of the parameters.
1372
1373 .. seealso::
1374
1375 :ref:`faq_sql_expression_string` - includes a usage example for
1376 debugging use cases.
1377
1378 """
1379 _values_bindparam: Optional[List[str]] = None
1380
1381 _visited_bindparam: Optional[List[str]] = None
1382
1383 inline: bool = False
1384
1385 ctes: Optional[MutableMapping[CTE, str]]
1386
1387 # Detect same CTE references - Dict[(level, name), cte]
1388 # Level is required for supporting nesting
1389 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1390
1391 # To retrieve key/level in ctes_by_level_name -
1392 # Dict[cte_reference, (level, cte_name, cte_opts)]
1393 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1394
1395 ctes_recursive: bool
1396
1397 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1398 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1399 _positional_pattern = re.compile(
1400 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1401 )
1402 _collect_params: Final[bool]
1403 _collected_params: util.immutabledict[str, Any]
1404
1405 @classmethod
1406 def _init_compiler_cls(cls):
1407 cls._init_bind_translate()
1408
1409 @classmethod
1410 def _init_bind_translate(cls):
1411 reg = re.escape("".join(cls.bindname_escape_characters))
1412 cls._bind_translate_re = re.compile(f"[{reg}]")
1413 cls._bind_translate_chars = cls.bindname_escape_characters
1414
1415 def __init__(
1416 self,
1417 dialect: Dialect,
1418 statement: Optional[ClauseElement],
1419 cache_key: Optional[CacheKey] = None,
1420 column_keys: Optional[Sequence[str]] = None,
1421 for_executemany: bool = False,
1422 linting: Linting = NO_LINTING,
1423 _supporting_against: Optional[SQLCompiler] = None,
1424 **kwargs: Any,
1425 ):
1426 """Construct a new :class:`.SQLCompiler` object.
1427
1428 :param dialect: :class:`.Dialect` to be used
1429
1430 :param statement: :class:`_expression.ClauseElement` to be compiled
1431
1432 :param column_keys: a list of column names to be compiled into an
1433 INSERT or UPDATE statement.
1434
1435 :param for_executemany: whether INSERT / UPDATE statements should
1436 expect that they are to be invoked in an "executemany" style,
1437 which may impact how the statement will be expected to return the
1438 values of defaults and autoincrement / sequences and similar.
1439 Depending on the backend and driver in use, support for retrieving
1440 these values may be disabled which means SQL expressions may
1441 be rendered inline, RETURNING may not be rendered, etc.
1442
1443 :param kwargs: additional keyword arguments to be consumed by the
1444 superclass.
1445
1446 """
1447 self.column_keys = column_keys
1448
1449 self.cache_key = cache_key
1450
1451 if cache_key:
1452 cksm = {b.key: b for b in cache_key[1]}
1453 ckbm = {b: [b] for b in cache_key[1]}
1454 self._cache_key_bind_match = (ckbm, cksm)
1455
1456 # compile INSERT/UPDATE defaults/sequences to expect executemany
1457 # style execution, which may mean no pre-execute of defaults,
1458 # or no RETURNING
1459 self.for_executemany = for_executemany
1460
1461 self.linting = linting
1462
1463 # a dictionary of bind parameter keys to BindParameter
1464 # instances.
1465 self.binds = {}
1466
1467 # a dictionary of BindParameter instances to "compiled" names
1468 # that are actually present in the generated SQL
1469 self.bind_names = util.column_dict()
1470
1471 # stack which keeps track of nested SELECT statements
1472 self.stack = []
1473
1474 self._result_columns = []
1475
1476 # true if the paramstyle is positional
1477 self.positional = dialect.positional
1478 if self.positional:
1479 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1480 if nb:
1481 self._numeric_binds_identifier_char = (
1482 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1483 )
1484
1485 self.compilation_bindtemplate = _pyformat_template
1486 else:
1487 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1488
1489 self.ctes = None
1490
1491 self.label_length = (
1492 dialect.label_length or dialect.max_identifier_length
1493 )
1494
1495 # a map which tracks "anonymous" identifiers that are created on
1496 # the fly here
1497 self.anon_map = prefix_anon_map()
1498
1499 # a map which tracks "truncated" names based on
1500 # dialect.label_length or dialect.max_identifier_length
1501 self.truncated_names: Dict[Tuple[str, str], str] = {}
1502 self._truncated_counters: Dict[str, int] = {}
1503 if not cache_key:
1504 self._collect_params = True
1505 self._collected_params = util.EMPTY_DICT
1506 else:
1507 self._collect_params = False # type: ignore[misc]
1508
1509 Compiled.__init__(self, dialect, statement, **kwargs)
1510
1511 if self.isinsert or self.isupdate or self.isdelete:
1512 if TYPE_CHECKING:
1513 assert isinstance(statement, UpdateBase)
1514
1515 if self.isinsert or self.isupdate:
1516 if TYPE_CHECKING:
1517 assert isinstance(statement, ValuesBase)
1518 if statement._inline:
1519 self.inline = True
1520 elif self.for_executemany and (
1521 not self.isinsert
1522 or (
1523 self.dialect.insert_executemany_returning
1524 and statement._return_defaults
1525 )
1526 ):
1527 self.inline = True
1528
1529 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1530
1531 if _supporting_against:
1532 self.__dict__.update(
1533 {
1534 k: v
1535 for k, v in _supporting_against.__dict__.items()
1536 if k
1537 not in {
1538 "state",
1539 "dialect",
1540 "preparer",
1541 "positional",
1542 "_numeric_binds",
1543 "compilation_bindtemplate",
1544 "bindtemplate",
1545 }
1546 }
1547 )
1548
1549 if self.state is CompilerState.STRING_APPLIED:
1550 if self.positional:
1551 if self._numeric_binds:
1552 self._process_numeric()
1553 else:
1554 self._process_positional()
1555
1556 if self._render_postcompile:
1557 parameters = self.construct_params(
1558 escape_names=False,
1559 _no_postcompile=True,
1560 )
1561
1562 self._process_parameters_for_postcompile(
1563 parameters, _populate_self=True
1564 )
1565
1566 @property
1567 def insert_single_values_expr(self) -> Optional[str]:
1568 """When an INSERT is compiled with a single set of parameters inside
1569 a VALUES expression, the string is assigned here, where it can be
1570 used for insert batching schemes to rewrite the VALUES expression.
1571
1572 .. versionchanged:: 2.0 This collection is no longer used by
1573 SQLAlchemy's built-in dialects, in favor of the currently
1574 internal ``_insertmanyvalues`` collection that is used only by
1575 :class:`.SQLCompiler`.
1576
1577 """
1578 if self._insertmanyvalues is None:
1579 return None
1580 else:
1581 return self._insertmanyvalues.single_values_expr
1582
1583 @util.ro_memoized_property
1584 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1585 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1586
1587 This is either the so-called "implicit returning" columns which are
1588 calculated by the compiler on the fly, or those present based on what's
1589 present in ``self.statement._returning`` (expanded into individual
1590 columns using the ``._all_selected_columns`` attribute) i.e. those set
1591 explicitly using the :meth:`.UpdateBase.returning` method.
1592
1593 .. versionadded:: 2.0
1594
1595 """
1596 if self.implicit_returning:
1597 return self.implicit_returning
1598 elif self.statement is not None and is_dml(self.statement):
1599 return [
1600 c
1601 for c in self.statement._all_selected_columns
1602 if is_column_element(c)
1603 ]
1604
1605 else:
1606 return None
1607
1608 @property
1609 def returning(self):
1610 """backwards compatibility; returns the
1611 effective_returning collection.
1612
1613 """
1614 return self.effective_returning
1615
1616 @property
1617 def current_executable(self):
1618 """Return the current 'executable' that is being compiled.
1619
1620 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1621 :class:`_sql.Update`, :class:`_sql.Delete`,
1622 :class:`_sql.CompoundSelect` object that is being compiled.
1623 Specifically it's assigned to the ``self.stack`` list of elements.
1624
1625 When a statement like the above is being compiled, it normally
1626 is also assigned to the ``.statement`` attribute of the
1627 :class:`_sql.Compiler` object. However, all SQL constructs are
1628 ultimately nestable, and this attribute should never be consulted
1629 by a ``visit_`` method, as it is not guaranteed to be assigned
1630 nor guaranteed to correspond to the current statement being compiled.
1631
1632 """
1633 try:
1634 return self.stack[-1]["selectable"]
1635 except IndexError as ie:
1636 raise IndexError("Compiler does not have a stack entry") from ie
1637
1638 @property
1639 def prefetch(self):
1640 return list(self.insert_prefetch) + list(self.update_prefetch)
1641
1642 @util.memoized_property
1643 def _global_attributes(self) -> Dict[Any, Any]:
1644 return {}
1645
1646 def _add_to_params(self, item: ExecutableStatement) -> None:
1647 # assumes that this is called before traversing the statement
1648 # so the call happens outer to inner, meaning that existing params
1649 # take precedence
1650 if item._params:
1651 self._collected_params = item._params | self._collected_params
1652
1653 @util.memoized_instancemethod
1654 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1655 """Initialize collections related to CTEs only if
1656 a CTE is located, to save on the overhead of
1657 these collections otherwise.
1658
1659 """
1660 # collect CTEs to tack on top of a SELECT
1661 # To store the query to print - Dict[cte, text_query]
1662 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1663 self.ctes = ctes
1664
1665 # Detect same CTE references - Dict[(level, name), cte]
1666 # Level is required for supporting nesting
1667 self.ctes_by_level_name = {}
1668
1669 # To retrieve key/level in ctes_by_level_name -
1670 # Dict[cte_reference, (level, cte_name, cte_opts)]
1671 self.level_name_by_cte = {}
1672
1673 self.ctes_recursive = False
1674
1675 return ctes
1676
1677 @contextlib.contextmanager
1678 def _nested_result(self):
1679 """special API to support the use case of 'nested result sets'"""
1680 result_columns, ordered_columns = (
1681 self._result_columns,
1682 self._ordered_columns,
1683 )
1684 self._result_columns, self._ordered_columns = [], False
1685
1686 try:
1687 if self.stack:
1688 entry = self.stack[-1]
1689 entry["need_result_map_for_nested"] = True
1690 else:
1691 entry = None
1692 yield self._result_columns, self._ordered_columns
1693 finally:
1694 if entry:
1695 entry.pop("need_result_map_for_nested")
1696 self._result_columns, self._ordered_columns = (
1697 result_columns,
1698 ordered_columns,
1699 )
1700
1701 def _process_positional(self):
1702 assert not self.positiontup
1703 assert self.state is CompilerState.STRING_APPLIED
1704 assert not self._numeric_binds
1705
1706 if self.dialect.paramstyle == "format":
1707 placeholder = "%s"
1708 else:
1709 assert self.dialect.paramstyle == "qmark"
1710 placeholder = "?"
1711
1712 positions = []
1713
1714 def find_position(m: re.Match[str]) -> str:
1715 normal_bind = m.group(1)
1716 if normal_bind:
1717 positions.append(normal_bind)
1718 return placeholder
1719 else:
1720 # this a post-compile bind
1721 positions.append(m.group(2))
1722 return m.group(0)
1723
1724 self.string = re.sub(
1725 self._positional_pattern, find_position, self.string
1726 )
1727
1728 if self.escaped_bind_names:
1729 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1730 assert len(self.escaped_bind_names) == len(reverse_escape)
1731 self.positiontup = [
1732 reverse_escape.get(name, name) for name in positions
1733 ]
1734 else:
1735 self.positiontup = positions
1736
1737 if self._insertmanyvalues:
1738 positions = []
1739
1740 single_values_expr = re.sub(
1741 self._positional_pattern,
1742 find_position,
1743 self._insertmanyvalues.single_values_expr,
1744 )
1745 insert_crud_params = [
1746 (
1747 v[0],
1748 v[1],
1749 re.sub(self._positional_pattern, find_position, v[2]),
1750 v[3],
1751 )
1752 for v in self._insertmanyvalues.insert_crud_params
1753 ]
1754
1755 self._insertmanyvalues = self._insertmanyvalues._replace(
1756 single_values_expr=single_values_expr,
1757 insert_crud_params=insert_crud_params,
1758 )
1759
1760 def _process_numeric(self):
1761 assert self._numeric_binds
1762 assert self.state is CompilerState.STRING_APPLIED
1763
1764 num = 1
1765 param_pos: Dict[str, str] = {}
1766 order: Iterable[str]
1767 if self._insertmanyvalues and self._values_bindparam is not None:
1768 # bindparams that are not in values are always placed first.
1769 # this avoids the need of changing them when using executemany
1770 # values () ()
1771 order = itertools.chain(
1772 (
1773 name
1774 for name in self.bind_names.values()
1775 if name not in self._values_bindparam
1776 ),
1777 self.bind_names.values(),
1778 )
1779 else:
1780 order = self.bind_names.values()
1781
1782 for bind_name in order:
1783 if bind_name in param_pos:
1784 continue
1785 bind = self.binds[bind_name]
1786 if (
1787 bind in self.post_compile_params
1788 or bind in self.literal_execute_params
1789 ):
1790 # set to None to just mark the in positiontup, it will not
1791 # be replaced below.
1792 param_pos[bind_name] = None # type: ignore
1793 else:
1794 ph = f"{self._numeric_binds_identifier_char}{num}"
1795 num += 1
1796 param_pos[bind_name] = ph
1797
1798 self.next_numeric_pos = num
1799
1800 self.positiontup = list(param_pos)
1801 if self.escaped_bind_names:
1802 len_before = len(param_pos)
1803 param_pos = {
1804 self.escaped_bind_names.get(name, name): pos
1805 for name, pos in param_pos.items()
1806 }
1807 assert len(param_pos) == len_before
1808
1809 # Can't use format here since % chars are not escaped.
1810 self.string = self._pyformat_pattern.sub(
1811 lambda m: param_pos[m.group(1)], self.string
1812 )
1813
1814 if self._insertmanyvalues:
1815 single_values_expr = (
1816 # format is ok here since single_values_expr includes only
1817 # place-holders
1818 self._insertmanyvalues.single_values_expr
1819 % param_pos
1820 )
1821 insert_crud_params = [
1822 (v[0], v[1], "%s", v[3])
1823 for v in self._insertmanyvalues.insert_crud_params
1824 ]
1825
1826 self._insertmanyvalues = self._insertmanyvalues._replace(
1827 # This has the numbers (:1, :2)
1828 single_values_expr=single_values_expr,
1829 # The single binds are instead %s so they can be formatted
1830 insert_crud_params=insert_crud_params,
1831 )
1832
1833 @util.memoized_property
1834 def _bind_processors(
1835 self,
1836 ) -> MutableMapping[
1837 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1838 ]:
1839 # mypy is not able to see the two value types as the above Union,
1840 # it just sees "object". don't know how to resolve
1841 return {
1842 key: value # type: ignore
1843 for key, value in (
1844 (
1845 self.bind_names[bindparam],
1846 (
1847 bindparam.type._cached_bind_processor(self.dialect)
1848 if not bindparam.type._is_tuple_type
1849 else tuple(
1850 elem_type._cached_bind_processor(self.dialect)
1851 for elem_type in cast(
1852 TupleType, bindparam.type
1853 ).types
1854 )
1855 ),
1856 )
1857 for bindparam in self.bind_names
1858 )
1859 if value is not None
1860 }
1861
1862 def is_subquery(self):
1863 return len(self.stack) > 1
1864
1865 @property
1866 def sql_compiler(self) -> Self:
1867 return self
1868
1869 def construct_expanded_state(
1870 self,
1871 params: Optional[_CoreSingleExecuteParams] = None,
1872 escape_names: bool = True,
1873 ) -> ExpandedState:
1874 """Return a new :class:`.ExpandedState` for a given parameter set.
1875
1876 For queries that use "expanding" or other late-rendered parameters,
1877 this method will provide for both the finalized SQL string as well
1878 as the parameters that would be used for a particular parameter set.
1879
1880 .. versionadded:: 2.0.0rc1
1881
1882 """
1883 parameters = self.construct_params(
1884 params,
1885 escape_names=escape_names,
1886 _no_postcompile=True,
1887 )
1888 return self._process_parameters_for_postcompile(
1889 parameters,
1890 )
1891
1892 def construct_params(
1893 self,
1894 params: Optional[_CoreSingleExecuteParams] = None,
1895 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1896 escape_names: bool = True,
1897 _group_number: Optional[int] = None,
1898 _check: bool = True,
1899 _no_postcompile: bool = False,
1900 _collected_params: _CoreSingleExecuteParams | None = None,
1901 ) -> _MutableCoreSingleExecuteParams:
1902 """return a dictionary of bind parameter keys and values"""
1903 if _collected_params is not None:
1904 assert not self._collect_params
1905 elif self._collect_params:
1906 _collected_params = self._collected_params
1907
1908 if _collected_params:
1909 if not params:
1910 params = _collected_params
1911 else:
1912 params = {**_collected_params, **params}
1913
1914 if self._render_postcompile and not _no_postcompile:
1915 assert self._post_compile_expanded_state is not None
1916 if not params:
1917 return dict(self._post_compile_expanded_state.parameters)
1918 else:
1919 raise exc.InvalidRequestError(
1920 "can't construct new parameters when render_postcompile "
1921 "is used; the statement is hard-linked to the original "
1922 "parameters. Use construct_expanded_state to generate a "
1923 "new statement and parameters."
1924 )
1925
1926 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1927
1928 if extracted_parameters:
1929 # related the bound parameters collected in the original cache key
1930 # to those collected in the incoming cache key. They will not have
1931 # matching names but they will line up positionally in the same
1932 # way. The parameters present in self.bind_names may be clones of
1933 # these original cache key params in the case of DML but the .key
1934 # will be guaranteed to match.
1935 if self.cache_key is None:
1936 raise exc.CompileError(
1937 "This compiled object has no original cache key; "
1938 "can't pass extracted_parameters to construct_params"
1939 )
1940 else:
1941 orig_extracted = self.cache_key[1]
1942
1943 ckbm_tuple = self._cache_key_bind_match
1944 assert ckbm_tuple is not None
1945 ckbm, _ = ckbm_tuple
1946 resolved_extracted = {
1947 bind: extracted
1948 for b, extracted in zip(orig_extracted, extracted_parameters)
1949 for bind in ckbm[b]
1950 }
1951 else:
1952 resolved_extracted = None
1953
1954 if params:
1955 pd = {}
1956 for bindparam, name in self.bind_names.items():
1957 escaped_name = (
1958 self.escaped_bind_names.get(name, name)
1959 if has_escaped_names
1960 else name
1961 )
1962
1963 if bindparam.key in params:
1964 pd[escaped_name] = params[bindparam.key]
1965 elif name in params:
1966 pd[escaped_name] = params[name]
1967
1968 elif _check and bindparam.required:
1969 if _group_number:
1970 raise exc.InvalidRequestError(
1971 "A value is required for bind parameter %r, "
1972 "in parameter group %d"
1973 % (bindparam.key, _group_number),
1974 code="cd3x",
1975 )
1976 else:
1977 raise exc.InvalidRequestError(
1978 "A value is required for bind parameter %r"
1979 % bindparam.key,
1980 code="cd3x",
1981 )
1982 else:
1983 if resolved_extracted:
1984 value_param = resolved_extracted.get(
1985 bindparam, bindparam
1986 )
1987 else:
1988 value_param = bindparam
1989
1990 if bindparam.callable:
1991 pd[escaped_name] = value_param.effective_value
1992 else:
1993 pd[escaped_name] = value_param.value
1994 return pd
1995 else:
1996 pd = {}
1997 for bindparam, name in self.bind_names.items():
1998 escaped_name = (
1999 self.escaped_bind_names.get(name, name)
2000 if has_escaped_names
2001 else name
2002 )
2003
2004 if _check and bindparam.required:
2005 if _group_number:
2006 raise exc.InvalidRequestError(
2007 "A value is required for bind parameter %r, "
2008 "in parameter group %d"
2009 % (bindparam.key, _group_number),
2010 code="cd3x",
2011 )
2012 else:
2013 raise exc.InvalidRequestError(
2014 "A value is required for bind parameter %r"
2015 % bindparam.key,
2016 code="cd3x",
2017 )
2018
2019 if resolved_extracted:
2020 value_param = resolved_extracted.get(bindparam, bindparam)
2021 else:
2022 value_param = bindparam
2023
2024 if bindparam.callable:
2025 pd[escaped_name] = value_param.effective_value
2026 else:
2027 pd[escaped_name] = value_param.value
2028
2029 return pd
2030
2031 @util.memoized_instancemethod
2032 def _get_set_input_sizes_lookup(self):
2033 dialect = self.dialect
2034
2035 include_types = dialect.include_set_input_sizes
2036 exclude_types = dialect.exclude_set_input_sizes
2037
2038 dbapi = dialect.dbapi
2039
2040 def lookup_type(typ):
2041 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
2042
2043 if (
2044 dbtype is not None
2045 and (exclude_types is None or dbtype not in exclude_types)
2046 and (include_types is None or dbtype in include_types)
2047 ):
2048 return dbtype
2049 else:
2050 return None
2051
2052 inputsizes = {}
2053
2054 literal_execute_params = self.literal_execute_params
2055
2056 for bindparam in self.bind_names:
2057 if bindparam in literal_execute_params:
2058 continue
2059
2060 if bindparam.type._is_tuple_type:
2061 inputsizes[bindparam] = [
2062 lookup_type(typ)
2063 for typ in cast(TupleType, bindparam.type).types
2064 ]
2065 else:
2066 inputsizes[bindparam] = lookup_type(bindparam.type)
2067
2068 return inputsizes
2069
2070 @property
2071 def params(self):
2072 """Return the bind param dictionary embedded into this
2073 compiled object, for those values that are present.
2074
2075 .. seealso::
2076
2077 :ref:`faq_sql_expression_string` - includes a usage example for
2078 debugging use cases.
2079
2080 """
2081 return self.construct_params(_check=False)
2082
2083 def _process_parameters_for_postcompile(
2084 self,
2085 parameters: _MutableCoreSingleExecuteParams,
2086 _populate_self: bool = False,
2087 ) -> ExpandedState:
2088 """handle special post compile parameters.
2089
2090 These include:
2091
2092 * "expanding" parameters -typically IN tuples that are rendered
2093 on a per-parameter basis for an otherwise fixed SQL statement string.
2094
2095 * literal_binds compiled with the literal_execute flag. Used for
2096 things like SQL Server "TOP N" where the driver does not accommodate
2097 N as a bound parameter.
2098
2099 """
2100
2101 expanded_parameters = {}
2102 new_positiontup: Optional[List[str]]
2103
2104 pre_expanded_string = self._pre_expanded_string
2105 if pre_expanded_string is None:
2106 pre_expanded_string = self.string
2107
2108 if self.positional:
2109 new_positiontup = []
2110
2111 pre_expanded_positiontup = self._pre_expanded_positiontup
2112 if pre_expanded_positiontup is None:
2113 pre_expanded_positiontup = self.positiontup
2114
2115 else:
2116 new_positiontup = pre_expanded_positiontup = None
2117
2118 processors = self._bind_processors
2119 single_processors = cast(
2120 "Mapping[str, _BindProcessorType[Any]]", processors
2121 )
2122 tuple_processors = cast(
2123 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2124 )
2125
2126 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2127
2128 replacement_expressions: Dict[str, Any] = {}
2129 to_update_sets: Dict[str, Any] = {}
2130
2131 # notes:
2132 # *unescaped* parameter names in:
2133 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2134 #
2135 # *escaped* parameter names in:
2136 # construct_params(), replacement_expressions
2137
2138 numeric_positiontup: Optional[List[str]] = None
2139
2140 if self.positional and pre_expanded_positiontup is not None:
2141 names: Iterable[str] = pre_expanded_positiontup
2142 if self._numeric_binds:
2143 numeric_positiontup = []
2144 else:
2145 names = self.bind_names.values()
2146
2147 ebn = self.escaped_bind_names
2148 for name in names:
2149 escaped_name = ebn.get(name, name) if ebn else name
2150 parameter = self.binds[name]
2151
2152 if parameter in self.literal_execute_params:
2153 if escaped_name not in replacement_expressions:
2154 replacement_expressions[escaped_name] = (
2155 self.render_literal_bindparam(
2156 parameter,
2157 render_literal_value=parameters.pop(escaped_name),
2158 )
2159 )
2160 continue
2161
2162 if parameter in self.post_compile_params:
2163 if escaped_name in replacement_expressions:
2164 to_update = to_update_sets[escaped_name]
2165 values = None
2166 else:
2167 # we are removing the parameter from parameters
2168 # because it is a list value, which is not expected by
2169 # TypeEngine objects that would otherwise be asked to
2170 # process it. the single name is being replaced with
2171 # individual numbered parameters for each value in the
2172 # param.
2173 #
2174 # note we are also inserting *escaped* parameter names
2175 # into the given dictionary. default dialect will
2176 # use these param names directly as they will not be
2177 # in the escaped_bind_names dictionary.
2178 values = parameters.pop(name)
2179
2180 leep_res = self._literal_execute_expanding_parameter(
2181 escaped_name, parameter, values
2182 )
2183 (to_update, replacement_expr) = leep_res
2184
2185 to_update_sets[escaped_name] = to_update
2186 replacement_expressions[escaped_name] = replacement_expr
2187
2188 if not parameter.literal_execute:
2189 parameters.update(to_update)
2190 if parameter.type._is_tuple_type:
2191 assert values is not None
2192 new_processors.update(
2193 (
2194 "%s_%s_%s" % (name, i, j),
2195 tuple_processors[name][j - 1],
2196 )
2197 for i, tuple_element in enumerate(values, 1)
2198 for j, _ in enumerate(tuple_element, 1)
2199 if name in tuple_processors
2200 and tuple_processors[name][j - 1] is not None
2201 )
2202 else:
2203 new_processors.update(
2204 (key, single_processors[name])
2205 for key, _ in to_update
2206 if name in single_processors
2207 )
2208 if numeric_positiontup is not None:
2209 numeric_positiontup.extend(
2210 name for name, _ in to_update
2211 )
2212 elif new_positiontup is not None:
2213 # to_update has escaped names, but that's ok since
2214 # these are new names, that aren't in the
2215 # escaped_bind_names dict.
2216 new_positiontup.extend(name for name, _ in to_update)
2217 expanded_parameters[name] = [
2218 expand_key for expand_key, _ in to_update
2219 ]
2220 elif new_positiontup is not None:
2221 new_positiontup.append(name)
2222
2223 def process_expanding(m):
2224 key = m.group(1)
2225 expr = replacement_expressions[key]
2226
2227 # if POSTCOMPILE included a bind_expression, render that
2228 # around each element
2229 if m.group(2):
2230 tok = m.group(2).split("~~")
2231 be_left, be_right = tok[1], tok[3]
2232 expr = ", ".join(
2233 "%s%s%s" % (be_left, exp, be_right)
2234 for exp in expr.split(", ")
2235 )
2236 return expr
2237
2238 statement = re.sub(
2239 self._post_compile_pattern, process_expanding, pre_expanded_string
2240 )
2241
2242 if numeric_positiontup is not None:
2243 assert new_positiontup is not None
2244 param_pos = {
2245 key: f"{self._numeric_binds_identifier_char}{num}"
2246 for num, key in enumerate(
2247 numeric_positiontup, self.next_numeric_pos
2248 )
2249 }
2250 # Can't use format here since % chars are not escaped.
2251 statement = self._pyformat_pattern.sub(
2252 lambda m: param_pos[m.group(1)], statement
2253 )
2254 new_positiontup.extend(numeric_positiontup)
2255
2256 expanded_state = ExpandedState(
2257 statement,
2258 parameters,
2259 new_processors,
2260 new_positiontup,
2261 expanded_parameters,
2262 )
2263
2264 if _populate_self:
2265 # this is for the "render_postcompile" flag, which is not
2266 # otherwise used internally and is for end-user debugging and
2267 # special use cases.
2268 self._pre_expanded_string = pre_expanded_string
2269 self._pre_expanded_positiontup = pre_expanded_positiontup
2270 self.string = expanded_state.statement
2271 self.positiontup = (
2272 list(expanded_state.positiontup or ())
2273 if self.positional
2274 else None
2275 )
2276 self._post_compile_expanded_state = expanded_state
2277
2278 return expanded_state
2279
2280 @util.preload_module("sqlalchemy.engine.cursor")
2281 def _create_result_map(self):
2282 """utility method used for unit tests only."""
2283 cursor = util.preloaded.engine_cursor
2284 return cursor.CursorResultMetaData._create_description_match_map(
2285 self._result_columns
2286 )
2287
2288 # assigned by crud.py for insert/update statements
2289 _get_bind_name_for_col: _BindNameForColProtocol
2290
2291 @util.memoized_property
2292 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2293 getter = self._get_bind_name_for_col
2294 return getter
2295
2296 @util.memoized_property
2297 @util.preload_module("sqlalchemy.engine.result")
2298 def _inserted_primary_key_from_lastrowid_getter(self):
2299 result = util.preloaded.engine_result
2300
2301 param_key_getter = self._within_exec_param_key_getter
2302
2303 assert self.compile_state is not None
2304 statement = self.compile_state.statement
2305
2306 if TYPE_CHECKING:
2307 assert isinstance(statement, Insert)
2308
2309 table = statement.table
2310
2311 getters = [
2312 (operator.methodcaller("get", param_key_getter(col), None), col)
2313 for col in table.primary_key
2314 ]
2315
2316 autoinc_getter = None
2317 autoinc_col = table._autoincrement_column
2318 if autoinc_col is not None:
2319 # apply type post processors to the lastrowid
2320 lastrowid_processor = autoinc_col.type._cached_result_processor(
2321 self.dialect, None
2322 )
2323 autoinc_key = param_key_getter(autoinc_col)
2324
2325 # if a bind value is present for the autoincrement column
2326 # in the parameters, we need to do the logic dictated by
2327 # #7998; honor a non-None user-passed parameter over lastrowid.
2328 # previously in the 1.4 series we weren't fetching lastrowid
2329 # at all if the key were present in the parameters
2330 if autoinc_key in self.binds:
2331
2332 def _autoinc_getter(lastrowid, parameters):
2333 param_value = parameters.get(autoinc_key, lastrowid)
2334 if param_value is not None:
2335 # they supplied non-None parameter, use that.
2336 # SQLite at least is observed to return the wrong
2337 # cursor.lastrowid for INSERT..ON CONFLICT so it
2338 # can't be used in all cases
2339 return param_value
2340 else:
2341 # use lastrowid
2342 return lastrowid
2343
2344 # work around mypy https://github.com/python/mypy/issues/14027
2345 autoinc_getter = _autoinc_getter
2346
2347 else:
2348 lastrowid_processor = None
2349
2350 row_fn = result.result_tuple([col.key for col in table.primary_key])
2351
2352 def get(lastrowid, parameters):
2353 """given cursor.lastrowid value and the parameters used for INSERT,
2354 return a "row" that represents the primary key, either by
2355 using the "lastrowid" or by extracting values from the parameters
2356 that were sent along with the INSERT.
2357
2358 """
2359 if lastrowid_processor is not None:
2360 lastrowid = lastrowid_processor(lastrowid)
2361
2362 if lastrowid is None:
2363 return row_fn(getter(parameters) for getter, col in getters)
2364 else:
2365 return row_fn(
2366 (
2367 (
2368 autoinc_getter(lastrowid, parameters)
2369 if autoinc_getter is not None
2370 else lastrowid
2371 )
2372 if col is autoinc_col
2373 else getter(parameters)
2374 )
2375 for getter, col in getters
2376 )
2377
2378 return get
2379
2380 @util.memoized_property
2381 @util.preload_module("sqlalchemy.engine.result")
2382 def _inserted_primary_key_from_returning_getter(self):
2383 result = util.preloaded.engine_result
2384
2385 assert self.compile_state is not None
2386 statement = self.compile_state.statement
2387
2388 if TYPE_CHECKING:
2389 assert isinstance(statement, Insert)
2390
2391 param_key_getter = self._within_exec_param_key_getter
2392 table = statement.table
2393
2394 returning = self.implicit_returning
2395 assert returning is not None
2396 ret = {col: idx for idx, col in enumerate(returning)}
2397
2398 getters = cast(
2399 "List[Tuple[Callable[[Any], Any], bool]]",
2400 [
2401 (
2402 (operator.itemgetter(ret[col]), True)
2403 if col in ret
2404 else (
2405 operator.methodcaller(
2406 "get", param_key_getter(col), None
2407 ),
2408 False,
2409 )
2410 )
2411 for col in table.primary_key
2412 ],
2413 )
2414
2415 row_fn = result.result_tuple([col.key for col in table.primary_key])
2416
2417 def get(row, parameters):
2418 return row_fn(
2419 getter(row) if use_row else getter(parameters)
2420 for getter, use_row in getters
2421 )
2422
2423 return get
2424
2425 def default_from(self) -> str:
2426 """Called when a SELECT statement has no froms, and no FROM clause is
2427 to be appended.
2428
2429 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string
2430 output.
2431
2432 """
2433 return ""
2434
2435 def visit_override_binds(self, override_binds, **kw):
2436 """SQL compile the nested element of an _OverrideBinds with
2437 bindparams swapped out.
2438
2439 The _OverrideBinds is not normally expected to be compiled; it
2440 is meant to be used when an already cached statement is to be used,
2441 the compilation was already performed, and only the bound params should
2442 be swapped in at execution time.
2443
2444 However, there are test cases that exericise this object, and
2445 additionally the ORM subquery loader is known to feed in expressions
2446 which include this construct into new queries (discovered in #11173),
2447 so it has to do the right thing at compile time as well.
2448
2449 """
2450
2451 # get SQL text first
2452 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2453
2454 # for a test compile that is not for caching, change binds after the
2455 # fact. note that we don't try to
2456 # swap the bindparam as we compile, because our element may be
2457 # elsewhere in the statement already (e.g. a subquery or perhaps a
2458 # CTE) and was already visited / compiled. See
2459 # test_relationship_criteria.py ->
2460 # test_selectinload_local_criteria_subquery
2461 for k in override_binds.translate:
2462 if k not in self.binds:
2463 continue
2464 bp = self.binds[k]
2465
2466 # so this would work, just change the value of bp in place.
2467 # but we dont want to mutate things outside.
2468 # bp.value = override_binds.translate[bp.key]
2469 # continue
2470
2471 # instead, need to replace bp with new_bp or otherwise accommodate
2472 # in all internal collections
2473 new_bp = bp._with_value(
2474 override_binds.translate[bp.key],
2475 maintain_key=True,
2476 required=False,
2477 )
2478
2479 name = self.bind_names[bp]
2480 self.binds[k] = self.binds[name] = new_bp
2481 self.bind_names[new_bp] = name
2482 self.bind_names.pop(bp, None)
2483
2484 if bp in self.post_compile_params:
2485 self.post_compile_params |= {new_bp}
2486 if bp in self.literal_execute_params:
2487 self.literal_execute_params |= {new_bp}
2488
2489 ckbm_tuple = self._cache_key_bind_match
2490 if ckbm_tuple:
2491 ckbm, cksm = ckbm_tuple
2492 for bp in bp._cloned_set:
2493 if bp.key in cksm:
2494 cb = cksm[bp.key]
2495 ckbm[cb].append(new_bp)
2496
2497 return sqltext
2498
2499 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2500 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2501
2502 def visit_select_statement_grouping(self, grouping, **kwargs):
2503 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2504
2505 def visit_label_reference(
2506 self, element, within_columns_clause=False, **kwargs
2507 ):
2508 if self.stack and self.dialect.supports_simple_order_by_label:
2509 try:
2510 compile_state = cast(
2511 "Union[SelectState, CompoundSelectState]",
2512 self.stack[-1]["compile_state"],
2513 )
2514 except KeyError as ke:
2515 raise exc.CompileError(
2516 "Can't resolve label reference for ORDER BY / "
2517 "GROUP BY / DISTINCT etc."
2518 ) from ke
2519
2520 (
2521 with_cols,
2522 only_froms,
2523 only_cols,
2524 ) = compile_state._label_resolve_dict
2525 if within_columns_clause:
2526 resolve_dict = only_froms
2527 else:
2528 resolve_dict = only_cols
2529
2530 # this can be None in the case that a _label_reference()
2531 # were subject to a replacement operation, in which case
2532 # the replacement of the Label element may have changed
2533 # to something else like a ColumnClause expression.
2534 order_by_elem = element.element._order_by_label_element
2535
2536 if (
2537 order_by_elem is not None
2538 and order_by_elem.name in resolve_dict
2539 and order_by_elem.shares_lineage(
2540 resolve_dict[order_by_elem.name]
2541 )
2542 ):
2543 kwargs["render_label_as_label"] = (
2544 element.element._order_by_label_element
2545 )
2546 return self.process(
2547 element.element,
2548 within_columns_clause=within_columns_clause,
2549 **kwargs,
2550 )
2551
2552 def visit_textual_label_reference(
2553 self, element, within_columns_clause=False, **kwargs
2554 ):
2555 if not self.stack:
2556 # compiling the element outside of the context of a SELECT
2557 return self.process(element._text_clause)
2558
2559 try:
2560 compile_state = cast(
2561 "Union[SelectState, CompoundSelectState]",
2562 self.stack[-1]["compile_state"],
2563 )
2564 except KeyError as ke:
2565 coercions._no_text_coercion(
2566 element.element,
2567 extra=(
2568 "Can't resolve label reference for ORDER BY / "
2569 "GROUP BY / DISTINCT etc."
2570 ),
2571 exc_cls=exc.CompileError,
2572 err=ke,
2573 )
2574
2575 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2576 try:
2577 if within_columns_clause:
2578 col = only_froms[element.element]
2579 else:
2580 col = with_cols[element.element]
2581 except KeyError as err:
2582 coercions._no_text_coercion(
2583 element.element,
2584 extra=(
2585 "Can't resolve label reference for ORDER BY / "
2586 "GROUP BY / DISTINCT etc."
2587 ),
2588 exc_cls=exc.CompileError,
2589 err=err,
2590 )
2591 else:
2592 kwargs["render_label_as_label"] = col
2593 return self.process(
2594 col, within_columns_clause=within_columns_clause, **kwargs
2595 )
2596
2597 def visit_label(
2598 self,
2599 label,
2600 add_to_result_map=None,
2601 within_label_clause=False,
2602 within_columns_clause=False,
2603 render_label_as_label=None,
2604 result_map_targets=(),
2605 within_tstring=False,
2606 **kw,
2607 ):
2608 if within_tstring:
2609 raise exc.CompileError(
2610 "Using label() directly inside tstring is not supported "
2611 "as it is ambiguous how the label expression should be "
2612 "rendered without knowledge of how it's being used in SQL"
2613 )
2614 # only render labels within the columns clause
2615 # or ORDER BY clause of a select. dialect-specific compilers
2616 # can modify this behavior.
2617 render_label_with_as = (
2618 within_columns_clause and not within_label_clause
2619 )
2620 render_label_only = render_label_as_label is label
2621
2622 if render_label_only or render_label_with_as:
2623 if isinstance(label.name, elements._truncated_label):
2624 labelname = self._truncated_identifier("colident", label.name)
2625 else:
2626 labelname = label.name
2627
2628 if render_label_with_as:
2629 if add_to_result_map is not None:
2630 add_to_result_map(
2631 labelname,
2632 label.name,
2633 (label, labelname) + label._alt_names + result_map_targets,
2634 label.type,
2635 )
2636 return (
2637 label.element._compiler_dispatch(
2638 self,
2639 within_columns_clause=True,
2640 within_label_clause=True,
2641 **kw,
2642 )
2643 + OPERATORS[operators.as_]
2644 + self.preparer.format_label(label, labelname)
2645 )
2646 elif render_label_only:
2647 return self.preparer.format_label(label, labelname)
2648 else:
2649 return label.element._compiler_dispatch(
2650 self, within_columns_clause=False, **kw
2651 )
2652
2653 def _fallback_column_name(self, column):
2654 raise exc.CompileError(
2655 "Cannot compile Column object until its 'name' is assigned."
2656 )
2657
2658 def visit_lambda_element(self, element, **kw):
2659 sql_element = element._resolved
2660 return self.process(sql_element, **kw)
2661
2662 def visit_column(
2663 self,
2664 column: ColumnClause[Any],
2665 add_to_result_map: Optional[_ResultMapAppender] = None,
2666 include_table: bool = True,
2667 result_map_targets: Tuple[Any, ...] = (),
2668 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2669 **kwargs: Any,
2670 ) -> str:
2671 name = orig_name = column.name
2672 if name is None:
2673 name = self._fallback_column_name(column)
2674
2675 is_literal = column.is_literal
2676 if not is_literal and isinstance(name, elements._truncated_label):
2677 name = self._truncated_identifier("colident", name)
2678
2679 if add_to_result_map is not None:
2680 targets = (column, name, column.key) + result_map_targets
2681 if column._tq_label:
2682 targets += (column._tq_label,)
2683
2684 add_to_result_map(name, orig_name, targets, column.type)
2685
2686 if is_literal:
2687 # note we are not currently accommodating for
2688 # literal_column(quoted_name('ident', True)) here
2689 name = self.escape_literal_column(name)
2690 else:
2691 name = self.preparer.quote(name)
2692 table = column.table
2693 if table is None or not include_table or not table.named_with_column:
2694 return name
2695 else:
2696 effective_schema = self.preparer.schema_for_object(table)
2697
2698 if effective_schema:
2699 schema_prefix = (
2700 self.preparer.quote_schema(effective_schema) + "."
2701 )
2702 else:
2703 schema_prefix = ""
2704
2705 if TYPE_CHECKING:
2706 assert isinstance(table, NamedFromClause)
2707 tablename = table.name
2708
2709 if (
2710 not effective_schema
2711 and ambiguous_table_name_map
2712 and tablename in ambiguous_table_name_map
2713 ):
2714 tablename = ambiguous_table_name_map[tablename]
2715
2716 if isinstance(tablename, elements._truncated_label):
2717 tablename = self._truncated_identifier("alias", tablename)
2718
2719 return schema_prefix + self.preparer.quote(tablename) + "." + name
2720
2721 def visit_collation(self, element, **kw):
2722 return self.preparer.format_collation(element.collation)
2723
2724 def visit_fromclause(self, fromclause, **kwargs):
2725 return fromclause.name
2726
2727 def visit_index(self, index, **kwargs):
2728 return index.name
2729
2730 def visit_typeclause(self, typeclause, **kw):
2731 kw["type_expression"] = typeclause
2732 kw["identifier_preparer"] = self.preparer
2733 return self.dialect.type_compiler_instance.process(
2734 typeclause.type, **kw
2735 )
2736
2737 def post_process_text(self, text):
2738 if self.preparer._double_percents:
2739 text = text.replace("%", "%%")
2740 return text
2741
2742 def escape_literal_column(self, text):
2743 if self.preparer._double_percents:
2744 text = text.replace("%", "%%")
2745 return text
2746
2747 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2748 if self._collect_params:
2749 self._add_to_params(textclause)
2750
2751 def do_bindparam(m):
2752 name = m.group(1)
2753 if name in textclause._bindparams:
2754 return self.process(textclause._bindparams[name], **kw)
2755 else:
2756 return self.bindparam_string(name, **kw)
2757
2758 if not self.stack:
2759 self.isplaintext = True
2760
2761 if add_to_result_map:
2762 # text() object is present in the columns clause of a
2763 # select(). Add a no-name entry to the result map so that
2764 # row[text()] produces a result
2765 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2766
2767 # un-escape any \:params
2768 return BIND_PARAMS_ESC.sub(
2769 lambda m: m.group(1),
2770 BIND_PARAMS.sub(
2771 do_bindparam, self.post_process_text(textclause.text)
2772 ),
2773 )
2774
2775 def visit_tstring(self, tstring, add_to_result_map=None, **kw):
2776 if self._collect_params:
2777 self._add_to_params(tstring)
2778
2779 if not self.stack:
2780 self.isplaintext = True
2781
2782 if add_to_result_map:
2783 # tstring() object is present in the columns clause of a
2784 # select(). Add a no-name entry to the result map so that
2785 # row[tstring()] produces a result
2786 add_to_result_map(None, None, (tstring,), sqltypes.NULLTYPE)
2787
2788 # Process each part and concatenate
2789 kw["within_tstring"] = True
2790 return "".join(self.process(part, **kw) for part in tstring.parts)
2791
2792 def visit_textual_select(
2793 self, taf, compound_index=None, asfrom=False, **kw
2794 ):
2795 if self._collect_params:
2796 self._add_to_params(taf)
2797 toplevel = not self.stack
2798 entry = self._default_stack_entry if toplevel else self.stack[-1]
2799
2800 new_entry: _CompilerStackEntry = {
2801 "correlate_froms": set(),
2802 "asfrom_froms": set(),
2803 "selectable": taf,
2804 }
2805 self.stack.append(new_entry)
2806
2807 if taf._independent_ctes:
2808 self._dispatch_independent_ctes(taf, kw)
2809
2810 populate_result_map = (
2811 toplevel
2812 or (
2813 compound_index == 0
2814 and entry.get("need_result_map_for_compound", False)
2815 )
2816 or entry.get("need_result_map_for_nested", False)
2817 )
2818
2819 if populate_result_map:
2820 self._ordered_columns = self._textual_ordered_columns = (
2821 taf.positional
2822 )
2823
2824 # enable looser result column matching when the SQL text links to
2825 # Column objects by name only
2826 self._loose_column_name_matching = not taf.positional and bool(
2827 taf.column_args
2828 )
2829
2830 for c in taf.column_args:
2831 self.process(
2832 c,
2833 within_columns_clause=True,
2834 add_to_result_map=self._add_to_result_map,
2835 )
2836
2837 text = self.process(taf.element, **kw)
2838 if self.ctes:
2839 nesting_level = len(self.stack) if not toplevel else None
2840 text = self._render_cte_clause(nesting_level=nesting_level) + text
2841
2842 self.stack.pop(-1)
2843
2844 return text
2845
2846 def visit_null(self, expr: Null, **kw: Any) -> str:
2847 return "NULL"
2848
2849 def visit_true(self, expr: True_, **kw: Any) -> str:
2850 if self.dialect.supports_native_boolean:
2851 return "true"
2852 else:
2853 return "1"
2854
2855 def visit_false(self, expr: False_, **kw: Any) -> str:
2856 if self.dialect.supports_native_boolean:
2857 return "false"
2858 else:
2859 return "0"
2860
2861 def _generate_delimited_list(self, elements, separator, **kw):
2862 return separator.join(
2863 s
2864 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2865 if s
2866 )
2867
2868 def _generate_delimited_and_list(self, clauses, **kw):
2869 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2870 operators.and_,
2871 elements.True_._singleton,
2872 elements.False_._singleton,
2873 clauses,
2874 )
2875 if lcc == 1:
2876 return clauses[0]._compiler_dispatch(self, **kw)
2877 else:
2878 separator = OPERATORS[operators.and_]
2879 return separator.join(
2880 s
2881 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2882 if s
2883 )
2884
2885 def visit_tuple(self, clauselist, **kw):
2886 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2887
2888 def visit_element_list(self, element, **kw):
2889 return self._generate_delimited_list(element.clauses, " ", **kw)
2890
2891 def visit_order_by_list(self, element, **kw):
2892 return self._generate_delimited_list(element.clauses, ", ", **kw)
2893
2894 def visit_clauselist(self, clauselist, **kw):
2895 sep = clauselist.operator
2896 if sep is None:
2897 sep = " "
2898 else:
2899 sep = OPERATORS[clauselist.operator]
2900
2901 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2902
2903 def visit_expression_clauselist(self, clauselist, **kw):
2904 operator_ = clauselist.operator
2905
2906 disp = self._get_operator_dispatch(
2907 operator_, "expression_clauselist", None
2908 )
2909 if disp:
2910 return disp(clauselist, operator_, **kw)
2911
2912 try:
2913 opstring = OPERATORS[operator_]
2914 except KeyError as err:
2915 raise exc.UnsupportedCompilationError(self, operator_) from err
2916 else:
2917 kw["_in_operator_expression"] = True
2918 return self._generate_delimited_list(
2919 clauselist.clauses, opstring, **kw
2920 )
2921
2922 def visit_case(self, clause, **kwargs):
2923 x = "CASE "
2924 if clause.value is not None:
2925 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2926 for cond, result in clause.whens:
2927 x += (
2928 "WHEN "
2929 + cond._compiler_dispatch(self, **kwargs)
2930 + " THEN "
2931 + result._compiler_dispatch(self, **kwargs)
2932 + " "
2933 )
2934 if clause.else_ is not None:
2935 x += (
2936 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2937 )
2938 x += "END"
2939 return x
2940
2941 def visit_type_coerce(self, type_coerce, **kw):
2942 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2943
2944 def visit_cast(self, cast, **kwargs):
2945 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2946 match = re.match("(.*)( COLLATE .*)", type_clause)
2947 return "CAST(%s AS %s)%s" % (
2948 cast.clause._compiler_dispatch(self, **kwargs),
2949 match.group(1) if match else type_clause,
2950 match.group(2) if match else "",
2951 )
2952
2953 def visit_frame_clause(self, frameclause, **kw):
2954
2955 if frameclause.lower_type is elements.FrameClauseType.UNBOUNDED:
2956 left = "UNBOUNDED PRECEDING"
2957 elif frameclause.lower_type is elements.FrameClauseType.CURRENT:
2958 left = "CURRENT ROW"
2959 else:
2960 val = self.process(frameclause.lower_bind, **kw)
2961 if frameclause.lower_type is elements.FrameClauseType.PRECEDING:
2962 left = f"{val} PRECEDING"
2963 else:
2964 left = f"{val} FOLLOWING"
2965
2966 if frameclause.upper_type is elements.FrameClauseType.UNBOUNDED:
2967 right = "UNBOUNDED FOLLOWING"
2968 elif frameclause.upper_type is elements.FrameClauseType.CURRENT:
2969 right = "CURRENT ROW"
2970 else:
2971 val = self.process(frameclause.upper_bind, **kw)
2972 if frameclause.upper_type is elements.FrameClauseType.PRECEDING:
2973 right = f"{val} PRECEDING"
2974 else:
2975 right = f"{val} FOLLOWING"
2976
2977 return f"{left} AND {right}"
2978
2979 def visit_over(self, over, **kwargs):
2980 text = over.element._compiler_dispatch(self, **kwargs)
2981 if over.range_ is not None:
2982 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2983 elif over.rows is not None:
2984 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
2985 elif over.groups is not None:
2986 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}"
2987 else:
2988 range_ = None
2989
2990 return "%s OVER (%s)" % (
2991 text,
2992 " ".join(
2993 [
2994 "%s BY %s"
2995 % (word, clause._compiler_dispatch(self, **kwargs))
2996 for word, clause in (
2997 ("PARTITION", over.partition_by),
2998 ("ORDER", over.order_by),
2999 )
3000 if clause is not None and len(clause)
3001 ]
3002 + ([range_] if range_ else [])
3003 ),
3004 )
3005
3006 def visit_withingroup(self, withingroup, **kwargs):
3007 return "%s WITHIN GROUP (ORDER BY %s)" % (
3008 withingroup.element._compiler_dispatch(self, **kwargs),
3009 withingroup.order_by._compiler_dispatch(self, **kwargs),
3010 )
3011
3012 def visit_funcfilter(self, funcfilter, **kwargs):
3013 return "%s FILTER (WHERE %s)" % (
3014 funcfilter.func._compiler_dispatch(self, **kwargs),
3015 funcfilter.criterion._compiler_dispatch(self, **kwargs),
3016 )
3017
3018 def visit_aggregateorderby(self, aggregateorderby, **kwargs):
3019 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE:
3020 raise exc.CompileError(
3021 "this dialect does not support "
3022 "ORDER BY within an aggregate function"
3023 )
3024 elif (
3025 self.dialect.aggregate_order_by_style
3026 is AggregateOrderByStyle.INLINE
3027 ):
3028 new_fn = aggregateorderby.element._clone()
3029 new_fn.clause_expr = elements.Grouping(
3030 aggregate_orderby_inline(
3031 new_fn.clause_expr.element, aggregateorderby.order_by
3032 )
3033 )
3034
3035 return new_fn._compiler_dispatch(self, **kwargs)
3036 else:
3037 return self.visit_withingroup(aggregateorderby, **kwargs)
3038
3039 def visit_aggregate_orderby_inline(self, element, **kw):
3040 return "%s ORDER BY %s" % (
3041 self.process(element.element, **kw),
3042 self.process(element.aggregate_order_by, **kw),
3043 )
3044
3045 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw):
3046 # aggreagate_order_by attribute is present if visit_function
3047 # gave us a Function with aggregate_orderby_inline() as the inner
3048 # contents
3049 order_by = getattr(fn.clauses, "aggregate_order_by", None)
3050
3051 literal_exec = dict(kw)
3052 literal_exec["literal_execute"] = True
3053
3054 # break up the function into its components so we can apply
3055 # literal_execute to the second argument (the delimiter)
3056 cl = list(fn.clauses)
3057 expr, delimiter = cl[0:2]
3058 if (
3059 order_by is not None
3060 and self.dialect.aggregate_order_by_style
3061 is AggregateOrderByStyle.INLINE
3062 ):
3063 return (
3064 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3065 f"{delimiter._compiler_dispatch(self, **literal_exec)} "
3066 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})"
3067 )
3068 else:
3069 return (
3070 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, "
3071 f"{delimiter._compiler_dispatch(self, **literal_exec)})"
3072 )
3073
3074 def visit_extract(self, extract, **kwargs):
3075 field = self.extract_map.get(extract.field, extract.field)
3076 return "EXTRACT(%s FROM %s)" % (
3077 field,
3078 extract.expr._compiler_dispatch(self, **kwargs),
3079 )
3080
3081 def visit_scalar_function_column(self, element, **kw):
3082 compiled_fn = self.visit_function(element.fn, **kw)
3083 compiled_col = self.visit_column(element, **kw)
3084 return "(%s).%s" % (compiled_fn, compiled_col)
3085
3086 def visit_function(
3087 self,
3088 func: Function[Any],
3089 add_to_result_map: Optional[_ResultMapAppender] = None,
3090 **kwargs: Any,
3091 ) -> str:
3092 if self._collect_params:
3093 self._add_to_params(func)
3094 if add_to_result_map is not None:
3095 add_to_result_map(func.name, func.name, (func.name,), func.type)
3096
3097 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
3098
3099 text: str
3100
3101 if disp:
3102 text = disp(func, **kwargs)
3103 else:
3104 name = FUNCTIONS.get(func._deannotate().__class__, None)
3105 if name:
3106 if func._has_args:
3107 name += "%(expr)s"
3108 else:
3109 name = func.name
3110 name = (
3111 self.preparer.quote(name)
3112 if self.preparer._requires_quotes_illegal_chars(name)
3113 or isinstance(name, elements.quoted_name)
3114 else name
3115 )
3116 name = name + "%(expr)s"
3117 text = ".".join(
3118 [
3119 (
3120 self.preparer.quote(tok)
3121 if self.preparer._requires_quotes_illegal_chars(tok)
3122 or isinstance(name, elements.quoted_name)
3123 else tok
3124 )
3125 for tok in func.packagenames
3126 ]
3127 + [name]
3128 ) % {"expr": self.function_argspec(func, **kwargs)}
3129
3130 if func._with_ordinality:
3131 text += " WITH ORDINALITY"
3132 return text
3133
3134 def visit_next_value_func(self, next_value, **kw):
3135 return self.visit_sequence(next_value.sequence)
3136
3137 def visit_sequence(self, sequence, **kw):
3138 raise NotImplementedError(
3139 "Dialect '%s' does not support sequence increments."
3140 % self.dialect.name
3141 )
3142
3143 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str:
3144 return func.clause_expr._compiler_dispatch(self, **kwargs)
3145
3146 def visit_compound_select(
3147 self, cs, asfrom=False, compound_index=None, **kwargs
3148 ):
3149 if self._collect_params:
3150 self._add_to_params(cs)
3151 toplevel = not self.stack
3152
3153 compile_state = cs._compile_state_factory(cs, self, **kwargs)
3154
3155 if toplevel and not self.compile_state:
3156 self.compile_state = compile_state
3157
3158 compound_stmt = compile_state.statement
3159
3160 entry = self._default_stack_entry if toplevel else self.stack[-1]
3161 need_result_map = toplevel or (
3162 not compound_index
3163 and entry.get("need_result_map_for_compound", False)
3164 )
3165
3166 # indicates there is already a CompoundSelect in play
3167 if compound_index == 0:
3168 entry["select_0"] = cs
3169
3170 self.stack.append(
3171 {
3172 "correlate_froms": entry["correlate_froms"],
3173 "asfrom_froms": entry["asfrom_froms"],
3174 "selectable": cs,
3175 "compile_state": compile_state,
3176 "need_result_map_for_compound": need_result_map,
3177 }
3178 )
3179
3180 if compound_stmt._independent_ctes:
3181 self._dispatch_independent_ctes(compound_stmt, kwargs)
3182
3183 keyword = self.compound_keywords[cs.keyword]
3184
3185 text = (" " + keyword + " ").join(
3186 (
3187 c._compiler_dispatch(
3188 self, asfrom=asfrom, compound_index=i, **kwargs
3189 )
3190 for i, c in enumerate(cs.selects)
3191 )
3192 )
3193
3194 kwargs["include_table"] = False
3195 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3196 text += self.order_by_clause(cs, **kwargs)
3197 if cs._has_row_limiting_clause:
3198 text += self._row_limit_clause(cs, **kwargs)
3199
3200 if self.ctes:
3201 nesting_level = len(self.stack) if not toplevel else None
3202 text = (
3203 self._render_cte_clause(
3204 nesting_level=nesting_level,
3205 include_following_stack=True,
3206 )
3207 + text
3208 )
3209
3210 self.stack.pop(-1)
3211 return text
3212
3213 def _row_limit_clause(self, cs, **kwargs):
3214 if cs._fetch_clause is not None:
3215 return self.fetch_clause(cs, **kwargs)
3216 else:
3217 return self.limit_clause(cs, **kwargs)
3218
3219 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3220 attrname = "visit_%s_%s%s" % (
3221 operator_.__name__,
3222 qualifier1,
3223 "_" + qualifier2 if qualifier2 else "",
3224 )
3225 return getattr(self, attrname, None)
3226
3227 def _get_custom_operator_dispatch(self, operator_, qualifier1):
3228 attrname = "visit_%s_op_%s" % (operator_.visit_name, qualifier1)
3229 return getattr(self, attrname, None)
3230
3231 def visit_unary(
3232 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3233 ):
3234 if add_to_result_map is not None:
3235 result_map_targets += (unary,)
3236 kw["add_to_result_map"] = add_to_result_map
3237 kw["result_map_targets"] = result_map_targets
3238
3239 if unary.operator:
3240 if unary.modifier:
3241 raise exc.CompileError(
3242 "Unary expression does not support operator "
3243 "and modifier simultaneously"
3244 )
3245 disp = self._get_operator_dispatch(
3246 unary.operator, "unary", "operator"
3247 )
3248 if disp:
3249 return disp(unary, unary.operator, **kw)
3250 else:
3251 return self._generate_generic_unary_operator(
3252 unary, OPERATORS[unary.operator], **kw
3253 )
3254 elif unary.modifier:
3255 disp = self._get_operator_dispatch(
3256 unary.modifier, "unary", "modifier"
3257 )
3258 if disp:
3259 return disp(unary, unary.modifier, **kw)
3260 else:
3261 return self._generate_generic_unary_modifier(
3262 unary, OPERATORS[unary.modifier], **kw
3263 )
3264 else:
3265 raise exc.CompileError(
3266 "Unary expression has no operator or modifier"
3267 )
3268
3269 def visit_truediv_binary(self, binary, operator, **kw):
3270 if self.dialect.div_is_floordiv:
3271 return (
3272 self.process(binary.left, **kw)
3273 + " / "
3274 # TODO: would need a fast cast again here,
3275 # unless we want to use an implicit cast like "+ 0.0"
3276 + self.process(
3277 elements.Cast(
3278 binary.right,
3279 (
3280 binary.right.type
3281 if binary.right.type._type_affinity
3282 in (sqltypes.Numeric, sqltypes.Float)
3283 else sqltypes.Numeric()
3284 ),
3285 ),
3286 **kw,
3287 )
3288 )
3289 else:
3290 return (
3291 self.process(binary.left, **kw)
3292 + " / "
3293 + self.process(binary.right, **kw)
3294 )
3295
3296 def visit_floordiv_binary(self, binary, operator, **kw):
3297 if (
3298 self.dialect.div_is_floordiv
3299 and binary.right.type._type_affinity is sqltypes.Integer
3300 ):
3301 return (
3302 self.process(binary.left, **kw)
3303 + " / "
3304 + self.process(binary.right, **kw)
3305 )
3306 else:
3307 return "FLOOR(%s)" % (
3308 self.process(binary.left, **kw)
3309 + " / "
3310 + self.process(binary.right, **kw)
3311 )
3312
3313 def visit_is_true_unary_operator(self, element, operator, **kw):
3314 if (
3315 element._is_implicitly_boolean
3316 or self.dialect.supports_native_boolean
3317 ):
3318 return self.process(element.element, **kw)
3319 else:
3320 return "%s = 1" % self.process(element.element, **kw)
3321
3322 def visit_is_false_unary_operator(self, element, operator, **kw):
3323 if (
3324 element._is_implicitly_boolean
3325 or self.dialect.supports_native_boolean
3326 ):
3327 return "NOT %s" % self.process(element.element, **kw)
3328 else:
3329 return "%s = 0" % self.process(element.element, **kw)
3330
3331 def visit_not_match_op_binary(self, binary, operator, **kw):
3332 return "NOT %s" % self.visit_binary(
3333 binary, override_operator=operators.match_op
3334 )
3335
3336 def visit_not_in_op_binary(self, binary, operator, **kw):
3337 # The brackets are required in the NOT IN operation because the empty
3338 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3339 # The presence of the OR makes the brackets required.
3340 return "(%s)" % self._generate_generic_binary(
3341 binary, OPERATORS[operator], **kw
3342 )
3343
3344 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3345 if expand_op is operators.not_in_op:
3346 if len(type_) > 1:
3347 return "(%s)) OR (1 = 1" % (
3348 ", ".join("NULL" for element in type_)
3349 )
3350 else:
3351 return "NULL) OR (1 = 1"
3352 elif expand_op is operators.in_op:
3353 if len(type_) > 1:
3354 return "(%s)) AND (1 != 1" % (
3355 ", ".join("NULL" for element in type_)
3356 )
3357 else:
3358 return "NULL) AND (1 != 1"
3359 else:
3360 return self.visit_empty_set_expr(type_)
3361
3362 def visit_empty_set_expr(self, element_types, **kw):
3363 raise NotImplementedError(
3364 "Dialect '%s' does not support empty set expression."
3365 % self.dialect.name
3366 )
3367
3368 def _literal_execute_expanding_parameter_literal_binds(
3369 self, parameter, values, bind_expression_template=None
3370 ):
3371 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3372
3373 if not values:
3374 # empty IN expression. note we don't need to use
3375 # bind_expression_template here because there are no
3376 # expressions to render.
3377
3378 if typ_dialect_impl._is_tuple_type:
3379 replacement_expression = (
3380 "VALUES " if self.dialect.tuple_in_values else ""
3381 ) + self.visit_empty_set_op_expr(
3382 parameter.type.types, parameter.expand_op
3383 )
3384
3385 else:
3386 replacement_expression = self.visit_empty_set_op_expr(
3387 [parameter.type], parameter.expand_op
3388 )
3389
3390 elif typ_dialect_impl._is_tuple_type or (
3391 typ_dialect_impl._isnull
3392 and isinstance(values[0], collections_abc.Sequence)
3393 and not isinstance(values[0], (str, bytes))
3394 ):
3395 if typ_dialect_impl._has_bind_expression:
3396 raise NotImplementedError(
3397 "bind_expression() on TupleType not supported with "
3398 "literal_binds"
3399 )
3400
3401 replacement_expression = (
3402 "VALUES " if self.dialect.tuple_in_values else ""
3403 ) + ", ".join(
3404 "(%s)"
3405 % (
3406 ", ".join(
3407 self.render_literal_value(value, param_type)
3408 for value, param_type in zip(
3409 tuple_element, parameter.type.types
3410 )
3411 )
3412 )
3413 for i, tuple_element in enumerate(values)
3414 )
3415 else:
3416 if bind_expression_template:
3417 post_compile_pattern = self._post_compile_pattern
3418 m = post_compile_pattern.search(bind_expression_template)
3419 assert m and m.group(
3420 2
3421 ), "unexpected format for expanding parameter"
3422
3423 tok = m.group(2).split("~~")
3424 be_left, be_right = tok[1], tok[3]
3425 replacement_expression = ", ".join(
3426 "%s%s%s"
3427 % (
3428 be_left,
3429 self.render_literal_value(value, parameter.type),
3430 be_right,
3431 )
3432 for value in values
3433 )
3434 else:
3435 replacement_expression = ", ".join(
3436 self.render_literal_value(value, parameter.type)
3437 for value in values
3438 )
3439
3440 return (), replacement_expression
3441
3442 def _literal_execute_expanding_parameter(self, name, parameter, values):
3443 if parameter.literal_execute:
3444 return self._literal_execute_expanding_parameter_literal_binds(
3445 parameter, values
3446 )
3447
3448 dialect = self.dialect
3449 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3450
3451 if self._numeric_binds:
3452 bind_template = self.compilation_bindtemplate
3453 else:
3454 bind_template = self.bindtemplate
3455
3456 if (
3457 self.dialect._bind_typing_render_casts
3458 and typ_dialect_impl.render_bind_cast
3459 ):
3460
3461 def _render_bindtemplate(name):
3462 return self.render_bind_cast(
3463 parameter.type,
3464 typ_dialect_impl,
3465 bind_template % {"name": name},
3466 )
3467
3468 else:
3469
3470 def _render_bindtemplate(name):
3471 return bind_template % {"name": name}
3472
3473 if not values:
3474 to_update = []
3475 if typ_dialect_impl._is_tuple_type:
3476 replacement_expression = self.visit_empty_set_op_expr(
3477 parameter.type.types, parameter.expand_op
3478 )
3479 else:
3480 replacement_expression = self.visit_empty_set_op_expr(
3481 [parameter.type], parameter.expand_op
3482 )
3483
3484 elif typ_dialect_impl._is_tuple_type or (
3485 typ_dialect_impl._isnull
3486 and isinstance(values[0], collections_abc.Sequence)
3487 and not isinstance(values[0], (str, bytes))
3488 ):
3489 assert not typ_dialect_impl._is_array
3490 to_update = [
3491 ("%s_%s_%s" % (name, i, j), value)
3492 for i, tuple_element in enumerate(values, 1)
3493 for j, value in enumerate(tuple_element, 1)
3494 ]
3495
3496 replacement_expression = (
3497 "VALUES " if dialect.tuple_in_values else ""
3498 ) + ", ".join(
3499 "(%s)"
3500 % (
3501 ", ".join(
3502 _render_bindtemplate(
3503 to_update[i * len(tuple_element) + j][0]
3504 )
3505 for j, value in enumerate(tuple_element)
3506 )
3507 )
3508 for i, tuple_element in enumerate(values)
3509 )
3510 else:
3511 to_update = [
3512 ("%s_%s" % (name, i), value)
3513 for i, value in enumerate(values, 1)
3514 ]
3515 replacement_expression = ", ".join(
3516 _render_bindtemplate(key) for key, value in to_update
3517 )
3518
3519 return to_update, replacement_expression
3520
3521 def visit_binary(
3522 self,
3523 binary,
3524 override_operator=None,
3525 eager_grouping=False,
3526 from_linter=None,
3527 lateral_from_linter=None,
3528 **kw,
3529 ):
3530 if from_linter and operators.is_comparison(binary.operator):
3531 if lateral_from_linter is not None:
3532 enclosing_lateral = kw["enclosing_lateral"]
3533 lateral_from_linter.edges.update(
3534 itertools.product(
3535 _de_clone(
3536 binary.left._from_objects + [enclosing_lateral]
3537 ),
3538 _de_clone(
3539 binary.right._from_objects + [enclosing_lateral]
3540 ),
3541 )
3542 )
3543 else:
3544 from_linter.edges.update(
3545 itertools.product(
3546 _de_clone(binary.left._from_objects),
3547 _de_clone(binary.right._from_objects),
3548 )
3549 )
3550
3551 # don't allow "? = ?" to render
3552 if (
3553 self.ansi_bind_rules
3554 and isinstance(binary.left, elements.BindParameter)
3555 and isinstance(binary.right, elements.BindParameter)
3556 ):
3557 kw["literal_execute"] = True
3558
3559 operator_ = override_operator or binary.operator
3560 disp = self._get_operator_dispatch(operator_, "binary", None)
3561 if disp:
3562 return disp(binary, operator_, **kw)
3563 else:
3564 try:
3565 opstring = OPERATORS[operator_]
3566 except KeyError as err:
3567 raise exc.UnsupportedCompilationError(self, operator_) from err
3568 else:
3569 return self._generate_generic_binary(
3570 binary,
3571 opstring,
3572 from_linter=from_linter,
3573 lateral_from_linter=lateral_from_linter,
3574 **kw,
3575 )
3576
3577 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3578 return self.process(element.sql_function, **kw)
3579
3580 def visit_mod_binary(self, binary, operator, **kw):
3581 if self.preparer._double_percents:
3582 return (
3583 self.process(binary.left, **kw)
3584 + " %% "
3585 + self.process(binary.right, **kw)
3586 )
3587 else:
3588 return (
3589 self.process(binary.left, **kw)
3590 + " % "
3591 + self.process(binary.right, **kw)
3592 )
3593
3594 def visit_custom_op_binary(self, element, operator, **kw):
3595 if operator.visit_name:
3596 disp = self._get_custom_operator_dispatch(operator, "binary")
3597 if disp:
3598 return disp(element, operator, **kw)
3599
3600 kw["eager_grouping"] = operator.eager_grouping
3601 return self._generate_generic_binary(
3602 element,
3603 " " + self.escape_literal_column(operator.opstring) + " ",
3604 **kw,
3605 )
3606
3607 def visit_custom_op_unary_operator(self, element, operator, **kw):
3608 if operator.visit_name:
3609 disp = self._get_custom_operator_dispatch(operator, "unary")
3610 if disp:
3611 return disp(element, operator, **kw)
3612
3613 return self._generate_generic_unary_operator(
3614 element, self.escape_literal_column(operator.opstring) + " ", **kw
3615 )
3616
3617 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3618 if operator.visit_name:
3619 disp = self._get_custom_operator_dispatch(operator, "unary")
3620 if disp:
3621 return disp(element, operator, **kw)
3622
3623 return self._generate_generic_unary_modifier(
3624 element, " " + self.escape_literal_column(operator.opstring), **kw
3625 )
3626
3627 def _generate_generic_binary(
3628 self,
3629 binary: BinaryExpression[Any],
3630 opstring: str,
3631 eager_grouping: bool = False,
3632 **kw: Any,
3633 ) -> str:
3634 _in_operator_expression = kw.get("_in_operator_expression", False)
3635
3636 kw["_in_operator_expression"] = True
3637 kw["_binary_op"] = binary.operator
3638 text = (
3639 binary.left._compiler_dispatch(
3640 self, eager_grouping=eager_grouping, **kw
3641 )
3642 + opstring
3643 + binary.right._compiler_dispatch(
3644 self, eager_grouping=eager_grouping, **kw
3645 )
3646 )
3647
3648 if _in_operator_expression and eager_grouping:
3649 text = "(%s)" % text
3650 return text
3651
3652 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3653 return opstring + unary.element._compiler_dispatch(self, **kw)
3654
3655 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3656 return unary.element._compiler_dispatch(self, **kw) + opstring
3657
3658 @util.memoized_property
3659 def _like_percent_literal(self):
3660 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3661
3662 def visit_ilike_case_insensitive_operand(self, element, **kw):
3663 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3664
3665 def visit_contains_op_binary(self, binary, operator, **kw):
3666 binary = binary._clone()
3667 percent = self._like_percent_literal
3668 binary.right = percent.concat(binary.right).concat(percent)
3669 return self.visit_like_op_binary(binary, operator, **kw)
3670
3671 def visit_not_contains_op_binary(self, binary, operator, **kw):
3672 binary = binary._clone()
3673 percent = self._like_percent_literal
3674 binary.right = percent.concat(binary.right).concat(percent)
3675 return self.visit_not_like_op_binary(binary, operator, **kw)
3676
3677 def visit_icontains_op_binary(self, binary, operator, **kw):
3678 binary = binary._clone()
3679 percent = self._like_percent_literal
3680 binary.left = ilike_case_insensitive(binary.left)
3681 binary.right = percent.concat(
3682 ilike_case_insensitive(binary.right)
3683 ).concat(percent)
3684 return self.visit_ilike_op_binary(binary, operator, **kw)
3685
3686 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3687 binary = binary._clone()
3688 percent = self._like_percent_literal
3689 binary.left = ilike_case_insensitive(binary.left)
3690 binary.right = percent.concat(
3691 ilike_case_insensitive(binary.right)
3692 ).concat(percent)
3693 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3694
3695 def visit_startswith_op_binary(self, binary, operator, **kw):
3696 binary = binary._clone()
3697 percent = self._like_percent_literal
3698 binary.right = percent._rconcat(binary.right)
3699 return self.visit_like_op_binary(binary, operator, **kw)
3700
3701 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3702 binary = binary._clone()
3703 percent = self._like_percent_literal
3704 binary.right = percent._rconcat(binary.right)
3705 return self.visit_not_like_op_binary(binary, operator, **kw)
3706
3707 def visit_istartswith_op_binary(self, binary, operator, **kw):
3708 binary = binary._clone()
3709 percent = self._like_percent_literal
3710 binary.left = ilike_case_insensitive(binary.left)
3711 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3712 return self.visit_ilike_op_binary(binary, operator, **kw)
3713
3714 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3715 binary = binary._clone()
3716 percent = self._like_percent_literal
3717 binary.left = ilike_case_insensitive(binary.left)
3718 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3719 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3720
3721 def visit_endswith_op_binary(self, binary, operator, **kw):
3722 binary = binary._clone()
3723 percent = self._like_percent_literal
3724 binary.right = percent.concat(binary.right)
3725 return self.visit_like_op_binary(binary, operator, **kw)
3726
3727 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3728 binary = binary._clone()
3729 percent = self._like_percent_literal
3730 binary.right = percent.concat(binary.right)
3731 return self.visit_not_like_op_binary(binary, operator, **kw)
3732
3733 def visit_iendswith_op_binary(self, binary, operator, **kw):
3734 binary = binary._clone()
3735 percent = self._like_percent_literal
3736 binary.left = ilike_case_insensitive(binary.left)
3737 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3738 return self.visit_ilike_op_binary(binary, operator, **kw)
3739
3740 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3741 binary = binary._clone()
3742 percent = self._like_percent_literal
3743 binary.left = ilike_case_insensitive(binary.left)
3744 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3745 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3746
3747 def visit_like_op_binary(self, binary, operator, **kw):
3748 escape = binary.modifiers.get("escape", None)
3749
3750 return "%s LIKE %s" % (
3751 binary.left._compiler_dispatch(self, **kw),
3752 binary.right._compiler_dispatch(self, **kw),
3753 ) + (
3754 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3755 if escape is not None
3756 else ""
3757 )
3758
3759 def visit_not_like_op_binary(self, binary, operator, **kw):
3760 escape = binary.modifiers.get("escape", None)
3761 return "%s NOT LIKE %s" % (
3762 binary.left._compiler_dispatch(self, **kw),
3763 binary.right._compiler_dispatch(self, **kw),
3764 ) + (
3765 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3766 if escape is not None
3767 else ""
3768 )
3769
3770 def visit_ilike_op_binary(self, binary, operator, **kw):
3771 if operator is operators.ilike_op:
3772 binary = binary._clone()
3773 binary.left = ilike_case_insensitive(binary.left)
3774 binary.right = ilike_case_insensitive(binary.right)
3775 # else we assume ilower() has been applied
3776
3777 return self.visit_like_op_binary(binary, operator, **kw)
3778
3779 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3780 if operator is operators.not_ilike_op:
3781 binary = binary._clone()
3782 binary.left = ilike_case_insensitive(binary.left)
3783 binary.right = ilike_case_insensitive(binary.right)
3784 # else we assume ilower() has been applied
3785
3786 return self.visit_not_like_op_binary(binary, operator, **kw)
3787
3788 def visit_between_op_binary(self, binary, operator, **kw):
3789 symmetric = binary.modifiers.get("symmetric", False)
3790 return self._generate_generic_binary(
3791 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3792 )
3793
3794 def visit_not_between_op_binary(self, binary, operator, **kw):
3795 symmetric = binary.modifiers.get("symmetric", False)
3796 return self._generate_generic_binary(
3797 binary,
3798 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3799 **kw,
3800 )
3801
3802 def visit_regexp_match_op_binary(
3803 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3804 ) -> str:
3805 raise exc.CompileError(
3806 "%s dialect does not support regular expressions"
3807 % self.dialect.name
3808 )
3809
3810 def visit_not_regexp_match_op_binary(
3811 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3812 ) -> str:
3813 raise exc.CompileError(
3814 "%s dialect does not support regular expressions"
3815 % self.dialect.name
3816 )
3817
3818 def visit_regexp_replace_op_binary(
3819 self, binary: BinaryExpression[Any], operator: Any, **kw: Any
3820 ) -> str:
3821 raise exc.CompileError(
3822 "%s dialect does not support regular expression replacements"
3823 % self.dialect.name
3824 )
3825
3826 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw):
3827 if bindmarkers is None:
3828 raise exc.CompileError(
3829 "DML target objects may only be used with "
3830 "compiled INSERT or UPDATE statements"
3831 )
3832
3833 bindmarkers[element.column.key] = element
3834 return f"__BINDMARKER_~~{element.column.key}~~"
3835
3836 def visit_bindparam(
3837 self,
3838 bindparam,
3839 within_columns_clause=False,
3840 literal_binds=False,
3841 skip_bind_expression=False,
3842 literal_execute=False,
3843 render_postcompile=False,
3844 **kwargs,
3845 ):
3846
3847 if not skip_bind_expression:
3848 impl = bindparam.type.dialect_impl(self.dialect)
3849 if impl._has_bind_expression:
3850 bind_expression = impl.bind_expression(bindparam)
3851 wrapped = self.process(
3852 bind_expression,
3853 skip_bind_expression=True,
3854 within_columns_clause=within_columns_clause,
3855 literal_binds=literal_binds and not bindparam.expanding,
3856 literal_execute=literal_execute,
3857 render_postcompile=render_postcompile,
3858 **kwargs,
3859 )
3860 if bindparam.expanding:
3861 # for postcompile w/ expanding, move the "wrapped" part
3862 # of this into the inside
3863
3864 m = re.match(
3865 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3866 )
3867 assert m, "unexpected format for expanding parameter"
3868 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3869 m.group(2),
3870 m.group(1),
3871 m.group(3),
3872 )
3873
3874 if literal_binds:
3875 ret = self.render_literal_bindparam(
3876 bindparam,
3877 within_columns_clause=True,
3878 bind_expression_template=wrapped,
3879 **kwargs,
3880 )
3881 return f"({ret})"
3882
3883 return wrapped
3884
3885 if not literal_binds:
3886 literal_execute = (
3887 literal_execute
3888 or bindparam.literal_execute
3889 or (within_columns_clause and self.ansi_bind_rules)
3890 )
3891 post_compile = literal_execute or bindparam.expanding
3892 else:
3893 post_compile = False
3894
3895 if literal_binds:
3896 ret = self.render_literal_bindparam(
3897 bindparam, within_columns_clause=True, **kwargs
3898 )
3899 if bindparam.expanding:
3900 ret = f"({ret})"
3901 return ret
3902
3903 name = self._truncate_bindparam(bindparam)
3904
3905 if name in self.binds:
3906 existing = self.binds[name]
3907 if existing is not bindparam:
3908 if (
3909 (existing.unique or bindparam.unique)
3910 and not existing.proxy_set.intersection(
3911 bindparam.proxy_set
3912 )
3913 and not existing._cloned_set.intersection(
3914 bindparam._cloned_set
3915 )
3916 ):
3917 raise exc.CompileError(
3918 "Bind parameter '%s' conflicts with "
3919 "unique bind parameter of the same name" % name
3920 )
3921 elif existing.expanding != bindparam.expanding:
3922 raise exc.CompileError(
3923 "Can't reuse bound parameter name '%s' in both "
3924 "'expanding' (e.g. within an IN expression) and "
3925 "non-expanding contexts. If this parameter is to "
3926 "receive a list/array value, set 'expanding=True' on "
3927 "it for expressions that aren't IN, otherwise use "
3928 "a different parameter name." % (name,)
3929 )
3930 elif existing._is_crud or bindparam._is_crud:
3931 if existing._is_crud and bindparam._is_crud:
3932 # TODO: this condition is not well understood.
3933 # see tests in test/sql/test_update.py
3934 raise exc.CompileError(
3935 "Encountered unsupported case when compiling an "
3936 "INSERT or UPDATE statement. If this is a "
3937 "multi-table "
3938 "UPDATE statement, please provide string-named "
3939 "arguments to the "
3940 "values() method with distinct names; support for "
3941 "multi-table UPDATE statements that "
3942 "target multiple tables for UPDATE is very "
3943 "limited",
3944 )
3945 else:
3946 raise exc.CompileError(
3947 f"bindparam() name '{bindparam.key}' is reserved "
3948 "for automatic usage in the VALUES or SET "
3949 "clause of this "
3950 "insert/update statement. Please use a "
3951 "name other than column name when using "
3952 "bindparam() "
3953 "with insert() or update() (for example, "
3954 f"'b_{bindparam.key}')."
3955 )
3956
3957 self.binds[bindparam.key] = self.binds[name] = bindparam
3958
3959 # if we are given a cache key that we're going to match against,
3960 # relate the bindparam here to one that is most likely present
3961 # in the "extracted params" portion of the cache key. this is used
3962 # to set up a positional mapping that is used to determine the
3963 # correct parameters for a subsequent use of this compiled with
3964 # a different set of parameter values. here, we accommodate for
3965 # parameters that may have been cloned both before and after the cache
3966 # key was been generated.
3967 ckbm_tuple = self._cache_key_bind_match
3968
3969 if ckbm_tuple:
3970 ckbm, cksm = ckbm_tuple
3971 for bp in bindparam._cloned_set:
3972 if bp.key in cksm:
3973 cb = cksm[bp.key]
3974 ckbm[cb].append(bindparam)
3975
3976 if bindparam.isoutparam:
3977 self.has_out_parameters = True
3978
3979 if post_compile:
3980 if render_postcompile:
3981 self._render_postcompile = True
3982
3983 if literal_execute:
3984 self.literal_execute_params |= {bindparam}
3985 else:
3986 self.post_compile_params |= {bindparam}
3987
3988 ret = self.bindparam_string(
3989 name,
3990 post_compile=post_compile,
3991 expanding=bindparam.expanding,
3992 bindparam_type=bindparam.type,
3993 **kwargs,
3994 )
3995
3996 if bindparam.expanding:
3997 ret = f"({ret})"
3998
3999 return ret
4000
4001 def render_bind_cast(self, type_, dbapi_type, sqltext):
4002 raise NotImplementedError()
4003
4004 def render_literal_bindparam(
4005 self,
4006 bindparam,
4007 render_literal_value=NO_ARG,
4008 bind_expression_template=None,
4009 **kw,
4010 ):
4011 if render_literal_value is not NO_ARG:
4012 value = render_literal_value
4013 else:
4014 if bindparam.value is None and bindparam.callable is None:
4015 op = kw.get("_binary_op", None)
4016 if op and op not in (operators.is_, operators.is_not):
4017 util.warn_limited(
4018 "Bound parameter '%s' rendering literal NULL in a SQL "
4019 "expression; comparisons to NULL should not use "
4020 "operators outside of 'is' or 'is not'",
4021 (bindparam.key,),
4022 )
4023 return self.process(sqltypes.NULLTYPE, **kw)
4024 value = bindparam.effective_value
4025
4026 if bindparam.expanding:
4027 leep = self._literal_execute_expanding_parameter_literal_binds
4028 to_update, replacement_expr = leep(
4029 bindparam,
4030 value,
4031 bind_expression_template=bind_expression_template,
4032 )
4033 return replacement_expr
4034 else:
4035 return self.render_literal_value(value, bindparam.type)
4036
4037 def render_literal_value(
4038 self, value: Any, type_: sqltypes.TypeEngine[Any]
4039 ) -> str:
4040 """Render the value of a bind parameter as a quoted literal.
4041
4042 This is used for statement sections that do not accept bind parameters
4043 on the target driver/database.
4044
4045 This should be implemented by subclasses using the quoting services
4046 of the DBAPI.
4047
4048 """
4049
4050 if value is None and not type_.should_evaluate_none:
4051 # issue #10535 - handle NULL in the compiler without placing
4052 # this onto each type, except for "evaluate None" types
4053 # (e.g. JSON)
4054 return self.process(elements.Null._instance())
4055
4056 processor = type_._cached_literal_processor(self.dialect)
4057 if processor:
4058 try:
4059 return processor(value)
4060 except Exception as e:
4061 raise exc.CompileError(
4062 f"Could not render literal value "
4063 f'"{sql_util._repr_single_value(value)}" '
4064 f"with datatype "
4065 f"{type_}; see parent stack trace for "
4066 "more detail."
4067 ) from e
4068
4069 else:
4070 raise exc.CompileError(
4071 f"No literal value renderer is available for literal value "
4072 f'"{sql_util._repr_single_value(value)}" '
4073 f"with datatype {type_}"
4074 )
4075
4076 def _truncate_bindparam(self, bindparam):
4077 if bindparam in self.bind_names:
4078 return self.bind_names[bindparam]
4079
4080 bind_name = bindparam.key
4081 if isinstance(bind_name, elements._truncated_label):
4082 bind_name = self._truncated_identifier("bindparam", bind_name)
4083
4084 # add to bind_names for translation
4085 self.bind_names[bindparam] = bind_name
4086
4087 return bind_name
4088
4089 def _truncated_identifier(
4090 self, ident_class: str, name: _truncated_label
4091 ) -> str:
4092 if (ident_class, name) in self.truncated_names:
4093 return self.truncated_names[(ident_class, name)]
4094
4095 anonname = name.apply_map(self.anon_map)
4096
4097 if len(anonname) > self.label_length - 6:
4098 counter = self._truncated_counters.get(ident_class, 1)
4099 truncname = (
4100 anonname[0 : max(self.label_length - 6, 0)]
4101 + "_"
4102 + hex(counter)[2:]
4103 )
4104 self._truncated_counters[ident_class] = counter + 1
4105 else:
4106 truncname = anonname
4107 self.truncated_names[(ident_class, name)] = truncname
4108 return truncname
4109
4110 def _anonymize(self, name: str) -> str:
4111 return name % self.anon_map
4112
4113 def bindparam_string(
4114 self,
4115 name: str,
4116 post_compile: bool = False,
4117 expanding: bool = False,
4118 escaped_from: Optional[str] = None,
4119 bindparam_type: Optional[TypeEngine[Any]] = None,
4120 accumulate_bind_names: Optional[Set[str]] = None,
4121 visited_bindparam: Optional[List[str]] = None,
4122 **kw: Any,
4123 ) -> str:
4124 # TODO: accumulate_bind_names is passed by crud.py to gather
4125 # names on a per-value basis, visited_bindparam is passed by
4126 # visit_insert() to collect all parameters in the statement.
4127 # see if this gathering can be simplified somehow
4128 if accumulate_bind_names is not None:
4129 accumulate_bind_names.add(name)
4130 if visited_bindparam is not None:
4131 visited_bindparam.append(name)
4132
4133 if not escaped_from:
4134 if self._bind_translate_re.search(name):
4135 # not quite the translate use case as we want to
4136 # also get a quick boolean if we even found
4137 # unusual characters in the name
4138 new_name = self._bind_translate_re.sub(
4139 lambda m: self._bind_translate_chars[m.group(0)],
4140 name,
4141 )
4142 escaped_from = name
4143 name = new_name
4144
4145 if escaped_from:
4146 self.escaped_bind_names = self.escaped_bind_names.union(
4147 {escaped_from: name}
4148 )
4149 if post_compile:
4150 ret = "__[POSTCOMPILE_%s]" % name
4151 if expanding:
4152 # for expanding, bound parameters or literal values will be
4153 # rendered per item
4154 return ret
4155
4156 # otherwise, for non-expanding "literal execute", apply
4157 # bind casts as determined by the datatype
4158 if bindparam_type is not None:
4159 type_impl = bindparam_type._unwrapped_dialect_impl(
4160 self.dialect
4161 )
4162 if type_impl.render_literal_cast:
4163 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4164 return ret
4165 elif self.state is CompilerState.COMPILING:
4166 ret = self.compilation_bindtemplate % {"name": name}
4167 else:
4168 ret = self.bindtemplate % {"name": name}
4169
4170 if (
4171 bindparam_type is not None
4172 and self.dialect._bind_typing_render_casts
4173 ):
4174 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
4175 if type_impl.render_bind_cast:
4176 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
4177
4178 return ret
4179
4180 def _dispatch_independent_ctes(self, stmt, kw):
4181 local_kw = kw.copy()
4182 local_kw.pop("cte_opts", None)
4183 for cte, opt in zip(
4184 stmt._independent_ctes, stmt._independent_ctes_opts
4185 ):
4186 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
4187
4188 def visit_cte(
4189 self,
4190 cte: CTE,
4191 asfrom: bool = False,
4192 ashint: bool = False,
4193 fromhints: Optional[_FromHintsType] = None,
4194 visiting_cte: Optional[CTE] = None,
4195 from_linter: Optional[FromLinter] = None,
4196 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
4197 **kwargs: Any,
4198 ) -> Optional[str]:
4199 self_ctes = self._init_cte_state()
4200 assert self_ctes is self.ctes
4201
4202 kwargs["visiting_cte"] = cte
4203
4204 cte_name = cte.name
4205
4206 if isinstance(cte_name, elements._truncated_label):
4207 cte_name = self._truncated_identifier("alias", cte_name)
4208
4209 is_new_cte = True
4210 embedded_in_current_named_cte = False
4211
4212 _reference_cte = cte._get_reference_cte()
4213
4214 nesting = cte.nesting or cte_opts.nesting
4215
4216 # check for CTE already encountered
4217 if _reference_cte in self.level_name_by_cte:
4218 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4219 _reference_cte
4220 ]
4221 assert _ == cte_name
4222
4223 cte_level_name = (cte_level, cte_name)
4224 existing_cte = self.ctes_by_level_name[cte_level_name]
4225
4226 # check if we are receiving it here with a specific
4227 # "nest_here" location; if so, move it to this location
4228
4229 if cte_opts.nesting:
4230 if existing_cte_opts.nesting:
4231 raise exc.CompileError(
4232 "CTE is stated as 'nest_here' in "
4233 "more than one location"
4234 )
4235
4236 old_level_name = (cte_level, cte_name)
4237 cte_level = len(self.stack) if nesting else 1
4238 cte_level_name = new_level_name = (cte_level, cte_name)
4239
4240 del self.ctes_by_level_name[old_level_name]
4241 self.ctes_by_level_name[new_level_name] = existing_cte
4242 self.level_name_by_cte[_reference_cte] = new_level_name + (
4243 cte_opts,
4244 )
4245
4246 else:
4247 cte_level = len(self.stack) if nesting else 1
4248 cte_level_name = (cte_level, cte_name)
4249
4250 if cte_level_name in self.ctes_by_level_name:
4251 existing_cte = self.ctes_by_level_name[cte_level_name]
4252 else:
4253 existing_cte = None
4254
4255 if existing_cte is not None:
4256 embedded_in_current_named_cte = visiting_cte is existing_cte
4257
4258 # we've generated a same-named CTE that we are enclosed in,
4259 # or this is the same CTE. just return the name.
4260 if cte is existing_cte._restates or cte is existing_cte:
4261 is_new_cte = False
4262 elif existing_cte is cte._restates:
4263 # we've generated a same-named CTE that is
4264 # enclosed in us - we take precedence, so
4265 # discard the text for the "inner".
4266 del self_ctes[existing_cte]
4267
4268 existing_cte_reference_cte = existing_cte._get_reference_cte()
4269
4270 assert existing_cte_reference_cte is _reference_cte
4271 assert existing_cte_reference_cte is existing_cte
4272
4273 del self.level_name_by_cte[existing_cte_reference_cte]
4274 else:
4275 if (
4276 # if the two CTEs have the same hash, which we expect
4277 # here means that one/both is an annotated of the other
4278 (hash(cte) == hash(existing_cte))
4279 # or...
4280 or (
4281 (
4282 # if they are clones, i.e. they came from the ORM
4283 # or some other visit method
4284 cte._is_clone_of is not None
4285 or existing_cte._is_clone_of is not None
4286 )
4287 # and are deep-copy identical
4288 and cte.compare(existing_cte)
4289 )
4290 ):
4291 # then consider these two CTEs the same
4292 is_new_cte = False
4293 else:
4294 # otherwise these are two CTEs that either will render
4295 # differently, or were indicated separately by the user,
4296 # with the same name
4297 raise exc.CompileError(
4298 "Multiple, unrelated CTEs found with "
4299 "the same name: %r" % cte_name
4300 )
4301
4302 if not asfrom and not is_new_cte:
4303 return None
4304
4305 if cte._cte_alias is not None:
4306 pre_alias_cte = cte._cte_alias
4307 cte_pre_alias_name = cte._cte_alias.name
4308 if isinstance(cte_pre_alias_name, elements._truncated_label):
4309 cte_pre_alias_name = self._truncated_identifier(
4310 "alias", cte_pre_alias_name
4311 )
4312 else:
4313 pre_alias_cte = cte
4314 cte_pre_alias_name = None
4315
4316 if is_new_cte:
4317 self.ctes_by_level_name[cte_level_name] = cte
4318 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4319 cte_opts,
4320 )
4321
4322 if pre_alias_cte not in self.ctes:
4323 self.visit_cte(pre_alias_cte, **kwargs)
4324
4325 if not cte_pre_alias_name and cte not in self_ctes:
4326 if cte.recursive:
4327 self.ctes_recursive = True
4328 text = self.preparer.format_alias(cte, cte_name)
4329 if cte.recursive or cte.element.name_cte_columns:
4330 col_source = cte.element
4331
4332 # TODO: can we get at the .columns_plus_names collection
4333 # that is already (or will be?) generated for the SELECT
4334 # rather than calling twice?
4335 recur_cols = [
4336 # TODO: proxy_name is not technically safe,
4337 # see test_cte->
4338 # test_with_recursive_no_name_currently_buggy. not
4339 # clear what should be done with such a case
4340 fallback_label_name or proxy_name
4341 for (
4342 _,
4343 proxy_name,
4344 fallback_label_name,
4345 c,
4346 repeated,
4347 ) in (col_source._generate_columns_plus_names(True))
4348 if not repeated
4349 ]
4350
4351 text += "(%s)" % (
4352 ", ".join(
4353 self.preparer.format_label_name(
4354 ident, anon_map=self.anon_map
4355 )
4356 for ident in recur_cols
4357 )
4358 )
4359
4360 assert kwargs.get("subquery", False) is False
4361
4362 if not self.stack:
4363 # toplevel, this is a stringify of the
4364 # cte directly. just compile the inner
4365 # the way alias() does.
4366 return cte.element._compiler_dispatch(
4367 self, asfrom=asfrom, **kwargs
4368 )
4369 else:
4370 prefixes = self._generate_prefixes(
4371 cte, cte._prefixes, **kwargs
4372 )
4373 inner = cte.element._compiler_dispatch(
4374 self, asfrom=True, **kwargs
4375 )
4376
4377 text += " AS %s\n(%s)" % (prefixes, inner)
4378
4379 if cte._suffixes:
4380 text += " " + self._generate_prefixes(
4381 cte, cte._suffixes, **kwargs
4382 )
4383
4384 self_ctes[cte] = text
4385
4386 if asfrom:
4387 if from_linter:
4388 from_linter.froms[cte._de_clone()] = cte_name
4389
4390 if not is_new_cte and embedded_in_current_named_cte:
4391 return self.preparer.format_alias(cte, cte_name)
4392
4393 if cte_pre_alias_name:
4394 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4395 if self.preparer._requires_quotes(cte_name):
4396 cte_name = self.preparer.quote(cte_name)
4397 text += self.get_render_as_alias_suffix(cte_name)
4398 return text # type: ignore[no-any-return]
4399 else:
4400 return self.preparer.format_alias(cte, cte_name)
4401
4402 return None
4403
4404 def visit_table_valued_alias(self, element, **kw):
4405 if element.joins_implicitly:
4406 kw["from_linter"] = None
4407 if element._is_lateral:
4408 return self.visit_lateral(element, **kw)
4409 else:
4410 return self.visit_alias(element, **kw)
4411
4412 def visit_table_valued_column(self, element, **kw):
4413 return self.visit_column(element, **kw)
4414
4415 def visit_alias(
4416 self,
4417 alias,
4418 asfrom=False,
4419 ashint=False,
4420 iscrud=False,
4421 fromhints=None,
4422 subquery=False,
4423 lateral=False,
4424 enclosing_alias=None,
4425 from_linter=None,
4426 within_tstring=False,
4427 **kwargs,
4428 ):
4429 if lateral:
4430 if "enclosing_lateral" not in kwargs:
4431 # if lateral is set and enclosing_lateral is not
4432 # present, we assume we are being called directly
4433 # from visit_lateral() and we need to set enclosing_lateral.
4434 assert alias._is_lateral
4435 kwargs["enclosing_lateral"] = alias
4436
4437 # for lateral objects, we track a second from_linter that is...
4438 # lateral! to the level above us.
4439 if (
4440 from_linter
4441 and "lateral_from_linter" not in kwargs
4442 and "enclosing_lateral" in kwargs
4443 ):
4444 kwargs["lateral_from_linter"] = from_linter
4445
4446 if enclosing_alias is not None and enclosing_alias.element is alias:
4447 inner = alias.element._compiler_dispatch(
4448 self,
4449 asfrom=asfrom,
4450 ashint=ashint,
4451 iscrud=iscrud,
4452 fromhints=fromhints,
4453 lateral=lateral,
4454 enclosing_alias=alias,
4455 **kwargs,
4456 )
4457 if subquery and (asfrom or lateral):
4458 inner = "(%s)" % (inner,)
4459 return inner
4460 else:
4461 kwargs["enclosing_alias"] = alias
4462
4463 if asfrom or ashint or within_tstring:
4464 if isinstance(alias.name, elements._truncated_label):
4465 alias_name = self._truncated_identifier("alias", alias.name)
4466 else:
4467 alias_name = alias.name
4468
4469 if ashint:
4470 return self.preparer.format_alias(alias, alias_name)
4471 elif asfrom or within_tstring:
4472 if from_linter:
4473 from_linter.froms[alias._de_clone()] = alias_name
4474
4475 inner = alias.element._compiler_dispatch(
4476 self, asfrom=True, lateral=lateral, **kwargs
4477 )
4478 if subquery:
4479 inner = "(%s)" % (inner,)
4480
4481 ret = inner + self.get_render_as_alias_suffix(
4482 self.preparer.format_alias(alias, alias_name)
4483 )
4484
4485 if alias._supports_derived_columns and alias._render_derived:
4486 ret += "(%s)" % (
4487 ", ".join(
4488 "%s%s"
4489 % (
4490 self.preparer.quote(col.name),
4491 (
4492 " %s"
4493 % self.dialect.type_compiler_instance.process(
4494 col.type, **kwargs
4495 )
4496 if alias._render_derived_w_types
4497 else ""
4498 ),
4499 )
4500 for col in alias.c
4501 )
4502 )
4503
4504 if fromhints and alias in fromhints:
4505 ret = self.format_from_hint_text(
4506 ret, alias, fromhints[alias], iscrud
4507 )
4508
4509 return ret
4510 else:
4511 # note we cancel the "subquery" flag here as well
4512 return alias.element._compiler_dispatch(
4513 self, lateral=lateral, **kwargs
4514 )
4515
4516 def visit_subquery(self, subquery, **kw):
4517 kw["subquery"] = True
4518 return self.visit_alias(subquery, **kw)
4519
4520 def visit_lateral(self, lateral_, **kw):
4521 kw["lateral"] = True
4522 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4523
4524 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4525 text = "%s TABLESAMPLE %s" % (
4526 self.visit_alias(tablesample, asfrom=True, **kw),
4527 tablesample._get_method()._compiler_dispatch(self, **kw),
4528 )
4529
4530 if tablesample.seed is not None:
4531 text += " REPEATABLE (%s)" % (
4532 tablesample.seed._compiler_dispatch(self, **kw)
4533 )
4534
4535 return text
4536
4537 def _render_values(self, element, **kw):
4538 kw.setdefault("literal_binds", element.literal_binds)
4539 tuples = ", ".join(
4540 self.process(
4541 elements.Tuple(
4542 types=element._column_types, *elem
4543 ).self_group(),
4544 **kw,
4545 )
4546 for chunk in element._data
4547 for elem in chunk
4548 )
4549 return f"VALUES {tuples}"
4550
4551 def visit_values(
4552 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw
4553 ):
4554
4555 if element._independent_ctes:
4556 self._dispatch_independent_ctes(element, kw)
4557
4558 v = self._render_values(element, **kw)
4559
4560 if element._unnamed:
4561 name = None
4562 elif isinstance(element.name, elements._truncated_label):
4563 name = self._truncated_identifier("values", element.name)
4564 else:
4565 name = element.name
4566
4567 if element._is_lateral:
4568 lateral = "LATERAL "
4569 else:
4570 lateral = ""
4571
4572 if asfrom:
4573 if from_linter:
4574 from_linter.froms[element._de_clone()] = (
4575 name if name is not None else "(unnamed VALUES element)"
4576 )
4577
4578 if visiting_cte is not None and visiting_cte.element is element:
4579 if element._is_lateral:
4580 raise exc.CompileError(
4581 "Can't use a LATERAL VALUES expression inside of a CTE"
4582 )
4583 elif name:
4584 kw["include_table"] = False
4585 v = "%s(%s)%s (%s)" % (
4586 lateral,
4587 v,
4588 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4589 (
4590 ", ".join(
4591 c._compiler_dispatch(self, **kw)
4592 for c in element.columns
4593 )
4594 ),
4595 )
4596 else:
4597 v = "%s(%s)" % (lateral, v)
4598 return v
4599
4600 def visit_scalar_values(self, element, **kw):
4601 return f"({self._render_values(element, **kw)})"
4602
4603 def get_render_as_alias_suffix(self, alias_name_text):
4604 return " AS " + alias_name_text
4605
4606 def _add_to_result_map(
4607 self,
4608 keyname: str,
4609 name: str,
4610 objects: Tuple[Any, ...],
4611 type_: TypeEngine[Any],
4612 ) -> None:
4613
4614 # note objects must be non-empty for cursor.py to handle the
4615 # collection properly
4616 assert objects
4617
4618 if keyname is None or keyname == "*":
4619 self._ordered_columns = False
4620 self._ad_hoc_textual = True
4621 if type_._is_tuple_type:
4622 raise exc.CompileError(
4623 "Most backends don't support SELECTing "
4624 "from a tuple() object. If this is an ORM query, "
4625 "consider using the Bundle object."
4626 )
4627 self._result_columns.append(
4628 ResultColumnsEntry(keyname, name, objects, type_)
4629 )
4630
4631 def _label_returning_column(
4632 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4633 ):
4634 """Render a column with necessary labels inside of a RETURNING clause.
4635
4636 This method is provided for individual dialects in place of calling
4637 the _label_select_column method directly, so that the two use cases
4638 of RETURNING vs. SELECT can be disambiguated going forward.
4639
4640 .. versionadded:: 1.4.21
4641
4642 """
4643 return self._label_select_column(
4644 None,
4645 column,
4646 populate_result_map,
4647 False,
4648 {} if column_clause_args is None else column_clause_args,
4649 **kw,
4650 )
4651
4652 def _label_select_column(
4653 self,
4654 select,
4655 column,
4656 populate_result_map,
4657 asfrom,
4658 column_clause_args,
4659 name=None,
4660 proxy_name=None,
4661 fallback_label_name=None,
4662 within_columns_clause=True,
4663 column_is_repeated=False,
4664 need_column_expressions=False,
4665 include_table=True,
4666 ):
4667 """produce labeled columns present in a select()."""
4668 impl = column.type.dialect_impl(self.dialect)
4669
4670 if impl._has_column_expression and (
4671 need_column_expressions or populate_result_map
4672 ):
4673 col_expr = impl.column_expression(column)
4674 else:
4675 col_expr = column
4676
4677 if populate_result_map:
4678 # pass an "add_to_result_map" callable into the compilation
4679 # of embedded columns. this collects information about the
4680 # column as it will be fetched in the result and is coordinated
4681 # with cursor.description when the query is executed.
4682 add_to_result_map = self._add_to_result_map
4683
4684 # if the SELECT statement told us this column is a repeat,
4685 # wrap the callable with one that prevents the addition of the
4686 # targets
4687 if column_is_repeated:
4688 _add_to_result_map = add_to_result_map
4689
4690 def add_to_result_map(keyname, name, objects, type_):
4691 _add_to_result_map(keyname, name, (keyname,), type_)
4692
4693 # if we redefined col_expr for type expressions, wrap the
4694 # callable with one that adds the original column to the targets
4695 elif col_expr is not column:
4696 _add_to_result_map = add_to_result_map
4697
4698 def add_to_result_map(keyname, name, objects, type_):
4699 _add_to_result_map(
4700 keyname, name, (column,) + objects, type_
4701 )
4702
4703 else:
4704 add_to_result_map = None
4705
4706 # this method is used by some of the dialects for RETURNING,
4707 # which has different inputs. _label_returning_column was added
4708 # as the better target for this now however for 1.4 we will keep
4709 # _label_select_column directly compatible with this use case.
4710 # these assertions right now set up the current expected inputs
4711 assert within_columns_clause, (
4712 "_label_select_column is only relevant within "
4713 "the columns clause of a SELECT or RETURNING"
4714 )
4715 if isinstance(column, elements.Label):
4716 if col_expr is not column:
4717 result_expr = _CompileLabel(
4718 col_expr, column.name, alt_names=(column.element,)
4719 )
4720 else:
4721 result_expr = col_expr
4722
4723 elif name:
4724 # here, _columns_plus_names has determined there's an explicit
4725 # label name we need to use. this is the default for
4726 # tablenames_plus_columnnames as well as when columns are being
4727 # deduplicated on name
4728
4729 assert (
4730 proxy_name is not None
4731 ), "proxy_name is required if 'name' is passed"
4732
4733 result_expr = _CompileLabel(
4734 col_expr,
4735 name,
4736 alt_names=(
4737 proxy_name,
4738 # this is a hack to allow legacy result column lookups
4739 # to work as they did before; this goes away in 2.0.
4740 # TODO: this only seems to be tested indirectly
4741 # via test/orm/test_deprecations.py. should be a
4742 # resultset test for this
4743 column._tq_label,
4744 ),
4745 )
4746 else:
4747 # determine here whether this column should be rendered in
4748 # a labelled context or not, as we were given no required label
4749 # name from the caller. Here we apply heuristics based on the kind
4750 # of SQL expression involved.
4751
4752 if col_expr is not column:
4753 # type-specific expression wrapping the given column,
4754 # so we render a label
4755 render_with_label = True
4756 elif isinstance(column, elements.ColumnClause):
4757 # table-bound column, we render its name as a label if we are
4758 # inside of a subquery only
4759 render_with_label = (
4760 asfrom
4761 and not column.is_literal
4762 and column.table is not None
4763 )
4764 elif isinstance(column, elements.TextClause):
4765 render_with_label = False
4766 elif isinstance(column, elements.UnaryExpression):
4767 # unary expression. notes added as of #12681
4768 #
4769 # By convention, the visit_unary() method
4770 # itself does not add an entry to the result map, and relies
4771 # upon either the inner expression creating a result map
4772 # entry, or if not, by creating a label here that produces
4773 # the result map entry. Where that happens is based on whether
4774 # or not the element immediately inside the unary is a
4775 # NamedColumn subclass or not.
4776 #
4777 # Now, this also impacts how the SELECT is written; if
4778 # we decide to generate a label here, we get the usual
4779 # "~(x+y) AS anon_1" thing in the columns clause. If we
4780 # don't, we don't get an AS at all, we get like
4781 # "~table.column".
4782 #
4783 # But here is the important thing as of modernish (like 1.4)
4784 # versions of SQLAlchemy - **whether or not the AS <label>
4785 # is present in the statement is not actually important**.
4786 # We target result columns **positionally** for a fully
4787 # compiled ``Select()`` object; before 1.4 we needed those
4788 # labels to match in cursor.description etc etc but now it
4789 # really doesn't matter.
4790 # So really, we could set render_with_label True in all cases.
4791 # Or we could just have visit_unary() populate the result map
4792 # in all cases.
4793 #
4794 # What we're doing here is strictly trying to not rock the
4795 # boat too much with when we do/don't render "AS label";
4796 # labels being present helps in the edge cases that we
4797 # "fall back" to named cursor.description matching, labels
4798 # not being present for columns keeps us from having awkward
4799 # phrases like "SELECT DISTINCT table.x AS x".
4800 render_with_label = (
4801 (
4802 # exception case to detect if we render "not boolean"
4803 # as "not <col>" for native boolean or "<col> = 1"
4804 # for non-native boolean. this is controlled by
4805 # visit_is_<true|false>_unary_operator
4806 column.operator
4807 in (operators.is_false, operators.is_true)
4808 and not self.dialect.supports_native_boolean
4809 )
4810 or column._wraps_unnamed_column()
4811 or asfrom
4812 )
4813 elif (
4814 # general class of expressions that don't have a SQL-column
4815 # addressable name. includes scalar selects, bind parameters,
4816 # SQL functions, others
4817 not isinstance(column, elements.NamedColumn)
4818 # deeper check that indicates there's no natural "name" to
4819 # this element, which accommodates for custom SQL constructs
4820 # that might have a ".name" attribute (but aren't SQL
4821 # functions) but are not implementing this more recently added
4822 # base class. in theory the "NamedColumn" check should be
4823 # enough, however here we seek to maintain legacy behaviors
4824 # as well.
4825 and column._non_anon_label is None
4826 ):
4827 render_with_label = True
4828 else:
4829 render_with_label = False
4830
4831 if render_with_label:
4832 if not fallback_label_name:
4833 # used by the RETURNING case right now. we generate it
4834 # here as 3rd party dialects may be referring to
4835 # _label_select_column method directly instead of the
4836 # just-added _label_returning_column method
4837 assert not column_is_repeated
4838 fallback_label_name = column._anon_name_label
4839
4840 fallback_label_name = (
4841 elements._truncated_label(fallback_label_name)
4842 if not isinstance(
4843 fallback_label_name, elements._truncated_label
4844 )
4845 else fallback_label_name
4846 )
4847
4848 result_expr = _CompileLabel(
4849 col_expr, fallback_label_name, alt_names=(proxy_name,)
4850 )
4851 else:
4852 result_expr = col_expr
4853
4854 column_clause_args.update(
4855 within_columns_clause=within_columns_clause,
4856 add_to_result_map=add_to_result_map,
4857 include_table=include_table,
4858 within_tstring=False,
4859 )
4860 return result_expr._compiler_dispatch(self, **column_clause_args)
4861
4862 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4863 hinttext = self.get_from_hint_text(table, hint)
4864 if hinttext:
4865 sqltext += " " + hinttext
4866 return sqltext
4867
4868 def get_select_hint_text(self, byfroms):
4869 return None
4870
4871 def get_from_hint_text(
4872 self, table: FromClause, text: Optional[str]
4873 ) -> Optional[str]:
4874 return None
4875
4876 def get_crud_hint_text(self, table, text):
4877 return None
4878
4879 def get_statement_hint_text(self, hint_texts):
4880 return " ".join(hint_texts)
4881
4882 _default_stack_entry: _CompilerStackEntry
4883
4884 if not typing.TYPE_CHECKING:
4885 _default_stack_entry = util.immutabledict(
4886 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4887 )
4888
4889 def _display_froms_for_select(
4890 self, select_stmt, asfrom, lateral=False, **kw
4891 ):
4892 # utility method to help external dialects
4893 # get the correct from list for a select.
4894 # specifically the oracle dialect needs this feature
4895 # right now.
4896 toplevel = not self.stack
4897 entry = self._default_stack_entry if toplevel else self.stack[-1]
4898
4899 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4900
4901 correlate_froms = entry["correlate_froms"]
4902 asfrom_froms = entry["asfrom_froms"]
4903
4904 if asfrom and not lateral:
4905 froms = compile_state._get_display_froms(
4906 explicit_correlate_froms=correlate_froms.difference(
4907 asfrom_froms
4908 ),
4909 implicit_correlate_froms=(),
4910 )
4911 else:
4912 froms = compile_state._get_display_froms(
4913 explicit_correlate_froms=correlate_froms,
4914 implicit_correlate_froms=asfrom_froms,
4915 )
4916 return froms
4917
4918 translate_select_structure: Any = None
4919 """if not ``None``, should be a callable which accepts ``(select_stmt,
4920 **kw)`` and returns a select object. this is used for structural changes
4921 mostly to accommodate for LIMIT/OFFSET schemes
4922
4923 """
4924
4925 def visit_select(
4926 self,
4927 select_stmt,
4928 asfrom=False,
4929 insert_into=False,
4930 fromhints=None,
4931 compound_index=None,
4932 select_wraps_for=None,
4933 lateral=False,
4934 from_linter=None,
4935 **kwargs,
4936 ):
4937 assert select_wraps_for is None, (
4938 "SQLAlchemy 1.4 requires use of "
4939 "the translate_select_structure hook for structural "
4940 "translations of SELECT objects"
4941 )
4942 if self._collect_params:
4943 self._add_to_params(select_stmt)
4944
4945 # initial setup of SELECT. the compile_state_factory may now
4946 # be creating a totally different SELECT from the one that was
4947 # passed in. for ORM use this will convert from an ORM-state
4948 # SELECT to a regular "Core" SELECT. other composed operations
4949 # such as computation of joins will be performed.
4950
4951 kwargs["within_columns_clause"] = False
4952
4953 compile_state = select_stmt._compile_state_factory(
4954 select_stmt, self, **kwargs
4955 )
4956 kwargs["ambiguous_table_name_map"] = (
4957 compile_state._ambiguous_table_name_map
4958 )
4959
4960 select_stmt = compile_state.statement
4961
4962 toplevel = not self.stack
4963
4964 if toplevel and not self.compile_state:
4965 self.compile_state = compile_state
4966
4967 is_embedded_select = compound_index is not None or insert_into
4968
4969 # translate step for Oracle, SQL Server which often need to
4970 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4971 # other conditions
4972 if self.translate_select_structure:
4973 new_select_stmt = self.translate_select_structure(
4974 select_stmt, asfrom=asfrom, **kwargs
4975 )
4976
4977 # if SELECT was restructured, maintain a link to the originals
4978 # and assemble a new compile state
4979 if new_select_stmt is not select_stmt:
4980 compile_state_wraps_for = compile_state
4981 select_wraps_for = select_stmt
4982 select_stmt = new_select_stmt
4983
4984 compile_state = select_stmt._compile_state_factory(
4985 select_stmt, self, **kwargs
4986 )
4987 select_stmt = compile_state.statement
4988
4989 entry = self._default_stack_entry if toplevel else self.stack[-1]
4990
4991 populate_result_map = need_column_expressions = (
4992 toplevel
4993 or entry.get("need_result_map_for_compound", False)
4994 or entry.get("need_result_map_for_nested", False)
4995 )
4996
4997 # indicates there is a CompoundSelect in play and we are not the
4998 # first select
4999 if compound_index:
5000 populate_result_map = False
5001
5002 # this was first proposed as part of #3372; however, it is not
5003 # reached in current tests and could possibly be an assertion
5004 # instead.
5005 if not populate_result_map and "add_to_result_map" in kwargs:
5006 del kwargs["add_to_result_map"]
5007
5008 froms = self._setup_select_stack(
5009 select_stmt, compile_state, entry, asfrom, lateral, compound_index
5010 )
5011
5012 column_clause_args = kwargs.copy()
5013 column_clause_args.update(
5014 {"within_label_clause": False, "within_columns_clause": False}
5015 )
5016
5017 text = "SELECT " # we're off to a good start !
5018
5019 if select_stmt._post_select_clause is not None:
5020 psc = self.process(select_stmt._post_select_clause, **kwargs)
5021 if psc is not None:
5022 text += psc + " "
5023
5024 if select_stmt._hints:
5025 hint_text, byfrom = self._setup_select_hints(select_stmt)
5026 if hint_text:
5027 text += hint_text + " "
5028 else:
5029 byfrom = None
5030
5031 if select_stmt._independent_ctes:
5032 self._dispatch_independent_ctes(select_stmt, kwargs)
5033
5034 if select_stmt._prefixes:
5035 text += self._generate_prefixes(
5036 select_stmt, select_stmt._prefixes, **kwargs
5037 )
5038
5039 text += self.get_select_precolumns(select_stmt, **kwargs)
5040
5041 if select_stmt._pre_columns_clause is not None:
5042 pcc = self.process(select_stmt._pre_columns_clause, **kwargs)
5043 if pcc is not None:
5044 text += pcc + " "
5045
5046 # the actual list of columns to print in the SELECT column list.
5047 inner_columns = [
5048 c
5049 for c in [
5050 self._label_select_column(
5051 select_stmt,
5052 column,
5053 populate_result_map,
5054 asfrom,
5055 column_clause_args,
5056 name=name,
5057 proxy_name=proxy_name,
5058 fallback_label_name=fallback_label_name,
5059 column_is_repeated=repeated,
5060 need_column_expressions=need_column_expressions,
5061 )
5062 for (
5063 name,
5064 proxy_name,
5065 fallback_label_name,
5066 column,
5067 repeated,
5068 ) in compile_state.columns_plus_names
5069 ]
5070 if c is not None
5071 ]
5072
5073 if populate_result_map and select_wraps_for is not None:
5074 # if this select was generated from translate_select,
5075 # rewrite the targeted columns in the result map
5076
5077 translate = dict(
5078 zip(
5079 [
5080 name
5081 for (
5082 key,
5083 proxy_name,
5084 fallback_label_name,
5085 name,
5086 repeated,
5087 ) in compile_state.columns_plus_names
5088 ],
5089 [
5090 name
5091 for (
5092 key,
5093 proxy_name,
5094 fallback_label_name,
5095 name,
5096 repeated,
5097 ) in compile_state_wraps_for.columns_plus_names
5098 ],
5099 )
5100 )
5101
5102 self._result_columns = [
5103 ResultColumnsEntry(
5104 key, name, tuple(translate.get(o, o) for o in obj), type_
5105 )
5106 for key, name, obj, type_ in self._result_columns
5107 ]
5108
5109 text = self._compose_select_body(
5110 text,
5111 select_stmt,
5112 compile_state,
5113 inner_columns,
5114 froms,
5115 byfrom,
5116 toplevel,
5117 kwargs,
5118 )
5119
5120 if select_stmt._post_body_clause is not None:
5121 pbc = self.process(select_stmt._post_body_clause, **kwargs)
5122 if pbc:
5123 text += " " + pbc
5124
5125 if select_stmt._statement_hints:
5126 per_dialect = [
5127 ht
5128 for (dialect_name, ht) in select_stmt._statement_hints
5129 if dialect_name in ("*", self.dialect.name)
5130 ]
5131 if per_dialect:
5132 text += " " + self.get_statement_hint_text(per_dialect)
5133
5134 # In compound query, CTEs are shared at the compound level
5135 if self.ctes and (not is_embedded_select or toplevel):
5136 nesting_level = len(self.stack) if not toplevel else None
5137 text = self._render_cte_clause(nesting_level=nesting_level) + text
5138
5139 if select_stmt._suffixes:
5140 text += " " + self._generate_prefixes(
5141 select_stmt, select_stmt._suffixes, **kwargs
5142 )
5143
5144 self.stack.pop(-1)
5145
5146 return text
5147
5148 def _setup_select_hints(
5149 self, select: Select[Unpack[TupleAny]]
5150 ) -> Tuple[str, _FromHintsType]:
5151 byfrom = {
5152 from_: hinttext
5153 % {"name": from_._compiler_dispatch(self, ashint=True)}
5154 for (from_, dialect), hinttext in select._hints.items()
5155 if dialect in ("*", self.dialect.name)
5156 }
5157 hint_text = self.get_select_hint_text(byfrom)
5158 return hint_text, byfrom
5159
5160 def _setup_select_stack(
5161 self, select, compile_state, entry, asfrom, lateral, compound_index
5162 ):
5163 correlate_froms = entry["correlate_froms"]
5164 asfrom_froms = entry["asfrom_froms"]
5165
5166 if compound_index == 0:
5167 entry["select_0"] = select
5168 elif compound_index:
5169 select_0 = entry["select_0"]
5170 numcols = len(select_0._all_selected_columns)
5171
5172 if len(compile_state.columns_plus_names) != numcols:
5173 raise exc.CompileError(
5174 "All selectables passed to "
5175 "CompoundSelect must have identical numbers of "
5176 "columns; select #%d has %d columns, select "
5177 "#%d has %d"
5178 % (
5179 1,
5180 numcols,
5181 compound_index + 1,
5182 len(select._all_selected_columns),
5183 )
5184 )
5185
5186 if asfrom and not lateral:
5187 froms = compile_state._get_display_froms(
5188 explicit_correlate_froms=correlate_froms.difference(
5189 asfrom_froms
5190 ),
5191 implicit_correlate_froms=(),
5192 )
5193 else:
5194 froms = compile_state._get_display_froms(
5195 explicit_correlate_froms=correlate_froms,
5196 implicit_correlate_froms=asfrom_froms,
5197 )
5198
5199 new_correlate_froms = set(_from_objects(*froms))
5200 all_correlate_froms = new_correlate_froms.union(correlate_froms)
5201
5202 new_entry: _CompilerStackEntry = {
5203 "asfrom_froms": new_correlate_froms,
5204 "correlate_froms": all_correlate_froms,
5205 "selectable": select,
5206 "compile_state": compile_state,
5207 }
5208 self.stack.append(new_entry)
5209
5210 return froms
5211
5212 def _compose_select_body(
5213 self,
5214 text,
5215 select,
5216 compile_state,
5217 inner_columns,
5218 froms,
5219 byfrom,
5220 toplevel,
5221 kwargs,
5222 ):
5223 text += ", ".join(inner_columns)
5224
5225 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
5226 from_linter = FromLinter({}, set())
5227 warn_linting = self.linting & WARN_LINTING
5228 if toplevel:
5229 self.from_linter = from_linter
5230 else:
5231 from_linter = None
5232 warn_linting = False
5233
5234 # adjust the whitespace for no inner columns, part of #9440,
5235 # so that a no-col SELECT comes out as "SELECT WHERE..." or
5236 # "SELECT FROM ...".
5237 # while it would be better to have built the SELECT starting string
5238 # without trailing whitespace first, then add whitespace only if inner
5239 # cols were present, this breaks compatibility with various custom
5240 # compilation schemes that are currently being tested.
5241 if not inner_columns:
5242 text = text.rstrip()
5243
5244 if froms:
5245 text += " \nFROM "
5246
5247 if select._hints:
5248 text += ", ".join(
5249 [
5250 f._compiler_dispatch(
5251 self,
5252 asfrom=True,
5253 fromhints=byfrom,
5254 from_linter=from_linter,
5255 **kwargs,
5256 )
5257 for f in froms
5258 ]
5259 )
5260 else:
5261 text += ", ".join(
5262 [
5263 f._compiler_dispatch(
5264 self,
5265 asfrom=True,
5266 from_linter=from_linter,
5267 **kwargs,
5268 )
5269 for f in froms
5270 ]
5271 )
5272 else:
5273 text += self.default_from()
5274
5275 if select._where_criteria:
5276 t = self._generate_delimited_and_list(
5277 select._where_criteria, from_linter=from_linter, **kwargs
5278 )
5279 if t:
5280 text += " \nWHERE " + t
5281
5282 if warn_linting:
5283 assert from_linter is not None
5284 from_linter.warn()
5285
5286 if select._group_by_clauses:
5287 text += self.group_by_clause(select, **kwargs)
5288
5289 if select._having_criteria:
5290 t = self._generate_delimited_and_list(
5291 select._having_criteria, **kwargs
5292 )
5293 if t:
5294 text += " \nHAVING " + t
5295
5296 if select._post_criteria_clause is not None:
5297 pcc = self.process(select._post_criteria_clause, **kwargs)
5298 if pcc is not None:
5299 text += " \n" + pcc
5300
5301 if select._order_by_clauses:
5302 text += self.order_by_clause(select, **kwargs)
5303
5304 if select._has_row_limiting_clause:
5305 text += self._row_limit_clause(select, **kwargs)
5306
5307 if select._for_update_arg is not None:
5308 text += self.for_update_clause(select, **kwargs)
5309
5310 return text
5311
5312 def _generate_prefixes(self, stmt, prefixes, **kw):
5313 clause = " ".join(
5314 prefix._compiler_dispatch(self, **kw)
5315 for prefix, dialect_name in prefixes
5316 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5317 )
5318 if clause:
5319 clause += " "
5320 return clause
5321
5322 def _render_cte_clause(
5323 self,
5324 nesting_level=None,
5325 include_following_stack=False,
5326 ):
5327 """
5328 include_following_stack
5329 Also render the nesting CTEs on the next stack. Useful for
5330 SQL structures like UNION or INSERT that can wrap SELECT
5331 statements containing nesting CTEs.
5332 """
5333 if not self.ctes:
5334 return ""
5335
5336 ctes: MutableMapping[CTE, str]
5337
5338 if nesting_level and nesting_level > 1:
5339 ctes = util.OrderedDict()
5340 for cte in list(self.ctes.keys()):
5341 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5342 cte._get_reference_cte()
5343 ]
5344 nesting = cte.nesting or cte_opts.nesting
5345 is_rendered_level = cte_level == nesting_level or (
5346 include_following_stack and cte_level == nesting_level + 1
5347 )
5348 if not (nesting and is_rendered_level):
5349 continue
5350
5351 ctes[cte] = self.ctes[cte]
5352
5353 else:
5354 ctes = self.ctes
5355
5356 if not ctes:
5357 return ""
5358 ctes_recursive = any([cte.recursive for cte in ctes])
5359
5360 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5361 cte_text += ", \n".join([txt for txt in ctes.values()])
5362 cte_text += "\n "
5363
5364 if nesting_level and nesting_level > 1:
5365 for cte in list(ctes.keys()):
5366 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5367 cte._get_reference_cte()
5368 ]
5369 del self.ctes[cte]
5370 del self.ctes_by_level_name[(cte_level, cte_name)]
5371 del self.level_name_by_cte[cte._get_reference_cte()]
5372
5373 return cte_text
5374
5375 def get_cte_preamble(self, recursive):
5376 if recursive:
5377 return "WITH RECURSIVE"
5378 else:
5379 return "WITH"
5380
5381 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str:
5382 """Called when building a ``SELECT`` statement, position is just
5383 before column list.
5384
5385 """
5386 if select._distinct_on:
5387 util.warn_deprecated(
5388 "DISTINCT ON is currently supported only by the PostgreSQL "
5389 "dialect. Use of DISTINCT ON for other backends is currently "
5390 "silently ignored, however this usage is deprecated, and will "
5391 "raise CompileError in a future release for all backends "
5392 "that do not support this syntax.",
5393 version="1.4",
5394 )
5395 return "DISTINCT " if select._distinct else ""
5396
5397 def group_by_clause(self, select, **kw):
5398 """allow dialects to customize how GROUP BY is rendered."""
5399
5400 group_by = self._generate_delimited_list(
5401 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5402 )
5403 if group_by:
5404 return " GROUP BY " + group_by
5405 else:
5406 return ""
5407
5408 def order_by_clause(self, select, **kw):
5409 """allow dialects to customize how ORDER BY is rendered."""
5410
5411 order_by = self._generate_delimited_list(
5412 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5413 )
5414
5415 if order_by:
5416 return " ORDER BY " + order_by
5417 else:
5418 return ""
5419
5420 def for_update_clause(self, select, **kw):
5421 return " FOR UPDATE"
5422
5423 def returning_clause(
5424 self,
5425 stmt: UpdateBase,
5426 returning_cols: Sequence[_ColumnsClauseElement],
5427 *,
5428 populate_result_map: bool,
5429 **kw: Any,
5430 ) -> str:
5431 columns = [
5432 self._label_returning_column(
5433 stmt,
5434 column,
5435 populate_result_map,
5436 fallback_label_name=fallback_label_name,
5437 column_is_repeated=repeated,
5438 name=name,
5439 proxy_name=proxy_name,
5440 **kw,
5441 )
5442 for (
5443 name,
5444 proxy_name,
5445 fallback_label_name,
5446 column,
5447 repeated,
5448 ) in stmt._generate_columns_plus_names(
5449 True, cols=base._select_iterables(returning_cols)
5450 )
5451 ]
5452
5453 return "RETURNING " + ", ".join(columns)
5454
5455 def limit_clause(self, select, **kw):
5456 text = ""
5457 if select._limit_clause is not None:
5458 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5459 if select._offset_clause is not None:
5460 if select._limit_clause is None:
5461 text += "\n LIMIT -1"
5462 text += " OFFSET " + self.process(select._offset_clause, **kw)
5463 return text
5464
5465 def fetch_clause(
5466 self,
5467 select,
5468 fetch_clause=None,
5469 require_offset=False,
5470 use_literal_execute_for_simple_int=False,
5471 **kw,
5472 ):
5473 if fetch_clause is None:
5474 fetch_clause = select._fetch_clause
5475 fetch_clause_options = select._fetch_clause_options
5476 else:
5477 fetch_clause_options = {"percent": False, "with_ties": False}
5478
5479 text = ""
5480
5481 if select._offset_clause is not None:
5482 offset_clause = select._offset_clause
5483 if (
5484 use_literal_execute_for_simple_int
5485 and select._simple_int_clause(offset_clause)
5486 ):
5487 offset_clause = offset_clause.render_literal_execute()
5488 offset_str = self.process(offset_clause, **kw)
5489 text += "\n OFFSET %s ROWS" % offset_str
5490 elif require_offset:
5491 text += "\n OFFSET 0 ROWS"
5492
5493 if fetch_clause is not None:
5494 if (
5495 use_literal_execute_for_simple_int
5496 and select._simple_int_clause(fetch_clause)
5497 ):
5498 fetch_clause = fetch_clause.render_literal_execute()
5499 text += "\n FETCH FIRST %s%s ROWS %s" % (
5500 self.process(fetch_clause, **kw),
5501 " PERCENT" if fetch_clause_options["percent"] else "",
5502 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5503 )
5504 return text
5505
5506 def visit_table(
5507 self,
5508 table,
5509 asfrom=False,
5510 iscrud=False,
5511 ashint=False,
5512 fromhints=None,
5513 use_schema=True,
5514 from_linter=None,
5515 ambiguous_table_name_map=None,
5516 enclosing_alias=None,
5517 within_tstring=False,
5518 **kwargs,
5519 ):
5520 if from_linter:
5521 from_linter.froms[table] = table.fullname
5522
5523 if asfrom or ashint or within_tstring:
5524 effective_schema = self.preparer.schema_for_object(table)
5525
5526 if use_schema and effective_schema:
5527 ret = (
5528 self.preparer.quote_schema(effective_schema)
5529 + "."
5530 + self.preparer.quote(table.name)
5531 )
5532 else:
5533 ret = self.preparer.quote(table.name)
5534
5535 if (
5536 (
5537 enclosing_alias is None
5538 or enclosing_alias.element is not table
5539 )
5540 and not effective_schema
5541 and ambiguous_table_name_map
5542 and table.name in ambiguous_table_name_map
5543 ):
5544 anon_name = self._truncated_identifier(
5545 "alias", ambiguous_table_name_map[table.name]
5546 )
5547
5548 ret = ret + self.get_render_as_alias_suffix(
5549 self.preparer.format_alias(None, anon_name)
5550 )
5551
5552 if fromhints and table in fromhints:
5553 ret = self.format_from_hint_text(
5554 ret, table, fromhints[table], iscrud
5555 )
5556 return ret
5557 else:
5558 return ""
5559
5560 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5561 if from_linter:
5562 from_linter.edges.update(
5563 itertools.product(
5564 _de_clone(join.left._from_objects),
5565 _de_clone(join.right._from_objects),
5566 )
5567 )
5568
5569 if join.full:
5570 join_type = " FULL OUTER JOIN "
5571 elif join.isouter:
5572 join_type = " LEFT OUTER JOIN "
5573 else:
5574 join_type = " JOIN "
5575 return (
5576 join.left._compiler_dispatch(
5577 self, asfrom=True, from_linter=from_linter, **kwargs
5578 )
5579 + join_type
5580 + join.right._compiler_dispatch(
5581 self, asfrom=True, from_linter=from_linter, **kwargs
5582 )
5583 + " ON "
5584 # TODO: likely need asfrom=True here?
5585 + join.onclause._compiler_dispatch(
5586 self, from_linter=from_linter, **kwargs
5587 )
5588 )
5589
5590 def _setup_crud_hints(self, stmt, table_text):
5591 dialect_hints = {
5592 table: hint_text
5593 for (table, dialect), hint_text in stmt._hints.items()
5594 if dialect in ("*", self.dialect.name)
5595 }
5596 if stmt.table in dialect_hints:
5597 table_text = self.format_from_hint_text(
5598 table_text, stmt.table, dialect_hints[stmt.table], True
5599 )
5600 return dialect_hints, table_text
5601
5602 # within the realm of "insertmanyvalues sentinel columns",
5603 # these lookups match different kinds of Column() configurations
5604 # to specific backend capabilities. they are broken into two
5605 # lookups, one for autoincrement columns and the other for non
5606 # autoincrement columns
5607 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5608 {
5609 _SentinelDefaultCharacterization.CLIENTSIDE: (
5610 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5611 ),
5612 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5613 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5614 ),
5615 _SentinelDefaultCharacterization.NONE: (
5616 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5617 ),
5618 _SentinelDefaultCharacterization.IDENTITY: (
5619 InsertmanyvaluesSentinelOpts.IDENTITY
5620 ),
5621 _SentinelDefaultCharacterization.SEQUENCE: (
5622 InsertmanyvaluesSentinelOpts.SEQUENCE
5623 ),
5624 _SentinelDefaultCharacterization.MONOTONIC_FUNCTION: (
5625 InsertmanyvaluesSentinelOpts.MONOTONIC_FUNCTION
5626 ),
5627 }
5628 )
5629 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5630 {
5631 _SentinelDefaultCharacterization.NONE: (
5632 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5633 ),
5634 }
5635 )
5636
5637 def _get_sentinel_column_for_table(
5638 self, table: Table
5639 ) -> Optional[Sequence[Column[Any]]]:
5640 """given a :class:`.Table`, return a usable sentinel column or
5641 columns for this dialect if any.
5642
5643 Return None if no sentinel columns could be identified, or raise an
5644 error if a column was marked as a sentinel explicitly but isn't
5645 compatible with this dialect.
5646
5647 """
5648
5649 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5650 sentinel_characteristics = table._sentinel_column_characteristics
5651
5652 sent_cols = sentinel_characteristics.columns
5653
5654 if sent_cols is None:
5655 return None
5656
5657 if sentinel_characteristics.is_autoinc:
5658 bitmask = self._sentinel_col_autoinc_lookup.get(
5659 sentinel_characteristics.default_characterization, 0
5660 )
5661 else:
5662 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5663 sentinel_characteristics.default_characterization, 0
5664 )
5665
5666 if sentinel_opts & bitmask:
5667 return sent_cols
5668
5669 if sentinel_characteristics.is_explicit:
5670 # a column was explicitly marked as insert_sentinel=True,
5671 # however it is not compatible with this dialect. they should
5672 # not indicate this column as a sentinel if they need to include
5673 # this dialect.
5674
5675 # TODO: do we want non-primary key explicit sentinel cols
5676 # that can gracefully degrade for some backends?
5677 # insert_sentinel="degrade" perhaps. not for the initial release.
5678 # I am hoping people are generally not dealing with this sentinel
5679 # business at all.
5680
5681 # if is_explicit is True, there will be only one sentinel column.
5682
5683 raise exc.InvalidRequestError(
5684 f"Column {sent_cols[0]} can't be explicitly "
5685 "marked as a sentinel column when using the "
5686 f"{self.dialect.name} dialect, as the "
5687 "particular type of default generation on this column is "
5688 "not currently compatible with this dialect's specific "
5689 f"INSERT..RETURNING syntax which can receive the "
5690 "server-generated value in "
5691 "a deterministic way. To remove this error, remove "
5692 "insert_sentinel=True from primary key autoincrement "
5693 "columns; these columns are automatically used as "
5694 "sentinels for supported dialects in any case."
5695 )
5696
5697 return None
5698
5699 def _deliver_insertmanyvalues_batches(
5700 self,
5701 statement: str,
5702 parameters: _DBAPIMultiExecuteParams,
5703 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5704 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5705 batch_size: int,
5706 sort_by_parameter_order: bool,
5707 schema_translate_map: Optional[SchemaTranslateMapType],
5708 ) -> Iterator[_InsertManyValuesBatch]:
5709 imv = self._insertmanyvalues
5710 assert imv is not None
5711
5712 if not imv.sentinel_param_keys:
5713 _sentinel_from_params = None
5714 else:
5715 _sentinel_from_params = operator.itemgetter(
5716 *imv.sentinel_param_keys
5717 )
5718
5719 lenparams = len(parameters)
5720 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5721 # backend doesn't support
5722 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5723 # at the moment this is basically SQL Server due to
5724 # not being able to use DEFAULT for identity column
5725 # just yield out that many single statements! still
5726 # faster than a whole connection.execute() call ;)
5727 #
5728 # note we still are taking advantage of the fact that we know
5729 # we are using RETURNING. The generalized approach of fetching
5730 # cursor.lastrowid etc. still goes through the more heavyweight
5731 # "ExecutionContext per statement" system as it isn't usable
5732 # as a generic "RETURNING" approach
5733 use_row_at_a_time = True
5734 downgraded = False
5735 elif not self.dialect.supports_multivalues_insert or (
5736 sort_by_parameter_order
5737 and self._result_columns
5738 and (
5739 imv.sentinel_columns is None
5740 or (
5741 imv.includes_upsert_behaviors
5742 and not imv.embed_values_counter
5743 )
5744 )
5745 ):
5746 # deterministic order was requested and the compiler could
5747 # not organize sentinel columns for this dialect/statement.
5748 # use row at a time. Note: if embed_values_counter is True,
5749 # the counter itself provides the ordering capability we need,
5750 # so we can use batch mode even with upsert behaviors.
5751 use_row_at_a_time = True
5752 downgraded = True
5753 else:
5754 use_row_at_a_time = False
5755 downgraded = False
5756
5757 if use_row_at_a_time:
5758 for batchnum, (param, compiled_param) in enumerate(
5759 cast(
5760 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5761 zip(parameters, compiled_parameters),
5762 ),
5763 1,
5764 ):
5765 yield _InsertManyValuesBatch(
5766 statement,
5767 param,
5768 generic_setinputsizes,
5769 [param],
5770 (
5771 [_sentinel_from_params(compiled_param)]
5772 if _sentinel_from_params
5773 else []
5774 ),
5775 1,
5776 batchnum,
5777 lenparams,
5778 sort_by_parameter_order,
5779 downgraded,
5780 )
5781 return
5782
5783 if schema_translate_map:
5784 rst = functools.partial(
5785 self.preparer._render_schema_translates,
5786 schema_translate_map=schema_translate_map,
5787 )
5788 else:
5789 rst = None
5790
5791 imv_single_values_expr = imv.single_values_expr
5792 if rst:
5793 imv_single_values_expr = rst(imv_single_values_expr)
5794
5795 executemany_values = f"({imv_single_values_expr})"
5796 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5797
5798 # Use optional insertmanyvalues_max_parameters
5799 # to further shrink the batch size so that there are no more than
5800 # insertmanyvalues_max_parameters params.
5801 # Currently used by SQL Server, which limits statements to 2100 bound
5802 # parameters (actually 2099).
5803 max_params = self.dialect.insertmanyvalues_max_parameters
5804 if max_params:
5805 total_num_of_params = len(self.bind_names)
5806 num_params_per_batch = len(imv.insert_crud_params)
5807 num_params_outside_of_batch = (
5808 total_num_of_params - num_params_per_batch
5809 )
5810 batch_size = min(
5811 batch_size,
5812 (
5813 (max_params - num_params_outside_of_batch)
5814 // num_params_per_batch
5815 ),
5816 )
5817
5818 batches = cast("List[Sequence[Any]]", list(parameters))
5819 compiled_batches = cast(
5820 "List[Sequence[Any]]", list(compiled_parameters)
5821 )
5822
5823 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5824 batchnum = 1
5825 total_batches = lenparams // batch_size + (
5826 1 if lenparams % batch_size else 0
5827 )
5828
5829 insert_crud_params = imv.insert_crud_params
5830 assert insert_crud_params is not None
5831
5832 if rst:
5833 insert_crud_params = [
5834 (col, key, rst(expr), st)
5835 for col, key, expr, st in insert_crud_params
5836 ]
5837
5838 escaped_bind_names: Mapping[str, str]
5839 expand_pos_lower_index = expand_pos_upper_index = 0
5840
5841 if not self.positional:
5842 if self.escaped_bind_names:
5843 escaped_bind_names = self.escaped_bind_names
5844 else:
5845 escaped_bind_names = {}
5846
5847 all_keys = set(parameters[0])
5848
5849 def apply_placeholders(keys, formatted):
5850 for key in keys:
5851 key = escaped_bind_names.get(key, key)
5852 formatted = formatted.replace(
5853 self.bindtemplate % {"name": key},
5854 self.bindtemplate
5855 % {"name": f"{key}__EXECMANY_INDEX__"},
5856 )
5857 return formatted
5858
5859 if imv.embed_values_counter:
5860 imv_values_counter = ", _IMV_VALUES_COUNTER"
5861 else:
5862 imv_values_counter = ""
5863 formatted_values_clause = f"""({', '.join(
5864 apply_placeholders(bind_keys, formatted)
5865 for _, _, formatted, bind_keys in insert_crud_params
5866 )}{imv_values_counter})"""
5867
5868 keys_to_replace = all_keys.intersection(
5869 escaped_bind_names.get(key, key)
5870 for _, _, _, bind_keys in insert_crud_params
5871 for key in bind_keys
5872 )
5873 base_parameters = {
5874 key: parameters[0][key]
5875 for key in all_keys.difference(keys_to_replace)
5876 }
5877 executemany_values_w_comma = ""
5878 else:
5879 formatted_values_clause = ""
5880 keys_to_replace = set()
5881 base_parameters = {}
5882
5883 if imv.embed_values_counter:
5884 executemany_values_w_comma = (
5885 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5886 )
5887 else:
5888 executemany_values_w_comma = f"({imv_single_values_expr}), "
5889
5890 all_names_we_will_expand: Set[str] = set()
5891 for elem in imv.insert_crud_params:
5892 all_names_we_will_expand.update(elem[3])
5893
5894 # get the start and end position in a particular list
5895 # of parameters where we will be doing the "expanding".
5896 # statements can have params on either side or both sides,
5897 # given RETURNING and CTEs
5898 if all_names_we_will_expand:
5899 positiontup = self.positiontup
5900 assert positiontup is not None
5901
5902 all_expand_positions = {
5903 idx
5904 for idx, name in enumerate(positiontup)
5905 if name in all_names_we_will_expand
5906 }
5907 expand_pos_lower_index = min(all_expand_positions)
5908 expand_pos_upper_index = max(all_expand_positions) + 1
5909 assert (
5910 len(all_expand_positions)
5911 == expand_pos_upper_index - expand_pos_lower_index
5912 )
5913
5914 if self._numeric_binds:
5915 escaped = re.escape(self._numeric_binds_identifier_char)
5916 executemany_values_w_comma = re.sub(
5917 rf"{escaped}\d+", "%s", executemany_values_w_comma
5918 )
5919
5920 while batches:
5921 batch = batches[0:batch_size]
5922 compiled_batch = compiled_batches[0:batch_size]
5923
5924 batches[0:batch_size] = []
5925 compiled_batches[0:batch_size] = []
5926
5927 if batches:
5928 current_batch_size = batch_size
5929 else:
5930 current_batch_size = len(batch)
5931
5932 if generic_setinputsizes:
5933 # if setinputsizes is present, expand this collection to
5934 # suit the batch length as well
5935 # currently this will be mssql+pyodbc for internal dialects
5936 processed_setinputsizes = [
5937 (new_key, len_, typ)
5938 for new_key, len_, typ in (
5939 (f"{key}_{index}", len_, typ)
5940 for index in range(current_batch_size)
5941 for key, len_, typ in generic_setinputsizes
5942 )
5943 ]
5944
5945 replaced_parameters: Any
5946 if self.positional:
5947 num_ins_params = imv.num_positional_params_counted
5948
5949 batch_iterator: Iterable[Sequence[Any]]
5950 extra_params_left: Sequence[Any]
5951 extra_params_right: Sequence[Any]
5952
5953 if num_ins_params == len(batch[0]):
5954 extra_params_left = extra_params_right = ()
5955 batch_iterator = batch
5956 else:
5957 extra_params_left = batch[0][:expand_pos_lower_index]
5958 extra_params_right = batch[0][expand_pos_upper_index:]
5959 batch_iterator = (
5960 b[expand_pos_lower_index:expand_pos_upper_index]
5961 for b in batch
5962 )
5963
5964 if imv.embed_values_counter:
5965 expanded_values_string = (
5966 "".join(
5967 executemany_values_w_comma.replace(
5968 "_IMV_VALUES_COUNTER", str(i)
5969 )
5970 for i, _ in enumerate(batch)
5971 )
5972 )[:-2]
5973 else:
5974 expanded_values_string = (
5975 (executemany_values_w_comma * current_batch_size)
5976 )[:-2]
5977
5978 if self._numeric_binds and num_ins_params > 0:
5979 # numeric will always number the parameters inside of
5980 # VALUES (and thus order self.positiontup) to be higher
5981 # than non-VALUES parameters, no matter where in the
5982 # statement those non-VALUES parameters appear (this is
5983 # ensured in _process_numeric by numbering first all
5984 # params that are not in _values_bindparam)
5985 # therefore all extra params are always
5986 # on the left side and numbered lower than the VALUES
5987 # parameters
5988 assert not extra_params_right
5989
5990 start = expand_pos_lower_index + 1
5991 end = num_ins_params * (current_batch_size) + start
5992
5993 # need to format here, since statement may contain
5994 # unescaped %, while values_string contains just (%s, %s)
5995 positions = tuple(
5996 f"{self._numeric_binds_identifier_char}{i}"
5997 for i in range(start, end)
5998 )
5999 expanded_values_string = expanded_values_string % positions
6000
6001 replaced_statement = statement.replace(
6002 "__EXECMANY_TOKEN__", expanded_values_string
6003 )
6004
6005 replaced_parameters = tuple(
6006 itertools.chain.from_iterable(batch_iterator)
6007 )
6008
6009 replaced_parameters = (
6010 extra_params_left
6011 + replaced_parameters
6012 + extra_params_right
6013 )
6014
6015 else:
6016 replaced_values_clauses = []
6017 replaced_parameters = base_parameters.copy()
6018
6019 for i, param in enumerate(batch):
6020 fmv = formatted_values_clause.replace(
6021 "EXECMANY_INDEX__", str(i)
6022 )
6023 if imv.embed_values_counter:
6024 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
6025
6026 replaced_values_clauses.append(fmv)
6027 replaced_parameters.update(
6028 {f"{key}__{i}": param[key] for key in keys_to_replace}
6029 )
6030
6031 replaced_statement = statement.replace(
6032 "__EXECMANY_TOKEN__",
6033 ", ".join(replaced_values_clauses),
6034 )
6035
6036 yield _InsertManyValuesBatch(
6037 replaced_statement,
6038 replaced_parameters,
6039 processed_setinputsizes,
6040 batch,
6041 (
6042 [_sentinel_from_params(cb) for cb in compiled_batch]
6043 if _sentinel_from_params
6044 else []
6045 ),
6046 current_batch_size,
6047 batchnum,
6048 total_batches,
6049 sort_by_parameter_order,
6050 False,
6051 )
6052 batchnum += 1
6053
6054 def visit_insert(
6055 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
6056 ):
6057 compile_state = insert_stmt._compile_state_factory(
6058 insert_stmt, self, **kw
6059 )
6060 insert_stmt = compile_state.statement
6061
6062 if visiting_cte is not None:
6063 kw["visiting_cte"] = visiting_cte
6064 toplevel = False
6065 else:
6066 toplevel = not self.stack
6067
6068 if toplevel:
6069 self.isinsert = True
6070 if not self.dml_compile_state:
6071 self.dml_compile_state = compile_state
6072 if not self.compile_state:
6073 self.compile_state = compile_state
6074
6075 self.stack.append(
6076 {
6077 "correlate_froms": set(),
6078 "asfrom_froms": set(),
6079 "selectable": insert_stmt,
6080 }
6081 )
6082
6083 counted_bindparam = 0
6084
6085 # reset any incoming "visited_bindparam" collection
6086 visited_bindparam = None
6087
6088 # for positional, insertmanyvalues needs to know how many
6089 # bound parameters are in the VALUES sequence; there's no simple
6090 # rule because default expressions etc. can have zero or more
6091 # params inside them. After multiple attempts to figure this out,
6092 # this very simplistic "count after" works and is
6093 # likely the least amount of callcounts, though looks clumsy
6094 if self.positional and visiting_cte is None:
6095 # if we are inside a CTE, don't count parameters
6096 # here since they won't be for insertmanyvalues. keep
6097 # visited_bindparam at None so no counting happens.
6098 # see #9173
6099 visited_bindparam = []
6100
6101 crud_params_struct = crud._get_crud_params(
6102 self,
6103 insert_stmt,
6104 compile_state,
6105 toplevel,
6106 visited_bindparam=visited_bindparam,
6107 **kw,
6108 )
6109
6110 if self.positional and visited_bindparam is not None:
6111 counted_bindparam = len(visited_bindparam)
6112 if self._numeric_binds:
6113 if self._values_bindparam is not None:
6114 self._values_bindparam += visited_bindparam
6115 else:
6116 self._values_bindparam = visited_bindparam
6117
6118 crud_params_single = crud_params_struct.single_params
6119
6120 if (
6121 not crud_params_single
6122 and not self.dialect.supports_default_values
6123 and not self.dialect.supports_default_metavalue
6124 and not self.dialect.supports_empty_insert
6125 ):
6126 raise exc.CompileError(
6127 "The '%s' dialect with current database "
6128 "version settings does not support empty "
6129 "inserts." % self.dialect.name
6130 )
6131
6132 if compile_state._has_multi_parameters:
6133 if not self.dialect.supports_multivalues_insert:
6134 raise exc.CompileError(
6135 "The '%s' dialect with current database "
6136 "version settings does not support "
6137 "in-place multirow inserts." % self.dialect.name
6138 )
6139 elif (
6140 self.implicit_returning or insert_stmt._returning
6141 ) and insert_stmt._sort_by_parameter_order:
6142 raise exc.CompileError(
6143 "RETURNING cannot be deterministically sorted when "
6144 "using an INSERT which includes multi-row values()."
6145 )
6146 crud_params_single = crud_params_struct.single_params
6147 else:
6148 crud_params_single = crud_params_struct.single_params
6149
6150 preparer = self.preparer
6151 supports_default_values = self.dialect.supports_default_values
6152
6153 text = "INSERT "
6154
6155 if insert_stmt._prefixes:
6156 text += self._generate_prefixes(
6157 insert_stmt, insert_stmt._prefixes, **kw
6158 )
6159
6160 text += "INTO "
6161 table_text = preparer.format_table(insert_stmt.table)
6162
6163 if insert_stmt._hints:
6164 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
6165
6166 if insert_stmt._independent_ctes:
6167 self._dispatch_independent_ctes(insert_stmt, kw)
6168
6169 text += table_text
6170
6171 if crud_params_single or not supports_default_values:
6172 text += " (%s)" % ", ".join(
6173 [expr for _, expr, _, _ in crud_params_single]
6174 )
6175
6176 # look for insertmanyvalues attributes that would have been configured
6177 # by crud.py as it scanned through the columns to be part of the
6178 # INSERT
6179 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
6180 named_sentinel_params: Optional[Sequence[str]] = None
6181 add_sentinel_cols = None
6182 implicit_sentinel = False
6183
6184 returning_cols = self.implicit_returning or insert_stmt._returning
6185 if returning_cols:
6186 add_sentinel_cols = crud_params_struct.use_sentinel_columns
6187 if add_sentinel_cols is not None:
6188 assert use_insertmanyvalues
6189
6190 # search for the sentinel column explicitly present
6191 # in the INSERT columns list, and additionally check that
6192 # this column has a bound parameter name set up that's in the
6193 # parameter list. If both of these cases are present, it means
6194 # we will have a client side value for the sentinel in each
6195 # parameter set.
6196
6197 _params_by_col = {
6198 col: param_names
6199 for col, _, _, param_names in crud_params_single
6200 }
6201 named_sentinel_params = []
6202 for _add_sentinel_col in add_sentinel_cols:
6203 if _add_sentinel_col not in _params_by_col:
6204 named_sentinel_params = None
6205 break
6206 param_name = self._within_exec_param_key_getter(
6207 _add_sentinel_col
6208 )
6209 if param_name not in _params_by_col[_add_sentinel_col]:
6210 named_sentinel_params = None
6211 break
6212 named_sentinel_params.append(param_name)
6213
6214 if named_sentinel_params is None:
6215 # if we are not going to have a client side value for
6216 # the sentinel in the parameter set, that means it's
6217 # an autoincrement, an IDENTITY, or a server-side SQL
6218 # expression like nextval('seqname'). So this is
6219 # an "implicit" sentinel; we will look for it in
6220 # RETURNING
6221 # only, and then sort on it. For this case on PG,
6222 # SQL Server we have to use a special INSERT form
6223 # that guarantees the server side function lines up with
6224 # the entries in the VALUES.
6225 if (
6226 self.dialect.insertmanyvalues_implicit_sentinel
6227 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
6228 ):
6229 implicit_sentinel = True
6230 else:
6231 # here, we are not using a sentinel at all
6232 # and we are likely the SQLite dialect.
6233 # The first add_sentinel_col that we have should not
6234 # be marked as "insert_sentinel=True". if it was,
6235 # an error should have been raised in
6236 # _get_sentinel_column_for_table.
6237 assert not add_sentinel_cols[0]._insert_sentinel, (
6238 "sentinel selection rules should have prevented "
6239 "us from getting here for this dialect"
6240 )
6241
6242 # always put the sentinel columns last. even if they are
6243 # in the returning list already, they will be there twice
6244 # then.
6245 returning_cols = list(returning_cols) + list(add_sentinel_cols)
6246
6247 returning_clause = self.returning_clause(
6248 insert_stmt,
6249 returning_cols,
6250 populate_result_map=toplevel,
6251 )
6252
6253 if self.returning_precedes_values:
6254 text += " " + returning_clause
6255
6256 else:
6257 returning_clause = None
6258
6259 if insert_stmt.select is not None:
6260 # placed here by crud.py
6261 select_text = self.process(
6262 self.stack[-1]["insert_from_select"], insert_into=True, **kw
6263 )
6264
6265 if self.ctes and self.dialect.cte_follows_insert:
6266 nesting_level = len(self.stack) if not toplevel else None
6267 text += " %s%s" % (
6268 self._render_cte_clause(
6269 nesting_level=nesting_level,
6270 include_following_stack=True,
6271 ),
6272 select_text,
6273 )
6274 else:
6275 text += " %s" % select_text
6276 elif not crud_params_single and supports_default_values:
6277 text += " DEFAULT VALUES"
6278 if use_insertmanyvalues:
6279 self._insertmanyvalues = _InsertManyValues(
6280 True,
6281 self.dialect.default_metavalue_token,
6282 crud_params_single,
6283 counted_bindparam,
6284 sort_by_parameter_order=(
6285 insert_stmt._sort_by_parameter_order
6286 ),
6287 includes_upsert_behaviors=(
6288 insert_stmt._post_values_clause is not None
6289 ),
6290 sentinel_columns=add_sentinel_cols,
6291 num_sentinel_columns=(
6292 len(add_sentinel_cols) if add_sentinel_cols else 0
6293 ),
6294 implicit_sentinel=implicit_sentinel,
6295 )
6296 elif compile_state._has_multi_parameters:
6297 text += " VALUES %s" % (
6298 ", ".join(
6299 "(%s)"
6300 % (", ".join(value for _, _, value, _ in crud_param_set))
6301 for crud_param_set in crud_params_struct.all_multi_params
6302 ),
6303 )
6304 elif use_insertmanyvalues:
6305 if (
6306 implicit_sentinel
6307 and (
6308 self.dialect.insertmanyvalues_implicit_sentinel
6309 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
6310 )
6311 # this is checking if we have
6312 # INSERT INTO table (id) VALUES (DEFAULT).
6313 and not (crud_params_struct.is_default_metavalue_only)
6314 ):
6315 # if we have a sentinel column that is server generated,
6316 # then for selected backends render the VALUES list as a
6317 # subquery. This is the orderable form supported by
6318 # PostgreSQL and in fewer cases SQL Server
6319 embed_sentinel_value = True
6320
6321 render_bind_casts = (
6322 self.dialect.insertmanyvalues_implicit_sentinel
6323 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6324 )
6325
6326 add_sentinel_set = add_sentinel_cols or ()
6327
6328 insert_single_values_expr = ", ".join(
6329 [
6330 value
6331 for col, _, value, _ in crud_params_single
6332 if col not in add_sentinel_set
6333 ]
6334 )
6335
6336 colnames = ", ".join(
6337 f"p{i}"
6338 for i, cp in enumerate(crud_params_single)
6339 if cp[0] not in add_sentinel_set
6340 )
6341
6342 if render_bind_casts:
6343 # render casts for the SELECT list. For PG, we are
6344 # already rendering bind casts in the parameter list,
6345 # selectively for the more "tricky" types like ARRAY.
6346 # however, even for the "easy" types, if the parameter
6347 # is NULL for every entry, PG gives up and says
6348 # "it must be TEXT", which fails for other easy types
6349 # like ints. So we cast on this side too.
6350 colnames_w_cast = ", ".join(
6351 (
6352 self.render_bind_cast(
6353 col.type,
6354 col.type._unwrapped_dialect_impl(self.dialect),
6355 f"p{i}",
6356 )
6357 if col not in add_sentinel_set
6358 else expr
6359 )
6360 for i, (col, _, expr, _) in enumerate(
6361 crud_params_single
6362 )
6363 )
6364 else:
6365 colnames_w_cast = ", ".join(
6366 (f"p{i}" if col not in add_sentinel_set else expr)
6367 for i, (col, _, expr, _) in enumerate(
6368 crud_params_single
6369 )
6370 )
6371
6372 insert_crud_params = [
6373 elem
6374 for elem in crud_params_single
6375 if elem[0] not in add_sentinel_set
6376 ]
6377
6378 text += (
6379 f" SELECT {colnames_w_cast} FROM "
6380 f"(VALUES ({insert_single_values_expr})) "
6381 f"AS imp_sen({colnames}, sen_counter) "
6382 "ORDER BY sen_counter"
6383 )
6384
6385 else:
6386 # otherwise, if no sentinel or backend doesn't support
6387 # orderable subquery form, use a plain VALUES list
6388 embed_sentinel_value = False
6389 insert_crud_params = crud_params_single
6390 insert_single_values_expr = ", ".join(
6391 [value for _, _, value, _ in crud_params_single]
6392 )
6393
6394 text += f" VALUES ({insert_single_values_expr})"
6395
6396 self._insertmanyvalues = _InsertManyValues(
6397 is_default_expr=False,
6398 single_values_expr=insert_single_values_expr,
6399 insert_crud_params=insert_crud_params,
6400 num_positional_params_counted=counted_bindparam,
6401 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
6402 includes_upsert_behaviors=(
6403 insert_stmt._post_values_clause is not None
6404 ),
6405 sentinel_columns=add_sentinel_cols,
6406 num_sentinel_columns=(
6407 len(add_sentinel_cols) if add_sentinel_cols else 0
6408 ),
6409 sentinel_param_keys=named_sentinel_params,
6410 implicit_sentinel=implicit_sentinel,
6411 embed_values_counter=embed_sentinel_value,
6412 )
6413
6414 else:
6415 insert_single_values_expr = ", ".join(
6416 [value for _, _, value, _ in crud_params_single]
6417 )
6418
6419 text += f" VALUES ({insert_single_values_expr})"
6420
6421 if insert_stmt._post_values_clause is not None:
6422 post_values_clause = self.process(
6423 insert_stmt._post_values_clause, **kw
6424 )
6425 if post_values_clause:
6426 text += " " + post_values_clause
6427
6428 if returning_clause and not self.returning_precedes_values:
6429 text += " " + returning_clause
6430
6431 if self.ctes and not self.dialect.cte_follows_insert:
6432 nesting_level = len(self.stack) if not toplevel else None
6433 text = (
6434 self._render_cte_clause(
6435 nesting_level=nesting_level,
6436 include_following_stack=True,
6437 )
6438 + text
6439 )
6440
6441 self.stack.pop(-1)
6442
6443 return text
6444
6445 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6446 """Provide a hook to override the initial table clause
6447 in an UPDATE statement.
6448
6449 MySQL overrides this.
6450
6451 """
6452 kw["asfrom"] = True
6453 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6454
6455 def update_from_clause(
6456 self, update_stmt, from_table, extra_froms, from_hints, **kw
6457 ):
6458 """Provide a hook to override the generation of an
6459 UPDATE..FROM clause.
6460 MySQL and MSSQL override this.
6461 """
6462 raise NotImplementedError(
6463 "This backend does not support multiple-table "
6464 "criteria within UPDATE"
6465 )
6466
6467 def update_post_criteria_clause(
6468 self, update_stmt: Update, **kw: Any
6469 ) -> Optional[str]:
6470 """provide a hook to override generation after the WHERE criteria
6471 in an UPDATE statement
6472
6473 .. versionadded:: 2.1
6474
6475 """
6476 if update_stmt._post_criteria_clause is not None:
6477 return self.process(
6478 update_stmt._post_criteria_clause,
6479 **kw,
6480 )
6481 else:
6482 return None
6483
6484 def delete_post_criteria_clause(
6485 self, delete_stmt: Delete, **kw: Any
6486 ) -> Optional[str]:
6487 """provide a hook to override generation after the WHERE criteria
6488 in a DELETE statement
6489
6490 .. versionadded:: 2.1
6491
6492 """
6493 if delete_stmt._post_criteria_clause is not None:
6494 return self.process(
6495 delete_stmt._post_criteria_clause,
6496 **kw,
6497 )
6498 else:
6499 return None
6500
6501 def visit_update(
6502 self,
6503 update_stmt: Update,
6504 visiting_cte: Optional[CTE] = None,
6505 **kw: Any,
6506 ) -> str:
6507 compile_state = update_stmt._compile_state_factory(
6508 update_stmt, self, **kw
6509 )
6510 if TYPE_CHECKING:
6511 assert isinstance(compile_state, UpdateDMLState)
6512 update_stmt = compile_state.statement # type: ignore[assignment]
6513
6514 if visiting_cte is not None:
6515 kw["visiting_cte"] = visiting_cte
6516 toplevel = False
6517 else:
6518 toplevel = not self.stack
6519
6520 if toplevel:
6521 self.isupdate = True
6522 if not self.dml_compile_state:
6523 self.dml_compile_state = compile_state
6524 if not self.compile_state:
6525 self.compile_state = compile_state
6526
6527 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6528 from_linter = FromLinter({}, set())
6529 warn_linting = self.linting & WARN_LINTING
6530 if toplevel:
6531 self.from_linter = from_linter
6532 else:
6533 from_linter = None
6534 warn_linting = False
6535
6536 extra_froms = compile_state._extra_froms
6537 is_multitable = bool(extra_froms)
6538
6539 if is_multitable:
6540 # main table might be a JOIN
6541 main_froms = set(_from_objects(update_stmt.table))
6542 render_extra_froms = [
6543 f for f in extra_froms if f not in main_froms
6544 ]
6545 correlate_froms = main_froms.union(extra_froms)
6546 else:
6547 render_extra_froms = []
6548 correlate_froms = {update_stmt.table}
6549
6550 self.stack.append(
6551 {
6552 "correlate_froms": correlate_froms,
6553 "asfrom_froms": correlate_froms,
6554 "selectable": update_stmt,
6555 }
6556 )
6557
6558 text = "UPDATE "
6559
6560 if update_stmt._prefixes:
6561 text += self._generate_prefixes(
6562 update_stmt, update_stmt._prefixes, **kw
6563 )
6564
6565 table_text = self.update_tables_clause(
6566 update_stmt,
6567 update_stmt.table,
6568 render_extra_froms,
6569 from_linter=from_linter,
6570 **kw,
6571 )
6572 crud_params_struct = crud._get_crud_params(
6573 self, update_stmt, compile_state, toplevel, **kw
6574 )
6575 crud_params = crud_params_struct.single_params
6576
6577 if update_stmt._hints:
6578 dialect_hints, table_text = self._setup_crud_hints(
6579 update_stmt, table_text
6580 )
6581 else:
6582 dialect_hints = None
6583
6584 if update_stmt._independent_ctes:
6585 self._dispatch_independent_ctes(update_stmt, kw)
6586
6587 text += table_text
6588
6589 text += " SET "
6590 text += ", ".join(
6591 expr + "=" + value
6592 for _, expr, value, _ in cast(
6593 "List[Tuple[Any, str, str, Any]]", crud_params
6594 )
6595 )
6596
6597 if self.implicit_returning or update_stmt._returning:
6598 if self.returning_precedes_values:
6599 text += " " + self.returning_clause(
6600 update_stmt,
6601 self.implicit_returning or update_stmt._returning,
6602 populate_result_map=toplevel,
6603 )
6604
6605 if extra_froms:
6606 extra_from_text = self.update_from_clause(
6607 update_stmt,
6608 update_stmt.table,
6609 render_extra_froms,
6610 dialect_hints,
6611 from_linter=from_linter,
6612 **kw,
6613 )
6614 if extra_from_text:
6615 text += " " + extra_from_text
6616
6617 if update_stmt._where_criteria:
6618 t = self._generate_delimited_and_list(
6619 update_stmt._where_criteria, from_linter=from_linter, **kw
6620 )
6621 if t:
6622 text += " WHERE " + t
6623
6624 ulc = self.update_post_criteria_clause(
6625 update_stmt, from_linter=from_linter, **kw
6626 )
6627 if ulc:
6628 text += " " + ulc
6629
6630 if (
6631 self.implicit_returning or update_stmt._returning
6632 ) and not self.returning_precedes_values:
6633 text += " " + self.returning_clause(
6634 update_stmt,
6635 self.implicit_returning or update_stmt._returning,
6636 populate_result_map=toplevel,
6637 )
6638
6639 if self.ctes:
6640 nesting_level = len(self.stack) if not toplevel else None
6641 text = self._render_cte_clause(nesting_level=nesting_level) + text
6642
6643 if warn_linting:
6644 assert from_linter is not None
6645 from_linter.warn(stmt_type="UPDATE")
6646
6647 self.stack.pop(-1)
6648
6649 return text # type: ignore[no-any-return]
6650
6651 def delete_extra_from_clause(
6652 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6653 ):
6654 """Provide a hook to override the generation of an
6655 DELETE..FROM clause.
6656
6657 This can be used to implement DELETE..USING for example.
6658
6659 MySQL and MSSQL override this.
6660
6661 """
6662 raise NotImplementedError(
6663 "This backend does not support multiple-table "
6664 "criteria within DELETE"
6665 )
6666
6667 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6668 return from_table._compiler_dispatch(
6669 self, asfrom=True, iscrud=True, **kw
6670 )
6671
6672 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6673 compile_state = delete_stmt._compile_state_factory(
6674 delete_stmt, self, **kw
6675 )
6676 delete_stmt = compile_state.statement
6677
6678 if visiting_cte is not None:
6679 kw["visiting_cte"] = visiting_cte
6680 toplevel = False
6681 else:
6682 toplevel = not self.stack
6683
6684 if toplevel:
6685 self.isdelete = True
6686 if not self.dml_compile_state:
6687 self.dml_compile_state = compile_state
6688 if not self.compile_state:
6689 self.compile_state = compile_state
6690
6691 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6692 from_linter = FromLinter({}, set())
6693 warn_linting = self.linting & WARN_LINTING
6694 if toplevel:
6695 self.from_linter = from_linter
6696 else:
6697 from_linter = None
6698 warn_linting = False
6699
6700 extra_froms = compile_state._extra_froms
6701
6702 correlate_froms = {delete_stmt.table}.union(extra_froms)
6703 self.stack.append(
6704 {
6705 "correlate_froms": correlate_froms,
6706 "asfrom_froms": correlate_froms,
6707 "selectable": delete_stmt,
6708 }
6709 )
6710
6711 text = "DELETE "
6712
6713 if delete_stmt._prefixes:
6714 text += self._generate_prefixes(
6715 delete_stmt, delete_stmt._prefixes, **kw
6716 )
6717
6718 text += "FROM "
6719
6720 try:
6721 table_text = self.delete_table_clause(
6722 delete_stmt,
6723 delete_stmt.table,
6724 extra_froms,
6725 from_linter=from_linter,
6726 )
6727 except TypeError:
6728 # anticipate 3rd party dialects that don't include **kw
6729 # TODO: remove in 2.1
6730 table_text = self.delete_table_clause(
6731 delete_stmt, delete_stmt.table, extra_froms
6732 )
6733 if from_linter:
6734 _ = self.process(delete_stmt.table, from_linter=from_linter)
6735
6736 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6737
6738 if delete_stmt._hints:
6739 dialect_hints, table_text = self._setup_crud_hints(
6740 delete_stmt, table_text
6741 )
6742 else:
6743 dialect_hints = None
6744
6745 if delete_stmt._independent_ctes:
6746 self._dispatch_independent_ctes(delete_stmt, kw)
6747
6748 text += table_text
6749
6750 if (
6751 self.implicit_returning or delete_stmt._returning
6752 ) and self.returning_precedes_values:
6753 text += " " + self.returning_clause(
6754 delete_stmt,
6755 self.implicit_returning or delete_stmt._returning,
6756 populate_result_map=toplevel,
6757 )
6758
6759 if extra_froms:
6760 extra_from_text = self.delete_extra_from_clause(
6761 delete_stmt,
6762 delete_stmt.table,
6763 extra_froms,
6764 dialect_hints,
6765 from_linter=from_linter,
6766 **kw,
6767 )
6768 if extra_from_text:
6769 text += " " + extra_from_text
6770
6771 if delete_stmt._where_criteria:
6772 t = self._generate_delimited_and_list(
6773 delete_stmt._where_criteria, from_linter=from_linter, **kw
6774 )
6775 if t:
6776 text += " WHERE " + t
6777
6778 dlc = self.delete_post_criteria_clause(
6779 delete_stmt, from_linter=from_linter, **kw
6780 )
6781 if dlc:
6782 text += " " + dlc
6783
6784 if (
6785 self.implicit_returning or delete_stmt._returning
6786 ) and not self.returning_precedes_values:
6787 text += " " + self.returning_clause(
6788 delete_stmt,
6789 self.implicit_returning or delete_stmt._returning,
6790 populate_result_map=toplevel,
6791 )
6792
6793 if self.ctes:
6794 nesting_level = len(self.stack) if not toplevel else None
6795 text = self._render_cte_clause(nesting_level=nesting_level) + text
6796
6797 if warn_linting:
6798 assert from_linter is not None
6799 from_linter.warn(stmt_type="DELETE")
6800
6801 self.stack.pop(-1)
6802
6803 return text
6804
6805 def visit_savepoint(self, savepoint_stmt, **kw):
6806 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6807
6808 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6809 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6810 savepoint_stmt
6811 )
6812
6813 def visit_release_savepoint(self, savepoint_stmt, **kw):
6814 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6815 savepoint_stmt
6816 )
6817
6818
6819class StrSQLCompiler(SQLCompiler):
6820 """A :class:`.SQLCompiler` subclass which allows a small selection
6821 of non-standard SQL features to render into a string value.
6822
6823 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6824 element is directly stringified without calling upon the
6825 :meth:`_expression.ClauseElement.compile` method.
6826 It can render a limited set
6827 of non-standard SQL constructs to assist in basic stringification,
6828 however for more substantial custom or dialect-specific SQL constructs,
6829 it will be necessary to make use of
6830 :meth:`_expression.ClauseElement.compile`
6831 directly.
6832
6833 .. seealso::
6834
6835 :ref:`faq_sql_expression_string`
6836
6837 """
6838
6839 def _fallback_column_name(self, column):
6840 return "<name unknown>"
6841
6842 @util.preload_module("sqlalchemy.engine.url")
6843 def visit_unsupported_compilation(self, element, err, **kw):
6844 if element.stringify_dialect != "default":
6845 url = util.preloaded.engine_url
6846 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6847
6848 compiler = dialect.statement_compiler(
6849 dialect, None, _supporting_against=self
6850 )
6851 if not isinstance(compiler, StrSQLCompiler):
6852 return compiler.process(element, **kw)
6853
6854 return super().visit_unsupported_compilation(element, err)
6855
6856 def visit_getitem_binary(self, binary, operator, **kw):
6857 return "%s[%s]" % (
6858 self.process(binary.left, **kw),
6859 self.process(binary.right, **kw),
6860 )
6861
6862 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6863 return self.visit_getitem_binary(binary, operator, **kw)
6864
6865 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6866 return self.visit_getitem_binary(binary, operator, **kw)
6867
6868 def visit_sequence(self, sequence, **kw):
6869 return (
6870 f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
6871 )
6872
6873 def returning_clause(
6874 self,
6875 stmt: UpdateBase,
6876 returning_cols: Sequence[_ColumnsClauseElement],
6877 *,
6878 populate_result_map: bool,
6879 **kw: Any,
6880 ) -> str:
6881 columns = [
6882 self._label_select_column(None, c, True, False, {})
6883 for c in base._select_iterables(returning_cols)
6884 ]
6885 return "RETURNING " + ", ".join(columns)
6886
6887 def update_from_clause(
6888 self, update_stmt, from_table, extra_froms, from_hints, **kw
6889 ):
6890 kw["asfrom"] = True
6891 return "FROM " + ", ".join(
6892 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6893 for t in extra_froms
6894 )
6895
6896 def delete_extra_from_clause(
6897 self, delete_stmt, from_table, extra_froms, from_hints, **kw
6898 ):
6899 kw["asfrom"] = True
6900 return ", " + ", ".join(
6901 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6902 for t in extra_froms
6903 )
6904
6905 def visit_empty_set_expr(self, element_types, **kw):
6906 return "SELECT 1 WHERE 1!=1"
6907
6908 def get_from_hint_text(self, table, text):
6909 return "[%s]" % text
6910
6911 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6912 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6913
6914 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6915 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6916
6917 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6918 return "<regexp replace>(%s, %s)" % (
6919 binary.left._compiler_dispatch(self, **kw),
6920 binary.right._compiler_dispatch(self, **kw),
6921 )
6922
6923 def visit_try_cast(self, cast, **kwargs):
6924 return "TRY_CAST(%s AS %s)" % (
6925 cast.clause._compiler_dispatch(self, **kwargs),
6926 cast.typeclause._compiler_dispatch(self, **kwargs),
6927 )
6928
6929
6930class DDLCompiler(Compiled):
6931 is_ddl = True
6932
6933 if TYPE_CHECKING:
6934
6935 def __init__(
6936 self,
6937 dialect: Dialect,
6938 statement: ExecutableDDLElement,
6939 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6940 render_schema_translate: bool = ...,
6941 compile_kwargs: Mapping[str, Any] = ...,
6942 ): ...
6943
6944 @util.ro_memoized_property
6945 def sql_compiler(self) -> SQLCompiler:
6946 return self.dialect.statement_compiler(
6947 self.dialect, None, schema_translate_map=self.schema_translate_map
6948 )
6949
6950 @util.memoized_property
6951 def type_compiler(self):
6952 return self.dialect.type_compiler_instance
6953
6954 def construct_params(
6955 self,
6956 params: Optional[_CoreSingleExecuteParams] = None,
6957 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6958 escape_names: bool = True,
6959 ) -> Optional[_MutableCoreSingleExecuteParams]:
6960 return None
6961
6962 def visit_ddl(self, ddl, **kwargs):
6963 # table events can substitute table and schema name
6964 context = ddl.context
6965 if isinstance(ddl.target, schema.Table):
6966 context = context.copy()
6967
6968 preparer = self.preparer
6969 path = preparer.format_table_seq(ddl.target)
6970 if len(path) == 1:
6971 table, sch = path[0], ""
6972 else:
6973 table, sch = path[-1], path[0]
6974
6975 context.setdefault("table", table)
6976 context.setdefault("schema", sch)
6977 context.setdefault("fullname", preparer.format_table(ddl.target))
6978
6979 return self.sql_compiler.post_process_text(ddl.statement % context)
6980
6981 def visit_create_schema(self, create, **kw):
6982 text = "CREATE SCHEMA "
6983 if create.if_not_exists:
6984 text += "IF NOT EXISTS "
6985 return text + self.preparer.format_schema(create.element)
6986
6987 def visit_drop_schema(self, drop, **kw):
6988 text = "DROP SCHEMA "
6989 if drop.if_exists:
6990 text += "IF EXISTS "
6991 text += self.preparer.format_schema(drop.element)
6992 if drop.cascade:
6993 text += " CASCADE"
6994 return text
6995
6996 def visit_create_table(self, create, **kw):
6997 table = create.element
6998 preparer = self.preparer
6999
7000 text = "\nCREATE "
7001 if table._prefixes:
7002 text += " ".join(table._prefixes) + " "
7003
7004 text += "TABLE "
7005 if create.if_not_exists:
7006 text += "IF NOT EXISTS "
7007
7008 text += preparer.format_table(table) + " "
7009
7010 create_table_suffix = self.create_table_suffix(table)
7011 if create_table_suffix:
7012 text += create_table_suffix + " "
7013
7014 text += "("
7015
7016 separator = "\n"
7017
7018 # if only one primary key, specify it along with the column
7019 first_pk = False
7020 for create_column in create.columns:
7021 column = create_column.element
7022 try:
7023 processed = self.process(
7024 create_column, first_pk=column.primary_key and not first_pk
7025 )
7026 if processed is not None:
7027 text += separator
7028 separator = ", \n"
7029 text += "\t" + processed
7030 if column.primary_key:
7031 first_pk = True
7032 except exc.CompileError as ce:
7033 raise exc.CompileError(
7034 "(in table '%s', column '%s'): %s"
7035 % (table.description, column.name, ce.args[0])
7036 ) from ce
7037
7038 const = self.create_table_constraints(
7039 table,
7040 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
7041 )
7042 if const:
7043 text += separator + "\t" + const
7044
7045 text += "\n)%s\n\n" % self.post_create_table(table)
7046 return text
7047
7048 def visit_create_view(self, element: CreateView, **kw: Any) -> str:
7049 return self._generate_table_select(element, "view", **kw)
7050
7051 def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
7052 return self._generate_table_select(element, "create_table_as", **kw)
7053
7054 def _generate_table_select(
7055 self,
7056 element: _TableViaSelect,
7057 type_: str,
7058 if_not_exists: Optional[bool] = None,
7059 **kw: Any,
7060 ) -> str:
7061 prep = self.preparer
7062
7063 inner_kw = dict(kw)
7064 inner_kw["literal_binds"] = True
7065 select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
7066
7067 # Use if_not_exists parameter if provided, otherwise use element's
7068 use_if_not_exists = (
7069 if_not_exists
7070 if if_not_exists is not None
7071 else element.if_not_exists
7072 )
7073
7074 parts = [
7075 "CREATE",
7076 "OR REPLACE" if getattr(element, "or_replace", False) else None,
7077 "TEMPORARY" if element.temporary else None,
7078 (
7079 "MATERIALIZED VIEW"
7080 if type_ == "view" and getattr(element, "materialized", False)
7081 else "TABLE" if type_ == "create_table_as" else "VIEW"
7082 ),
7083 "IF NOT EXISTS" if use_if_not_exists else None,
7084 prep.format_table(element.table),
7085 "AS",
7086 select_sql,
7087 ]
7088 return " ".join(p for p in parts if p)
7089
7090 def visit_create_column(self, create, first_pk=False, **kw):
7091 column = create.element
7092
7093 if column.system:
7094 return None
7095
7096 text = self.get_column_specification(column, first_pk=first_pk)
7097 const = " ".join(
7098 self.process(constraint) for constraint in column.constraints
7099 )
7100 if const:
7101 text += " " + const
7102
7103 return text
7104
7105 def create_table_constraints(
7106 self, table, _include_foreign_key_constraints=None, **kw
7107 ):
7108 # On some DB order is significant: visit PK first, then the
7109 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
7110 constraints = []
7111 if table.primary_key:
7112 constraints.append(table.primary_key)
7113
7114 all_fkcs = table.foreign_key_constraints
7115 if _include_foreign_key_constraints is not None:
7116 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
7117 else:
7118 omit_fkcs = set()
7119
7120 constraints.extend(
7121 [
7122 c
7123 for c in table._sorted_constraints
7124 if c is not table.primary_key and c not in omit_fkcs
7125 ]
7126 )
7127
7128 return ", \n\t".join(
7129 p
7130 for p in (
7131 self.process(constraint)
7132 for constraint in constraints
7133 if (constraint._should_create_for_compiler(self))
7134 and (
7135 not self.dialect.supports_alter
7136 or not getattr(constraint, "use_alter", False)
7137 )
7138 )
7139 if p is not None
7140 )
7141
7142 def visit_drop_table(self, drop, **kw):
7143 text = "\nDROP TABLE "
7144 if drop.if_exists:
7145 text += "IF EXISTS "
7146 return text + self.preparer.format_table(drop.element)
7147
7148 def visit_drop_view(self, drop, **kw):
7149 text = "\nDROP "
7150 if drop.materialized:
7151 text += "MATERIALIZED VIEW "
7152 else:
7153 text += "VIEW "
7154 if drop.if_exists:
7155 text += "IF EXISTS "
7156 return text + self.preparer.format_table(drop.element)
7157
7158 def _verify_index_table(self, index: Index) -> None:
7159 if index.table is None:
7160 raise exc.CompileError(
7161 "Index '%s' is not associated with any table." % index.name
7162 )
7163
7164 def visit_create_index(
7165 self, create, include_schema=False, include_table_schema=True, **kw
7166 ):
7167 index = create.element
7168 self._verify_index_table(index)
7169 preparer = self.preparer
7170 text = "CREATE "
7171 if index.unique:
7172 text += "UNIQUE "
7173 if index.name is None:
7174 raise exc.CompileError(
7175 "CREATE INDEX requires that the index have a name"
7176 )
7177
7178 text += "INDEX "
7179 if create.if_not_exists:
7180 text += "IF NOT EXISTS "
7181
7182 text += "%s ON %s (%s)" % (
7183 self._prepared_index_name(index, include_schema=include_schema),
7184 preparer.format_table(
7185 index.table, use_schema=include_table_schema
7186 ),
7187 ", ".join(
7188 self.sql_compiler.process(
7189 expr, include_table=False, literal_binds=True
7190 )
7191 for expr in index.expressions
7192 ),
7193 )
7194 return text
7195
7196 def visit_drop_index(self, drop, **kw):
7197 index = drop.element
7198
7199 if index.name is None:
7200 raise exc.CompileError(
7201 "DROP INDEX requires that the index have a name"
7202 )
7203 text = "\nDROP INDEX "
7204 if drop.if_exists:
7205 text += "IF EXISTS "
7206
7207 return text + self._prepared_index_name(index, include_schema=True)
7208
7209 def _prepared_index_name(
7210 self, index: Index, include_schema: bool = False
7211 ) -> str:
7212 if index.table is not None:
7213 effective_schema = self.preparer.schema_for_object(index.table)
7214 else:
7215 effective_schema = None
7216 if include_schema and effective_schema:
7217 schema_name = self.preparer.quote_schema(effective_schema)
7218 else:
7219 schema_name = None
7220
7221 index_name: str = self.preparer.format_index(index)
7222
7223 if schema_name:
7224 index_name = schema_name + "." + index_name
7225 return index_name
7226
7227 def visit_add_constraint(self, create, **kw):
7228 return "ALTER TABLE %s ADD %s" % (
7229 self.preparer.format_table(create.element.table),
7230 self.process(create.element),
7231 )
7232
7233 def visit_set_table_comment(self, create, **kw):
7234 return "COMMENT ON TABLE %s IS %s" % (
7235 self.preparer.format_table(create.element),
7236 self.sql_compiler.render_literal_value(
7237 create.element.comment, sqltypes.String()
7238 ),
7239 )
7240
7241 def visit_drop_table_comment(self, drop, **kw):
7242 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
7243 drop.element
7244 )
7245
7246 def visit_set_column_comment(self, create, **kw):
7247 return "COMMENT ON COLUMN %s IS %s" % (
7248 self.preparer.format_column(
7249 create.element, use_table=True, use_schema=True
7250 ),
7251 self.sql_compiler.render_literal_value(
7252 create.element.comment, sqltypes.String()
7253 ),
7254 )
7255
7256 def visit_drop_column_comment(self, drop, **kw):
7257 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
7258 drop.element, use_table=True
7259 )
7260
7261 def visit_set_constraint_comment(self, create, **kw):
7262 raise exc.UnsupportedCompilationError(self, type(create))
7263
7264 def visit_drop_constraint_comment(self, drop, **kw):
7265 raise exc.UnsupportedCompilationError(self, type(drop))
7266
7267 def get_identity_options(self, identity_options: IdentityOptions) -> str:
7268 text = []
7269 if identity_options.increment is not None:
7270 text.append("INCREMENT BY %d" % identity_options.increment)
7271 if identity_options.start is not None:
7272 text.append("START WITH %d" % identity_options.start)
7273 if identity_options.minvalue is not None:
7274 text.append("MINVALUE %d" % identity_options.minvalue)
7275 if identity_options.maxvalue is not None:
7276 text.append("MAXVALUE %d" % identity_options.maxvalue)
7277 if identity_options.nominvalue is not None:
7278 text.append("NO MINVALUE")
7279 if identity_options.nomaxvalue is not None:
7280 text.append("NO MAXVALUE")
7281 if identity_options.cache is not None:
7282 text.append("CACHE %d" % identity_options.cache)
7283 if identity_options.cycle is not None:
7284 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
7285 return " ".join(text)
7286
7287 def visit_create_sequence(self, create, prefix=None, **kw):
7288 text = "CREATE SEQUENCE "
7289 if create.if_not_exists:
7290 text += "IF NOT EXISTS "
7291 text += self.preparer.format_sequence(create.element)
7292
7293 if prefix:
7294 text += prefix
7295 options = self.get_identity_options(create.element)
7296 if options:
7297 text += " " + options
7298 return text
7299
7300 def visit_drop_sequence(self, drop, **kw):
7301 text = "DROP SEQUENCE "
7302 if drop.if_exists:
7303 text += "IF EXISTS "
7304 return text + self.preparer.format_sequence(drop.element)
7305
7306 def visit_drop_constraint(self, drop, **kw):
7307 constraint = drop.element
7308 if constraint.name is not None:
7309 formatted_name = self.preparer.format_constraint(constraint)
7310 else:
7311 formatted_name = None
7312
7313 if formatted_name is None:
7314 raise exc.CompileError(
7315 "Can't emit DROP CONSTRAINT for constraint %r; "
7316 "it has no name" % drop.element
7317 )
7318 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
7319 self.preparer.format_table(drop.element.table),
7320 "IF EXISTS " if drop.if_exists else "",
7321 formatted_name,
7322 " CASCADE" if drop.cascade else "",
7323 )
7324
7325 def get_column_specification(
7326 self, column: Column[Any], **kwargs: Any
7327 ) -> str:
7328 colspec = (
7329 self.preparer.format_column(column)
7330 + " "
7331 + self.dialect.type_compiler_instance.process(
7332 column.type, type_expression=column
7333 )
7334 )
7335 default = self.get_column_default_string(column)
7336 if default is not None:
7337 colspec += " DEFAULT " + default
7338
7339 if column.computed is not None:
7340 colspec += " " + self.process(column.computed)
7341
7342 if (
7343 column.identity is not None
7344 and self.dialect.supports_identity_columns
7345 ):
7346 colspec += " " + self.process(column.identity)
7347
7348 if not column.nullable and (
7349 not column.identity or not self.dialect.supports_identity_columns
7350 ):
7351 colspec += " NOT NULL"
7352 return colspec
7353
7354 def create_table_suffix(self, table: Table) -> str:
7355 return ""
7356
7357 def post_create_table(self, table: Table) -> str:
7358 return ""
7359
7360 def get_column_default_string(self, column: Column[Any]) -> Optional[str]:
7361 if isinstance(column.server_default, schema.DefaultClause):
7362 return self.render_default_string(column.server_default.arg)
7363 else:
7364 return None
7365
7366 def render_default_string(self, default: Union[Visitable, str]) -> str:
7367 if isinstance(default, str):
7368 return self.sql_compiler.render_literal_value(
7369 default, sqltypes.STRINGTYPE
7370 )
7371 else:
7372 return self.sql_compiler.process(default, literal_binds=True)
7373
7374 def visit_table_or_column_check_constraint(self, constraint, **kw):
7375 if constraint.is_column_level:
7376 return self.visit_column_check_constraint(constraint)
7377 else:
7378 return self.visit_check_constraint(constraint)
7379
7380 def visit_check_constraint(self, constraint, **kw):
7381 text = self.define_constraint_preamble(constraint, **kw)
7382 text += self.define_check_body(constraint, **kw)
7383 text += self.define_constraint_deferrability(constraint)
7384 return text
7385
7386 def visit_column_check_constraint(self, constraint, **kw):
7387 text = self.define_constraint_preamble(constraint, **kw)
7388 text += self.define_check_body(constraint, **kw)
7389 text += self.define_constraint_deferrability(constraint)
7390 return text
7391
7392 def visit_primary_key_constraint(
7393 self, constraint: PrimaryKeyConstraint, **kw: Any
7394 ) -> str:
7395 if len(constraint) == 0:
7396 return ""
7397 text = self.define_constraint_preamble(constraint, **kw)
7398 text += self.define_primary_key_body(constraint, **kw)
7399 text += self.define_constraint_deferrability(constraint)
7400 return text
7401
7402 def visit_foreign_key_constraint(
7403 self, constraint: ForeignKeyConstraint, **kw: Any
7404 ) -> str:
7405 text = self.define_constraint_preamble(constraint, **kw)
7406 text += self.define_foreign_key_body(constraint, **kw)
7407 text += self.define_constraint_match(constraint)
7408 text += self.define_constraint_cascades(constraint)
7409 text += self.define_constraint_deferrability(constraint)
7410 return text
7411
7412 def define_constraint_remote_table(self, constraint, table, preparer):
7413 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7414
7415 return preparer.format_table(table)
7416
7417 def visit_unique_constraint(
7418 self, constraint: UniqueConstraint, **kw: Any
7419 ) -> str:
7420 if len(constraint) == 0:
7421 return ""
7422 text = self.define_constraint_preamble(constraint, **kw)
7423 text += self.define_unique_body(constraint, **kw)
7424 text += self.define_constraint_deferrability(constraint)
7425 return text
7426
7427 def define_constraint_preamble(
7428 self, constraint: Constraint, **kw: Any
7429 ) -> str:
7430 text = ""
7431 if constraint.name is not None:
7432 formatted_name = self.preparer.format_constraint(constraint)
7433 if formatted_name is not None:
7434 text += "CONSTRAINT %s " % formatted_name
7435 return text
7436
7437 def define_primary_key_body(
7438 self, constraint: PrimaryKeyConstraint, **kw: Any
7439 ) -> str:
7440 text = ""
7441 text += "PRIMARY KEY "
7442 text += "(%s)" % ", ".join(
7443 self.preparer.quote(c.name)
7444 for c in (
7445 constraint.columns_autoinc_first
7446 if constraint._implicit_generated
7447 else constraint.columns
7448 )
7449 )
7450 return text
7451
7452 def define_foreign_key_body(
7453 self, constraint: ForeignKeyConstraint, **kw: Any
7454 ) -> str:
7455 preparer = self.preparer
7456 remote_table = list(constraint.elements)[0].column.table
7457 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
7458 ", ".join(
7459 preparer.quote(f.parent.name) for f in constraint.elements
7460 ),
7461 self.define_constraint_remote_table(
7462 constraint, remote_table, preparer
7463 ),
7464 ", ".join(
7465 preparer.quote(f.column.name) for f in constraint.elements
7466 ),
7467 )
7468 return text
7469
7470 def define_unique_body(
7471 self, constraint: UniqueConstraint, **kw: Any
7472 ) -> str:
7473 text = "UNIQUE %s(%s)" % (
7474 self.define_unique_constraint_distinct(constraint, **kw),
7475 ", ".join(self.preparer.quote(c.name) for c in constraint),
7476 )
7477 return text
7478
7479 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str:
7480 text = "CHECK (%s)" % self.sql_compiler.process(
7481 constraint.sqltext, include_table=False, literal_binds=True
7482 )
7483 return text
7484
7485 def define_unique_constraint_distinct(
7486 self, constraint: UniqueConstraint, **kw: Any
7487 ) -> str:
7488 return ""
7489
7490 def define_constraint_cascades(
7491 self, constraint: ForeignKeyConstraint
7492 ) -> str:
7493 text = ""
7494 if constraint.ondelete is not None:
7495 text += self.define_constraint_ondelete_cascade(constraint)
7496
7497 if constraint.onupdate is not None:
7498 text += self.define_constraint_onupdate_cascade(constraint)
7499 return text
7500
7501 def define_constraint_ondelete_cascade(
7502 self, constraint: ForeignKeyConstraint
7503 ) -> str:
7504 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
7505 constraint.ondelete, FK_ON_DELETE
7506 )
7507
7508 def define_constraint_onupdate_cascade(
7509 self, constraint: ForeignKeyConstraint
7510 ) -> str:
7511 return " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7512 constraint.onupdate, FK_ON_UPDATE
7513 )
7514
7515 def define_constraint_deferrability(self, constraint: Constraint) -> str:
7516 text = ""
7517 if constraint.deferrable is not None:
7518 if constraint.deferrable:
7519 text += " DEFERRABLE"
7520 else:
7521 text += " NOT DEFERRABLE"
7522 if constraint.initially is not None:
7523 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7524 constraint.initially, FK_INITIALLY
7525 )
7526 return text
7527
7528 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str:
7529 text = ""
7530 if constraint.match is not None:
7531 text += " MATCH %s" % constraint.match
7532 return text
7533
7534 def visit_computed_column(self, generated, **kw):
7535 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7536 generated.sqltext, include_table=False, literal_binds=True
7537 )
7538 if generated.persisted is True:
7539 text += " STORED"
7540 elif generated.persisted is False:
7541 text += " VIRTUAL"
7542 return text
7543
7544 def visit_identity_column(self, identity, **kw):
7545 text = "GENERATED %s AS IDENTITY" % (
7546 "ALWAYS" if identity.always else "BY DEFAULT",
7547 )
7548 options = self.get_identity_options(identity)
7549 if options:
7550 text += " (%s)" % options
7551 return text
7552
7553
7554class GenericTypeCompiler(TypeCompiler):
7555 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7556 return "FLOAT"
7557
7558 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7559 return "DOUBLE"
7560
7561 def visit_DOUBLE_PRECISION(
7562 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any
7563 ) -> str:
7564 return "DOUBLE PRECISION"
7565
7566 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7567 return "REAL"
7568
7569 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7570 if type_.precision is None:
7571 return "NUMERIC"
7572 elif type_.scale is None:
7573 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7574 else:
7575 return "NUMERIC(%(precision)s, %(scale)s)" % {
7576 "precision": type_.precision,
7577 "scale": type_.scale,
7578 }
7579
7580 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str:
7581 if type_.precision is None:
7582 return "DECIMAL"
7583 elif type_.scale is None:
7584 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7585 else:
7586 return "DECIMAL(%(precision)s, %(scale)s)" % {
7587 "precision": type_.precision,
7588 "scale": type_.scale,
7589 }
7590
7591 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str:
7592 return "INTEGER"
7593
7594 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str:
7595 return "SMALLINT"
7596
7597 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7598 return "BIGINT"
7599
7600 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str:
7601 return "TIMESTAMP"
7602
7603 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7604 return "DATETIME"
7605
7606 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str:
7607 return "DATE"
7608
7609 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str:
7610 return "TIME"
7611
7612 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str:
7613 return "CLOB"
7614
7615 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str:
7616 return "NCLOB"
7617
7618 def _render_string_type(
7619 self, name: str, length: Optional[int], collation: Optional[str]
7620 ) -> str:
7621 text = name
7622 if length:
7623 text += f"({length})"
7624 if collation:
7625 text += f' COLLATE "{collation}"'
7626 return text
7627
7628 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str:
7629 return self._render_string_type("CHAR", type_.length, type_.collation)
7630
7631 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str:
7632 return self._render_string_type("NCHAR", type_.length, type_.collation)
7633
7634 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str:
7635 return self._render_string_type(
7636 "VARCHAR", type_.length, type_.collation
7637 )
7638
7639 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str:
7640 return self._render_string_type(
7641 "NVARCHAR", type_.length, type_.collation
7642 )
7643
7644 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str:
7645 return self._render_string_type("TEXT", type_.length, type_.collation)
7646
7647 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7648 return "UUID"
7649
7650 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str:
7651 return "BLOB"
7652
7653 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str:
7654 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7655
7656 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str:
7657 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7658
7659 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7660 return "BOOLEAN"
7661
7662 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str:
7663 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7664 return self._render_string_type("CHAR", length=32, collation=None)
7665 else:
7666 return self.visit_UUID(type_, **kw)
7667
7668 def visit_large_binary(
7669 self, type_: sqltypes.LargeBinary, **kw: Any
7670 ) -> str:
7671 return self.visit_BLOB(type_, **kw)
7672
7673 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str:
7674 return self.visit_BOOLEAN(type_, **kw)
7675
7676 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str:
7677 return self.visit_TIME(type_, **kw)
7678
7679 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str:
7680 return self.visit_DATETIME(type_, **kw)
7681
7682 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str:
7683 return self.visit_DATE(type_, **kw)
7684
7685 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str:
7686 return self.visit_BIGINT(type_, **kw)
7687
7688 def visit_small_integer(
7689 self, type_: sqltypes.SmallInteger, **kw: Any
7690 ) -> str:
7691 return self.visit_SMALLINT(type_, **kw)
7692
7693 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str:
7694 return self.visit_INTEGER(type_, **kw)
7695
7696 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str:
7697 return self.visit_REAL(type_, **kw)
7698
7699 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str:
7700 return self.visit_FLOAT(type_, **kw)
7701
7702 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str:
7703 return self.visit_DOUBLE(type_, **kw)
7704
7705 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str:
7706 return self.visit_NUMERIC(type_, **kw)
7707
7708 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str:
7709 return self.visit_VARCHAR(type_, **kw)
7710
7711 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str:
7712 return self.visit_VARCHAR(type_, **kw)
7713
7714 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str:
7715 return self.visit_TEXT(type_, **kw)
7716
7717 def visit_unicode_text(
7718 self, type_: sqltypes.UnicodeText, **kw: Any
7719 ) -> str:
7720 return self.visit_TEXT(type_, **kw)
7721
7722 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str:
7723 return self.visit_VARCHAR(type_, **kw)
7724
7725 def visit_null(self, type_, **kw):
7726 raise exc.CompileError(
7727 "Can't generate DDL for %r; "
7728 "did you forget to specify a "
7729 "type on this Column?" % type_
7730 )
7731
7732 def visit_type_decorator(
7733 self, type_: TypeDecorator[Any], **kw: Any
7734 ) -> str:
7735 return self.process(type_.type_engine(self.dialect), **kw)
7736
7737 def visit_user_defined(
7738 self, type_: UserDefinedType[Any], **kw: Any
7739 ) -> str:
7740 return type_.get_col_spec(**kw)
7741
7742
7743class StrSQLTypeCompiler(GenericTypeCompiler):
7744 def process(self, type_, **kw):
7745 try:
7746 _compiler_dispatch = type_._compiler_dispatch
7747 except AttributeError:
7748 return self._visit_unknown(type_, **kw)
7749 else:
7750 return _compiler_dispatch(self, **kw)
7751
7752 def __getattr__(self, key):
7753 if key.startswith("visit_"):
7754 return self._visit_unknown
7755 else:
7756 raise AttributeError(key)
7757
7758 def _visit_unknown(self, type_, **kw):
7759 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7760 return type_.__class__.__name__
7761 else:
7762 return repr(type_)
7763
7764 def visit_null(self, type_, **kw):
7765 return "NULL"
7766
7767 def visit_user_defined(self, type_, **kw):
7768 try:
7769 get_col_spec = type_.get_col_spec
7770 except AttributeError:
7771 return repr(type_)
7772 else:
7773 return get_col_spec(**kw)
7774
7775
7776class _SchemaForObjectCallable(Protocol):
7777 def __call__(self, obj: Any, /) -> str: ...
7778
7779
7780class _BindNameForColProtocol(Protocol):
7781 def __call__(self, col: ColumnClause[Any]) -> str: ...
7782
7783
7784class IdentifierPreparer:
7785 """Handle quoting and case-folding of identifiers based on options."""
7786
7787 reserved_words = RESERVED_WORDS
7788
7789 legal_characters = LEGAL_CHARACTERS
7790
7791 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7792
7793 initial_quote: str
7794
7795 final_quote: str
7796
7797 _strings: MutableMapping[str, str]
7798
7799 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7800 """Return the .schema attribute for an object.
7801
7802 For the default IdentifierPreparer, the schema for an object is always
7803 the value of the ".schema" attribute. if the preparer is replaced
7804 with one that has a non-empty schema_translate_map, the value of the
7805 ".schema" attribute is rendered a symbol that will be converted to a
7806 real schema name from the mapping post-compile.
7807
7808 """
7809
7810 _includes_none_schema_translate: bool = False
7811
7812 def __init__(
7813 self,
7814 dialect: Dialect,
7815 initial_quote: str = '"',
7816 final_quote: Optional[str] = None,
7817 escape_quote: str = '"',
7818 quote_case_sensitive_collations: bool = True,
7819 omit_schema: bool = False,
7820 ):
7821 """Construct a new ``IdentifierPreparer`` object.
7822
7823 initial_quote
7824 Character that begins a delimited identifier.
7825
7826 final_quote
7827 Character that ends a delimited identifier. Defaults to
7828 `initial_quote`.
7829
7830 omit_schema
7831 Prevent prepending schema name. Useful for databases that do
7832 not support schemae.
7833 """
7834
7835 self.dialect = dialect
7836 self.initial_quote = initial_quote
7837 self.final_quote = final_quote or self.initial_quote
7838 self.escape_quote = escape_quote
7839 self.escape_to_quote = self.escape_quote * 2
7840 self.omit_schema = omit_schema
7841 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7842 self._strings = {}
7843 self._double_percents = self.dialect.paramstyle in (
7844 "format",
7845 "pyformat",
7846 )
7847
7848 def _with_schema_translate(self, schema_translate_map):
7849 prep = self.__class__.__new__(self.__class__)
7850 prep.__dict__.update(self.__dict__)
7851
7852 includes_none = None in schema_translate_map
7853
7854 def symbol_getter(obj):
7855 name = obj.schema
7856 if obj._use_schema_map and (name is not None or includes_none):
7857 if name is not None and ("[" in name or "]" in name):
7858 raise exc.CompileError(
7859 "Square bracket characters ([]) not supported "
7860 "in schema translate name '%s'" % name
7861 )
7862 return quoted_name(
7863 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7864 )
7865 else:
7866 return obj.schema
7867
7868 prep.schema_for_object = symbol_getter
7869 prep._includes_none_schema_translate = includes_none
7870 return prep
7871
7872 def _render_schema_translates(
7873 self, statement: str, schema_translate_map: SchemaTranslateMapType
7874 ) -> str:
7875 d = schema_translate_map
7876 if None in d:
7877 if not self._includes_none_schema_translate:
7878 raise exc.InvalidRequestError(
7879 "schema translate map which previously did not have "
7880 "`None` present as a key now has `None` present; compiled "
7881 "statement may lack adequate placeholders. Please use "
7882 "consistent keys in successive "
7883 "schema_translate_map dictionaries."
7884 )
7885
7886 d["_none"] = d[None] # type: ignore[index]
7887
7888 def replace(m):
7889 name = m.group(2)
7890 if name in d:
7891 effective_schema = d[name]
7892 else:
7893 if name in (None, "_none"):
7894 raise exc.InvalidRequestError(
7895 "schema translate map which previously had `None` "
7896 "present as a key now no longer has it present; don't "
7897 "know how to apply schema for compiled statement. "
7898 "Please use consistent keys in successive "
7899 "schema_translate_map dictionaries."
7900 )
7901 effective_schema = name
7902
7903 if not effective_schema:
7904 effective_schema = self.dialect.default_schema_name
7905 if not effective_schema:
7906 # TODO: no coverage here
7907 raise exc.CompileError(
7908 "Dialect has no default schema name; can't "
7909 "use None as dynamic schema target."
7910 )
7911 return self.quote_schema(effective_schema)
7912
7913 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7914
7915 def _escape_identifier(self, value: str) -> str:
7916 """Escape an identifier.
7917
7918 Subclasses should override this to provide database-dependent
7919 escaping behavior.
7920 """
7921
7922 value = value.replace(self.escape_quote, self.escape_to_quote)
7923 if self._double_percents:
7924 value = value.replace("%", "%%")
7925 return value
7926
7927 def _unescape_identifier(self, value: str) -> str:
7928 """Canonicalize an escaped identifier.
7929
7930 Subclasses should override this to provide database-dependent
7931 unescaping behavior that reverses _escape_identifier.
7932 """
7933
7934 return value.replace(self.escape_to_quote, self.escape_quote)
7935
7936 def validate_sql_phrase(self, element, reg):
7937 """keyword sequence filter.
7938
7939 a filter for elements that are intended to represent keyword sequences,
7940 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7941 should be present.
7942
7943 """
7944
7945 if element is not None and not reg.match(element):
7946 raise exc.CompileError(
7947 "Unexpected SQL phrase: %r (matching against %r)"
7948 % (element, reg.pattern)
7949 )
7950 return element
7951
7952 def quote_identifier(self, value: str) -> str:
7953 """Quote an identifier.
7954
7955 Subclasses should override this to provide database-dependent
7956 quoting behavior.
7957 """
7958
7959 return (
7960 self.initial_quote
7961 + self._escape_identifier(value)
7962 + self.final_quote
7963 )
7964
7965 def _requires_quotes(self, value: str) -> bool:
7966 """Return True if the given identifier requires quoting."""
7967 lc_value = value.lower()
7968 return (
7969 lc_value in self.reserved_words
7970 or value[0] in self.illegal_initial_characters
7971 or not self.legal_characters.match(str(value))
7972 or (lc_value != value)
7973 )
7974
7975 def _requires_quotes_illegal_chars(self, value):
7976 """Return True if the given identifier requires quoting, but
7977 not taking case convention into account."""
7978 return not self.legal_characters.match(str(value))
7979
7980 def quote_schema(self, schema: str) -> str:
7981 """Conditionally quote a schema name.
7982
7983
7984 The name is quoted if it is a reserved word, contains quote-necessary
7985 characters, or is an instance of :class:`.quoted_name` which includes
7986 ``quote`` set to ``True``.
7987
7988 Subclasses can override this to provide database-dependent
7989 quoting behavior for schema names.
7990
7991 :param schema: string schema name
7992 """
7993 return self.quote(schema)
7994
7995 def quote(self, ident: str) -> str:
7996 """Conditionally quote an identifier.
7997
7998 The identifier is quoted if it is a reserved word, contains
7999 quote-necessary characters, or is an instance of
8000 :class:`.quoted_name` which includes ``quote`` set to ``True``.
8001
8002 Subclasses can override this to provide database-dependent
8003 quoting behavior for identifier names.
8004
8005 :param ident: string identifier
8006 """
8007 force = getattr(ident, "quote", None)
8008
8009 if force is None:
8010 if ident in self._strings:
8011 return self._strings[ident]
8012 else:
8013 if self._requires_quotes(ident):
8014 self._strings[ident] = self.quote_identifier(ident)
8015 else:
8016 self._strings[ident] = ident
8017 return self._strings[ident]
8018 elif force:
8019 return self.quote_identifier(ident)
8020 else:
8021 return ident
8022
8023 def format_collation(self, collation_name):
8024 if self.quote_case_sensitive_collations:
8025 return self.quote(collation_name)
8026 else:
8027 return collation_name
8028
8029 def format_sequence(
8030 self, sequence: schema.Sequence, use_schema: bool = True
8031 ) -> str:
8032 name = self.quote(sequence.name)
8033
8034 effective_schema = self.schema_for_object(sequence)
8035
8036 if (
8037 not self.omit_schema
8038 and use_schema
8039 and effective_schema is not None
8040 ):
8041 name = self.quote_schema(effective_schema) + "." + name
8042 return name
8043
8044 def format_label(
8045 self, label: Label[Any], name: Optional[str] = None
8046 ) -> str:
8047 return self.quote(name or label.name)
8048
8049 def format_alias(
8050 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
8051 ) -> str:
8052 if name is None:
8053 assert alias is not None
8054 return self.quote(alias.name)
8055 else:
8056 return self.quote(name)
8057
8058 def format_savepoint(self, savepoint, name=None):
8059 # Running the savepoint name through quoting is unnecessary
8060 # for all known dialects. This is here to support potential
8061 # third party use cases
8062 ident = name or savepoint.ident
8063 if self._requires_quotes(ident):
8064 ident = self.quote_identifier(ident)
8065 return ident
8066
8067 @util.preload_module("sqlalchemy.sql.naming")
8068 def format_constraint(
8069 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True
8070 ) -> Optional[str]:
8071 naming = util.preloaded.sql_naming
8072
8073 if constraint.name is _NONE_NAME:
8074 name = naming._constraint_name_for_table(
8075 constraint, constraint.table
8076 )
8077
8078 if name is None:
8079 return None
8080 else:
8081 name = constraint.name
8082
8083 assert name is not None
8084 if constraint.__visit_name__ == "index":
8085 return self.truncate_and_render_index_name(
8086 name, _alembic_quote=_alembic_quote
8087 )
8088 else:
8089 return self.truncate_and_render_constraint_name(
8090 name, _alembic_quote=_alembic_quote
8091 )
8092
8093 def truncate_and_render_index_name(
8094 self, name: str, _alembic_quote: bool = True
8095 ) -> str:
8096 # calculate these at format time so that ad-hoc changes
8097 # to dialect.max_identifier_length etc. can be reflected
8098 # as IdentifierPreparer is long lived
8099 max_ = (
8100 self.dialect.max_index_name_length
8101 or self.dialect.max_identifier_length
8102 )
8103 return self._truncate_and_render_maxlen_name(
8104 name, max_, _alembic_quote
8105 )
8106
8107 def truncate_and_render_constraint_name(
8108 self, name: str, _alembic_quote: bool = True
8109 ) -> str:
8110 # calculate these at format time so that ad-hoc changes
8111 # to dialect.max_identifier_length etc. can be reflected
8112 # as IdentifierPreparer is long lived
8113 max_ = (
8114 self.dialect.max_constraint_name_length
8115 or self.dialect.max_identifier_length
8116 )
8117 return self._truncate_and_render_maxlen_name(
8118 name, max_, _alembic_quote
8119 )
8120
8121 def _truncate_and_render_maxlen_name(
8122 self, name: str, max_: int, _alembic_quote: bool
8123 ) -> str:
8124 if isinstance(name, elements._truncated_label):
8125 if len(name) > max_:
8126 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
8127 else:
8128 self.dialect.validate_identifier(name)
8129
8130 if not _alembic_quote:
8131 return name
8132 else:
8133 return self.quote(name)
8134
8135 def format_index(self, index: Index) -> str:
8136 name = self.format_constraint(index)
8137 assert name is not None
8138 return name
8139
8140 def format_table(
8141 self,
8142 table: FromClause,
8143 use_schema: bool = True,
8144 name: Optional[str] = None,
8145 ) -> str:
8146 """Prepare a quoted table and schema name."""
8147 if name is None:
8148 if TYPE_CHECKING:
8149 assert isinstance(table, NamedFromClause)
8150 name = table.name
8151
8152 result = self.quote(name)
8153
8154 effective_schema = self.schema_for_object(table)
8155
8156 if not self.omit_schema and use_schema and effective_schema:
8157 result = self.quote_schema(effective_schema) + "." + result
8158 return result
8159
8160 def format_schema(self, name):
8161 """Prepare a quoted schema name."""
8162
8163 return self.quote(name)
8164
8165 def format_label_name(
8166 self,
8167 name,
8168 anon_map=None,
8169 ):
8170 """Prepare a quoted column name."""
8171
8172 if anon_map is not None and isinstance(
8173 name, elements._truncated_label
8174 ):
8175 name = name.apply_map(anon_map)
8176
8177 return self.quote(name)
8178
8179 def format_column(
8180 self,
8181 column: ColumnElement[Any],
8182 use_table: bool = False,
8183 name: Optional[str] = None,
8184 table_name: Optional[str] = None,
8185 use_schema: bool = False,
8186 anon_map: Optional[Mapping[str, Any]] = None,
8187 ) -> str:
8188 """Prepare a quoted column name."""
8189
8190 if name is None:
8191 name = column.name
8192 assert name is not None
8193
8194 if anon_map is not None and isinstance(
8195 name, elements._truncated_label
8196 ):
8197 name = name.apply_map(anon_map)
8198
8199 if not getattr(column, "is_literal", False):
8200 if use_table:
8201 return (
8202 self.format_table(
8203 column.table, use_schema=use_schema, name=table_name
8204 )
8205 + "."
8206 + self.quote(name)
8207 )
8208 else:
8209 return self.quote(name)
8210 else:
8211 # literal textual elements get stuck into ColumnClause a lot,
8212 # which shouldn't get quoted
8213
8214 if use_table:
8215 return (
8216 self.format_table(
8217 column.table, use_schema=use_schema, name=table_name
8218 )
8219 + "."
8220 + name
8221 )
8222 else:
8223 return name
8224
8225 def format_table_seq(self, table, use_schema=True):
8226 """Format table name and schema as a tuple."""
8227
8228 # Dialects with more levels in their fully qualified references
8229 # ('database', 'owner', etc.) could override this and return
8230 # a longer sequence.
8231
8232 effective_schema = self.schema_for_object(table)
8233
8234 if not self.omit_schema and use_schema and effective_schema:
8235 return (
8236 self.quote_schema(effective_schema),
8237 self.format_table(table, use_schema=False),
8238 )
8239 else:
8240 return (self.format_table(table, use_schema=False),)
8241
8242 @util.memoized_property
8243 def _r_identifiers(self):
8244 initial, final, escaped_final = (
8245 re.escape(s)
8246 for s in (
8247 self.initial_quote,
8248 self.final_quote,
8249 self._escape_identifier(self.final_quote),
8250 )
8251 )
8252 r = re.compile(
8253 r"(?:"
8254 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
8255 r"|([^\.]+))(?=\.|$))+"
8256 % {"initial": initial, "final": final, "escaped": escaped_final}
8257 )
8258 return r
8259
8260 def unformat_identifiers(self, identifiers: str) -> Sequence[str]:
8261 """Unpack 'schema.table.column'-like strings into components."""
8262
8263 r = self._r_identifiers
8264 return [
8265 self._unescape_identifier(i)
8266 for i in [a or b for a, b in r.findall(identifiers)]
8267 ]