1# sql/compiler.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Base SQL and DDL compiler implementations.
10
11Classes provided include:
12
13:class:`.compiler.SQLCompiler` - renders SQL
14strings
15
16:class:`.compiler.DDLCompiler` - renders DDL
17(data definition language) strings
18
19:class:`.compiler.GenericTypeCompiler` - renders
20type specification strings.
21
22To generate user-defined SQL strings, see
23:doc:`/ext/compiler`.
24
25"""
26from __future__ import annotations
27
28import collections
29import collections.abc as collections_abc
30import contextlib
31from enum import IntEnum
32import functools
33import itertools
34import operator
35import re
36from time import perf_counter
37import typing
38from typing import Any
39from typing import Callable
40from typing import cast
41from typing import ClassVar
42from typing import Dict
43from typing import FrozenSet
44from typing import Iterable
45from typing import Iterator
46from typing import List
47from typing import Mapping
48from typing import MutableMapping
49from typing import NamedTuple
50from typing import NoReturn
51from typing import Optional
52from typing import Pattern
53from typing import Protocol
54from typing import Sequence
55from typing import Set
56from typing import Tuple
57from typing import Type
58from typing import TYPE_CHECKING
59from typing import TypedDict
60from typing import Union
61
62from . import base
63from . import coercions
64from . import crud
65from . import elements
66from . import functions
67from . import operators
68from . import roles
69from . import schema
70from . import selectable
71from . import sqltypes
72from . import util as sql_util
73from ._typing import is_column_element
74from ._typing import is_dml
75from .base import _de_clone
76from .base import _from_objects
77from .base import _NONE_NAME
78from .base import _SentinelDefaultCharacterization
79from .base import Executable
80from .base import NO_ARG
81from .elements import ClauseElement
82from .elements import quoted_name
83from .schema import Column
84from .sqltypes import TupleType
85from .type_api import TypeEngine
86from .visitors import prefix_anon_map
87from .visitors import Visitable
88from .. import exc
89from .. import util
90from ..util import FastIntFlag
91from ..util.typing import Literal
92from ..util.typing import TupleAny
93from ..util.typing import Unpack
94
95if typing.TYPE_CHECKING:
96 from .annotation import _AnnotationDict
97 from .base import _AmbiguousTableNameMap
98 from .base import CompileState
99 from .cache_key import CacheKey
100 from .ddl import ExecutableDDLElement
101 from .dml import Insert
102 from .dml import UpdateBase
103 from .dml import ValuesBase
104 from .elements import _truncated_label
105 from .elements import BindParameter
106 from .elements import ColumnClause
107 from .elements import ColumnElement
108 from .elements import Label
109 from .functions import Function
110 from .schema import Table
111 from .selectable import AliasedReturnsRows
112 from .selectable import CompoundSelectState
113 from .selectable import CTE
114 from .selectable import FromClause
115 from .selectable import NamedFromClause
116 from .selectable import ReturnsRows
117 from .selectable import Select
118 from .selectable import SelectState
119 from .type_api import _BindProcessorType
120 from ..engine.cursor import CursorResultMetaData
121 from ..engine.interfaces import _CoreSingleExecuteParams
122 from ..engine.interfaces import _DBAPIAnyExecuteParams
123 from ..engine.interfaces import _DBAPIMultiExecuteParams
124 from ..engine.interfaces import _DBAPISingleExecuteParams
125 from ..engine.interfaces import _ExecuteOptions
126 from ..engine.interfaces import _GenericSetInputSizesType
127 from ..engine.interfaces import _MutableCoreSingleExecuteParams
128 from ..engine.interfaces import Dialect
129 from ..engine.interfaces import SchemaTranslateMapType
130
131_FromHintsType = Dict["FromClause", str]
132
133RESERVED_WORDS = {
134 "all",
135 "analyse",
136 "analyze",
137 "and",
138 "any",
139 "array",
140 "as",
141 "asc",
142 "asymmetric",
143 "authorization",
144 "between",
145 "binary",
146 "both",
147 "case",
148 "cast",
149 "check",
150 "collate",
151 "column",
152 "constraint",
153 "create",
154 "cross",
155 "current_date",
156 "current_role",
157 "current_time",
158 "current_timestamp",
159 "current_user",
160 "default",
161 "deferrable",
162 "desc",
163 "distinct",
164 "do",
165 "else",
166 "end",
167 "except",
168 "false",
169 "for",
170 "foreign",
171 "freeze",
172 "from",
173 "full",
174 "grant",
175 "group",
176 "having",
177 "ilike",
178 "in",
179 "initially",
180 "inner",
181 "intersect",
182 "into",
183 "is",
184 "isnull",
185 "join",
186 "leading",
187 "left",
188 "like",
189 "limit",
190 "localtime",
191 "localtimestamp",
192 "natural",
193 "new",
194 "not",
195 "notnull",
196 "null",
197 "off",
198 "offset",
199 "old",
200 "on",
201 "only",
202 "or",
203 "order",
204 "outer",
205 "overlaps",
206 "placing",
207 "primary",
208 "references",
209 "right",
210 "select",
211 "session_user",
212 "set",
213 "similar",
214 "some",
215 "symmetric",
216 "table",
217 "then",
218 "to",
219 "trailing",
220 "true",
221 "union",
222 "unique",
223 "user",
224 "using",
225 "verbose",
226 "when",
227 "where",
228}
229
230LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I)
231LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I)
232ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"])
233
234FK_ON_DELETE = re.compile(
235 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
236)
237FK_ON_UPDATE = re.compile(
238 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I
239)
240FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I)
241BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE)
242BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE)
243
244_pyformat_template = "%%(%(name)s)s"
245BIND_TEMPLATES = {
246 "pyformat": _pyformat_template,
247 "qmark": "?",
248 "format": "%%s",
249 "numeric": ":[_POSITION]",
250 "numeric_dollar": "$[_POSITION]",
251 "named": ":%(name)s",
252}
253
254
255OPERATORS = {
256 # binary
257 operators.and_: " AND ",
258 operators.or_: " OR ",
259 operators.add: " + ",
260 operators.mul: " * ",
261 operators.sub: " - ",
262 operators.mod: " % ",
263 operators.neg: "-",
264 operators.lt: " < ",
265 operators.le: " <= ",
266 operators.ne: " != ",
267 operators.gt: " > ",
268 operators.ge: " >= ",
269 operators.eq: " = ",
270 operators.is_distinct_from: " IS DISTINCT FROM ",
271 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ",
272 operators.concat_op: " || ",
273 operators.match_op: " MATCH ",
274 operators.not_match_op: " NOT MATCH ",
275 operators.in_op: " IN ",
276 operators.not_in_op: " NOT IN ",
277 operators.comma_op: ", ",
278 operators.from_: " FROM ",
279 operators.as_: " AS ",
280 operators.is_: " IS ",
281 operators.is_not: " IS NOT ",
282 operators.collate: " COLLATE ",
283 # unary
284 operators.exists: "EXISTS ",
285 operators.distinct_op: "DISTINCT ",
286 operators.inv: "NOT ",
287 operators.any_op: "ANY ",
288 operators.all_op: "ALL ",
289 # modifiers
290 operators.desc_op: " DESC",
291 operators.asc_op: " ASC",
292 operators.nulls_first_op: " NULLS FIRST",
293 operators.nulls_last_op: " NULLS LAST",
294 # bitwise
295 operators.bitwise_xor_op: " ^ ",
296 operators.bitwise_or_op: " | ",
297 operators.bitwise_and_op: " & ",
298 operators.bitwise_not_op: "~",
299 operators.bitwise_lshift_op: " << ",
300 operators.bitwise_rshift_op: " >> ",
301}
302
303FUNCTIONS: Dict[Type[Function[Any]], str] = {
304 functions.coalesce: "coalesce",
305 functions.current_date: "CURRENT_DATE",
306 functions.current_time: "CURRENT_TIME",
307 functions.current_timestamp: "CURRENT_TIMESTAMP",
308 functions.current_user: "CURRENT_USER",
309 functions.localtime: "LOCALTIME",
310 functions.localtimestamp: "LOCALTIMESTAMP",
311 functions.random: "random",
312 functions.sysdate: "sysdate",
313 functions.session_user: "SESSION_USER",
314 functions.user: "USER",
315 functions.cube: "CUBE",
316 functions.rollup: "ROLLUP",
317 functions.grouping_sets: "GROUPING SETS",
318}
319
320
321EXTRACT_MAP = {
322 "month": "month",
323 "day": "day",
324 "year": "year",
325 "second": "second",
326 "hour": "hour",
327 "doy": "doy",
328 "minute": "minute",
329 "quarter": "quarter",
330 "dow": "dow",
331 "week": "week",
332 "epoch": "epoch",
333 "milliseconds": "milliseconds",
334 "microseconds": "microseconds",
335 "timezone_hour": "timezone_hour",
336 "timezone_minute": "timezone_minute",
337}
338
339COMPOUND_KEYWORDS = {
340 selectable._CompoundSelectKeyword.UNION: "UNION",
341 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL",
342 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT",
343 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL",
344 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT",
345 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL",
346}
347
348
349class ResultColumnsEntry(NamedTuple):
350 """Tracks a column expression that is expected to be represented
351 in the result rows for this statement.
352
353 This normally refers to the columns clause of a SELECT statement
354 but may also refer to a RETURNING clause, as well as for dialect-specific
355 emulations.
356
357 """
358
359 keyname: str
360 """string name that's expected in cursor.description"""
361
362 name: str
363 """column name, may be labeled"""
364
365 objects: Tuple[Any, ...]
366 """sequence of objects that should be able to locate this column
367 in a RowMapping. This is typically string names and aliases
368 as well as Column objects.
369
370 """
371
372 type: TypeEngine[Any]
373 """Datatype to be associated with this column. This is where
374 the "result processing" logic directly links the compiled statement
375 to the rows that come back from the cursor.
376
377 """
378
379
380class _ResultMapAppender(Protocol):
381 def __call__(
382 self,
383 keyname: str,
384 name: str,
385 objects: Sequence[Any],
386 type_: TypeEngine[Any],
387 ) -> None: ...
388
389
390# integer indexes into ResultColumnsEntry used by cursor.py.
391# some profiling showed integer access faster than named tuple
392RM_RENDERED_NAME: Literal[0] = 0
393RM_NAME: Literal[1] = 1
394RM_OBJECTS: Literal[2] = 2
395RM_TYPE: Literal[3] = 3
396
397
398class _BaseCompilerStackEntry(TypedDict):
399 asfrom_froms: Set[FromClause]
400 correlate_froms: Set[FromClause]
401 selectable: ReturnsRows
402
403
404class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
405 compile_state: CompileState
406 need_result_map_for_nested: bool
407 need_result_map_for_compound: bool
408 select_0: ReturnsRows
409 insert_from_select: Select[Unpack[TupleAny]]
410
411
412class ExpandedState(NamedTuple):
413 """represents state to use when producing "expanded" and
414 "post compile" bound parameters for a statement.
415
416 "expanded" parameters are parameters that are generated at
417 statement execution time to suit a number of parameters passed, the most
418 prominent example being the individual elements inside of an IN expression.
419
420 "post compile" parameters are parameters where the SQL literal value
421 will be rendered into the SQL statement at execution time, rather than
422 being passed as separate parameters to the driver.
423
424 To create an :class:`.ExpandedState` instance, use the
425 :meth:`.SQLCompiler.construct_expanded_state` method on any
426 :class:`.SQLCompiler` instance.
427
428 """
429
430 statement: str
431 """String SQL statement with parameters fully expanded"""
432
433 parameters: _CoreSingleExecuteParams
434 """Parameter dictionary with parameters fully expanded.
435
436 For a statement that uses named parameters, this dictionary will map
437 exactly to the names in the statement. For a statement that uses
438 positional parameters, the :attr:`.ExpandedState.positional_parameters`
439 will yield a tuple with the positional parameter set.
440
441 """
442
443 processors: Mapping[str, _BindProcessorType[Any]]
444 """mapping of bound value processors"""
445
446 positiontup: Optional[Sequence[str]]
447 """Sequence of string names indicating the order of positional
448 parameters"""
449
450 parameter_expansion: Mapping[str, List[str]]
451 """Mapping representing the intermediary link from original parameter
452 name to list of "expanded" parameter names, for those parameters that
453 were expanded."""
454
455 @property
456 def positional_parameters(self) -> Tuple[Any, ...]:
457 """Tuple of positional parameters, for statements that were compiled
458 using a positional paramstyle.
459
460 """
461 if self.positiontup is None:
462 raise exc.InvalidRequestError(
463 "statement does not use a positional paramstyle"
464 )
465 return tuple(self.parameters[key] for key in self.positiontup)
466
467 @property
468 def additional_parameters(self) -> _CoreSingleExecuteParams:
469 """synonym for :attr:`.ExpandedState.parameters`."""
470 return self.parameters
471
472
473class _InsertManyValues(NamedTuple):
474 """represents state to use for executing an "insertmanyvalues" statement.
475
476 The primary consumers of this object are the
477 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
478 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods.
479
480 .. versionadded:: 2.0
481
482 """
483
484 is_default_expr: bool
485 """if True, the statement is of the form
486 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch"
487
488 """
489
490 single_values_expr: str
491 """The rendered "values" clause of the INSERT statement.
492
493 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar.
494 The insertmanyvalues logic uses this string as a search and replace
495 target.
496
497 """
498
499 insert_crud_params: List[crud._CrudParamElementStr]
500 """List of Column / bind names etc. used while rewriting the statement"""
501
502 num_positional_params_counted: int
503 """the number of bound parameters in a single-row statement.
504
505 This count may be larger or smaller than the actual number of columns
506 targeted in the INSERT, as it accommodates for SQL expressions
507 in the values list that may have zero or more parameters embedded
508 within them.
509
510 This count is part of what's used to organize rewritten parameter lists
511 when batching.
512
513 """
514
515 sort_by_parameter_order: bool = False
516 """if the deterministic_returnined_order parameter were used on the
517 insert.
518
519 All of the attributes following this will only be used if this is True.
520
521 """
522
523 includes_upsert_behaviors: bool = False
524 """if True, we have to accommodate for upsert behaviors.
525
526 This will in some cases downgrade "insertmanyvalues" that requests
527 deterministic ordering.
528
529 """
530
531 sentinel_columns: Optional[Sequence[Column[Any]]] = None
532 """List of sentinel columns that were located.
533
534 This list is only here if the INSERT asked for
535 sort_by_parameter_order=True,
536 and dialect-appropriate sentinel columns were located.
537
538 .. versionadded:: 2.0.10
539
540 """
541
542 num_sentinel_columns: int = 0
543 """how many sentinel columns are in the above list, if any.
544
545 This is the same as
546 ``len(sentinel_columns) if sentinel_columns is not None else 0``
547
548 """
549
550 sentinel_param_keys: Optional[Sequence[str]] = None
551 """parameter str keys in each param dictionary / tuple
552 that would link to the client side "sentinel" values for that row, which
553 we can use to match up parameter sets to result rows.
554
555 This is only present if sentinel_columns is present and the INSERT
556 statement actually refers to client side values for these sentinel
557 columns.
558
559 .. versionadded:: 2.0.10
560
561 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
562 only, used against the "compiled parameteters" collection before
563 the parameters were converted by bound parameter processors
564
565 """
566
567 implicit_sentinel: bool = False
568 """if True, we have exactly one sentinel column and it uses a server side
569 value, currently has to generate an incrementing integer value.
570
571 The dialect in question would have asserted that it supports receiving
572 these values back and sorting on that value as a means of guaranteeing
573 correlation with the incoming parameter list.
574
575 .. versionadded:: 2.0.10
576
577 """
578
579 embed_values_counter: bool = False
580 """Whether to embed an incrementing integer counter in each parameter
581 set within the VALUES clause as parameters are batched over.
582
583 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax
584 where a subquery is used to produce value tuples. Current support
585 includes PostgreSQL, Microsoft SQL Server.
586
587 .. versionadded:: 2.0.10
588
589 """
590
591
592class _InsertManyValuesBatch(NamedTuple):
593 """represents an individual batch SQL statement for insertmanyvalues.
594
595 This is passed through the
596 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and
597 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out
598 to the :class:`.Connection` within the
599 :meth:`.Connection._exec_insertmany_context` method.
600
601 .. versionadded:: 2.0.10
602
603 """
604
605 replaced_statement: str
606 replaced_parameters: _DBAPIAnyExecuteParams
607 processed_setinputsizes: Optional[_GenericSetInputSizesType]
608 batch: Sequence[_DBAPISingleExecuteParams]
609 sentinel_values: Sequence[Tuple[Any, ...]]
610 current_batch_size: int
611 batchnum: int
612 total_batches: int
613 rows_sorted: bool
614 is_downgraded: bool
615
616
617class InsertmanyvaluesSentinelOpts(FastIntFlag):
618 """bitflag enum indicating styles of PK defaults
619 which can work as implicit sentinel columns
620
621 """
622
623 NOT_SUPPORTED = 1
624 AUTOINCREMENT = 2
625 IDENTITY = 4
626 SEQUENCE = 8
627
628 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE
629 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT
630
631 USE_INSERT_FROM_SELECT = 16
632 RENDER_SELECT_COL_CASTS = 64
633
634
635class CompilerState(IntEnum):
636 COMPILING = 0
637 """statement is present, compilation phase in progress"""
638
639 STRING_APPLIED = 1
640 """statement is present, string form of the statement has been applied.
641
642 Additional processors by subclasses may still be pending.
643
644 """
645
646 NO_STATEMENT = 2
647 """compiler does not have a statement to compile, is used
648 for method access"""
649
650
651class Linting(IntEnum):
652 """represent preferences for the 'SQL linting' feature.
653
654 this feature currently includes support for flagging cartesian products
655 in SQL statements.
656
657 """
658
659 NO_LINTING = 0
660 "Disable all linting."
661
662 COLLECT_CARTESIAN_PRODUCTS = 1
663 """Collect data on FROMs and cartesian products and gather into
664 'self.from_linter'"""
665
666 WARN_LINTING = 2
667 "Emit warnings for linters that find problems"
668
669 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING
670 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS
671 and WARN_LINTING"""
672
673
674NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
675 Linting
676)
677
678
679class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
680 """represents current state for the "cartesian product" detection
681 feature."""
682
683 def lint(self, start=None):
684 froms = self.froms
685 if not froms:
686 return None, None
687
688 edges = set(self.edges)
689 the_rest = set(froms)
690
691 if start is not None:
692 start_with = start
693 the_rest.remove(start_with)
694 else:
695 start_with = the_rest.pop()
696
697 stack = collections.deque([start_with])
698
699 while stack and the_rest:
700 node = stack.popleft()
701 the_rest.discard(node)
702
703 # comparison of nodes in edges here is based on hash equality, as
704 # there are "annotated" elements that match the non-annotated ones.
705 # to remove the need for in-python hash() calls, use native
706 # containment routines (e.g. "node in edge", "edge.index(node)")
707 to_remove = {edge for edge in edges if node in edge}
708
709 # appendleft the node in each edge that is not
710 # the one that matched.
711 stack.extendleft(edge[not edge.index(node)] for edge in to_remove)
712 edges.difference_update(to_remove)
713
714 # FROMS left over? boom
715 if the_rest:
716 return the_rest, start_with
717 else:
718 return None, None
719
720 def warn(self, stmt_type="SELECT"):
721 the_rest, start_with = self.lint()
722
723 # FROMS left over? boom
724 if the_rest:
725 froms = the_rest
726 if froms:
727 template = (
728 "{stmt_type} statement has a cartesian product between "
729 "FROM element(s) {froms} and "
730 'FROM element "{start}". Apply join condition(s) '
731 "between each element to resolve."
732 )
733 froms_str = ", ".join(
734 f'"{self.froms[from_]}"' for from_ in froms
735 )
736 message = template.format(
737 stmt_type=stmt_type,
738 froms=froms_str,
739 start=self.froms[start_with],
740 )
741
742 util.warn(message)
743
744
745class Compiled:
746 """Represent a compiled SQL or DDL expression.
747
748 The ``__str__`` method of the ``Compiled`` object should produce
749 the actual text of the statement. ``Compiled`` objects are
750 specific to their underlying database dialect, and also may
751 or may not be specific to the columns referenced within a
752 particular set of bind parameters. In no case should the
753 ``Compiled`` object be dependent on the actual values of those
754 bind parameters, even though it may reference those values as
755 defaults.
756 """
757
758 statement: Optional[ClauseElement] = None
759 "The statement to compile."
760 string: str = ""
761 "The string representation of the ``statement``"
762
763 state: CompilerState
764 """description of the compiler's state"""
765
766 is_sql = False
767 is_ddl = False
768
769 _cached_metadata: Optional[CursorResultMetaData] = None
770
771 _result_columns: Optional[List[ResultColumnsEntry]] = None
772
773 schema_translate_map: Optional[SchemaTranslateMapType] = None
774
775 execution_options: _ExecuteOptions = util.EMPTY_DICT
776 """
777 Execution options propagated from the statement. In some cases,
778 sub-elements of the statement can modify these.
779 """
780
781 preparer: IdentifierPreparer
782
783 _annotations: _AnnotationDict = util.EMPTY_DICT
784
785 compile_state: Optional[CompileState] = None
786 """Optional :class:`.CompileState` object that maintains additional
787 state used by the compiler.
788
789 Major executable objects such as :class:`_expression.Insert`,
790 :class:`_expression.Update`, :class:`_expression.Delete`,
791 :class:`_expression.Select` will generate this
792 state when compiled in order to calculate additional information about the
793 object. For the top level object that is to be executed, the state can be
794 stored here where it can also have applicability towards result set
795 processing.
796
797 .. versionadded:: 1.4
798
799 """
800
801 dml_compile_state: Optional[CompileState] = None
802 """Optional :class:`.CompileState` assigned at the same point that
803 .isinsert, .isupdate, or .isdelete is assigned.
804
805 This will normally be the same object as .compile_state, with the
806 exception of cases like the :class:`.ORMFromStatementCompileState`
807 object.
808
809 .. versionadded:: 1.4.40
810
811 """
812
813 cache_key: Optional[CacheKey] = None
814 """The :class:`.CacheKey` that was generated ahead of creating this
815 :class:`.Compiled` object.
816
817 This is used for routines that need access to the original
818 :class:`.CacheKey` instance generated when the :class:`.Compiled`
819 instance was first cached, typically in order to reconcile
820 the original list of :class:`.BindParameter` objects with a
821 per-statement list that's generated on each call.
822
823 """
824
825 _gen_time: float
826 """Generation time of this :class:`.Compiled`, used for reporting
827 cache stats."""
828
829 def __init__(
830 self,
831 dialect: Dialect,
832 statement: Optional[ClauseElement],
833 schema_translate_map: Optional[SchemaTranslateMapType] = None,
834 render_schema_translate: bool = False,
835 compile_kwargs: Mapping[str, Any] = util.immutabledict(),
836 ):
837 """Construct a new :class:`.Compiled` object.
838
839 :param dialect: :class:`.Dialect` to compile against.
840
841 :param statement: :class:`_expression.ClauseElement` to be compiled.
842
843 :param schema_translate_map: dictionary of schema names to be
844 translated when forming the resultant SQL
845
846 .. seealso::
847
848 :ref:`schema_translating`
849
850 :param compile_kwargs: additional kwargs that will be
851 passed to the initial call to :meth:`.Compiled.process`.
852
853
854 """
855 self.dialect = dialect
856 self.preparer = self.dialect.identifier_preparer
857 if schema_translate_map:
858 self.schema_translate_map = schema_translate_map
859 self.preparer = self.preparer._with_schema_translate(
860 schema_translate_map
861 )
862
863 if statement is not None:
864 self.state = CompilerState.COMPILING
865 self.statement = statement
866 self.can_execute = statement.supports_execution
867 self._annotations = statement._annotations
868 if self.can_execute:
869 if TYPE_CHECKING:
870 assert isinstance(statement, Executable)
871 self.execution_options = statement._execution_options
872 self.string = self.process(self.statement, **compile_kwargs)
873
874 if render_schema_translate:
875 self.string = self.preparer._render_schema_translates(
876 self.string, schema_translate_map
877 )
878
879 self.state = CompilerState.STRING_APPLIED
880 else:
881 self.state = CompilerState.NO_STATEMENT
882
883 self._gen_time = perf_counter()
884
885 def __init_subclass__(cls) -> None:
886 cls._init_compiler_cls()
887 return super().__init_subclass__()
888
889 @classmethod
890 def _init_compiler_cls(cls):
891 pass
892
893 def _execute_on_connection(
894 self, connection, distilled_params, execution_options
895 ):
896 if self.can_execute:
897 return connection._execute_compiled(
898 self, distilled_params, execution_options
899 )
900 else:
901 raise exc.ObjectNotExecutableError(self.statement)
902
903 def visit_unsupported_compilation(self, element, err, **kw):
904 raise exc.UnsupportedCompilationError(self, type(element)) from err
905
906 @property
907 def sql_compiler(self):
908 """Return a Compiled that is capable of processing SQL expressions.
909
910 If this compiler is one, it would likely just return 'self'.
911
912 """
913
914 raise NotImplementedError()
915
916 def process(self, obj: Visitable, **kwargs: Any) -> str:
917 return obj._compiler_dispatch(self, **kwargs)
918
919 def __str__(self) -> str:
920 """Return the string text of the generated SQL or DDL."""
921
922 if self.state is CompilerState.STRING_APPLIED:
923 return self.string
924 else:
925 return ""
926
927 def construct_params(
928 self,
929 params: Optional[_CoreSingleExecuteParams] = None,
930 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
931 escape_names: bool = True,
932 ) -> Optional[_MutableCoreSingleExecuteParams]:
933 """Return the bind params for this compiled object.
934
935 :param params: a dict of string/object pairs whose values will
936 override bind values compiled in to the
937 statement.
938 """
939
940 raise NotImplementedError()
941
942 @property
943 def params(self):
944 """Return the bind params for this compiled object."""
945 return self.construct_params()
946
947
948class TypeCompiler(util.EnsureKWArg):
949 """Produces DDL specification for TypeEngine objects."""
950
951 ensure_kwarg = r"visit_\w+"
952
953 def __init__(self, dialect: Dialect):
954 self.dialect = dialect
955
956 def process(self, type_: TypeEngine[Any], **kw: Any) -> str:
957 if (
958 type_._variant_mapping
959 and self.dialect.name in type_._variant_mapping
960 ):
961 type_ = type_._variant_mapping[self.dialect.name]
962 return type_._compiler_dispatch(self, **kw)
963
964 def visit_unsupported_compilation(
965 self, element: Any, err: Exception, **kw: Any
966 ) -> NoReturn:
967 raise exc.UnsupportedCompilationError(self, element) from err
968
969
970# this was a Visitable, but to allow accurate detection of
971# column elements this is actually a column element
972class _CompileLabel(
973 roles.BinaryElementRole[Any], elements.CompilerColumnElement
974):
975 """lightweight label object which acts as an expression.Label."""
976
977 __visit_name__ = "label"
978 __slots__ = "element", "name", "_alt_names"
979
980 def __init__(self, col, name, alt_names=()):
981 self.element = col
982 self.name = name
983 self._alt_names = (col,) + alt_names
984
985 @property
986 def proxy_set(self):
987 return self.element.proxy_set
988
989 @property
990 def type(self):
991 return self.element.type
992
993 def self_group(self, **kw):
994 return self
995
996
997class ilike_case_insensitive(
998 roles.BinaryElementRole[Any], elements.CompilerColumnElement
999):
1000 """produce a wrapping element for a case-insensitive portion of
1001 an ILIKE construct.
1002
1003 The construct usually renders the ``lower()`` function, but on
1004 PostgreSQL will pass silently with the assumption that "ILIKE"
1005 is being used.
1006
1007 .. versionadded:: 2.0
1008
1009 """
1010
1011 __visit_name__ = "ilike_case_insensitive_operand"
1012 __slots__ = "element", "comparator"
1013
1014 def __init__(self, element):
1015 self.element = element
1016 self.comparator = element.comparator
1017
1018 @property
1019 def proxy_set(self):
1020 return self.element.proxy_set
1021
1022 @property
1023 def type(self):
1024 return self.element.type
1025
1026 def self_group(self, **kw):
1027 return self
1028
1029 def _with_binary_element_type(self, type_):
1030 return ilike_case_insensitive(
1031 self.element._with_binary_element_type(type_)
1032 )
1033
1034
1035class SQLCompiler(Compiled):
1036 """Default implementation of :class:`.Compiled`.
1037
1038 Compiles :class:`_expression.ClauseElement` objects into SQL strings.
1039
1040 """
1041
1042 extract_map = EXTRACT_MAP
1043
1044 bindname_escape_characters: ClassVar[Mapping[str, str]] = (
1045 util.immutabledict(
1046 {
1047 "%": "P",
1048 "(": "A",
1049 ")": "Z",
1050 ":": "C",
1051 ".": "_",
1052 "[": "_",
1053 "]": "_",
1054 " ": "_",
1055 }
1056 )
1057 )
1058 """A mapping (e.g. dict or similar) containing a lookup of
1059 characters keyed to replacement characters which will be applied to all
1060 'bind names' used in SQL statements as a form of 'escaping'; the given
1061 characters are replaced entirely with the 'replacement' character when
1062 rendered in the SQL statement, and a similar translation is performed
1063 on the incoming names used in parameter dictionaries passed to methods
1064 like :meth:`_engine.Connection.execute`.
1065
1066 This allows bound parameter names used in :func:`_sql.bindparam` and
1067 other constructs to have any arbitrary characters present without any
1068 concern for characters that aren't allowed at all on the target database.
1069
1070 Third party dialects can establish their own dictionary here to replace the
1071 default mapping, which will ensure that the particular characters in the
1072 mapping will never appear in a bound parameter name.
1073
1074 The dictionary is evaluated at **class creation time**, so cannot be
1075 modified at runtime; it must be present on the class when the class
1076 is first declared.
1077
1078 Note that for dialects that have additional bound parameter rules such
1079 as additional restrictions on leading characters, the
1080 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented.
1081 See the cx_Oracle compiler for an example of this.
1082
1083 .. versionadded:: 2.0.0rc1
1084
1085 """
1086
1087 _bind_translate_re: ClassVar[Pattern[str]]
1088 _bind_translate_chars: ClassVar[Mapping[str, str]]
1089
1090 is_sql = True
1091
1092 compound_keywords = COMPOUND_KEYWORDS
1093
1094 isdelete: bool = False
1095 isinsert: bool = False
1096 isupdate: bool = False
1097 """class-level defaults which can be set at the instance
1098 level to define if this Compiled instance represents
1099 INSERT/UPDATE/DELETE
1100 """
1101
1102 postfetch: Optional[List[Column[Any]]]
1103 """list of columns that can be post-fetched after INSERT or UPDATE to
1104 receive server-updated values"""
1105
1106 insert_prefetch: Sequence[Column[Any]] = ()
1107 """list of columns for which default values should be evaluated before
1108 an INSERT takes place"""
1109
1110 update_prefetch: Sequence[Column[Any]] = ()
1111 """list of columns for which onupdate default values should be evaluated
1112 before an UPDATE takes place"""
1113
1114 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None
1115 """list of "implicit" returning columns for a toplevel INSERT or UPDATE
1116 statement, used to receive newly generated values of columns.
1117
1118 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous
1119 ``returning`` collection, which was not a generalized RETURNING
1120 collection and instead was in fact specific to the "implicit returning"
1121 feature.
1122
1123 """
1124
1125 isplaintext: bool = False
1126
1127 binds: Dict[str, BindParameter[Any]]
1128 """a dictionary of bind parameter keys to BindParameter instances."""
1129
1130 bind_names: Dict[BindParameter[Any], str]
1131 """a dictionary of BindParameter instances to "compiled" names
1132 that are actually present in the generated SQL"""
1133
1134 stack: List[_CompilerStackEntry]
1135 """major statements such as SELECT, INSERT, UPDATE, DELETE are
1136 tracked in this stack using an entry format."""
1137
1138 returning_precedes_values: bool = False
1139 """set to True classwide to generate RETURNING
1140 clauses before the VALUES or WHERE clause (i.e. MSSQL)
1141 """
1142
1143 render_table_with_column_in_update_from: bool = False
1144 """set to True classwide to indicate the SET clause
1145 in a multi-table UPDATE statement should qualify
1146 columns with the table name (i.e. MySQL only)
1147 """
1148
1149 ansi_bind_rules: bool = False
1150 """SQL 92 doesn't allow bind parameters to be used
1151 in the columns clause of a SELECT, nor does it allow
1152 ambiguous expressions like "? = ?". A compiler
1153 subclass can set this flag to False if the target
1154 driver/DB enforces this
1155 """
1156
1157 bindtemplate: str
1158 """template to render bound parameters based on paramstyle."""
1159
1160 compilation_bindtemplate: str
1161 """template used by compiler to render parameters before positional
1162 paramstyle application"""
1163
1164 _numeric_binds_identifier_char: str
1165 """Character that's used to as the identifier of a numerical bind param.
1166 For example if this char is set to ``$``, numerical binds will be rendered
1167 in the form ``$1, $2, $3``.
1168 """
1169
1170 _result_columns: List[ResultColumnsEntry]
1171 """relates label names in the final SQL to a tuple of local
1172 column/label name, ColumnElement object (if any) and
1173 TypeEngine. CursorResult uses this for type processing and
1174 column targeting"""
1175
1176 _textual_ordered_columns: bool = False
1177 """tell the result object that the column names as rendered are important,
1178 but they are also "ordered" vs. what is in the compiled object here.
1179
1180 As of 1.4.42 this condition is only present when the statement is a
1181 TextualSelect, e.g. text("....").columns(...), where it is required
1182 that the columns are considered positionally and not by name.
1183
1184 """
1185
1186 _ad_hoc_textual: bool = False
1187 """tell the result that we encountered text() or '*' constructs in the
1188 middle of the result columns, but we also have compiled columns, so
1189 if the number of columns in cursor.description does not match how many
1190 expressions we have, that means we can't rely on positional at all and
1191 should match on name.
1192
1193 """
1194
1195 _ordered_columns: bool = True
1196 """
1197 if False, means we can't be sure the list of entries
1198 in _result_columns is actually the rendered order. Usually
1199 True unless using an unordered TextualSelect.
1200 """
1201
1202 _loose_column_name_matching: bool = False
1203 """tell the result object that the SQL statement is textual, wants to match
1204 up to Column objects, and may be using the ._tq_label in the SELECT rather
1205 than the base name.
1206
1207 """
1208
1209 _numeric_binds: bool = False
1210 """
1211 True if paramstyle is "numeric". This paramstyle is trickier than
1212 all the others.
1213
1214 """
1215
1216 _render_postcompile: bool = False
1217 """
1218 whether to render out POSTCOMPILE params during the compile phase.
1219
1220 This attribute is used only for end-user invocation of stmt.compile();
1221 it's never used for actual statement execution, where instead the
1222 dialect internals access and render the internal postcompile structure
1223 directly.
1224
1225 """
1226
1227 _post_compile_expanded_state: Optional[ExpandedState] = None
1228 """When render_postcompile is used, the ``ExpandedState`` used to create
1229 the "expanded" SQL is assigned here, and then used by the ``.params``
1230 accessor and ``.construct_params()`` methods for their return values.
1231
1232 .. versionadded:: 2.0.0rc1
1233
1234 """
1235
1236 _pre_expanded_string: Optional[str] = None
1237 """Stores the original string SQL before 'post_compile' is applied,
1238 for cases where 'post_compile' were used.
1239
1240 """
1241
1242 _pre_expanded_positiontup: Optional[List[str]] = None
1243
1244 _insertmanyvalues: Optional[_InsertManyValues] = None
1245
1246 _insert_crud_params: Optional[crud._CrudParamSequence] = None
1247
1248 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
1249 """bindparameter objects that are rendered as literal values at statement
1250 execution time.
1251
1252 """
1253
1254 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset()
1255 """bindparameter objects that are rendered as bound parameter placeholders
1256 at statement execution time.
1257
1258 """
1259
1260 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT
1261 """Late escaping of bound parameter names that has to be converted
1262 to the original name when looking in the parameter dictionary.
1263
1264 """
1265
1266 has_out_parameters = False
1267 """if True, there are bindparam() objects that have the isoutparam
1268 flag set."""
1269
1270 postfetch_lastrowid = False
1271 """if True, and this in insert, use cursor.lastrowid to populate
1272 result.inserted_primary_key. """
1273
1274 _cache_key_bind_match: Optional[
1275 Tuple[
1276 Dict[
1277 BindParameter[Any],
1278 List[BindParameter[Any]],
1279 ],
1280 Dict[
1281 str,
1282 BindParameter[Any],
1283 ],
1284 ]
1285 ] = None
1286 """a mapping that will relate the BindParameter object we compile
1287 to those that are part of the extracted collection of parameters
1288 in the cache key, if we were given a cache key.
1289
1290 """
1291
1292 positiontup: Optional[List[str]] = None
1293 """for a compiled construct that uses a positional paramstyle, will be
1294 a sequence of strings, indicating the names of bound parameters in order.
1295
1296 This is used in order to render bound parameters in their correct order,
1297 and is combined with the :attr:`_sql.Compiled.params` dictionary to
1298 render parameters.
1299
1300 This sequence always contains the unescaped name of the parameters.
1301
1302 .. seealso::
1303
1304 :ref:`faq_sql_expression_string` - includes a usage example for
1305 debugging use cases.
1306
1307 """
1308 _values_bindparam: Optional[List[str]] = None
1309
1310 _visited_bindparam: Optional[List[str]] = None
1311
1312 inline: bool = False
1313
1314 ctes: Optional[MutableMapping[CTE, str]]
1315
1316 # Detect same CTE references - Dict[(level, name), cte]
1317 # Level is required for supporting nesting
1318 ctes_by_level_name: Dict[Tuple[int, str], CTE]
1319
1320 # To retrieve key/level in ctes_by_level_name -
1321 # Dict[cte_reference, (level, cte_name, cte_opts)]
1322 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]]
1323
1324 ctes_recursive: bool
1325
1326 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]")
1327 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s")
1328 _positional_pattern = re.compile(
1329 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}"
1330 )
1331
1332 @classmethod
1333 def _init_compiler_cls(cls):
1334 cls._init_bind_translate()
1335
1336 @classmethod
1337 def _init_bind_translate(cls):
1338 reg = re.escape("".join(cls.bindname_escape_characters))
1339 cls._bind_translate_re = re.compile(f"[{reg}]")
1340 cls._bind_translate_chars = cls.bindname_escape_characters
1341
1342 def __init__(
1343 self,
1344 dialect: Dialect,
1345 statement: Optional[ClauseElement],
1346 cache_key: Optional[CacheKey] = None,
1347 column_keys: Optional[Sequence[str]] = None,
1348 for_executemany: bool = False,
1349 linting: Linting = NO_LINTING,
1350 _supporting_against: Optional[SQLCompiler] = None,
1351 **kwargs: Any,
1352 ):
1353 """Construct a new :class:`.SQLCompiler` object.
1354
1355 :param dialect: :class:`.Dialect` to be used
1356
1357 :param statement: :class:`_expression.ClauseElement` to be compiled
1358
1359 :param column_keys: a list of column names to be compiled into an
1360 INSERT or UPDATE statement.
1361
1362 :param for_executemany: whether INSERT / UPDATE statements should
1363 expect that they are to be invoked in an "executemany" style,
1364 which may impact how the statement will be expected to return the
1365 values of defaults and autoincrement / sequences and similar.
1366 Depending on the backend and driver in use, support for retrieving
1367 these values may be disabled which means SQL expressions may
1368 be rendered inline, RETURNING may not be rendered, etc.
1369
1370 :param kwargs: additional keyword arguments to be consumed by the
1371 superclass.
1372
1373 """
1374 self.column_keys = column_keys
1375
1376 self.cache_key = cache_key
1377
1378 if cache_key:
1379 cksm = {b.key: b for b in cache_key[1]}
1380 ckbm = {b: [b] for b in cache_key[1]}
1381 self._cache_key_bind_match = (ckbm, cksm)
1382
1383 # compile INSERT/UPDATE defaults/sequences to expect executemany
1384 # style execution, which may mean no pre-execute of defaults,
1385 # or no RETURNING
1386 self.for_executemany = for_executemany
1387
1388 self.linting = linting
1389
1390 # a dictionary of bind parameter keys to BindParameter
1391 # instances.
1392 self.binds = {}
1393
1394 # a dictionary of BindParameter instances to "compiled" names
1395 # that are actually present in the generated SQL
1396 self.bind_names = util.column_dict()
1397
1398 # stack which keeps track of nested SELECT statements
1399 self.stack = []
1400
1401 self._result_columns = []
1402
1403 # true if the paramstyle is positional
1404 self.positional = dialect.positional
1405 if self.positional:
1406 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric")
1407 if nb:
1408 self._numeric_binds_identifier_char = (
1409 "$" if dialect.paramstyle == "numeric_dollar" else ":"
1410 )
1411
1412 self.compilation_bindtemplate = _pyformat_template
1413 else:
1414 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1415
1416 self.ctes = None
1417
1418 self.label_length = (
1419 dialect.label_length or dialect.max_identifier_length
1420 )
1421
1422 # a map which tracks "anonymous" identifiers that are created on
1423 # the fly here
1424 self.anon_map = prefix_anon_map()
1425
1426 # a map which tracks "truncated" names based on
1427 # dialect.label_length or dialect.max_identifier_length
1428 self.truncated_names: Dict[Tuple[str, str], str] = {}
1429 self._truncated_counters: Dict[str, int] = {}
1430
1431 Compiled.__init__(self, dialect, statement, **kwargs)
1432
1433 if self.isinsert or self.isupdate or self.isdelete:
1434 if TYPE_CHECKING:
1435 assert isinstance(statement, UpdateBase)
1436
1437 if self.isinsert or self.isupdate:
1438 if TYPE_CHECKING:
1439 assert isinstance(statement, ValuesBase)
1440 if statement._inline:
1441 self.inline = True
1442 elif self.for_executemany and (
1443 not self.isinsert
1444 or (
1445 self.dialect.insert_executemany_returning
1446 and statement._return_defaults
1447 )
1448 ):
1449 self.inline = True
1450
1451 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
1452
1453 if _supporting_against:
1454 self.__dict__.update(
1455 {
1456 k: v
1457 for k, v in _supporting_against.__dict__.items()
1458 if k
1459 not in {
1460 "state",
1461 "dialect",
1462 "preparer",
1463 "positional",
1464 "_numeric_binds",
1465 "compilation_bindtemplate",
1466 "bindtemplate",
1467 }
1468 }
1469 )
1470
1471 if self.state is CompilerState.STRING_APPLIED:
1472 if self.positional:
1473 if self._numeric_binds:
1474 self._process_numeric()
1475 else:
1476 self._process_positional()
1477
1478 if self._render_postcompile:
1479 parameters = self.construct_params(
1480 escape_names=False,
1481 _no_postcompile=True,
1482 )
1483
1484 self._process_parameters_for_postcompile(
1485 parameters, _populate_self=True
1486 )
1487
1488 @property
1489 def insert_single_values_expr(self) -> Optional[str]:
1490 """When an INSERT is compiled with a single set of parameters inside
1491 a VALUES expression, the string is assigned here, where it can be
1492 used for insert batching schemes to rewrite the VALUES expression.
1493
1494 .. versionadded:: 1.3.8
1495
1496 .. versionchanged:: 2.0 This collection is no longer used by
1497 SQLAlchemy's built-in dialects, in favor of the currently
1498 internal ``_insertmanyvalues`` collection that is used only by
1499 :class:`.SQLCompiler`.
1500
1501 """
1502 if self._insertmanyvalues is None:
1503 return None
1504 else:
1505 return self._insertmanyvalues.single_values_expr
1506
1507 @util.ro_memoized_property
1508 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
1509 """The effective "returning" columns for INSERT, UPDATE or DELETE.
1510
1511 This is either the so-called "implicit returning" columns which are
1512 calculated by the compiler on the fly, or those present based on what's
1513 present in ``self.statement._returning`` (expanded into individual
1514 columns using the ``._all_selected_columns`` attribute) i.e. those set
1515 explicitly using the :meth:`.UpdateBase.returning` method.
1516
1517 .. versionadded:: 2.0
1518
1519 """
1520 if self.implicit_returning:
1521 return self.implicit_returning
1522 elif self.statement is not None and is_dml(self.statement):
1523 return [
1524 c
1525 for c in self.statement._all_selected_columns
1526 if is_column_element(c)
1527 ]
1528
1529 else:
1530 return None
1531
1532 @property
1533 def returning(self):
1534 """backwards compatibility; returns the
1535 effective_returning collection.
1536
1537 """
1538 return self.effective_returning
1539
1540 @property
1541 def current_executable(self):
1542 """Return the current 'executable' that is being compiled.
1543
1544 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`,
1545 :class:`_sql.Update`, :class:`_sql.Delete`,
1546 :class:`_sql.CompoundSelect` object that is being compiled.
1547 Specifically it's assigned to the ``self.stack`` list of elements.
1548
1549 When a statement like the above is being compiled, it normally
1550 is also assigned to the ``.statement`` attribute of the
1551 :class:`_sql.Compiler` object. However, all SQL constructs are
1552 ultimately nestable, and this attribute should never be consulted
1553 by a ``visit_`` method, as it is not guaranteed to be assigned
1554 nor guaranteed to correspond to the current statement being compiled.
1555
1556 .. versionadded:: 1.3.21
1557
1558 For compatibility with previous versions, use the following
1559 recipe::
1560
1561 statement = getattr(self, "current_executable", False)
1562 if statement is False:
1563 statement = self.stack[-1]["selectable"]
1564
1565 For versions 1.4 and above, ensure only .current_executable
1566 is used; the format of "self.stack" may change.
1567
1568
1569 """
1570 try:
1571 return self.stack[-1]["selectable"]
1572 except IndexError as ie:
1573 raise IndexError("Compiler does not have a stack entry") from ie
1574
1575 @property
1576 def prefetch(self):
1577 return list(self.insert_prefetch) + list(self.update_prefetch)
1578
1579 @util.memoized_property
1580 def _global_attributes(self) -> Dict[Any, Any]:
1581 return {}
1582
1583 @util.memoized_instancemethod
1584 def _init_cte_state(self) -> MutableMapping[CTE, str]:
1585 """Initialize collections related to CTEs only if
1586 a CTE is located, to save on the overhead of
1587 these collections otherwise.
1588
1589 """
1590 # collect CTEs to tack on top of a SELECT
1591 # To store the query to print - Dict[cte, text_query]
1592 ctes: MutableMapping[CTE, str] = util.OrderedDict()
1593 self.ctes = ctes
1594
1595 # Detect same CTE references - Dict[(level, name), cte]
1596 # Level is required for supporting nesting
1597 self.ctes_by_level_name = {}
1598
1599 # To retrieve key/level in ctes_by_level_name -
1600 # Dict[cte_reference, (level, cte_name, cte_opts)]
1601 self.level_name_by_cte = {}
1602
1603 self.ctes_recursive = False
1604
1605 return ctes
1606
1607 @contextlib.contextmanager
1608 def _nested_result(self):
1609 """special API to support the use case of 'nested result sets'"""
1610 result_columns, ordered_columns = (
1611 self._result_columns,
1612 self._ordered_columns,
1613 )
1614 self._result_columns, self._ordered_columns = [], False
1615
1616 try:
1617 if self.stack:
1618 entry = self.stack[-1]
1619 entry["need_result_map_for_nested"] = True
1620 else:
1621 entry = None
1622 yield self._result_columns, self._ordered_columns
1623 finally:
1624 if entry:
1625 entry.pop("need_result_map_for_nested")
1626 self._result_columns, self._ordered_columns = (
1627 result_columns,
1628 ordered_columns,
1629 )
1630
1631 def _process_positional(self):
1632 assert not self.positiontup
1633 assert self.state is CompilerState.STRING_APPLIED
1634 assert not self._numeric_binds
1635
1636 if self.dialect.paramstyle == "format":
1637 placeholder = "%s"
1638 else:
1639 assert self.dialect.paramstyle == "qmark"
1640 placeholder = "?"
1641
1642 positions = []
1643
1644 def find_position(m: re.Match[str]) -> str:
1645 normal_bind = m.group(1)
1646 if normal_bind:
1647 positions.append(normal_bind)
1648 return placeholder
1649 else:
1650 # this a post-compile bind
1651 positions.append(m.group(2))
1652 return m.group(0)
1653
1654 self.string = re.sub(
1655 self._positional_pattern, find_position, self.string
1656 )
1657
1658 if self.escaped_bind_names:
1659 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()}
1660 assert len(self.escaped_bind_names) == len(reverse_escape)
1661 self.positiontup = [
1662 reverse_escape.get(name, name) for name in positions
1663 ]
1664 else:
1665 self.positiontup = positions
1666
1667 if self._insertmanyvalues:
1668 positions = []
1669
1670 single_values_expr = re.sub(
1671 self._positional_pattern,
1672 find_position,
1673 self._insertmanyvalues.single_values_expr,
1674 )
1675 insert_crud_params = [
1676 (
1677 v[0],
1678 v[1],
1679 re.sub(self._positional_pattern, find_position, v[2]),
1680 v[3],
1681 )
1682 for v in self._insertmanyvalues.insert_crud_params
1683 ]
1684
1685 self._insertmanyvalues = self._insertmanyvalues._replace(
1686 single_values_expr=single_values_expr,
1687 insert_crud_params=insert_crud_params,
1688 )
1689
1690 def _process_numeric(self):
1691 assert self._numeric_binds
1692 assert self.state is CompilerState.STRING_APPLIED
1693
1694 num = 1
1695 param_pos: Dict[str, str] = {}
1696 order: Iterable[str]
1697 if self._insertmanyvalues and self._values_bindparam is not None:
1698 # bindparams that are not in values are always placed first.
1699 # this avoids the need of changing them when using executemany
1700 # values () ()
1701 order = itertools.chain(
1702 (
1703 name
1704 for name in self.bind_names.values()
1705 if name not in self._values_bindparam
1706 ),
1707 self.bind_names.values(),
1708 )
1709 else:
1710 order = self.bind_names.values()
1711
1712 for bind_name in order:
1713 if bind_name in param_pos:
1714 continue
1715 bind = self.binds[bind_name]
1716 if (
1717 bind in self.post_compile_params
1718 or bind in self.literal_execute_params
1719 ):
1720 # set to None to just mark the in positiontup, it will not
1721 # be replaced below.
1722 param_pos[bind_name] = None # type: ignore
1723 else:
1724 ph = f"{self._numeric_binds_identifier_char}{num}"
1725 num += 1
1726 param_pos[bind_name] = ph
1727
1728 self.next_numeric_pos = num
1729
1730 self.positiontup = list(param_pos)
1731 if self.escaped_bind_names:
1732 len_before = len(param_pos)
1733 param_pos = {
1734 self.escaped_bind_names.get(name, name): pos
1735 for name, pos in param_pos.items()
1736 }
1737 assert len(param_pos) == len_before
1738
1739 # Can't use format here since % chars are not escaped.
1740 self.string = self._pyformat_pattern.sub(
1741 lambda m: param_pos[m.group(1)], self.string
1742 )
1743
1744 if self._insertmanyvalues:
1745 single_values_expr = (
1746 # format is ok here since single_values_expr includes only
1747 # place-holders
1748 self._insertmanyvalues.single_values_expr
1749 % param_pos
1750 )
1751 insert_crud_params = [
1752 (v[0], v[1], "%s", v[3])
1753 for v in self._insertmanyvalues.insert_crud_params
1754 ]
1755
1756 self._insertmanyvalues = self._insertmanyvalues._replace(
1757 # This has the numbers (:1, :2)
1758 single_values_expr=single_values_expr,
1759 # The single binds are instead %s so they can be formatted
1760 insert_crud_params=insert_crud_params,
1761 )
1762
1763 @util.memoized_property
1764 def _bind_processors(
1765 self,
1766 ) -> MutableMapping[
1767 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]]
1768 ]:
1769 # mypy is not able to see the two value types as the above Union,
1770 # it just sees "object". don't know how to resolve
1771 return {
1772 key: value # type: ignore
1773 for key, value in (
1774 (
1775 self.bind_names[bindparam],
1776 (
1777 bindparam.type._cached_bind_processor(self.dialect)
1778 if not bindparam.type._is_tuple_type
1779 else tuple(
1780 elem_type._cached_bind_processor(self.dialect)
1781 for elem_type in cast(
1782 TupleType, bindparam.type
1783 ).types
1784 )
1785 ),
1786 )
1787 for bindparam in self.bind_names
1788 )
1789 if value is not None
1790 }
1791
1792 def is_subquery(self):
1793 return len(self.stack) > 1
1794
1795 @property
1796 def sql_compiler(self):
1797 return self
1798
1799 def construct_expanded_state(
1800 self,
1801 params: Optional[_CoreSingleExecuteParams] = None,
1802 escape_names: bool = True,
1803 ) -> ExpandedState:
1804 """Return a new :class:`.ExpandedState` for a given parameter set.
1805
1806 For queries that use "expanding" or other late-rendered parameters,
1807 this method will provide for both the finalized SQL string as well
1808 as the parameters that would be used for a particular parameter set.
1809
1810 .. versionadded:: 2.0.0rc1
1811
1812 """
1813 parameters = self.construct_params(
1814 params,
1815 escape_names=escape_names,
1816 _no_postcompile=True,
1817 )
1818 return self._process_parameters_for_postcompile(
1819 parameters,
1820 )
1821
1822 def construct_params(
1823 self,
1824 params: Optional[_CoreSingleExecuteParams] = None,
1825 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
1826 escape_names: bool = True,
1827 _group_number: Optional[int] = None,
1828 _check: bool = True,
1829 _no_postcompile: bool = False,
1830 ) -> _MutableCoreSingleExecuteParams:
1831 """return a dictionary of bind parameter keys and values"""
1832
1833 if self._render_postcompile and not _no_postcompile:
1834 assert self._post_compile_expanded_state is not None
1835 if not params:
1836 return dict(self._post_compile_expanded_state.parameters)
1837 else:
1838 raise exc.InvalidRequestError(
1839 "can't construct new parameters when render_postcompile "
1840 "is used; the statement is hard-linked to the original "
1841 "parameters. Use construct_expanded_state to generate a "
1842 "new statement and parameters."
1843 )
1844
1845 has_escaped_names = escape_names and bool(self.escaped_bind_names)
1846
1847 if extracted_parameters:
1848 # related the bound parameters collected in the original cache key
1849 # to those collected in the incoming cache key. They will not have
1850 # matching names but they will line up positionally in the same
1851 # way. The parameters present in self.bind_names may be clones of
1852 # these original cache key params in the case of DML but the .key
1853 # will be guaranteed to match.
1854 if self.cache_key is None:
1855 raise exc.CompileError(
1856 "This compiled object has no original cache key; "
1857 "can't pass extracted_parameters to construct_params"
1858 )
1859 else:
1860 orig_extracted = self.cache_key[1]
1861
1862 ckbm_tuple = self._cache_key_bind_match
1863 assert ckbm_tuple is not None
1864 ckbm, _ = ckbm_tuple
1865 resolved_extracted = {
1866 bind: extracted
1867 for b, extracted in zip(orig_extracted, extracted_parameters)
1868 for bind in ckbm[b]
1869 }
1870 else:
1871 resolved_extracted = None
1872
1873 if params:
1874 pd = {}
1875 for bindparam, name in self.bind_names.items():
1876 escaped_name = (
1877 self.escaped_bind_names.get(name, name)
1878 if has_escaped_names
1879 else name
1880 )
1881
1882 if bindparam.key in params:
1883 pd[escaped_name] = params[bindparam.key]
1884 elif name in params:
1885 pd[escaped_name] = params[name]
1886
1887 elif _check and bindparam.required:
1888 if _group_number:
1889 raise exc.InvalidRequestError(
1890 "A value is required for bind parameter %r, "
1891 "in parameter group %d"
1892 % (bindparam.key, _group_number),
1893 code="cd3x",
1894 )
1895 else:
1896 raise exc.InvalidRequestError(
1897 "A value is required for bind parameter %r"
1898 % bindparam.key,
1899 code="cd3x",
1900 )
1901 else:
1902 if resolved_extracted:
1903 value_param = resolved_extracted.get(
1904 bindparam, bindparam
1905 )
1906 else:
1907 value_param = bindparam
1908
1909 if bindparam.callable:
1910 pd[escaped_name] = value_param.effective_value
1911 else:
1912 pd[escaped_name] = value_param.value
1913 return pd
1914 else:
1915 pd = {}
1916 for bindparam, name in self.bind_names.items():
1917 escaped_name = (
1918 self.escaped_bind_names.get(name, name)
1919 if has_escaped_names
1920 else name
1921 )
1922
1923 if _check and bindparam.required:
1924 if _group_number:
1925 raise exc.InvalidRequestError(
1926 "A value is required for bind parameter %r, "
1927 "in parameter group %d"
1928 % (bindparam.key, _group_number),
1929 code="cd3x",
1930 )
1931 else:
1932 raise exc.InvalidRequestError(
1933 "A value is required for bind parameter %r"
1934 % bindparam.key,
1935 code="cd3x",
1936 )
1937
1938 if resolved_extracted:
1939 value_param = resolved_extracted.get(bindparam, bindparam)
1940 else:
1941 value_param = bindparam
1942
1943 if bindparam.callable:
1944 pd[escaped_name] = value_param.effective_value
1945 else:
1946 pd[escaped_name] = value_param.value
1947
1948 return pd
1949
1950 @util.memoized_instancemethod
1951 def _get_set_input_sizes_lookup(self):
1952 dialect = self.dialect
1953
1954 include_types = dialect.include_set_input_sizes
1955 exclude_types = dialect.exclude_set_input_sizes
1956
1957 dbapi = dialect.dbapi
1958
1959 def lookup_type(typ):
1960 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi)
1961
1962 if (
1963 dbtype is not None
1964 and (exclude_types is None or dbtype not in exclude_types)
1965 and (include_types is None or dbtype in include_types)
1966 ):
1967 return dbtype
1968 else:
1969 return None
1970
1971 inputsizes = {}
1972
1973 literal_execute_params = self.literal_execute_params
1974
1975 for bindparam in self.bind_names:
1976 if bindparam in literal_execute_params:
1977 continue
1978
1979 if bindparam.type._is_tuple_type:
1980 inputsizes[bindparam] = [
1981 lookup_type(typ)
1982 for typ in cast(TupleType, bindparam.type).types
1983 ]
1984 else:
1985 inputsizes[bindparam] = lookup_type(bindparam.type)
1986
1987 return inputsizes
1988
1989 @property
1990 def params(self):
1991 """Return the bind param dictionary embedded into this
1992 compiled object, for those values that are present.
1993
1994 .. seealso::
1995
1996 :ref:`faq_sql_expression_string` - includes a usage example for
1997 debugging use cases.
1998
1999 """
2000 return self.construct_params(_check=False)
2001
2002 def _process_parameters_for_postcompile(
2003 self,
2004 parameters: _MutableCoreSingleExecuteParams,
2005 _populate_self: bool = False,
2006 ) -> ExpandedState:
2007 """handle special post compile parameters.
2008
2009 These include:
2010
2011 * "expanding" parameters -typically IN tuples that are rendered
2012 on a per-parameter basis for an otherwise fixed SQL statement string.
2013
2014 * literal_binds compiled with the literal_execute flag. Used for
2015 things like SQL Server "TOP N" where the driver does not accommodate
2016 N as a bound parameter.
2017
2018 """
2019
2020 expanded_parameters = {}
2021 new_positiontup: Optional[List[str]]
2022
2023 pre_expanded_string = self._pre_expanded_string
2024 if pre_expanded_string is None:
2025 pre_expanded_string = self.string
2026
2027 if self.positional:
2028 new_positiontup = []
2029
2030 pre_expanded_positiontup = self._pre_expanded_positiontup
2031 if pre_expanded_positiontup is None:
2032 pre_expanded_positiontup = self.positiontup
2033
2034 else:
2035 new_positiontup = pre_expanded_positiontup = None
2036
2037 processors = self._bind_processors
2038 single_processors = cast(
2039 "Mapping[str, _BindProcessorType[Any]]", processors
2040 )
2041 tuple_processors = cast(
2042 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors
2043 )
2044
2045 new_processors: Dict[str, _BindProcessorType[Any]] = {}
2046
2047 replacement_expressions: Dict[str, Any] = {}
2048 to_update_sets: Dict[str, Any] = {}
2049
2050 # notes:
2051 # *unescaped* parameter names in:
2052 # self.bind_names, self.binds, self._bind_processors, self.positiontup
2053 #
2054 # *escaped* parameter names in:
2055 # construct_params(), replacement_expressions
2056
2057 numeric_positiontup: Optional[List[str]] = None
2058
2059 if self.positional and pre_expanded_positiontup is not None:
2060 names: Iterable[str] = pre_expanded_positiontup
2061 if self._numeric_binds:
2062 numeric_positiontup = []
2063 else:
2064 names = self.bind_names.values()
2065
2066 ebn = self.escaped_bind_names
2067 for name in names:
2068 escaped_name = ebn.get(name, name) if ebn else name
2069 parameter = self.binds[name]
2070
2071 if parameter in self.literal_execute_params:
2072 if escaped_name not in replacement_expressions:
2073 replacement_expressions[escaped_name] = (
2074 self.render_literal_bindparam(
2075 parameter,
2076 render_literal_value=parameters.pop(escaped_name),
2077 )
2078 )
2079 continue
2080
2081 if parameter in self.post_compile_params:
2082 if escaped_name in replacement_expressions:
2083 to_update = to_update_sets[escaped_name]
2084 values = None
2085 else:
2086 # we are removing the parameter from parameters
2087 # because it is a list value, which is not expected by
2088 # TypeEngine objects that would otherwise be asked to
2089 # process it. the single name is being replaced with
2090 # individual numbered parameters for each value in the
2091 # param.
2092 #
2093 # note we are also inserting *escaped* parameter names
2094 # into the given dictionary. default dialect will
2095 # use these param names directly as they will not be
2096 # in the escaped_bind_names dictionary.
2097 values = parameters.pop(name)
2098
2099 leep_res = self._literal_execute_expanding_parameter(
2100 escaped_name, parameter, values
2101 )
2102 (to_update, replacement_expr) = leep_res
2103
2104 to_update_sets[escaped_name] = to_update
2105 replacement_expressions[escaped_name] = replacement_expr
2106
2107 if not parameter.literal_execute:
2108 parameters.update(to_update)
2109 if parameter.type._is_tuple_type:
2110 assert values is not None
2111 new_processors.update(
2112 (
2113 "%s_%s_%s" % (name, i, j),
2114 tuple_processors[name][j - 1],
2115 )
2116 for i, tuple_element in enumerate(values, 1)
2117 for j, _ in enumerate(tuple_element, 1)
2118 if name in tuple_processors
2119 and tuple_processors[name][j - 1] is not None
2120 )
2121 else:
2122 new_processors.update(
2123 (key, single_processors[name])
2124 for key, _ in to_update
2125 if name in single_processors
2126 )
2127 if numeric_positiontup is not None:
2128 numeric_positiontup.extend(
2129 name for name, _ in to_update
2130 )
2131 elif new_positiontup is not None:
2132 # to_update has escaped names, but that's ok since
2133 # these are new names, that aren't in the
2134 # escaped_bind_names dict.
2135 new_positiontup.extend(name for name, _ in to_update)
2136 expanded_parameters[name] = [
2137 expand_key for expand_key, _ in to_update
2138 ]
2139 elif new_positiontup is not None:
2140 new_positiontup.append(name)
2141
2142 def process_expanding(m):
2143 key = m.group(1)
2144 expr = replacement_expressions[key]
2145
2146 # if POSTCOMPILE included a bind_expression, render that
2147 # around each element
2148 if m.group(2):
2149 tok = m.group(2).split("~~")
2150 be_left, be_right = tok[1], tok[3]
2151 expr = ", ".join(
2152 "%s%s%s" % (be_left, exp, be_right)
2153 for exp in expr.split(", ")
2154 )
2155 return expr
2156
2157 statement = re.sub(
2158 self._post_compile_pattern, process_expanding, pre_expanded_string
2159 )
2160
2161 if numeric_positiontup is not None:
2162 assert new_positiontup is not None
2163 param_pos = {
2164 key: f"{self._numeric_binds_identifier_char}{num}"
2165 for num, key in enumerate(
2166 numeric_positiontup, self.next_numeric_pos
2167 )
2168 }
2169 # Can't use format here since % chars are not escaped.
2170 statement = self._pyformat_pattern.sub(
2171 lambda m: param_pos[m.group(1)], statement
2172 )
2173 new_positiontup.extend(numeric_positiontup)
2174
2175 expanded_state = ExpandedState(
2176 statement,
2177 parameters,
2178 new_processors,
2179 new_positiontup,
2180 expanded_parameters,
2181 )
2182
2183 if _populate_self:
2184 # this is for the "render_postcompile" flag, which is not
2185 # otherwise used internally and is for end-user debugging and
2186 # special use cases.
2187 self._pre_expanded_string = pre_expanded_string
2188 self._pre_expanded_positiontup = pre_expanded_positiontup
2189 self.string = expanded_state.statement
2190 self.positiontup = (
2191 list(expanded_state.positiontup or ())
2192 if self.positional
2193 else None
2194 )
2195 self._post_compile_expanded_state = expanded_state
2196
2197 return expanded_state
2198
2199 @util.preload_module("sqlalchemy.engine.cursor")
2200 def _create_result_map(self):
2201 """utility method used for unit tests only."""
2202 cursor = util.preloaded.engine_cursor
2203 return cursor.CursorResultMetaData._create_description_match_map(
2204 self._result_columns
2205 )
2206
2207 # assigned by crud.py for insert/update statements
2208 _get_bind_name_for_col: _BindNameForColProtocol
2209
2210 @util.memoized_property
2211 def _within_exec_param_key_getter(self) -> Callable[[Any], str]:
2212 getter = self._get_bind_name_for_col
2213 return getter
2214
2215 @util.memoized_property
2216 @util.preload_module("sqlalchemy.engine.result")
2217 def _inserted_primary_key_from_lastrowid_getter(self):
2218 result = util.preloaded.engine_result
2219
2220 param_key_getter = self._within_exec_param_key_getter
2221
2222 assert self.compile_state is not None
2223 statement = self.compile_state.statement
2224
2225 if TYPE_CHECKING:
2226 assert isinstance(statement, Insert)
2227
2228 table = statement.table
2229
2230 getters = [
2231 (operator.methodcaller("get", param_key_getter(col), None), col)
2232 for col in table.primary_key
2233 ]
2234
2235 autoinc_getter = None
2236 autoinc_col = table._autoincrement_column
2237 if autoinc_col is not None:
2238 # apply type post processors to the lastrowid
2239 lastrowid_processor = autoinc_col.type._cached_result_processor(
2240 self.dialect, None
2241 )
2242 autoinc_key = param_key_getter(autoinc_col)
2243
2244 # if a bind value is present for the autoincrement column
2245 # in the parameters, we need to do the logic dictated by
2246 # #7998; honor a non-None user-passed parameter over lastrowid.
2247 # previously in the 1.4 series we weren't fetching lastrowid
2248 # at all if the key were present in the parameters
2249 if autoinc_key in self.binds:
2250
2251 def _autoinc_getter(lastrowid, parameters):
2252 param_value = parameters.get(autoinc_key, lastrowid)
2253 if param_value is not None:
2254 # they supplied non-None parameter, use that.
2255 # SQLite at least is observed to return the wrong
2256 # cursor.lastrowid for INSERT..ON CONFLICT so it
2257 # can't be used in all cases
2258 return param_value
2259 else:
2260 # use lastrowid
2261 return lastrowid
2262
2263 # work around mypy https://github.com/python/mypy/issues/14027
2264 autoinc_getter = _autoinc_getter
2265
2266 else:
2267 lastrowid_processor = None
2268
2269 row_fn = result.result_tuple([col.key for col in table.primary_key])
2270
2271 def get(lastrowid, parameters):
2272 """given cursor.lastrowid value and the parameters used for INSERT,
2273 return a "row" that represents the primary key, either by
2274 using the "lastrowid" or by extracting values from the parameters
2275 that were sent along with the INSERT.
2276
2277 """
2278 if lastrowid_processor is not None:
2279 lastrowid = lastrowid_processor(lastrowid)
2280
2281 if lastrowid is None:
2282 return row_fn(getter(parameters) for getter, col in getters)
2283 else:
2284 return row_fn(
2285 (
2286 (
2287 autoinc_getter(lastrowid, parameters)
2288 if autoinc_getter is not None
2289 else lastrowid
2290 )
2291 if col is autoinc_col
2292 else getter(parameters)
2293 )
2294 for getter, col in getters
2295 )
2296
2297 return get
2298
2299 @util.memoized_property
2300 @util.preload_module("sqlalchemy.engine.result")
2301 def _inserted_primary_key_from_returning_getter(self):
2302 if typing.TYPE_CHECKING:
2303 from ..engine import result
2304 else:
2305 result = util.preloaded.engine_result
2306
2307 assert self.compile_state is not None
2308 statement = self.compile_state.statement
2309
2310 if TYPE_CHECKING:
2311 assert isinstance(statement, Insert)
2312
2313 param_key_getter = self._within_exec_param_key_getter
2314 table = statement.table
2315
2316 returning = self.implicit_returning
2317 assert returning is not None
2318 ret = {col: idx for idx, col in enumerate(returning)}
2319
2320 getters = cast(
2321 "List[Tuple[Callable[[Any], Any], bool]]",
2322 [
2323 (
2324 (operator.itemgetter(ret[col]), True)
2325 if col in ret
2326 else (
2327 operator.methodcaller(
2328 "get", param_key_getter(col), None
2329 ),
2330 False,
2331 )
2332 )
2333 for col in table.primary_key
2334 ],
2335 )
2336
2337 row_fn = result.result_tuple([col.key for col in table.primary_key])
2338
2339 def get(row, parameters):
2340 return row_fn(
2341 getter(row) if use_row else getter(parameters)
2342 for getter, use_row in getters
2343 )
2344
2345 return get
2346
2347 def default_from(self):
2348 """Called when a SELECT statement has no froms, and no FROM clause is
2349 to be appended.
2350
2351 Gives Oracle a chance to tack on a ``FROM DUAL`` to the string output.
2352
2353 """
2354 return ""
2355
2356 def visit_override_binds(self, override_binds, **kw):
2357 """SQL compile the nested element of an _OverrideBinds with
2358 bindparams swapped out.
2359
2360 The _OverrideBinds is not normally expected to be compiled; it
2361 is meant to be used when an already cached statement is to be used,
2362 the compilation was already performed, and only the bound params should
2363 be swapped in at execution time.
2364
2365 However, there are test cases that exericise this object, and
2366 additionally the ORM subquery loader is known to feed in expressions
2367 which include this construct into new queries (discovered in #11173),
2368 so it has to do the right thing at compile time as well.
2369
2370 """
2371
2372 # get SQL text first
2373 sqltext = override_binds.element._compiler_dispatch(self, **kw)
2374
2375 # for a test compile that is not for caching, change binds after the
2376 # fact. note that we don't try to
2377 # swap the bindparam as we compile, because our element may be
2378 # elsewhere in the statement already (e.g. a subquery or perhaps a
2379 # CTE) and was already visited / compiled. See
2380 # test_relationship_criteria.py ->
2381 # test_selectinload_local_criteria_subquery
2382 for k in override_binds.translate:
2383 if k not in self.binds:
2384 continue
2385 bp = self.binds[k]
2386
2387 # so this would work, just change the value of bp in place.
2388 # but we dont want to mutate things outside.
2389 # bp.value = override_binds.translate[bp.key]
2390 # continue
2391
2392 # instead, need to replace bp with new_bp or otherwise accommodate
2393 # in all internal collections
2394 new_bp = bp._with_value(
2395 override_binds.translate[bp.key],
2396 maintain_key=True,
2397 required=False,
2398 )
2399
2400 name = self.bind_names[bp]
2401 self.binds[k] = self.binds[name] = new_bp
2402 self.bind_names[new_bp] = name
2403 self.bind_names.pop(bp, None)
2404
2405 if bp in self.post_compile_params:
2406 self.post_compile_params |= {new_bp}
2407 if bp in self.literal_execute_params:
2408 self.literal_execute_params |= {new_bp}
2409
2410 ckbm_tuple = self._cache_key_bind_match
2411 if ckbm_tuple:
2412 ckbm, cksm = ckbm_tuple
2413 for bp in bp._cloned_set:
2414 if bp.key in cksm:
2415 cb = cksm[bp.key]
2416 ckbm[cb].append(new_bp)
2417
2418 return sqltext
2419
2420 def visit_grouping(self, grouping, asfrom=False, **kwargs):
2421 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2422
2423 def visit_select_statement_grouping(self, grouping, **kwargs):
2424 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
2425
2426 def visit_label_reference(
2427 self, element, within_columns_clause=False, **kwargs
2428 ):
2429 if self.stack and self.dialect.supports_simple_order_by_label:
2430 try:
2431 compile_state = cast(
2432 "Union[SelectState, CompoundSelectState]",
2433 self.stack[-1]["compile_state"],
2434 )
2435 except KeyError as ke:
2436 raise exc.CompileError(
2437 "Can't resolve label reference for ORDER BY / "
2438 "GROUP BY / DISTINCT etc."
2439 ) from ke
2440
2441 (
2442 with_cols,
2443 only_froms,
2444 only_cols,
2445 ) = compile_state._label_resolve_dict
2446 if within_columns_clause:
2447 resolve_dict = only_froms
2448 else:
2449 resolve_dict = only_cols
2450
2451 # this can be None in the case that a _label_reference()
2452 # were subject to a replacement operation, in which case
2453 # the replacement of the Label element may have changed
2454 # to something else like a ColumnClause expression.
2455 order_by_elem = element.element._order_by_label_element
2456
2457 if (
2458 order_by_elem is not None
2459 and order_by_elem.name in resolve_dict
2460 and order_by_elem.shares_lineage(
2461 resolve_dict[order_by_elem.name]
2462 )
2463 ):
2464 kwargs["render_label_as_label"] = (
2465 element.element._order_by_label_element
2466 )
2467 return self.process(
2468 element.element,
2469 within_columns_clause=within_columns_clause,
2470 **kwargs,
2471 )
2472
2473 def visit_textual_label_reference(
2474 self, element, within_columns_clause=False, **kwargs
2475 ):
2476 if not self.stack:
2477 # compiling the element outside of the context of a SELECT
2478 return self.process(element._text_clause)
2479
2480 try:
2481 compile_state = cast(
2482 "Union[SelectState, CompoundSelectState]",
2483 self.stack[-1]["compile_state"],
2484 )
2485 except KeyError as ke:
2486 coercions._no_text_coercion(
2487 element.element,
2488 extra=(
2489 "Can't resolve label reference for ORDER BY / "
2490 "GROUP BY / DISTINCT etc."
2491 ),
2492 exc_cls=exc.CompileError,
2493 err=ke,
2494 )
2495
2496 with_cols, only_froms, only_cols = compile_state._label_resolve_dict
2497 try:
2498 if within_columns_clause:
2499 col = only_froms[element.element]
2500 else:
2501 col = with_cols[element.element]
2502 except KeyError as err:
2503 coercions._no_text_coercion(
2504 element.element,
2505 extra=(
2506 "Can't resolve label reference for ORDER BY / "
2507 "GROUP BY / DISTINCT etc."
2508 ),
2509 exc_cls=exc.CompileError,
2510 err=err,
2511 )
2512 else:
2513 kwargs["render_label_as_label"] = col
2514 return self.process(
2515 col, within_columns_clause=within_columns_clause, **kwargs
2516 )
2517
2518 def visit_label(
2519 self,
2520 label,
2521 add_to_result_map=None,
2522 within_label_clause=False,
2523 within_columns_clause=False,
2524 render_label_as_label=None,
2525 result_map_targets=(),
2526 **kw,
2527 ):
2528 # only render labels within the columns clause
2529 # or ORDER BY clause of a select. dialect-specific compilers
2530 # can modify this behavior.
2531 render_label_with_as = (
2532 within_columns_clause and not within_label_clause
2533 )
2534 render_label_only = render_label_as_label is label
2535
2536 if render_label_only or render_label_with_as:
2537 if isinstance(label.name, elements._truncated_label):
2538 labelname = self._truncated_identifier("colident", label.name)
2539 else:
2540 labelname = label.name
2541
2542 if render_label_with_as:
2543 if add_to_result_map is not None:
2544 add_to_result_map(
2545 labelname,
2546 label.name,
2547 (label, labelname) + label._alt_names + result_map_targets,
2548 label.type,
2549 )
2550 return (
2551 label.element._compiler_dispatch(
2552 self,
2553 within_columns_clause=True,
2554 within_label_clause=True,
2555 **kw,
2556 )
2557 + OPERATORS[operators.as_]
2558 + self.preparer.format_label(label, labelname)
2559 )
2560 elif render_label_only:
2561 return self.preparer.format_label(label, labelname)
2562 else:
2563 return label.element._compiler_dispatch(
2564 self, within_columns_clause=False, **kw
2565 )
2566
2567 def _fallback_column_name(self, column):
2568 raise exc.CompileError(
2569 "Cannot compile Column object until its 'name' is assigned."
2570 )
2571
2572 def visit_lambda_element(self, element, **kw):
2573 sql_element = element._resolved
2574 return self.process(sql_element, **kw)
2575
2576 def visit_column(
2577 self,
2578 column: ColumnClause[Any],
2579 add_to_result_map: Optional[_ResultMapAppender] = None,
2580 include_table: bool = True,
2581 result_map_targets: Tuple[Any, ...] = (),
2582 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
2583 **kwargs: Any,
2584 ) -> str:
2585 name = orig_name = column.name
2586 if name is None:
2587 name = self._fallback_column_name(column)
2588
2589 is_literal = column.is_literal
2590 if not is_literal and isinstance(name, elements._truncated_label):
2591 name = self._truncated_identifier("colident", name)
2592
2593 if add_to_result_map is not None:
2594 targets = (column, name, column.key) + result_map_targets
2595 if column._tq_label:
2596 targets += (column._tq_label,)
2597
2598 add_to_result_map(name, orig_name, targets, column.type)
2599
2600 if is_literal:
2601 # note we are not currently accommodating for
2602 # literal_column(quoted_name('ident', True)) here
2603 name = self.escape_literal_column(name)
2604 else:
2605 name = self.preparer.quote(name)
2606 table = column.table
2607 if table is None or not include_table or not table.named_with_column:
2608 return name
2609 else:
2610 effective_schema = self.preparer.schema_for_object(table)
2611
2612 if effective_schema:
2613 schema_prefix = (
2614 self.preparer.quote_schema(effective_schema) + "."
2615 )
2616 else:
2617 schema_prefix = ""
2618
2619 if TYPE_CHECKING:
2620 assert isinstance(table, NamedFromClause)
2621 tablename = table.name
2622
2623 if (
2624 not effective_schema
2625 and ambiguous_table_name_map
2626 and tablename in ambiguous_table_name_map
2627 ):
2628 tablename = ambiguous_table_name_map[tablename]
2629
2630 if isinstance(tablename, elements._truncated_label):
2631 tablename = self._truncated_identifier("alias", tablename)
2632
2633 return schema_prefix + self.preparer.quote(tablename) + "." + name
2634
2635 def visit_collation(self, element, **kw):
2636 return self.preparer.format_collation(element.collation)
2637
2638 def visit_fromclause(self, fromclause, **kwargs):
2639 return fromclause.name
2640
2641 def visit_index(self, index, **kwargs):
2642 return index.name
2643
2644 def visit_typeclause(self, typeclause, **kw):
2645 kw["type_expression"] = typeclause
2646 kw["identifier_preparer"] = self.preparer
2647 return self.dialect.type_compiler_instance.process(
2648 typeclause.type, **kw
2649 )
2650
2651 def post_process_text(self, text):
2652 if self.preparer._double_percents:
2653 text = text.replace("%", "%%")
2654 return text
2655
2656 def escape_literal_column(self, text):
2657 if self.preparer._double_percents:
2658 text = text.replace("%", "%%")
2659 return text
2660
2661 def visit_textclause(self, textclause, add_to_result_map=None, **kw):
2662 def do_bindparam(m):
2663 name = m.group(1)
2664 if name in textclause._bindparams:
2665 return self.process(textclause._bindparams[name], **kw)
2666 else:
2667 return self.bindparam_string(name, **kw)
2668
2669 if not self.stack:
2670 self.isplaintext = True
2671
2672 if add_to_result_map:
2673 # text() object is present in the columns clause of a
2674 # select(). Add a no-name entry to the result map so that
2675 # row[text()] produces a result
2676 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE)
2677
2678 # un-escape any \:params
2679 return BIND_PARAMS_ESC.sub(
2680 lambda m: m.group(1),
2681 BIND_PARAMS.sub(
2682 do_bindparam, self.post_process_text(textclause.text)
2683 ),
2684 )
2685
2686 def visit_textual_select(
2687 self, taf, compound_index=None, asfrom=False, **kw
2688 ):
2689 toplevel = not self.stack
2690 entry = self._default_stack_entry if toplevel else self.stack[-1]
2691
2692 new_entry: _CompilerStackEntry = {
2693 "correlate_froms": set(),
2694 "asfrom_froms": set(),
2695 "selectable": taf,
2696 }
2697 self.stack.append(new_entry)
2698
2699 if taf._independent_ctes:
2700 self._dispatch_independent_ctes(taf, kw)
2701
2702 populate_result_map = (
2703 toplevel
2704 or (
2705 compound_index == 0
2706 and entry.get("need_result_map_for_compound", False)
2707 )
2708 or entry.get("need_result_map_for_nested", False)
2709 )
2710
2711 if populate_result_map:
2712 self._ordered_columns = self._textual_ordered_columns = (
2713 taf.positional
2714 )
2715
2716 # enable looser result column matching when the SQL text links to
2717 # Column objects by name only
2718 self._loose_column_name_matching = not taf.positional and bool(
2719 taf.column_args
2720 )
2721
2722 for c in taf.column_args:
2723 self.process(
2724 c,
2725 within_columns_clause=True,
2726 add_to_result_map=self._add_to_result_map,
2727 )
2728
2729 text = self.process(taf.element, **kw)
2730 if self.ctes:
2731 nesting_level = len(self.stack) if not toplevel else None
2732 text = self._render_cte_clause(nesting_level=nesting_level) + text
2733
2734 self.stack.pop(-1)
2735
2736 return text
2737
2738 def visit_null(self, expr, **kw):
2739 return "NULL"
2740
2741 def visit_true(self, expr, **kw):
2742 if self.dialect.supports_native_boolean:
2743 return "true"
2744 else:
2745 return "1"
2746
2747 def visit_false(self, expr, **kw):
2748 if self.dialect.supports_native_boolean:
2749 return "false"
2750 else:
2751 return "0"
2752
2753 def _generate_delimited_list(self, elements, separator, **kw):
2754 return separator.join(
2755 s
2756 for s in (c._compiler_dispatch(self, **kw) for c in elements)
2757 if s
2758 )
2759
2760 def _generate_delimited_and_list(self, clauses, **kw):
2761 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean(
2762 operators.and_,
2763 elements.True_._singleton,
2764 elements.False_._singleton,
2765 clauses,
2766 )
2767 if lcc == 1:
2768 return clauses[0]._compiler_dispatch(self, **kw)
2769 else:
2770 separator = OPERATORS[operators.and_]
2771 return separator.join(
2772 s
2773 for s in (c._compiler_dispatch(self, **kw) for c in clauses)
2774 if s
2775 )
2776
2777 def visit_tuple(self, clauselist, **kw):
2778 return "(%s)" % self.visit_clauselist(clauselist, **kw)
2779
2780 def visit_clauselist(self, clauselist, **kw):
2781 sep = clauselist.operator
2782 if sep is None:
2783 sep = " "
2784 else:
2785 sep = OPERATORS[clauselist.operator]
2786
2787 return self._generate_delimited_list(clauselist.clauses, sep, **kw)
2788
2789 def visit_expression_clauselist(self, clauselist, **kw):
2790 operator_ = clauselist.operator
2791
2792 disp = self._get_operator_dispatch(
2793 operator_, "expression_clauselist", None
2794 )
2795 if disp:
2796 return disp(clauselist, operator_, **kw)
2797
2798 try:
2799 opstring = OPERATORS[operator_]
2800 except KeyError as err:
2801 raise exc.UnsupportedCompilationError(self, operator_) from err
2802 else:
2803 kw["_in_operator_expression"] = True
2804 return self._generate_delimited_list(
2805 clauselist.clauses, opstring, **kw
2806 )
2807
2808 def visit_case(self, clause, **kwargs):
2809 x = "CASE "
2810 if clause.value is not None:
2811 x += clause.value._compiler_dispatch(self, **kwargs) + " "
2812 for cond, result in clause.whens:
2813 x += (
2814 "WHEN "
2815 + cond._compiler_dispatch(self, **kwargs)
2816 + " THEN "
2817 + result._compiler_dispatch(self, **kwargs)
2818 + " "
2819 )
2820 if clause.else_ is not None:
2821 x += (
2822 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " "
2823 )
2824 x += "END"
2825 return x
2826
2827 def visit_type_coerce(self, type_coerce, **kw):
2828 return type_coerce.typed_expression._compiler_dispatch(self, **kw)
2829
2830 def visit_cast(self, cast, **kwargs):
2831 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
2832 match = re.match("(.*)( COLLATE .*)", type_clause)
2833 return "CAST(%s AS %s)%s" % (
2834 cast.clause._compiler_dispatch(self, **kwargs),
2835 match.group(1) if match else type_clause,
2836 match.group(2) if match else "",
2837 )
2838
2839 def visit_frame_clause(self, frameclause, **kw):
2840
2841 if frameclause.lower_type is elements._FrameClauseType.RANGE_UNBOUNDED:
2842 left = "UNBOUNDED PRECEDING"
2843 elif frameclause.lower_type is elements._FrameClauseType.RANGE_CURRENT:
2844 left = "CURRENT ROW"
2845 else:
2846 val = self.process(frameclause.lower_integer_bind, **kw)
2847 if (
2848 frameclause.lower_type
2849 is elements._FrameClauseType.RANGE_PRECEDING
2850 ):
2851 left = f"{val} PRECEDING"
2852 else:
2853 left = f"{val} FOLLOWING"
2854
2855 if frameclause.upper_type is elements._FrameClauseType.RANGE_UNBOUNDED:
2856 right = "UNBOUNDED FOLLOWING"
2857 elif frameclause.upper_type is elements._FrameClauseType.RANGE_CURRENT:
2858 right = "CURRENT ROW"
2859 else:
2860 val = self.process(frameclause.upper_integer_bind, **kw)
2861 if (
2862 frameclause.upper_type
2863 is elements._FrameClauseType.RANGE_PRECEDING
2864 ):
2865 right = f"{val} PRECEDING"
2866 else:
2867 right = f"{val} FOLLOWING"
2868
2869 return f"{left} AND {right}"
2870
2871 def visit_over(self, over, **kwargs):
2872 text = over.element._compiler_dispatch(self, **kwargs)
2873 if over.range_ is not None:
2874 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}"
2875 elif over.rows is not None:
2876 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}"
2877 else:
2878 range_ = None
2879
2880 return "%s OVER (%s)" % (
2881 text,
2882 " ".join(
2883 [
2884 "%s BY %s"
2885 % (word, clause._compiler_dispatch(self, **kwargs))
2886 for word, clause in (
2887 ("PARTITION", over.partition_by),
2888 ("ORDER", over.order_by),
2889 )
2890 if clause is not None and len(clause)
2891 ]
2892 + ([range_] if range_ else [])
2893 ),
2894 )
2895
2896 def visit_withingroup(self, withingroup, **kwargs):
2897 return "%s WITHIN GROUP (ORDER BY %s)" % (
2898 withingroup.element._compiler_dispatch(self, **kwargs),
2899 withingroup.order_by._compiler_dispatch(self, **kwargs),
2900 )
2901
2902 def visit_funcfilter(self, funcfilter, **kwargs):
2903 return "%s FILTER (WHERE %s)" % (
2904 funcfilter.func._compiler_dispatch(self, **kwargs),
2905 funcfilter.criterion._compiler_dispatch(self, **kwargs),
2906 )
2907
2908 def visit_extract(self, extract, **kwargs):
2909 field = self.extract_map.get(extract.field, extract.field)
2910 return "EXTRACT(%s FROM %s)" % (
2911 field,
2912 extract.expr._compiler_dispatch(self, **kwargs),
2913 )
2914
2915 def visit_scalar_function_column(self, element, **kw):
2916 compiled_fn = self.visit_function(element.fn, **kw)
2917 compiled_col = self.visit_column(element, **kw)
2918 return "(%s).%s" % (compiled_fn, compiled_col)
2919
2920 def visit_function(
2921 self,
2922 func: Function[Any],
2923 add_to_result_map: Optional[_ResultMapAppender] = None,
2924 **kwargs: Any,
2925 ) -> str:
2926 if add_to_result_map is not None:
2927 add_to_result_map(func.name, func.name, (func.name,), func.type)
2928
2929 disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
2930
2931 text: str
2932
2933 if disp:
2934 text = disp(func, **kwargs)
2935 else:
2936 name = FUNCTIONS.get(func._deannotate().__class__, None)
2937 if name:
2938 if func._has_args:
2939 name += "%(expr)s"
2940 else:
2941 name = func.name
2942 name = (
2943 self.preparer.quote(name)
2944 if self.preparer._requires_quotes_illegal_chars(name)
2945 or isinstance(name, elements.quoted_name)
2946 else name
2947 )
2948 name = name + "%(expr)s"
2949 text = ".".join(
2950 [
2951 (
2952 self.preparer.quote(tok)
2953 if self.preparer._requires_quotes_illegal_chars(tok)
2954 or isinstance(name, elements.quoted_name)
2955 else tok
2956 )
2957 for tok in func.packagenames
2958 ]
2959 + [name]
2960 ) % {"expr": self.function_argspec(func, **kwargs)}
2961
2962 if func._with_ordinality:
2963 text += " WITH ORDINALITY"
2964 return text
2965
2966 def visit_next_value_func(self, next_value, **kw):
2967 return self.visit_sequence(next_value.sequence)
2968
2969 def visit_sequence(self, sequence, **kw):
2970 raise NotImplementedError(
2971 "Dialect '%s' does not support sequence increments."
2972 % self.dialect.name
2973 )
2974
2975 def function_argspec(self, func, **kwargs):
2976 return func.clause_expr._compiler_dispatch(self, **kwargs)
2977
2978 def visit_compound_select(
2979 self, cs, asfrom=False, compound_index=None, **kwargs
2980 ):
2981 toplevel = not self.stack
2982
2983 compile_state = cs._compile_state_factory(cs, self, **kwargs)
2984
2985 if toplevel and not self.compile_state:
2986 self.compile_state = compile_state
2987
2988 compound_stmt = compile_state.statement
2989
2990 entry = self._default_stack_entry if toplevel else self.stack[-1]
2991 need_result_map = toplevel or (
2992 not compound_index
2993 and entry.get("need_result_map_for_compound", False)
2994 )
2995
2996 # indicates there is already a CompoundSelect in play
2997 if compound_index == 0:
2998 entry["select_0"] = cs
2999
3000 self.stack.append(
3001 {
3002 "correlate_froms": entry["correlate_froms"],
3003 "asfrom_froms": entry["asfrom_froms"],
3004 "selectable": cs,
3005 "compile_state": compile_state,
3006 "need_result_map_for_compound": need_result_map,
3007 }
3008 )
3009
3010 if compound_stmt._independent_ctes:
3011 self._dispatch_independent_ctes(compound_stmt, kwargs)
3012
3013 keyword = self.compound_keywords[cs.keyword]
3014
3015 text = (" " + keyword + " ").join(
3016 (
3017 c._compiler_dispatch(
3018 self, asfrom=asfrom, compound_index=i, **kwargs
3019 )
3020 for i, c in enumerate(cs.selects)
3021 )
3022 )
3023
3024 kwargs["include_table"] = False
3025 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs))
3026 text += self.order_by_clause(cs, **kwargs)
3027 if cs._has_row_limiting_clause:
3028 text += self._row_limit_clause(cs, **kwargs)
3029
3030 if self.ctes:
3031 nesting_level = len(self.stack) if not toplevel else None
3032 text = (
3033 self._render_cte_clause(
3034 nesting_level=nesting_level,
3035 include_following_stack=True,
3036 )
3037 + text
3038 )
3039
3040 self.stack.pop(-1)
3041 return text
3042
3043 def _row_limit_clause(self, cs, **kwargs):
3044 if cs._fetch_clause is not None:
3045 return self.fetch_clause(cs, **kwargs)
3046 else:
3047 return self.limit_clause(cs, **kwargs)
3048
3049 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2):
3050 attrname = "visit_%s_%s%s" % (
3051 operator_.__name__,
3052 qualifier1,
3053 "_" + qualifier2 if qualifier2 else "",
3054 )
3055 return getattr(self, attrname, None)
3056
3057 def visit_unary(
3058 self, unary, add_to_result_map=None, result_map_targets=(), **kw
3059 ):
3060 if add_to_result_map is not None:
3061 result_map_targets += (unary,)
3062 kw["add_to_result_map"] = add_to_result_map
3063 kw["result_map_targets"] = result_map_targets
3064
3065 if unary.operator:
3066 if unary.modifier:
3067 raise exc.CompileError(
3068 "Unary expression does not support operator "
3069 "and modifier simultaneously"
3070 )
3071 disp = self._get_operator_dispatch(
3072 unary.operator, "unary", "operator"
3073 )
3074 if disp:
3075 return disp(unary, unary.operator, **kw)
3076 else:
3077 return self._generate_generic_unary_operator(
3078 unary, OPERATORS[unary.operator], **kw
3079 )
3080 elif unary.modifier:
3081 disp = self._get_operator_dispatch(
3082 unary.modifier, "unary", "modifier"
3083 )
3084 if disp:
3085 return disp(unary, unary.modifier, **kw)
3086 else:
3087 return self._generate_generic_unary_modifier(
3088 unary, OPERATORS[unary.modifier], **kw
3089 )
3090 else:
3091 raise exc.CompileError(
3092 "Unary expression has no operator or modifier"
3093 )
3094
3095 def visit_truediv_binary(self, binary, operator, **kw):
3096 if self.dialect.div_is_floordiv:
3097 return (
3098 self.process(binary.left, **kw)
3099 + " / "
3100 # TODO: would need a fast cast again here,
3101 # unless we want to use an implicit cast like "+ 0.0"
3102 + self.process(
3103 elements.Cast(
3104 binary.right,
3105 (
3106 binary.right.type
3107 if binary.right.type._type_affinity
3108 is sqltypes.Numeric
3109 else sqltypes.Numeric()
3110 ),
3111 ),
3112 **kw,
3113 )
3114 )
3115 else:
3116 return (
3117 self.process(binary.left, **kw)
3118 + " / "
3119 + self.process(binary.right, **kw)
3120 )
3121
3122 def visit_floordiv_binary(self, binary, operator, **kw):
3123 if (
3124 self.dialect.div_is_floordiv
3125 and binary.right.type._type_affinity is sqltypes.Integer
3126 ):
3127 return (
3128 self.process(binary.left, **kw)
3129 + " / "
3130 + self.process(binary.right, **kw)
3131 )
3132 else:
3133 return "FLOOR(%s)" % (
3134 self.process(binary.left, **kw)
3135 + " / "
3136 + self.process(binary.right, **kw)
3137 )
3138
3139 def visit_is_true_unary_operator(self, element, operator, **kw):
3140 if (
3141 element._is_implicitly_boolean
3142 or self.dialect.supports_native_boolean
3143 ):
3144 return self.process(element.element, **kw)
3145 else:
3146 return "%s = 1" % self.process(element.element, **kw)
3147
3148 def visit_is_false_unary_operator(self, element, operator, **kw):
3149 if (
3150 element._is_implicitly_boolean
3151 or self.dialect.supports_native_boolean
3152 ):
3153 return "NOT %s" % self.process(element.element, **kw)
3154 else:
3155 return "%s = 0" % self.process(element.element, **kw)
3156
3157 def visit_not_match_op_binary(self, binary, operator, **kw):
3158 return "NOT %s" % self.visit_binary(
3159 binary, override_operator=operators.match_op
3160 )
3161
3162 def visit_not_in_op_binary(self, binary, operator, **kw):
3163 # The brackets are required in the NOT IN operation because the empty
3164 # case is handled using the form "(col NOT IN (null) OR 1 = 1)".
3165 # The presence of the OR makes the brackets required.
3166 return "(%s)" % self._generate_generic_binary(
3167 binary, OPERATORS[operator], **kw
3168 )
3169
3170 def visit_empty_set_op_expr(self, type_, expand_op, **kw):
3171 if expand_op is operators.not_in_op:
3172 if len(type_) > 1:
3173 return "(%s)) OR (1 = 1" % (
3174 ", ".join("NULL" for element in type_)
3175 )
3176 else:
3177 return "NULL) OR (1 = 1"
3178 elif expand_op is operators.in_op:
3179 if len(type_) > 1:
3180 return "(%s)) AND (1 != 1" % (
3181 ", ".join("NULL" for element in type_)
3182 )
3183 else:
3184 return "NULL) AND (1 != 1"
3185 else:
3186 return self.visit_empty_set_expr(type_)
3187
3188 def visit_empty_set_expr(self, element_types, **kw):
3189 raise NotImplementedError(
3190 "Dialect '%s' does not support empty set expression."
3191 % self.dialect.name
3192 )
3193
3194 def _literal_execute_expanding_parameter_literal_binds(
3195 self, parameter, values, bind_expression_template=None
3196 ):
3197 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect)
3198
3199 if not values:
3200 # empty IN expression. note we don't need to use
3201 # bind_expression_template here because there are no
3202 # expressions to render.
3203
3204 if typ_dialect_impl._is_tuple_type:
3205 replacement_expression = (
3206 "VALUES " if self.dialect.tuple_in_values else ""
3207 ) + self.visit_empty_set_op_expr(
3208 parameter.type.types, parameter.expand_op
3209 )
3210
3211 else:
3212 replacement_expression = self.visit_empty_set_op_expr(
3213 [parameter.type], parameter.expand_op
3214 )
3215
3216 elif typ_dialect_impl._is_tuple_type or (
3217 typ_dialect_impl._isnull
3218 and isinstance(values[0], collections_abc.Sequence)
3219 and not isinstance(values[0], (str, bytes))
3220 ):
3221 if typ_dialect_impl._has_bind_expression:
3222 raise NotImplementedError(
3223 "bind_expression() on TupleType not supported with "
3224 "literal_binds"
3225 )
3226
3227 replacement_expression = (
3228 "VALUES " if self.dialect.tuple_in_values else ""
3229 ) + ", ".join(
3230 "(%s)"
3231 % (
3232 ", ".join(
3233 self.render_literal_value(value, param_type)
3234 for value, param_type in zip(
3235 tuple_element, parameter.type.types
3236 )
3237 )
3238 )
3239 for i, tuple_element in enumerate(values)
3240 )
3241 else:
3242 if bind_expression_template:
3243 post_compile_pattern = self._post_compile_pattern
3244 m = post_compile_pattern.search(bind_expression_template)
3245 assert m and m.group(
3246 2
3247 ), "unexpected format for expanding parameter"
3248
3249 tok = m.group(2).split("~~")
3250 be_left, be_right = tok[1], tok[3]
3251 replacement_expression = ", ".join(
3252 "%s%s%s"
3253 % (
3254 be_left,
3255 self.render_literal_value(value, parameter.type),
3256 be_right,
3257 )
3258 for value in values
3259 )
3260 else:
3261 replacement_expression = ", ".join(
3262 self.render_literal_value(value, parameter.type)
3263 for value in values
3264 )
3265
3266 return (), replacement_expression
3267
3268 def _literal_execute_expanding_parameter(self, name, parameter, values):
3269 if parameter.literal_execute:
3270 return self._literal_execute_expanding_parameter_literal_binds(
3271 parameter, values
3272 )
3273
3274 dialect = self.dialect
3275 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect)
3276
3277 if self._numeric_binds:
3278 bind_template = self.compilation_bindtemplate
3279 else:
3280 bind_template = self.bindtemplate
3281
3282 if (
3283 self.dialect._bind_typing_render_casts
3284 and typ_dialect_impl.render_bind_cast
3285 ):
3286
3287 def _render_bindtemplate(name):
3288 return self.render_bind_cast(
3289 parameter.type,
3290 typ_dialect_impl,
3291 bind_template % {"name": name},
3292 )
3293
3294 else:
3295
3296 def _render_bindtemplate(name):
3297 return bind_template % {"name": name}
3298
3299 if not values:
3300 to_update = []
3301 if typ_dialect_impl._is_tuple_type:
3302 replacement_expression = self.visit_empty_set_op_expr(
3303 parameter.type.types, parameter.expand_op
3304 )
3305 else:
3306 replacement_expression = self.visit_empty_set_op_expr(
3307 [parameter.type], parameter.expand_op
3308 )
3309
3310 elif typ_dialect_impl._is_tuple_type or (
3311 typ_dialect_impl._isnull
3312 and isinstance(values[0], collections_abc.Sequence)
3313 and not isinstance(values[0], (str, bytes))
3314 ):
3315 assert not typ_dialect_impl._is_array
3316 to_update = [
3317 ("%s_%s_%s" % (name, i, j), value)
3318 for i, tuple_element in enumerate(values, 1)
3319 for j, value in enumerate(tuple_element, 1)
3320 ]
3321
3322 replacement_expression = (
3323 "VALUES " if dialect.tuple_in_values else ""
3324 ) + ", ".join(
3325 "(%s)"
3326 % (
3327 ", ".join(
3328 _render_bindtemplate(
3329 to_update[i * len(tuple_element) + j][0]
3330 )
3331 for j, value in enumerate(tuple_element)
3332 )
3333 )
3334 for i, tuple_element in enumerate(values)
3335 )
3336 else:
3337 to_update = [
3338 ("%s_%s" % (name, i), value)
3339 for i, value in enumerate(values, 1)
3340 ]
3341 replacement_expression = ", ".join(
3342 _render_bindtemplate(key) for key, value in to_update
3343 )
3344
3345 return to_update, replacement_expression
3346
3347 def visit_binary(
3348 self,
3349 binary,
3350 override_operator=None,
3351 eager_grouping=False,
3352 from_linter=None,
3353 lateral_from_linter=None,
3354 **kw,
3355 ):
3356 if from_linter and operators.is_comparison(binary.operator):
3357 if lateral_from_linter is not None:
3358 enclosing_lateral = kw["enclosing_lateral"]
3359 lateral_from_linter.edges.update(
3360 itertools.product(
3361 _de_clone(
3362 binary.left._from_objects + [enclosing_lateral]
3363 ),
3364 _de_clone(
3365 binary.right._from_objects + [enclosing_lateral]
3366 ),
3367 )
3368 )
3369 else:
3370 from_linter.edges.update(
3371 itertools.product(
3372 _de_clone(binary.left._from_objects),
3373 _de_clone(binary.right._from_objects),
3374 )
3375 )
3376
3377 # don't allow "? = ?" to render
3378 if (
3379 self.ansi_bind_rules
3380 and isinstance(binary.left, elements.BindParameter)
3381 and isinstance(binary.right, elements.BindParameter)
3382 ):
3383 kw["literal_execute"] = True
3384
3385 operator_ = override_operator or binary.operator
3386 disp = self._get_operator_dispatch(operator_, "binary", None)
3387 if disp:
3388 return disp(binary, operator_, **kw)
3389 else:
3390 try:
3391 opstring = OPERATORS[operator_]
3392 except KeyError as err:
3393 raise exc.UnsupportedCompilationError(self, operator_) from err
3394 else:
3395 return self._generate_generic_binary(
3396 binary,
3397 opstring,
3398 from_linter=from_linter,
3399 lateral_from_linter=lateral_from_linter,
3400 **kw,
3401 )
3402
3403 def visit_function_as_comparison_op_binary(self, element, operator, **kw):
3404 return self.process(element.sql_function, **kw)
3405
3406 def visit_mod_binary(self, binary, operator, **kw):
3407 if self.preparer._double_percents:
3408 return (
3409 self.process(binary.left, **kw)
3410 + " %% "
3411 + self.process(binary.right, **kw)
3412 )
3413 else:
3414 return (
3415 self.process(binary.left, **kw)
3416 + " % "
3417 + self.process(binary.right, **kw)
3418 )
3419
3420 def visit_custom_op_binary(self, element, operator, **kw):
3421 kw["eager_grouping"] = operator.eager_grouping
3422 return self._generate_generic_binary(
3423 element,
3424 " " + self.escape_literal_column(operator.opstring) + " ",
3425 **kw,
3426 )
3427
3428 def visit_custom_op_unary_operator(self, element, operator, **kw):
3429 return self._generate_generic_unary_operator(
3430 element, self.escape_literal_column(operator.opstring) + " ", **kw
3431 )
3432
3433 def visit_custom_op_unary_modifier(self, element, operator, **kw):
3434 return self._generate_generic_unary_modifier(
3435 element, " " + self.escape_literal_column(operator.opstring), **kw
3436 )
3437
3438 def _generate_generic_binary(
3439 self, binary, opstring, eager_grouping=False, **kw
3440 ):
3441 _in_operator_expression = kw.get("_in_operator_expression", False)
3442
3443 kw["_in_operator_expression"] = True
3444 kw["_binary_op"] = binary.operator
3445 text = (
3446 binary.left._compiler_dispatch(
3447 self, eager_grouping=eager_grouping, **kw
3448 )
3449 + opstring
3450 + binary.right._compiler_dispatch(
3451 self, eager_grouping=eager_grouping, **kw
3452 )
3453 )
3454
3455 if _in_operator_expression and eager_grouping:
3456 text = "(%s)" % text
3457 return text
3458
3459 def _generate_generic_unary_operator(self, unary, opstring, **kw):
3460 return opstring + unary.element._compiler_dispatch(self, **kw)
3461
3462 def _generate_generic_unary_modifier(self, unary, opstring, **kw):
3463 return unary.element._compiler_dispatch(self, **kw) + opstring
3464
3465 @util.memoized_property
3466 def _like_percent_literal(self):
3467 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE)
3468
3469 def visit_ilike_case_insensitive_operand(self, element, **kw):
3470 return f"lower({element.element._compiler_dispatch(self, **kw)})"
3471
3472 def visit_contains_op_binary(self, binary, operator, **kw):
3473 binary = binary._clone()
3474 percent = self._like_percent_literal
3475 binary.right = percent.concat(binary.right).concat(percent)
3476 return self.visit_like_op_binary(binary, operator, **kw)
3477
3478 def visit_not_contains_op_binary(self, binary, operator, **kw):
3479 binary = binary._clone()
3480 percent = self._like_percent_literal
3481 binary.right = percent.concat(binary.right).concat(percent)
3482 return self.visit_not_like_op_binary(binary, operator, **kw)
3483
3484 def visit_icontains_op_binary(self, binary, operator, **kw):
3485 binary = binary._clone()
3486 percent = self._like_percent_literal
3487 binary.left = ilike_case_insensitive(binary.left)
3488 binary.right = percent.concat(
3489 ilike_case_insensitive(binary.right)
3490 ).concat(percent)
3491 return self.visit_ilike_op_binary(binary, operator, **kw)
3492
3493 def visit_not_icontains_op_binary(self, binary, operator, **kw):
3494 binary = binary._clone()
3495 percent = self._like_percent_literal
3496 binary.left = ilike_case_insensitive(binary.left)
3497 binary.right = percent.concat(
3498 ilike_case_insensitive(binary.right)
3499 ).concat(percent)
3500 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3501
3502 def visit_startswith_op_binary(self, binary, operator, **kw):
3503 binary = binary._clone()
3504 percent = self._like_percent_literal
3505 binary.right = percent._rconcat(binary.right)
3506 return self.visit_like_op_binary(binary, operator, **kw)
3507
3508 def visit_not_startswith_op_binary(self, binary, operator, **kw):
3509 binary = binary._clone()
3510 percent = self._like_percent_literal
3511 binary.right = percent._rconcat(binary.right)
3512 return self.visit_not_like_op_binary(binary, operator, **kw)
3513
3514 def visit_istartswith_op_binary(self, binary, operator, **kw):
3515 binary = binary._clone()
3516 percent = self._like_percent_literal
3517 binary.left = ilike_case_insensitive(binary.left)
3518 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3519 return self.visit_ilike_op_binary(binary, operator, **kw)
3520
3521 def visit_not_istartswith_op_binary(self, binary, operator, **kw):
3522 binary = binary._clone()
3523 percent = self._like_percent_literal
3524 binary.left = ilike_case_insensitive(binary.left)
3525 binary.right = percent._rconcat(ilike_case_insensitive(binary.right))
3526 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3527
3528 def visit_endswith_op_binary(self, binary, operator, **kw):
3529 binary = binary._clone()
3530 percent = self._like_percent_literal
3531 binary.right = percent.concat(binary.right)
3532 return self.visit_like_op_binary(binary, operator, **kw)
3533
3534 def visit_not_endswith_op_binary(self, binary, operator, **kw):
3535 binary = binary._clone()
3536 percent = self._like_percent_literal
3537 binary.right = percent.concat(binary.right)
3538 return self.visit_not_like_op_binary(binary, operator, **kw)
3539
3540 def visit_iendswith_op_binary(self, binary, operator, **kw):
3541 binary = binary._clone()
3542 percent = self._like_percent_literal
3543 binary.left = ilike_case_insensitive(binary.left)
3544 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3545 return self.visit_ilike_op_binary(binary, operator, **kw)
3546
3547 def visit_not_iendswith_op_binary(self, binary, operator, **kw):
3548 binary = binary._clone()
3549 percent = self._like_percent_literal
3550 binary.left = ilike_case_insensitive(binary.left)
3551 binary.right = percent.concat(ilike_case_insensitive(binary.right))
3552 return self.visit_not_ilike_op_binary(binary, operator, **kw)
3553
3554 def visit_like_op_binary(self, binary, operator, **kw):
3555 escape = binary.modifiers.get("escape", None)
3556
3557 return "%s LIKE %s" % (
3558 binary.left._compiler_dispatch(self, **kw),
3559 binary.right._compiler_dispatch(self, **kw),
3560 ) + (
3561 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3562 if escape is not None
3563 else ""
3564 )
3565
3566 def visit_not_like_op_binary(self, binary, operator, **kw):
3567 escape = binary.modifiers.get("escape", None)
3568 return "%s NOT LIKE %s" % (
3569 binary.left._compiler_dispatch(self, **kw),
3570 binary.right._compiler_dispatch(self, **kw),
3571 ) + (
3572 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
3573 if escape is not None
3574 else ""
3575 )
3576
3577 def visit_ilike_op_binary(self, binary, operator, **kw):
3578 if operator is operators.ilike_op:
3579 binary = binary._clone()
3580 binary.left = ilike_case_insensitive(binary.left)
3581 binary.right = ilike_case_insensitive(binary.right)
3582 # else we assume ilower() has been applied
3583
3584 return self.visit_like_op_binary(binary, operator, **kw)
3585
3586 def visit_not_ilike_op_binary(self, binary, operator, **kw):
3587 if operator is operators.not_ilike_op:
3588 binary = binary._clone()
3589 binary.left = ilike_case_insensitive(binary.left)
3590 binary.right = ilike_case_insensitive(binary.right)
3591 # else we assume ilower() has been applied
3592
3593 return self.visit_not_like_op_binary(binary, operator, **kw)
3594
3595 def visit_between_op_binary(self, binary, operator, **kw):
3596 symmetric = binary.modifiers.get("symmetric", False)
3597 return self._generate_generic_binary(
3598 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw
3599 )
3600
3601 def visit_not_between_op_binary(self, binary, operator, **kw):
3602 symmetric = binary.modifiers.get("symmetric", False)
3603 return self._generate_generic_binary(
3604 binary,
3605 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
3606 **kw,
3607 )
3608
3609 def visit_regexp_match_op_binary(self, binary, operator, **kw):
3610 raise exc.CompileError(
3611 "%s dialect does not support regular expressions"
3612 % self.dialect.name
3613 )
3614
3615 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
3616 raise exc.CompileError(
3617 "%s dialect does not support regular expressions"
3618 % self.dialect.name
3619 )
3620
3621 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
3622 raise exc.CompileError(
3623 "%s dialect does not support regular expression replacements"
3624 % self.dialect.name
3625 )
3626
3627 def visit_bindparam(
3628 self,
3629 bindparam,
3630 within_columns_clause=False,
3631 literal_binds=False,
3632 skip_bind_expression=False,
3633 literal_execute=False,
3634 render_postcompile=False,
3635 **kwargs,
3636 ):
3637
3638 if not skip_bind_expression:
3639 impl = bindparam.type.dialect_impl(self.dialect)
3640 if impl._has_bind_expression:
3641 bind_expression = impl.bind_expression(bindparam)
3642 wrapped = self.process(
3643 bind_expression,
3644 skip_bind_expression=True,
3645 within_columns_clause=within_columns_clause,
3646 literal_binds=literal_binds and not bindparam.expanding,
3647 literal_execute=literal_execute,
3648 render_postcompile=render_postcompile,
3649 **kwargs,
3650 )
3651 if bindparam.expanding:
3652 # for postcompile w/ expanding, move the "wrapped" part
3653 # of this into the inside
3654
3655 m = re.match(
3656 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped
3657 )
3658 assert m, "unexpected format for expanding parameter"
3659 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % (
3660 m.group(2),
3661 m.group(1),
3662 m.group(3),
3663 )
3664
3665 if literal_binds:
3666 ret = self.render_literal_bindparam(
3667 bindparam,
3668 within_columns_clause=True,
3669 bind_expression_template=wrapped,
3670 **kwargs,
3671 )
3672 return f"({ret})"
3673
3674 return wrapped
3675
3676 if not literal_binds:
3677 literal_execute = (
3678 literal_execute
3679 or bindparam.literal_execute
3680 or (within_columns_clause and self.ansi_bind_rules)
3681 )
3682 post_compile = literal_execute or bindparam.expanding
3683 else:
3684 post_compile = False
3685
3686 if literal_binds:
3687 ret = self.render_literal_bindparam(
3688 bindparam, within_columns_clause=True, **kwargs
3689 )
3690 if bindparam.expanding:
3691 ret = f"({ret})"
3692 return ret
3693
3694 name = self._truncate_bindparam(bindparam)
3695
3696 if name in self.binds:
3697 existing = self.binds[name]
3698 if existing is not bindparam:
3699 if (
3700 (existing.unique or bindparam.unique)
3701 and not existing.proxy_set.intersection(
3702 bindparam.proxy_set
3703 )
3704 and not existing._cloned_set.intersection(
3705 bindparam._cloned_set
3706 )
3707 ):
3708 raise exc.CompileError(
3709 "Bind parameter '%s' conflicts with "
3710 "unique bind parameter of the same name" % name
3711 )
3712 elif existing.expanding != bindparam.expanding:
3713 raise exc.CompileError(
3714 "Can't reuse bound parameter name '%s' in both "
3715 "'expanding' (e.g. within an IN expression) and "
3716 "non-expanding contexts. If this parameter is to "
3717 "receive a list/array value, set 'expanding=True' on "
3718 "it for expressions that aren't IN, otherwise use "
3719 "a different parameter name." % (name,)
3720 )
3721 elif existing._is_crud or bindparam._is_crud:
3722 if existing._is_crud and bindparam._is_crud:
3723 # TODO: this condition is not well understood.
3724 # see tests in test/sql/test_update.py
3725 raise exc.CompileError(
3726 "Encountered unsupported case when compiling an "
3727 "INSERT or UPDATE statement. If this is a "
3728 "multi-table "
3729 "UPDATE statement, please provide string-named "
3730 "arguments to the "
3731 "values() method with distinct names; support for "
3732 "multi-table UPDATE statements that "
3733 "target multiple tables for UPDATE is very "
3734 "limited",
3735 )
3736 else:
3737 raise exc.CompileError(
3738 f"bindparam() name '{bindparam.key}' is reserved "
3739 "for automatic usage in the VALUES or SET "
3740 "clause of this "
3741 "insert/update statement. Please use a "
3742 "name other than column name when using "
3743 "bindparam() "
3744 "with insert() or update() (for example, "
3745 f"'b_{bindparam.key}')."
3746 )
3747
3748 self.binds[bindparam.key] = self.binds[name] = bindparam
3749
3750 # if we are given a cache key that we're going to match against,
3751 # relate the bindparam here to one that is most likely present
3752 # in the "extracted params" portion of the cache key. this is used
3753 # to set up a positional mapping that is used to determine the
3754 # correct parameters for a subsequent use of this compiled with
3755 # a different set of parameter values. here, we accommodate for
3756 # parameters that may have been cloned both before and after the cache
3757 # key was been generated.
3758 ckbm_tuple = self._cache_key_bind_match
3759
3760 if ckbm_tuple:
3761 ckbm, cksm = ckbm_tuple
3762 for bp in bindparam._cloned_set:
3763 if bp.key in cksm:
3764 cb = cksm[bp.key]
3765 ckbm[cb].append(bindparam)
3766
3767 if bindparam.isoutparam:
3768 self.has_out_parameters = True
3769
3770 if post_compile:
3771 if render_postcompile:
3772 self._render_postcompile = True
3773
3774 if literal_execute:
3775 self.literal_execute_params |= {bindparam}
3776 else:
3777 self.post_compile_params |= {bindparam}
3778
3779 ret = self.bindparam_string(
3780 name,
3781 post_compile=post_compile,
3782 expanding=bindparam.expanding,
3783 bindparam_type=bindparam.type,
3784 **kwargs,
3785 )
3786
3787 if bindparam.expanding:
3788 ret = f"({ret})"
3789
3790 return ret
3791
3792 def render_bind_cast(self, type_, dbapi_type, sqltext):
3793 raise NotImplementedError()
3794
3795 def render_literal_bindparam(
3796 self,
3797 bindparam,
3798 render_literal_value=NO_ARG,
3799 bind_expression_template=None,
3800 **kw,
3801 ):
3802 if render_literal_value is not NO_ARG:
3803 value = render_literal_value
3804 else:
3805 if bindparam.value is None and bindparam.callable is None:
3806 op = kw.get("_binary_op", None)
3807 if op and op not in (operators.is_, operators.is_not):
3808 util.warn_limited(
3809 "Bound parameter '%s' rendering literal NULL in a SQL "
3810 "expression; comparisons to NULL should not use "
3811 "operators outside of 'is' or 'is not'",
3812 (bindparam.key,),
3813 )
3814 return self.process(sqltypes.NULLTYPE, **kw)
3815 value = bindparam.effective_value
3816
3817 if bindparam.expanding:
3818 leep = self._literal_execute_expanding_parameter_literal_binds
3819 to_update, replacement_expr = leep(
3820 bindparam,
3821 value,
3822 bind_expression_template=bind_expression_template,
3823 )
3824 return replacement_expr
3825 else:
3826 return self.render_literal_value(value, bindparam.type)
3827
3828 def render_literal_value(self, value, type_):
3829 """Render the value of a bind parameter as a quoted literal.
3830
3831 This is used for statement sections that do not accept bind parameters
3832 on the target driver/database.
3833
3834 This should be implemented by subclasses using the quoting services
3835 of the DBAPI.
3836
3837 """
3838
3839 if value is None and not type_.should_evaluate_none:
3840 # issue #10535 - handle NULL in the compiler without placing
3841 # this onto each type, except for "evaluate None" types
3842 # (e.g. JSON)
3843 return self.process(elements.Null._instance())
3844
3845 processor = type_._cached_literal_processor(self.dialect)
3846 if processor:
3847 try:
3848 return processor(value)
3849 except Exception as e:
3850 raise exc.CompileError(
3851 f"Could not render literal value "
3852 f'"{sql_util._repr_single_value(value)}" '
3853 f"with datatype "
3854 f"{type_}; see parent stack trace for "
3855 "more detail."
3856 ) from e
3857
3858 else:
3859 raise exc.CompileError(
3860 f"No literal value renderer is available for literal value "
3861 f'"{sql_util._repr_single_value(value)}" '
3862 f"with datatype {type_}"
3863 )
3864
3865 def _truncate_bindparam(self, bindparam):
3866 if bindparam in self.bind_names:
3867 return self.bind_names[bindparam]
3868
3869 bind_name = bindparam.key
3870 if isinstance(bind_name, elements._truncated_label):
3871 bind_name = self._truncated_identifier("bindparam", bind_name)
3872
3873 # add to bind_names for translation
3874 self.bind_names[bindparam] = bind_name
3875
3876 return bind_name
3877
3878 def _truncated_identifier(
3879 self, ident_class: str, name: _truncated_label
3880 ) -> str:
3881 if (ident_class, name) in self.truncated_names:
3882 return self.truncated_names[(ident_class, name)]
3883
3884 anonname = name.apply_map(self.anon_map)
3885
3886 if len(anonname) > self.label_length - 6:
3887 counter = self._truncated_counters.get(ident_class, 1)
3888 truncname = (
3889 anonname[0 : max(self.label_length - 6, 0)]
3890 + "_"
3891 + hex(counter)[2:]
3892 )
3893 self._truncated_counters[ident_class] = counter + 1
3894 else:
3895 truncname = anonname
3896 self.truncated_names[(ident_class, name)] = truncname
3897 return truncname
3898
3899 def _anonymize(self, name: str) -> str:
3900 return name % self.anon_map
3901
3902 def bindparam_string(
3903 self,
3904 name: str,
3905 post_compile: bool = False,
3906 expanding: bool = False,
3907 escaped_from: Optional[str] = None,
3908 bindparam_type: Optional[TypeEngine[Any]] = None,
3909 accumulate_bind_names: Optional[Set[str]] = None,
3910 visited_bindparam: Optional[List[str]] = None,
3911 **kw: Any,
3912 ) -> str:
3913 # TODO: accumulate_bind_names is passed by crud.py to gather
3914 # names on a per-value basis, visited_bindparam is passed by
3915 # visit_insert() to collect all parameters in the statement.
3916 # see if this gathering can be simplified somehow
3917 if accumulate_bind_names is not None:
3918 accumulate_bind_names.add(name)
3919 if visited_bindparam is not None:
3920 visited_bindparam.append(name)
3921
3922 if not escaped_from:
3923 if self._bind_translate_re.search(name):
3924 # not quite the translate use case as we want to
3925 # also get a quick boolean if we even found
3926 # unusual characters in the name
3927 new_name = self._bind_translate_re.sub(
3928 lambda m: self._bind_translate_chars[m.group(0)],
3929 name,
3930 )
3931 escaped_from = name
3932 name = new_name
3933
3934 if escaped_from:
3935 self.escaped_bind_names = self.escaped_bind_names.union(
3936 {escaped_from: name}
3937 )
3938 if post_compile:
3939 ret = "__[POSTCOMPILE_%s]" % name
3940 if expanding:
3941 # for expanding, bound parameters or literal values will be
3942 # rendered per item
3943 return ret
3944
3945 # otherwise, for non-expanding "literal execute", apply
3946 # bind casts as determined by the datatype
3947 if bindparam_type is not None:
3948 type_impl = bindparam_type._unwrapped_dialect_impl(
3949 self.dialect
3950 )
3951 if type_impl.render_literal_cast:
3952 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
3953 return ret
3954 elif self.state is CompilerState.COMPILING:
3955 ret = self.compilation_bindtemplate % {"name": name}
3956 else:
3957 ret = self.bindtemplate % {"name": name}
3958
3959 if (
3960 bindparam_type is not None
3961 and self.dialect._bind_typing_render_casts
3962 ):
3963 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect)
3964 if type_impl.render_bind_cast:
3965 ret = self.render_bind_cast(bindparam_type, type_impl, ret)
3966
3967 return ret
3968
3969 def _dispatch_independent_ctes(self, stmt, kw):
3970 local_kw = kw.copy()
3971 local_kw.pop("cte_opts", None)
3972 for cte, opt in zip(
3973 stmt._independent_ctes, stmt._independent_ctes_opts
3974 ):
3975 cte._compiler_dispatch(self, cte_opts=opt, **local_kw)
3976
3977 def visit_cte(
3978 self,
3979 cte: CTE,
3980 asfrom: bool = False,
3981 ashint: bool = False,
3982 fromhints: Optional[_FromHintsType] = None,
3983 visiting_cte: Optional[CTE] = None,
3984 from_linter: Optional[FromLinter] = None,
3985 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
3986 **kwargs: Any,
3987 ) -> Optional[str]:
3988 self_ctes = self._init_cte_state()
3989 assert self_ctes is self.ctes
3990
3991 kwargs["visiting_cte"] = cte
3992
3993 cte_name = cte.name
3994
3995 if isinstance(cte_name, elements._truncated_label):
3996 cte_name = self._truncated_identifier("alias", cte_name)
3997
3998 is_new_cte = True
3999 embedded_in_current_named_cte = False
4000
4001 _reference_cte = cte._get_reference_cte()
4002
4003 nesting = cte.nesting or cte_opts.nesting
4004
4005 # check for CTE already encountered
4006 if _reference_cte in self.level_name_by_cte:
4007 cte_level, _, existing_cte_opts = self.level_name_by_cte[
4008 _reference_cte
4009 ]
4010 assert _ == cte_name
4011
4012 cte_level_name = (cte_level, cte_name)
4013 existing_cte = self.ctes_by_level_name[cte_level_name]
4014
4015 # check if we are receiving it here with a specific
4016 # "nest_here" location; if so, move it to this location
4017
4018 if cte_opts.nesting:
4019 if existing_cte_opts.nesting:
4020 raise exc.CompileError(
4021 "CTE is stated as 'nest_here' in "
4022 "more than one location"
4023 )
4024
4025 old_level_name = (cte_level, cte_name)
4026 cte_level = len(self.stack) if nesting else 1
4027 cte_level_name = new_level_name = (cte_level, cte_name)
4028
4029 del self.ctes_by_level_name[old_level_name]
4030 self.ctes_by_level_name[new_level_name] = existing_cte
4031 self.level_name_by_cte[_reference_cte] = new_level_name + (
4032 cte_opts,
4033 )
4034
4035 else:
4036 cte_level = len(self.stack) if nesting else 1
4037 cte_level_name = (cte_level, cte_name)
4038
4039 if cte_level_name in self.ctes_by_level_name:
4040 existing_cte = self.ctes_by_level_name[cte_level_name]
4041 else:
4042 existing_cte = None
4043
4044 if existing_cte is not None:
4045 embedded_in_current_named_cte = visiting_cte is existing_cte
4046
4047 # we've generated a same-named CTE that we are enclosed in,
4048 # or this is the same CTE. just return the name.
4049 if cte is existing_cte._restates or cte is existing_cte:
4050 is_new_cte = False
4051 elif existing_cte is cte._restates:
4052 # we've generated a same-named CTE that is
4053 # enclosed in us - we take precedence, so
4054 # discard the text for the "inner".
4055 del self_ctes[existing_cte]
4056
4057 existing_cte_reference_cte = existing_cte._get_reference_cte()
4058
4059 assert existing_cte_reference_cte is _reference_cte
4060 assert existing_cte_reference_cte is existing_cte
4061
4062 del self.level_name_by_cte[existing_cte_reference_cte]
4063 else:
4064 # if the two CTEs are deep-copy identical, consider them
4065 # the same, **if** they are clones, that is, they came from
4066 # the ORM or other visit method
4067 if (
4068 cte._is_clone_of is not None
4069 or existing_cte._is_clone_of is not None
4070 ) and cte.compare(existing_cte):
4071 is_new_cte = False
4072 else:
4073 raise exc.CompileError(
4074 "Multiple, unrelated CTEs found with "
4075 "the same name: %r" % cte_name
4076 )
4077
4078 if not asfrom and not is_new_cte:
4079 return None
4080
4081 if cte._cte_alias is not None:
4082 pre_alias_cte = cte._cte_alias
4083 cte_pre_alias_name = cte._cte_alias.name
4084 if isinstance(cte_pre_alias_name, elements._truncated_label):
4085 cte_pre_alias_name = self._truncated_identifier(
4086 "alias", cte_pre_alias_name
4087 )
4088 else:
4089 pre_alias_cte = cte
4090 cte_pre_alias_name = None
4091
4092 if is_new_cte:
4093 self.ctes_by_level_name[cte_level_name] = cte
4094 self.level_name_by_cte[_reference_cte] = cte_level_name + (
4095 cte_opts,
4096 )
4097
4098 if pre_alias_cte not in self.ctes:
4099 self.visit_cte(pre_alias_cte, **kwargs)
4100
4101 if not cte_pre_alias_name and cte not in self_ctes:
4102 if cte.recursive:
4103 self.ctes_recursive = True
4104 text = self.preparer.format_alias(cte, cte_name)
4105 if cte.recursive:
4106 col_source = cte.element
4107
4108 # TODO: can we get at the .columns_plus_names collection
4109 # that is already (or will be?) generated for the SELECT
4110 # rather than calling twice?
4111 recur_cols = [
4112 # TODO: proxy_name is not technically safe,
4113 # see test_cte->
4114 # test_with_recursive_no_name_currently_buggy. not
4115 # clear what should be done with such a case
4116 fallback_label_name or proxy_name
4117 for (
4118 _,
4119 proxy_name,
4120 fallback_label_name,
4121 c,
4122 repeated,
4123 ) in (col_source._generate_columns_plus_names(True))
4124 if not repeated
4125 ]
4126
4127 text += "(%s)" % (
4128 ", ".join(
4129 self.preparer.format_label_name(
4130 ident, anon_map=self.anon_map
4131 )
4132 for ident in recur_cols
4133 )
4134 )
4135
4136 assert kwargs.get("subquery", False) is False
4137
4138 if not self.stack:
4139 # toplevel, this is a stringify of the
4140 # cte directly. just compile the inner
4141 # the way alias() does.
4142 return cte.element._compiler_dispatch(
4143 self, asfrom=asfrom, **kwargs
4144 )
4145 else:
4146 prefixes = self._generate_prefixes(
4147 cte, cte._prefixes, **kwargs
4148 )
4149 inner = cte.element._compiler_dispatch(
4150 self, asfrom=True, **kwargs
4151 )
4152
4153 text += " AS %s\n(%s)" % (prefixes, inner)
4154
4155 if cte._suffixes:
4156 text += " " + self._generate_prefixes(
4157 cte, cte._suffixes, **kwargs
4158 )
4159
4160 self_ctes[cte] = text
4161
4162 if asfrom:
4163 if from_linter:
4164 from_linter.froms[cte._de_clone()] = cte_name
4165
4166 if not is_new_cte and embedded_in_current_named_cte:
4167 return self.preparer.format_alias(cte, cte_name)
4168
4169 if cte_pre_alias_name:
4170 text = self.preparer.format_alias(cte, cte_pre_alias_name)
4171 if self.preparer._requires_quotes(cte_name):
4172 cte_name = self.preparer.quote(cte_name)
4173 text += self.get_render_as_alias_suffix(cte_name)
4174 return text
4175 else:
4176 return self.preparer.format_alias(cte, cte_name)
4177
4178 return None
4179
4180 def visit_table_valued_alias(self, element, **kw):
4181 if element.joins_implicitly:
4182 kw["from_linter"] = None
4183 if element._is_lateral:
4184 return self.visit_lateral(element, **kw)
4185 else:
4186 return self.visit_alias(element, **kw)
4187
4188 def visit_table_valued_column(self, element, **kw):
4189 return self.visit_column(element, **kw)
4190
4191 def visit_alias(
4192 self,
4193 alias,
4194 asfrom=False,
4195 ashint=False,
4196 iscrud=False,
4197 fromhints=None,
4198 subquery=False,
4199 lateral=False,
4200 enclosing_alias=None,
4201 from_linter=None,
4202 **kwargs,
4203 ):
4204 if lateral:
4205 if "enclosing_lateral" not in kwargs:
4206 # if lateral is set and enclosing_lateral is not
4207 # present, we assume we are being called directly
4208 # from visit_lateral() and we need to set enclosing_lateral.
4209 assert alias._is_lateral
4210 kwargs["enclosing_lateral"] = alias
4211
4212 # for lateral objects, we track a second from_linter that is...
4213 # lateral! to the level above us.
4214 if (
4215 from_linter
4216 and "lateral_from_linter" not in kwargs
4217 and "enclosing_lateral" in kwargs
4218 ):
4219 kwargs["lateral_from_linter"] = from_linter
4220
4221 if enclosing_alias is not None and enclosing_alias.element is alias:
4222 inner = alias.element._compiler_dispatch(
4223 self,
4224 asfrom=asfrom,
4225 ashint=ashint,
4226 iscrud=iscrud,
4227 fromhints=fromhints,
4228 lateral=lateral,
4229 enclosing_alias=alias,
4230 **kwargs,
4231 )
4232 if subquery and (asfrom or lateral):
4233 inner = "(%s)" % (inner,)
4234 return inner
4235 else:
4236 enclosing_alias = kwargs["enclosing_alias"] = alias
4237
4238 if asfrom or ashint:
4239 if isinstance(alias.name, elements._truncated_label):
4240 alias_name = self._truncated_identifier("alias", alias.name)
4241 else:
4242 alias_name = alias.name
4243
4244 if ashint:
4245 return self.preparer.format_alias(alias, alias_name)
4246 elif asfrom:
4247 if from_linter:
4248 from_linter.froms[alias._de_clone()] = alias_name
4249
4250 inner = alias.element._compiler_dispatch(
4251 self, asfrom=True, lateral=lateral, **kwargs
4252 )
4253 if subquery:
4254 inner = "(%s)" % (inner,)
4255
4256 ret = inner + self.get_render_as_alias_suffix(
4257 self.preparer.format_alias(alias, alias_name)
4258 )
4259
4260 if alias._supports_derived_columns and alias._render_derived:
4261 ret += "(%s)" % (
4262 ", ".join(
4263 "%s%s"
4264 % (
4265 self.preparer.quote(col.name),
4266 (
4267 " %s"
4268 % self.dialect.type_compiler_instance.process(
4269 col.type, **kwargs
4270 )
4271 if alias._render_derived_w_types
4272 else ""
4273 ),
4274 )
4275 for col in alias.c
4276 )
4277 )
4278
4279 if fromhints and alias in fromhints:
4280 ret = self.format_from_hint_text(
4281 ret, alias, fromhints[alias], iscrud
4282 )
4283
4284 return ret
4285 else:
4286 # note we cancel the "subquery" flag here as well
4287 return alias.element._compiler_dispatch(
4288 self, lateral=lateral, **kwargs
4289 )
4290
4291 def visit_subquery(self, subquery, **kw):
4292 kw["subquery"] = True
4293 return self.visit_alias(subquery, **kw)
4294
4295 def visit_lateral(self, lateral_, **kw):
4296 kw["lateral"] = True
4297 return "LATERAL %s" % self.visit_alias(lateral_, **kw)
4298
4299 def visit_tablesample(self, tablesample, asfrom=False, **kw):
4300 text = "%s TABLESAMPLE %s" % (
4301 self.visit_alias(tablesample, asfrom=True, **kw),
4302 tablesample._get_method()._compiler_dispatch(self, **kw),
4303 )
4304
4305 if tablesample.seed is not None:
4306 text += " REPEATABLE (%s)" % (
4307 tablesample.seed._compiler_dispatch(self, **kw)
4308 )
4309
4310 return text
4311
4312 def _render_values(self, element, **kw):
4313 kw.setdefault("literal_binds", element.literal_binds)
4314 tuples = ", ".join(
4315 self.process(
4316 elements.Tuple(
4317 types=element._column_types, *elem
4318 ).self_group(),
4319 **kw,
4320 )
4321 for chunk in element._data
4322 for elem in chunk
4323 )
4324 return f"VALUES {tuples}"
4325
4326 def visit_values(self, element, asfrom=False, from_linter=None, **kw):
4327 v = self._render_values(element, **kw)
4328
4329 if element._unnamed:
4330 name = None
4331 elif isinstance(element.name, elements._truncated_label):
4332 name = self._truncated_identifier("values", element.name)
4333 else:
4334 name = element.name
4335
4336 if element._is_lateral:
4337 lateral = "LATERAL "
4338 else:
4339 lateral = ""
4340
4341 if asfrom:
4342 if from_linter:
4343 from_linter.froms[element._de_clone()] = (
4344 name if name is not None else "(unnamed VALUES element)"
4345 )
4346
4347 if name:
4348 kw["include_table"] = False
4349 v = "%s(%s)%s (%s)" % (
4350 lateral,
4351 v,
4352 self.get_render_as_alias_suffix(self.preparer.quote(name)),
4353 (
4354 ", ".join(
4355 c._compiler_dispatch(self, **kw)
4356 for c in element.columns
4357 )
4358 ),
4359 )
4360 else:
4361 v = "%s(%s)" % (lateral, v)
4362 return v
4363
4364 def visit_scalar_values(self, element, **kw):
4365 return f"({self._render_values(element, **kw)})"
4366
4367 def get_render_as_alias_suffix(self, alias_name_text):
4368 return " AS " + alias_name_text
4369
4370 def _add_to_result_map(
4371 self,
4372 keyname: str,
4373 name: str,
4374 objects: Tuple[Any, ...],
4375 type_: TypeEngine[Any],
4376 ) -> None:
4377
4378 # note objects must be non-empty for cursor.py to handle the
4379 # collection properly
4380 assert objects
4381
4382 if keyname is None or keyname == "*":
4383 self._ordered_columns = False
4384 self._ad_hoc_textual = True
4385 if type_._is_tuple_type:
4386 raise exc.CompileError(
4387 "Most backends don't support SELECTing "
4388 "from a tuple() object. If this is an ORM query, "
4389 "consider using the Bundle object."
4390 )
4391 self._result_columns.append(
4392 ResultColumnsEntry(keyname, name, objects, type_)
4393 )
4394
4395 def _label_returning_column(
4396 self, stmt, column, populate_result_map, column_clause_args=None, **kw
4397 ):
4398 """Render a column with necessary labels inside of a RETURNING clause.
4399
4400 This method is provided for individual dialects in place of calling
4401 the _label_select_column method directly, so that the two use cases
4402 of RETURNING vs. SELECT can be disambiguated going forward.
4403
4404 .. versionadded:: 1.4.21
4405
4406 """
4407 return self._label_select_column(
4408 None,
4409 column,
4410 populate_result_map,
4411 False,
4412 {} if column_clause_args is None else column_clause_args,
4413 **kw,
4414 )
4415
4416 def _label_select_column(
4417 self,
4418 select,
4419 column,
4420 populate_result_map,
4421 asfrom,
4422 column_clause_args,
4423 name=None,
4424 proxy_name=None,
4425 fallback_label_name=None,
4426 within_columns_clause=True,
4427 column_is_repeated=False,
4428 need_column_expressions=False,
4429 include_table=True,
4430 ):
4431 """produce labeled columns present in a select()."""
4432 impl = column.type.dialect_impl(self.dialect)
4433
4434 if impl._has_column_expression and (
4435 need_column_expressions or populate_result_map
4436 ):
4437 col_expr = impl.column_expression(column)
4438 else:
4439 col_expr = column
4440
4441 if populate_result_map:
4442 # pass an "add_to_result_map" callable into the compilation
4443 # of embedded columns. this collects information about the
4444 # column as it will be fetched in the result and is coordinated
4445 # with cursor.description when the query is executed.
4446 add_to_result_map = self._add_to_result_map
4447
4448 # if the SELECT statement told us this column is a repeat,
4449 # wrap the callable with one that prevents the addition of the
4450 # targets
4451 if column_is_repeated:
4452 _add_to_result_map = add_to_result_map
4453
4454 def add_to_result_map(keyname, name, objects, type_):
4455 _add_to_result_map(keyname, name, (keyname,), type_)
4456
4457 # if we redefined col_expr for type expressions, wrap the
4458 # callable with one that adds the original column to the targets
4459 elif col_expr is not column:
4460 _add_to_result_map = add_to_result_map
4461
4462 def add_to_result_map(keyname, name, objects, type_):
4463 _add_to_result_map(
4464 keyname, name, (column,) + objects, type_
4465 )
4466
4467 else:
4468 add_to_result_map = None
4469
4470 # this method is used by some of the dialects for RETURNING,
4471 # which has different inputs. _label_returning_column was added
4472 # as the better target for this now however for 1.4 we will keep
4473 # _label_select_column directly compatible with this use case.
4474 # these assertions right now set up the current expected inputs
4475 assert within_columns_clause, (
4476 "_label_select_column is only relevant within "
4477 "the columns clause of a SELECT or RETURNING"
4478 )
4479 if isinstance(column, elements.Label):
4480 if col_expr is not column:
4481 result_expr = _CompileLabel(
4482 col_expr, column.name, alt_names=(column.element,)
4483 )
4484 else:
4485 result_expr = col_expr
4486
4487 elif name:
4488 # here, _columns_plus_names has determined there's an explicit
4489 # label name we need to use. this is the default for
4490 # tablenames_plus_columnnames as well as when columns are being
4491 # deduplicated on name
4492
4493 assert (
4494 proxy_name is not None
4495 ), "proxy_name is required if 'name' is passed"
4496
4497 result_expr = _CompileLabel(
4498 col_expr,
4499 name,
4500 alt_names=(
4501 proxy_name,
4502 # this is a hack to allow legacy result column lookups
4503 # to work as they did before; this goes away in 2.0.
4504 # TODO: this only seems to be tested indirectly
4505 # via test/orm/test_deprecations.py. should be a
4506 # resultset test for this
4507 column._tq_label,
4508 ),
4509 )
4510 else:
4511 # determine here whether this column should be rendered in
4512 # a labelled context or not, as we were given no required label
4513 # name from the caller. Here we apply heuristics based on the kind
4514 # of SQL expression involved.
4515
4516 if col_expr is not column:
4517 # type-specific expression wrapping the given column,
4518 # so we render a label
4519 render_with_label = True
4520 elif isinstance(column, elements.ColumnClause):
4521 # table-bound column, we render its name as a label if we are
4522 # inside of a subquery only
4523 render_with_label = (
4524 asfrom
4525 and not column.is_literal
4526 and column.table is not None
4527 )
4528 elif isinstance(column, elements.TextClause):
4529 render_with_label = False
4530 elif isinstance(column, elements.UnaryExpression):
4531 render_with_label = column.wraps_column_expression or asfrom
4532 elif (
4533 # general class of expressions that don't have a SQL-column
4534 # addressible name. includes scalar selects, bind parameters,
4535 # SQL functions, others
4536 not isinstance(column, elements.NamedColumn)
4537 # deeper check that indicates there's no natural "name" to
4538 # this element, which accommodates for custom SQL constructs
4539 # that might have a ".name" attribute (but aren't SQL
4540 # functions) but are not implementing this more recently added
4541 # base class. in theory the "NamedColumn" check should be
4542 # enough, however here we seek to maintain legacy behaviors
4543 # as well.
4544 and column._non_anon_label is None
4545 ):
4546 render_with_label = True
4547 else:
4548 render_with_label = False
4549
4550 if render_with_label:
4551 if not fallback_label_name:
4552 # used by the RETURNING case right now. we generate it
4553 # here as 3rd party dialects may be referring to
4554 # _label_select_column method directly instead of the
4555 # just-added _label_returning_column method
4556 assert not column_is_repeated
4557 fallback_label_name = column._anon_name_label
4558
4559 fallback_label_name = (
4560 elements._truncated_label(fallback_label_name)
4561 if not isinstance(
4562 fallback_label_name, elements._truncated_label
4563 )
4564 else fallback_label_name
4565 )
4566
4567 result_expr = _CompileLabel(
4568 col_expr, fallback_label_name, alt_names=(proxy_name,)
4569 )
4570 else:
4571 result_expr = col_expr
4572
4573 column_clause_args.update(
4574 within_columns_clause=within_columns_clause,
4575 add_to_result_map=add_to_result_map,
4576 include_table=include_table,
4577 )
4578 return result_expr._compiler_dispatch(self, **column_clause_args)
4579
4580 def format_from_hint_text(self, sqltext, table, hint, iscrud):
4581 hinttext = self.get_from_hint_text(table, hint)
4582 if hinttext:
4583 sqltext += " " + hinttext
4584 return sqltext
4585
4586 def get_select_hint_text(self, byfroms):
4587 return None
4588
4589 def get_from_hint_text(self, table, text):
4590 return None
4591
4592 def get_crud_hint_text(self, table, text):
4593 return None
4594
4595 def get_statement_hint_text(self, hint_texts):
4596 return " ".join(hint_texts)
4597
4598 _default_stack_entry: _CompilerStackEntry
4599
4600 if not typing.TYPE_CHECKING:
4601 _default_stack_entry = util.immutabledict(
4602 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())]
4603 )
4604
4605 def _display_froms_for_select(
4606 self, select_stmt, asfrom, lateral=False, **kw
4607 ):
4608 # utility method to help external dialects
4609 # get the correct from list for a select.
4610 # specifically the oracle dialect needs this feature
4611 # right now.
4612 toplevel = not self.stack
4613 entry = self._default_stack_entry if toplevel else self.stack[-1]
4614
4615 compile_state = select_stmt._compile_state_factory(select_stmt, self)
4616
4617 correlate_froms = entry["correlate_froms"]
4618 asfrom_froms = entry["asfrom_froms"]
4619
4620 if asfrom and not lateral:
4621 froms = compile_state._get_display_froms(
4622 explicit_correlate_froms=correlate_froms.difference(
4623 asfrom_froms
4624 ),
4625 implicit_correlate_froms=(),
4626 )
4627 else:
4628 froms = compile_state._get_display_froms(
4629 explicit_correlate_froms=correlate_froms,
4630 implicit_correlate_froms=asfrom_froms,
4631 )
4632 return froms
4633
4634 translate_select_structure: Any = None
4635 """if not ``None``, should be a callable which accepts ``(select_stmt,
4636 **kw)`` and returns a select object. this is used for structural changes
4637 mostly to accommodate for LIMIT/OFFSET schemes
4638
4639 """
4640
4641 def visit_select(
4642 self,
4643 select_stmt,
4644 asfrom=False,
4645 insert_into=False,
4646 fromhints=None,
4647 compound_index=None,
4648 select_wraps_for=None,
4649 lateral=False,
4650 from_linter=None,
4651 **kwargs,
4652 ):
4653 assert select_wraps_for is None, (
4654 "SQLAlchemy 1.4 requires use of "
4655 "the translate_select_structure hook for structural "
4656 "translations of SELECT objects"
4657 )
4658
4659 # initial setup of SELECT. the compile_state_factory may now
4660 # be creating a totally different SELECT from the one that was
4661 # passed in. for ORM use this will convert from an ORM-state
4662 # SELECT to a regular "Core" SELECT. other composed operations
4663 # such as computation of joins will be performed.
4664
4665 kwargs["within_columns_clause"] = False
4666
4667 compile_state = select_stmt._compile_state_factory(
4668 select_stmt, self, **kwargs
4669 )
4670 kwargs["ambiguous_table_name_map"] = (
4671 compile_state._ambiguous_table_name_map
4672 )
4673
4674 select_stmt = compile_state.statement
4675
4676 toplevel = not self.stack
4677
4678 if toplevel and not self.compile_state:
4679 self.compile_state = compile_state
4680
4681 is_embedded_select = compound_index is not None or insert_into
4682
4683 # translate step for Oracle, SQL Server which often need to
4684 # restructure the SELECT to allow for LIMIT/OFFSET and possibly
4685 # other conditions
4686 if self.translate_select_structure:
4687 new_select_stmt = self.translate_select_structure(
4688 select_stmt, asfrom=asfrom, **kwargs
4689 )
4690
4691 # if SELECT was restructured, maintain a link to the originals
4692 # and assemble a new compile state
4693 if new_select_stmt is not select_stmt:
4694 compile_state_wraps_for = compile_state
4695 select_wraps_for = select_stmt
4696 select_stmt = new_select_stmt
4697
4698 compile_state = select_stmt._compile_state_factory(
4699 select_stmt, self, **kwargs
4700 )
4701 select_stmt = compile_state.statement
4702
4703 entry = self._default_stack_entry if toplevel else self.stack[-1]
4704
4705 populate_result_map = need_column_expressions = (
4706 toplevel
4707 or entry.get("need_result_map_for_compound", False)
4708 or entry.get("need_result_map_for_nested", False)
4709 )
4710
4711 # indicates there is a CompoundSelect in play and we are not the
4712 # first select
4713 if compound_index:
4714 populate_result_map = False
4715
4716 # this was first proposed as part of #3372; however, it is not
4717 # reached in current tests and could possibly be an assertion
4718 # instead.
4719 if not populate_result_map and "add_to_result_map" in kwargs:
4720 del kwargs["add_to_result_map"]
4721
4722 froms = self._setup_select_stack(
4723 select_stmt, compile_state, entry, asfrom, lateral, compound_index
4724 )
4725
4726 column_clause_args = kwargs.copy()
4727 column_clause_args.update(
4728 {"within_label_clause": False, "within_columns_clause": False}
4729 )
4730
4731 text = "SELECT " # we're off to a good start !
4732
4733 if select_stmt._hints:
4734 hint_text, byfrom = self._setup_select_hints(select_stmt)
4735 if hint_text:
4736 text += hint_text + " "
4737 else:
4738 byfrom = None
4739
4740 if select_stmt._independent_ctes:
4741 self._dispatch_independent_ctes(select_stmt, kwargs)
4742
4743 if select_stmt._prefixes:
4744 text += self._generate_prefixes(
4745 select_stmt, select_stmt._prefixes, **kwargs
4746 )
4747
4748 text += self.get_select_precolumns(select_stmt, **kwargs)
4749 # the actual list of columns to print in the SELECT column list.
4750 inner_columns = [
4751 c
4752 for c in [
4753 self._label_select_column(
4754 select_stmt,
4755 column,
4756 populate_result_map,
4757 asfrom,
4758 column_clause_args,
4759 name=name,
4760 proxy_name=proxy_name,
4761 fallback_label_name=fallback_label_name,
4762 column_is_repeated=repeated,
4763 need_column_expressions=need_column_expressions,
4764 )
4765 for (
4766 name,
4767 proxy_name,
4768 fallback_label_name,
4769 column,
4770 repeated,
4771 ) in compile_state.columns_plus_names
4772 ]
4773 if c is not None
4774 ]
4775
4776 if populate_result_map and select_wraps_for is not None:
4777 # if this select was generated from translate_select,
4778 # rewrite the targeted columns in the result map
4779
4780 translate = dict(
4781 zip(
4782 [
4783 name
4784 for (
4785 key,
4786 proxy_name,
4787 fallback_label_name,
4788 name,
4789 repeated,
4790 ) in compile_state.columns_plus_names
4791 ],
4792 [
4793 name
4794 for (
4795 key,
4796 proxy_name,
4797 fallback_label_name,
4798 name,
4799 repeated,
4800 ) in compile_state_wraps_for.columns_plus_names
4801 ],
4802 )
4803 )
4804
4805 self._result_columns = [
4806 ResultColumnsEntry(
4807 key, name, tuple(translate.get(o, o) for o in obj), type_
4808 )
4809 for key, name, obj, type_ in self._result_columns
4810 ]
4811
4812 text = self._compose_select_body(
4813 text,
4814 select_stmt,
4815 compile_state,
4816 inner_columns,
4817 froms,
4818 byfrom,
4819 toplevel,
4820 kwargs,
4821 )
4822
4823 if select_stmt._statement_hints:
4824 per_dialect = [
4825 ht
4826 for (dialect_name, ht) in select_stmt._statement_hints
4827 if dialect_name in ("*", self.dialect.name)
4828 ]
4829 if per_dialect:
4830 text += " " + self.get_statement_hint_text(per_dialect)
4831
4832 # In compound query, CTEs are shared at the compound level
4833 if self.ctes and (not is_embedded_select or toplevel):
4834 nesting_level = len(self.stack) if not toplevel else None
4835 text = self._render_cte_clause(nesting_level=nesting_level) + text
4836
4837 if select_stmt._suffixes:
4838 text += " " + self._generate_prefixes(
4839 select_stmt, select_stmt._suffixes, **kwargs
4840 )
4841
4842 self.stack.pop(-1)
4843
4844 return text
4845
4846 def _setup_select_hints(
4847 self, select: Select[Unpack[TupleAny]]
4848 ) -> Tuple[str, _FromHintsType]:
4849 byfrom = {
4850 from_: hinttext
4851 % {"name": from_._compiler_dispatch(self, ashint=True)}
4852 for (from_, dialect), hinttext in select._hints.items()
4853 if dialect in ("*", self.dialect.name)
4854 }
4855 hint_text = self.get_select_hint_text(byfrom)
4856 return hint_text, byfrom
4857
4858 def _setup_select_stack(
4859 self, select, compile_state, entry, asfrom, lateral, compound_index
4860 ):
4861 correlate_froms = entry["correlate_froms"]
4862 asfrom_froms = entry["asfrom_froms"]
4863
4864 if compound_index == 0:
4865 entry["select_0"] = select
4866 elif compound_index:
4867 select_0 = entry["select_0"]
4868 numcols = len(select_0._all_selected_columns)
4869
4870 if len(compile_state.columns_plus_names) != numcols:
4871 raise exc.CompileError(
4872 "All selectables passed to "
4873 "CompoundSelect must have identical numbers of "
4874 "columns; select #%d has %d columns, select "
4875 "#%d has %d"
4876 % (
4877 1,
4878 numcols,
4879 compound_index + 1,
4880 len(select._all_selected_columns),
4881 )
4882 )
4883
4884 if asfrom and not lateral:
4885 froms = compile_state._get_display_froms(
4886 explicit_correlate_froms=correlate_froms.difference(
4887 asfrom_froms
4888 ),
4889 implicit_correlate_froms=(),
4890 )
4891 else:
4892 froms = compile_state._get_display_froms(
4893 explicit_correlate_froms=correlate_froms,
4894 implicit_correlate_froms=asfrom_froms,
4895 )
4896
4897 new_correlate_froms = set(_from_objects(*froms))
4898 all_correlate_froms = new_correlate_froms.union(correlate_froms)
4899
4900 new_entry: _CompilerStackEntry = {
4901 "asfrom_froms": new_correlate_froms,
4902 "correlate_froms": all_correlate_froms,
4903 "selectable": select,
4904 "compile_state": compile_state,
4905 }
4906 self.stack.append(new_entry)
4907
4908 return froms
4909
4910 def _compose_select_body(
4911 self,
4912 text,
4913 select,
4914 compile_state,
4915 inner_columns,
4916 froms,
4917 byfrom,
4918 toplevel,
4919 kwargs,
4920 ):
4921 text += ", ".join(inner_columns)
4922
4923 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
4924 from_linter = FromLinter({}, set())
4925 warn_linting = self.linting & WARN_LINTING
4926 if toplevel:
4927 self.from_linter = from_linter
4928 else:
4929 from_linter = None
4930 warn_linting = False
4931
4932 # adjust the whitespace for no inner columns, part of #9440,
4933 # so that a no-col SELECT comes out as "SELECT WHERE..." or
4934 # "SELECT FROM ...".
4935 # while it would be better to have built the SELECT starting string
4936 # without trailing whitespace first, then add whitespace only if inner
4937 # cols were present, this breaks compatibility with various custom
4938 # compilation schemes that are currently being tested.
4939 if not inner_columns:
4940 text = text.rstrip()
4941
4942 if froms:
4943 text += " \nFROM "
4944
4945 if select._hints:
4946 text += ", ".join(
4947 [
4948 f._compiler_dispatch(
4949 self,
4950 asfrom=True,
4951 fromhints=byfrom,
4952 from_linter=from_linter,
4953 **kwargs,
4954 )
4955 for f in froms
4956 ]
4957 )
4958 else:
4959 text += ", ".join(
4960 [
4961 f._compiler_dispatch(
4962 self,
4963 asfrom=True,
4964 from_linter=from_linter,
4965 **kwargs,
4966 )
4967 for f in froms
4968 ]
4969 )
4970 else:
4971 text += self.default_from()
4972
4973 if select._where_criteria:
4974 t = self._generate_delimited_and_list(
4975 select._where_criteria, from_linter=from_linter, **kwargs
4976 )
4977 if t:
4978 text += " \nWHERE " + t
4979
4980 if warn_linting:
4981 assert from_linter is not None
4982 from_linter.warn()
4983
4984 if select._group_by_clauses:
4985 text += self.group_by_clause(select, **kwargs)
4986
4987 if select._having_criteria:
4988 t = self._generate_delimited_and_list(
4989 select._having_criteria, **kwargs
4990 )
4991 if t:
4992 text += " \nHAVING " + t
4993
4994 if select._order_by_clauses:
4995 text += self.order_by_clause(select, **kwargs)
4996
4997 if select._has_row_limiting_clause:
4998 text += self._row_limit_clause(select, **kwargs)
4999
5000 if select._for_update_arg is not None:
5001 text += self.for_update_clause(select, **kwargs)
5002
5003 return text
5004
5005 def _generate_prefixes(self, stmt, prefixes, **kw):
5006 clause = " ".join(
5007 prefix._compiler_dispatch(self, **kw)
5008 for prefix, dialect_name in prefixes
5009 if dialect_name in (None, "*") or dialect_name == self.dialect.name
5010 )
5011 if clause:
5012 clause += " "
5013 return clause
5014
5015 def _render_cte_clause(
5016 self,
5017 nesting_level=None,
5018 include_following_stack=False,
5019 ):
5020 """
5021 include_following_stack
5022 Also render the nesting CTEs on the next stack. Useful for
5023 SQL structures like UNION or INSERT that can wrap SELECT
5024 statements containing nesting CTEs.
5025 """
5026 if not self.ctes:
5027 return ""
5028
5029 ctes: MutableMapping[CTE, str]
5030
5031 if nesting_level and nesting_level > 1:
5032 ctes = util.OrderedDict()
5033 for cte in list(self.ctes.keys()):
5034 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5035 cte._get_reference_cte()
5036 ]
5037 nesting = cte.nesting or cte_opts.nesting
5038 is_rendered_level = cte_level == nesting_level or (
5039 include_following_stack and cte_level == nesting_level + 1
5040 )
5041 if not (nesting and is_rendered_level):
5042 continue
5043
5044 ctes[cte] = self.ctes[cte]
5045
5046 else:
5047 ctes = self.ctes
5048
5049 if not ctes:
5050 return ""
5051 ctes_recursive = any([cte.recursive for cte in ctes])
5052
5053 cte_text = self.get_cte_preamble(ctes_recursive) + " "
5054 cte_text += ", \n".join([txt for txt in ctes.values()])
5055 cte_text += "\n "
5056
5057 if nesting_level and nesting_level > 1:
5058 for cte in list(ctes.keys()):
5059 cte_level, cte_name, cte_opts = self.level_name_by_cte[
5060 cte._get_reference_cte()
5061 ]
5062 del self.ctes[cte]
5063 del self.ctes_by_level_name[(cte_level, cte_name)]
5064 del self.level_name_by_cte[cte._get_reference_cte()]
5065
5066 return cte_text
5067
5068 def get_cte_preamble(self, recursive):
5069 if recursive:
5070 return "WITH RECURSIVE"
5071 else:
5072 return "WITH"
5073
5074 def get_select_precolumns(self, select, **kw):
5075 """Called when building a ``SELECT`` statement, position is just
5076 before column list.
5077
5078 """
5079 if select._distinct_on:
5080 util.warn_deprecated(
5081 "DISTINCT ON is currently supported only by the PostgreSQL "
5082 "dialect. Use of DISTINCT ON for other backends is currently "
5083 "silently ignored, however this usage is deprecated, and will "
5084 "raise CompileError in a future release for all backends "
5085 "that do not support this syntax.",
5086 version="1.4",
5087 )
5088 return "DISTINCT " if select._distinct else ""
5089
5090 def group_by_clause(self, select, **kw):
5091 """allow dialects to customize how GROUP BY is rendered."""
5092
5093 group_by = self._generate_delimited_list(
5094 select._group_by_clauses, OPERATORS[operators.comma_op], **kw
5095 )
5096 if group_by:
5097 return " GROUP BY " + group_by
5098 else:
5099 return ""
5100
5101 def order_by_clause(self, select, **kw):
5102 """allow dialects to customize how ORDER BY is rendered."""
5103
5104 order_by = self._generate_delimited_list(
5105 select._order_by_clauses, OPERATORS[operators.comma_op], **kw
5106 )
5107
5108 if order_by:
5109 return " ORDER BY " + order_by
5110 else:
5111 return ""
5112
5113 def for_update_clause(self, select, **kw):
5114 return " FOR UPDATE"
5115
5116 def returning_clause(
5117 self,
5118 stmt: UpdateBase,
5119 returning_cols: Sequence[ColumnElement[Any]],
5120 *,
5121 populate_result_map: bool,
5122 **kw: Any,
5123 ) -> str:
5124 columns = [
5125 self._label_returning_column(
5126 stmt,
5127 column,
5128 populate_result_map,
5129 fallback_label_name=fallback_label_name,
5130 column_is_repeated=repeated,
5131 name=name,
5132 proxy_name=proxy_name,
5133 **kw,
5134 )
5135 for (
5136 name,
5137 proxy_name,
5138 fallback_label_name,
5139 column,
5140 repeated,
5141 ) in stmt._generate_columns_plus_names(
5142 True, cols=base._select_iterables(returning_cols)
5143 )
5144 ]
5145
5146 return "RETURNING " + ", ".join(columns)
5147
5148 def limit_clause(self, select, **kw):
5149 text = ""
5150 if select._limit_clause is not None:
5151 text += "\n LIMIT " + self.process(select._limit_clause, **kw)
5152 if select._offset_clause is not None:
5153 if select._limit_clause is None:
5154 text += "\n LIMIT -1"
5155 text += " OFFSET " + self.process(select._offset_clause, **kw)
5156 return text
5157
5158 def fetch_clause(
5159 self,
5160 select,
5161 fetch_clause=None,
5162 require_offset=False,
5163 use_literal_execute_for_simple_int=False,
5164 **kw,
5165 ):
5166 if fetch_clause is None:
5167 fetch_clause = select._fetch_clause
5168 fetch_clause_options = select._fetch_clause_options
5169 else:
5170 fetch_clause_options = {"percent": False, "with_ties": False}
5171
5172 text = ""
5173
5174 if select._offset_clause is not None:
5175 offset_clause = select._offset_clause
5176 if (
5177 use_literal_execute_for_simple_int
5178 and select._simple_int_clause(offset_clause)
5179 ):
5180 offset_clause = offset_clause.render_literal_execute()
5181 offset_str = self.process(offset_clause, **kw)
5182 text += "\n OFFSET %s ROWS" % offset_str
5183 elif require_offset:
5184 text += "\n OFFSET 0 ROWS"
5185
5186 if fetch_clause is not None:
5187 if (
5188 use_literal_execute_for_simple_int
5189 and select._simple_int_clause(fetch_clause)
5190 ):
5191 fetch_clause = fetch_clause.render_literal_execute()
5192 text += "\n FETCH FIRST %s%s ROWS %s" % (
5193 self.process(fetch_clause, **kw),
5194 " PERCENT" if fetch_clause_options["percent"] else "",
5195 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY",
5196 )
5197 return text
5198
5199 def visit_table(
5200 self,
5201 table,
5202 asfrom=False,
5203 iscrud=False,
5204 ashint=False,
5205 fromhints=None,
5206 use_schema=True,
5207 from_linter=None,
5208 ambiguous_table_name_map=None,
5209 **kwargs,
5210 ):
5211 if from_linter:
5212 from_linter.froms[table] = table.fullname
5213
5214 if asfrom or ashint:
5215 effective_schema = self.preparer.schema_for_object(table)
5216
5217 if use_schema and effective_schema:
5218 ret = (
5219 self.preparer.quote_schema(effective_schema)
5220 + "."
5221 + self.preparer.quote(table.name)
5222 )
5223 else:
5224 ret = self.preparer.quote(table.name)
5225
5226 if (
5227 not effective_schema
5228 and ambiguous_table_name_map
5229 and table.name in ambiguous_table_name_map
5230 ):
5231 anon_name = self._truncated_identifier(
5232 "alias", ambiguous_table_name_map[table.name]
5233 )
5234
5235 ret = ret + self.get_render_as_alias_suffix(
5236 self.preparer.format_alias(None, anon_name)
5237 )
5238
5239 if fromhints and table in fromhints:
5240 ret = self.format_from_hint_text(
5241 ret, table, fromhints[table], iscrud
5242 )
5243 return ret
5244 else:
5245 return ""
5246
5247 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs):
5248 if from_linter:
5249 from_linter.edges.update(
5250 itertools.product(
5251 _de_clone(join.left._from_objects),
5252 _de_clone(join.right._from_objects),
5253 )
5254 )
5255
5256 if join.full:
5257 join_type = " FULL OUTER JOIN "
5258 elif join.isouter:
5259 join_type = " LEFT OUTER JOIN "
5260 else:
5261 join_type = " JOIN "
5262 return (
5263 join.left._compiler_dispatch(
5264 self, asfrom=True, from_linter=from_linter, **kwargs
5265 )
5266 + join_type
5267 + join.right._compiler_dispatch(
5268 self, asfrom=True, from_linter=from_linter, **kwargs
5269 )
5270 + " ON "
5271 # TODO: likely need asfrom=True here?
5272 + join.onclause._compiler_dispatch(
5273 self, from_linter=from_linter, **kwargs
5274 )
5275 )
5276
5277 def _setup_crud_hints(self, stmt, table_text):
5278 dialect_hints = {
5279 table: hint_text
5280 for (table, dialect), hint_text in stmt._hints.items()
5281 if dialect in ("*", self.dialect.name)
5282 }
5283 if stmt.table in dialect_hints:
5284 table_text = self.format_from_hint_text(
5285 table_text, stmt.table, dialect_hints[stmt.table], True
5286 )
5287 return dialect_hints, table_text
5288
5289 # within the realm of "insertmanyvalues sentinel columns",
5290 # these lookups match different kinds of Column() configurations
5291 # to specific backend capabilities. they are broken into two
5292 # lookups, one for autoincrement columns and the other for non
5293 # autoincrement columns
5294 _sentinel_col_non_autoinc_lookup = util.immutabledict(
5295 {
5296 _SentinelDefaultCharacterization.CLIENTSIDE: (
5297 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5298 ),
5299 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: (
5300 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5301 ),
5302 _SentinelDefaultCharacterization.NONE: (
5303 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT
5304 ),
5305 _SentinelDefaultCharacterization.IDENTITY: (
5306 InsertmanyvaluesSentinelOpts.IDENTITY
5307 ),
5308 _SentinelDefaultCharacterization.SEQUENCE: (
5309 InsertmanyvaluesSentinelOpts.SEQUENCE
5310 ),
5311 }
5312 )
5313 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union(
5314 {
5315 _SentinelDefaultCharacterization.NONE: (
5316 InsertmanyvaluesSentinelOpts.AUTOINCREMENT
5317 ),
5318 }
5319 )
5320
5321 def _get_sentinel_column_for_table(
5322 self, table: Table
5323 ) -> Optional[Sequence[Column[Any]]]:
5324 """given a :class:`.Table`, return a usable sentinel column or
5325 columns for this dialect if any.
5326
5327 Return None if no sentinel columns could be identified, or raise an
5328 error if a column was marked as a sentinel explicitly but isn't
5329 compatible with this dialect.
5330
5331 """
5332
5333 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel
5334 sentinel_characteristics = table._sentinel_column_characteristics
5335
5336 sent_cols = sentinel_characteristics.columns
5337
5338 if sent_cols is None:
5339 return None
5340
5341 if sentinel_characteristics.is_autoinc:
5342 bitmask = self._sentinel_col_autoinc_lookup.get(
5343 sentinel_characteristics.default_characterization, 0
5344 )
5345 else:
5346 bitmask = self._sentinel_col_non_autoinc_lookup.get(
5347 sentinel_characteristics.default_characterization, 0
5348 )
5349
5350 if sentinel_opts & bitmask:
5351 return sent_cols
5352
5353 if sentinel_characteristics.is_explicit:
5354 # a column was explicitly marked as insert_sentinel=True,
5355 # however it is not compatible with this dialect. they should
5356 # not indicate this column as a sentinel if they need to include
5357 # this dialect.
5358
5359 # TODO: do we want non-primary key explicit sentinel cols
5360 # that can gracefully degrade for some backends?
5361 # insert_sentinel="degrade" perhaps. not for the initial release.
5362 # I am hoping people are generally not dealing with this sentinel
5363 # business at all.
5364
5365 # if is_explicit is True, there will be only one sentinel column.
5366
5367 raise exc.InvalidRequestError(
5368 f"Column {sent_cols[0]} can't be explicitly "
5369 "marked as a sentinel column when using the "
5370 f"{self.dialect.name} dialect, as the "
5371 "particular type of default generation on this column is "
5372 "not currently compatible with this dialect's specific "
5373 f"INSERT..RETURNING syntax which can receive the "
5374 "server-generated value in "
5375 "a deterministic way. To remove this error, remove "
5376 "insert_sentinel=True from primary key autoincrement "
5377 "columns; these columns are automatically used as "
5378 "sentinels for supported dialects in any case."
5379 )
5380
5381 return None
5382
5383 def _deliver_insertmanyvalues_batches(
5384 self,
5385 statement: str,
5386 parameters: _DBAPIMultiExecuteParams,
5387 compiled_parameters: List[_MutableCoreSingleExecuteParams],
5388 generic_setinputsizes: Optional[_GenericSetInputSizesType],
5389 batch_size: int,
5390 sort_by_parameter_order: bool,
5391 schema_translate_map: Optional[SchemaTranslateMapType],
5392 ) -> Iterator[_InsertManyValuesBatch]:
5393 imv = self._insertmanyvalues
5394 assert imv is not None
5395
5396 if not imv.sentinel_param_keys:
5397 _sentinel_from_params = None
5398 else:
5399 _sentinel_from_params = operator.itemgetter(
5400 *imv.sentinel_param_keys
5401 )
5402
5403 lenparams = len(parameters)
5404 if imv.is_default_expr and not self.dialect.supports_default_metavalue:
5405 # backend doesn't support
5406 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
5407 # at the moment this is basically SQL Server due to
5408 # not being able to use DEFAULT for identity column
5409 # just yield out that many single statements! still
5410 # faster than a whole connection.execute() call ;)
5411 #
5412 # note we still are taking advantage of the fact that we know
5413 # we are using RETURNING. The generalized approach of fetching
5414 # cursor.lastrowid etc. still goes through the more heavyweight
5415 # "ExecutionContext per statement" system as it isn't usable
5416 # as a generic "RETURNING" approach
5417 use_row_at_a_time = True
5418 downgraded = False
5419 elif not self.dialect.supports_multivalues_insert or (
5420 sort_by_parameter_order
5421 and self._result_columns
5422 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
5423 ):
5424 # deterministic order was requested and the compiler could
5425 # not organize sentinel columns for this dialect/statement.
5426 # use row at a time
5427 use_row_at_a_time = True
5428 downgraded = True
5429 else:
5430 use_row_at_a_time = False
5431 downgraded = False
5432
5433 if use_row_at_a_time:
5434 for batchnum, (param, compiled_param) in enumerate(
5435 cast(
5436 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
5437 zip(parameters, compiled_parameters),
5438 ),
5439 1,
5440 ):
5441 yield _InsertManyValuesBatch(
5442 statement,
5443 param,
5444 generic_setinputsizes,
5445 [param],
5446 (
5447 [_sentinel_from_params(compiled_param)]
5448 if _sentinel_from_params
5449 else []
5450 ),
5451 1,
5452 batchnum,
5453 lenparams,
5454 sort_by_parameter_order,
5455 downgraded,
5456 )
5457 return
5458
5459 if schema_translate_map:
5460 rst = functools.partial(
5461 self.preparer._render_schema_translates,
5462 schema_translate_map=schema_translate_map,
5463 )
5464 else:
5465 rst = None
5466
5467 imv_single_values_expr = imv.single_values_expr
5468 if rst:
5469 imv_single_values_expr = rst(imv_single_values_expr)
5470
5471 executemany_values = f"({imv_single_values_expr})"
5472 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
5473
5474 # Use optional insertmanyvalues_max_parameters
5475 # to further shrink the batch size so that there are no more than
5476 # insertmanyvalues_max_parameters params.
5477 # Currently used by SQL Server, which limits statements to 2100 bound
5478 # parameters (actually 2099).
5479 max_params = self.dialect.insertmanyvalues_max_parameters
5480 if max_params:
5481 total_num_of_params = len(self.bind_names)
5482 num_params_per_batch = len(imv.insert_crud_params)
5483 num_params_outside_of_batch = (
5484 total_num_of_params - num_params_per_batch
5485 )
5486 batch_size = min(
5487 batch_size,
5488 (
5489 (max_params - num_params_outside_of_batch)
5490 // num_params_per_batch
5491 ),
5492 )
5493
5494 batches = cast("List[Sequence[Any]]", list(parameters))
5495 compiled_batches = cast(
5496 "List[Sequence[Any]]", list(compiled_parameters)
5497 )
5498
5499 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
5500 batchnum = 1
5501 total_batches = lenparams // batch_size + (
5502 1 if lenparams % batch_size else 0
5503 )
5504
5505 insert_crud_params = imv.insert_crud_params
5506 assert insert_crud_params is not None
5507
5508 if rst:
5509 insert_crud_params = [
5510 (col, key, rst(expr), st)
5511 for col, key, expr, st in insert_crud_params
5512 ]
5513
5514 escaped_bind_names: Mapping[str, str]
5515 expand_pos_lower_index = expand_pos_upper_index = 0
5516
5517 if not self.positional:
5518 if self.escaped_bind_names:
5519 escaped_bind_names = self.escaped_bind_names
5520 else:
5521 escaped_bind_names = {}
5522
5523 all_keys = set(parameters[0])
5524
5525 def apply_placeholders(keys, formatted):
5526 for key in keys:
5527 key = escaped_bind_names.get(key, key)
5528 formatted = formatted.replace(
5529 self.bindtemplate % {"name": key},
5530 self.bindtemplate
5531 % {"name": f"{key}__EXECMANY_INDEX__"},
5532 )
5533 return formatted
5534
5535 if imv.embed_values_counter:
5536 imv_values_counter = ", _IMV_VALUES_COUNTER"
5537 else:
5538 imv_values_counter = ""
5539 formatted_values_clause = f"""({', '.join(
5540 apply_placeholders(bind_keys, formatted)
5541 for _, _, formatted, bind_keys in insert_crud_params
5542 )}{imv_values_counter})"""
5543
5544 keys_to_replace = all_keys.intersection(
5545 escaped_bind_names.get(key, key)
5546 for _, _, _, bind_keys in insert_crud_params
5547 for key in bind_keys
5548 )
5549 base_parameters = {
5550 key: parameters[0][key]
5551 for key in all_keys.difference(keys_to_replace)
5552 }
5553 executemany_values_w_comma = ""
5554 else:
5555 formatted_values_clause = ""
5556 keys_to_replace = set()
5557 base_parameters = {}
5558
5559 if imv.embed_values_counter:
5560 executemany_values_w_comma = (
5561 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
5562 )
5563 else:
5564 executemany_values_w_comma = f"({imv_single_values_expr}), "
5565
5566 all_names_we_will_expand: Set[str] = set()
5567 for elem in imv.insert_crud_params:
5568 all_names_we_will_expand.update(elem[3])
5569
5570 # get the start and end position in a particular list
5571 # of parameters where we will be doing the "expanding".
5572 # statements can have params on either side or both sides,
5573 # given RETURNING and CTEs
5574 if all_names_we_will_expand:
5575 positiontup = self.positiontup
5576 assert positiontup is not None
5577
5578 all_expand_positions = {
5579 idx
5580 for idx, name in enumerate(positiontup)
5581 if name in all_names_we_will_expand
5582 }
5583 expand_pos_lower_index = min(all_expand_positions)
5584 expand_pos_upper_index = max(all_expand_positions) + 1
5585 assert (
5586 len(all_expand_positions)
5587 == expand_pos_upper_index - expand_pos_lower_index
5588 )
5589
5590 if self._numeric_binds:
5591 escaped = re.escape(self._numeric_binds_identifier_char)
5592 executemany_values_w_comma = re.sub(
5593 rf"{escaped}\d+", "%s", executemany_values_w_comma
5594 )
5595
5596 while batches:
5597 batch = batches[0:batch_size]
5598 compiled_batch = compiled_batches[0:batch_size]
5599
5600 batches[0:batch_size] = []
5601 compiled_batches[0:batch_size] = []
5602
5603 if batches:
5604 current_batch_size = batch_size
5605 else:
5606 current_batch_size = len(batch)
5607
5608 if generic_setinputsizes:
5609 # if setinputsizes is present, expand this collection to
5610 # suit the batch length as well
5611 # currently this will be mssql+pyodbc for internal dialects
5612 processed_setinputsizes = [
5613 (new_key, len_, typ)
5614 for new_key, len_, typ in (
5615 (f"{key}_{index}", len_, typ)
5616 for index in range(current_batch_size)
5617 for key, len_, typ in generic_setinputsizes
5618 )
5619 ]
5620
5621 replaced_parameters: Any
5622 if self.positional:
5623 num_ins_params = imv.num_positional_params_counted
5624
5625 batch_iterator: Iterable[Sequence[Any]]
5626 extra_params_left: Sequence[Any]
5627 extra_params_right: Sequence[Any]
5628
5629 if num_ins_params == len(batch[0]):
5630 extra_params_left = extra_params_right = ()
5631 batch_iterator = batch
5632 else:
5633 extra_params_left = batch[0][:expand_pos_lower_index]
5634 extra_params_right = batch[0][expand_pos_upper_index:]
5635 batch_iterator = (
5636 b[expand_pos_lower_index:expand_pos_upper_index]
5637 for b in batch
5638 )
5639
5640 if imv.embed_values_counter:
5641 expanded_values_string = (
5642 "".join(
5643 executemany_values_w_comma.replace(
5644 "_IMV_VALUES_COUNTER", str(i)
5645 )
5646 for i, _ in enumerate(batch)
5647 )
5648 )[:-2]
5649 else:
5650 expanded_values_string = (
5651 (executemany_values_w_comma * current_batch_size)
5652 )[:-2]
5653
5654 if self._numeric_binds and num_ins_params > 0:
5655 # numeric will always number the parameters inside of
5656 # VALUES (and thus order self.positiontup) to be higher
5657 # than non-VALUES parameters, no matter where in the
5658 # statement those non-VALUES parameters appear (this is
5659 # ensured in _process_numeric by numbering first all
5660 # params that are not in _values_bindparam)
5661 # therefore all extra params are always
5662 # on the left side and numbered lower than the VALUES
5663 # parameters
5664 assert not extra_params_right
5665
5666 start = expand_pos_lower_index + 1
5667 end = num_ins_params * (current_batch_size) + start
5668
5669 # need to format here, since statement may contain
5670 # unescaped %, while values_string contains just (%s, %s)
5671 positions = tuple(
5672 f"{self._numeric_binds_identifier_char}{i}"
5673 for i in range(start, end)
5674 )
5675 expanded_values_string = expanded_values_string % positions
5676
5677 replaced_statement = statement.replace(
5678 "__EXECMANY_TOKEN__", expanded_values_string
5679 )
5680
5681 replaced_parameters = tuple(
5682 itertools.chain.from_iterable(batch_iterator)
5683 )
5684
5685 replaced_parameters = (
5686 extra_params_left
5687 + replaced_parameters
5688 + extra_params_right
5689 )
5690
5691 else:
5692 replaced_values_clauses = []
5693 replaced_parameters = base_parameters.copy()
5694
5695 for i, param in enumerate(batch):
5696 fmv = formatted_values_clause.replace(
5697 "EXECMANY_INDEX__", str(i)
5698 )
5699 if imv.embed_values_counter:
5700 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i))
5701
5702 replaced_values_clauses.append(fmv)
5703 replaced_parameters.update(
5704 {f"{key}__{i}": param[key] for key in keys_to_replace}
5705 )
5706
5707 replaced_statement = statement.replace(
5708 "__EXECMANY_TOKEN__",
5709 ", ".join(replaced_values_clauses),
5710 )
5711
5712 yield _InsertManyValuesBatch(
5713 replaced_statement,
5714 replaced_parameters,
5715 processed_setinputsizes,
5716 batch,
5717 (
5718 [_sentinel_from_params(cb) for cb in compiled_batch]
5719 if _sentinel_from_params
5720 else []
5721 ),
5722 current_batch_size,
5723 batchnum,
5724 total_batches,
5725 sort_by_parameter_order,
5726 False,
5727 )
5728 batchnum += 1
5729
5730 def visit_insert(
5731 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw
5732 ):
5733 compile_state = insert_stmt._compile_state_factory(
5734 insert_stmt, self, **kw
5735 )
5736 insert_stmt = compile_state.statement
5737
5738 if visiting_cte is not None:
5739 kw["visiting_cte"] = visiting_cte
5740 toplevel = False
5741 else:
5742 toplevel = not self.stack
5743
5744 if toplevel:
5745 self.isinsert = True
5746 if not self.dml_compile_state:
5747 self.dml_compile_state = compile_state
5748 if not self.compile_state:
5749 self.compile_state = compile_state
5750
5751 self.stack.append(
5752 {
5753 "correlate_froms": set(),
5754 "asfrom_froms": set(),
5755 "selectable": insert_stmt,
5756 }
5757 )
5758
5759 counted_bindparam = 0
5760
5761 # reset any incoming "visited_bindparam" collection
5762 visited_bindparam = None
5763
5764 # for positional, insertmanyvalues needs to know how many
5765 # bound parameters are in the VALUES sequence; there's no simple
5766 # rule because default expressions etc. can have zero or more
5767 # params inside them. After multiple attempts to figure this out,
5768 # this very simplistic "count after" works and is
5769 # likely the least amount of callcounts, though looks clumsy
5770 if self.positional and visiting_cte is None:
5771 # if we are inside a CTE, don't count parameters
5772 # here since they wont be for insertmanyvalues. keep
5773 # visited_bindparam at None so no counting happens.
5774 # see #9173
5775 visited_bindparam = []
5776
5777 crud_params_struct = crud._get_crud_params(
5778 self,
5779 insert_stmt,
5780 compile_state,
5781 toplevel,
5782 visited_bindparam=visited_bindparam,
5783 **kw,
5784 )
5785
5786 if self.positional and visited_bindparam is not None:
5787 counted_bindparam = len(visited_bindparam)
5788 if self._numeric_binds:
5789 if self._values_bindparam is not None:
5790 self._values_bindparam += visited_bindparam
5791 else:
5792 self._values_bindparam = visited_bindparam
5793
5794 crud_params_single = crud_params_struct.single_params
5795
5796 if (
5797 not crud_params_single
5798 and not self.dialect.supports_default_values
5799 and not self.dialect.supports_default_metavalue
5800 and not self.dialect.supports_empty_insert
5801 ):
5802 raise exc.CompileError(
5803 "The '%s' dialect with current database "
5804 "version settings does not support empty "
5805 "inserts." % self.dialect.name
5806 )
5807
5808 if compile_state._has_multi_parameters:
5809 if not self.dialect.supports_multivalues_insert:
5810 raise exc.CompileError(
5811 "The '%s' dialect with current database "
5812 "version settings does not support "
5813 "in-place multirow inserts." % self.dialect.name
5814 )
5815 elif (
5816 self.implicit_returning or insert_stmt._returning
5817 ) and insert_stmt._sort_by_parameter_order:
5818 raise exc.CompileError(
5819 "RETURNING cannot be determinstically sorted when "
5820 "using an INSERT which includes multi-row values()."
5821 )
5822 crud_params_single = crud_params_struct.single_params
5823 else:
5824 crud_params_single = crud_params_struct.single_params
5825
5826 preparer = self.preparer
5827 supports_default_values = self.dialect.supports_default_values
5828
5829 text = "INSERT "
5830
5831 if insert_stmt._prefixes:
5832 text += self._generate_prefixes(
5833 insert_stmt, insert_stmt._prefixes, **kw
5834 )
5835
5836 text += "INTO "
5837 table_text = preparer.format_table(insert_stmt.table)
5838
5839 if insert_stmt._hints:
5840 _, table_text = self._setup_crud_hints(insert_stmt, table_text)
5841
5842 if insert_stmt._independent_ctes:
5843 self._dispatch_independent_ctes(insert_stmt, kw)
5844
5845 text += table_text
5846
5847 if crud_params_single or not supports_default_values:
5848 text += " (%s)" % ", ".join(
5849 [expr for _, expr, _, _ in crud_params_single]
5850 )
5851
5852 # look for insertmanyvalues attributes that would have been configured
5853 # by crud.py as it scanned through the columns to be part of the
5854 # INSERT
5855 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues
5856 named_sentinel_params: Optional[Sequence[str]] = None
5857 add_sentinel_cols = None
5858 implicit_sentinel = False
5859
5860 returning_cols = self.implicit_returning or insert_stmt._returning
5861 if returning_cols:
5862 add_sentinel_cols = crud_params_struct.use_sentinel_columns
5863 if add_sentinel_cols is not None:
5864 assert use_insertmanyvalues
5865
5866 # search for the sentinel column explicitly present
5867 # in the INSERT columns list, and additionally check that
5868 # this column has a bound parameter name set up that's in the
5869 # parameter list. If both of these cases are present, it means
5870 # we will have a client side value for the sentinel in each
5871 # parameter set.
5872
5873 _params_by_col = {
5874 col: param_names
5875 for col, _, _, param_names in crud_params_single
5876 }
5877 named_sentinel_params = []
5878 for _add_sentinel_col in add_sentinel_cols:
5879 if _add_sentinel_col not in _params_by_col:
5880 named_sentinel_params = None
5881 break
5882 param_name = self._within_exec_param_key_getter(
5883 _add_sentinel_col
5884 )
5885 if param_name not in _params_by_col[_add_sentinel_col]:
5886 named_sentinel_params = None
5887 break
5888 named_sentinel_params.append(param_name)
5889
5890 if named_sentinel_params is None:
5891 # if we are not going to have a client side value for
5892 # the sentinel in the parameter set, that means it's
5893 # an autoincrement, an IDENTITY, or a server-side SQL
5894 # expression like nextval('seqname'). So this is
5895 # an "implicit" sentinel; we will look for it in
5896 # RETURNING
5897 # only, and then sort on it. For this case on PG,
5898 # SQL Server we have to use a special INSERT form
5899 # that guarantees the server side function lines up with
5900 # the entries in the VALUES.
5901 if (
5902 self.dialect.insertmanyvalues_implicit_sentinel
5903 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
5904 ):
5905 implicit_sentinel = True
5906 else:
5907 # here, we are not using a sentinel at all
5908 # and we are likely the SQLite dialect.
5909 # The first add_sentinel_col that we have should not
5910 # be marked as "insert_sentinel=True". if it was,
5911 # an error should have been raised in
5912 # _get_sentinel_column_for_table.
5913 assert not add_sentinel_cols[0]._insert_sentinel, (
5914 "sentinel selection rules should have prevented "
5915 "us from getting here for this dialect"
5916 )
5917
5918 # always put the sentinel columns last. even if they are
5919 # in the returning list already, they will be there twice
5920 # then.
5921 returning_cols = list(returning_cols) + list(add_sentinel_cols)
5922
5923 returning_clause = self.returning_clause(
5924 insert_stmt,
5925 returning_cols,
5926 populate_result_map=toplevel,
5927 )
5928
5929 if self.returning_precedes_values:
5930 text += " " + returning_clause
5931
5932 else:
5933 returning_clause = None
5934
5935 if insert_stmt.select is not None:
5936 # placed here by crud.py
5937 select_text = self.process(
5938 self.stack[-1]["insert_from_select"], insert_into=True, **kw
5939 )
5940
5941 if self.ctes and self.dialect.cte_follows_insert:
5942 nesting_level = len(self.stack) if not toplevel else None
5943 text += " %s%s" % (
5944 self._render_cte_clause(
5945 nesting_level=nesting_level,
5946 include_following_stack=True,
5947 ),
5948 select_text,
5949 )
5950 else:
5951 text += " %s" % select_text
5952 elif not crud_params_single and supports_default_values:
5953 text += " DEFAULT VALUES"
5954 if use_insertmanyvalues:
5955 self._insertmanyvalues = _InsertManyValues(
5956 True,
5957 self.dialect.default_metavalue_token,
5958 cast(
5959 "List[crud._CrudParamElementStr]", crud_params_single
5960 ),
5961 counted_bindparam,
5962 sort_by_parameter_order=(
5963 insert_stmt._sort_by_parameter_order
5964 ),
5965 includes_upsert_behaviors=(
5966 insert_stmt._post_values_clause is not None
5967 ),
5968 sentinel_columns=add_sentinel_cols,
5969 num_sentinel_columns=(
5970 len(add_sentinel_cols) if add_sentinel_cols else 0
5971 ),
5972 implicit_sentinel=implicit_sentinel,
5973 )
5974 elif compile_state._has_multi_parameters:
5975 text += " VALUES %s" % (
5976 ", ".join(
5977 "(%s)"
5978 % (", ".join(value for _, _, value, _ in crud_param_set))
5979 for crud_param_set in crud_params_struct.all_multi_params
5980 ),
5981 )
5982 else:
5983 insert_single_values_expr = ", ".join(
5984 [
5985 value
5986 for _, _, value, _ in cast(
5987 "List[crud._CrudParamElementStr]",
5988 crud_params_single,
5989 )
5990 ]
5991 )
5992
5993 if use_insertmanyvalues:
5994 if (
5995 implicit_sentinel
5996 and (
5997 self.dialect.insertmanyvalues_implicit_sentinel
5998 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
5999 )
6000 # this is checking if we have
6001 # INSERT INTO table (id) VALUES (DEFAULT).
6002 and not (crud_params_struct.is_default_metavalue_only)
6003 ):
6004 # if we have a sentinel column that is server generated,
6005 # then for selected backends render the VALUES list as a
6006 # subquery. This is the orderable form supported by
6007 # PostgreSQL and SQL Server.
6008 embed_sentinel_value = True
6009
6010 render_bind_casts = (
6011 self.dialect.insertmanyvalues_implicit_sentinel
6012 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
6013 )
6014
6015 colnames = ", ".join(
6016 f"p{i}" for i, _ in enumerate(crud_params_single)
6017 )
6018
6019 if render_bind_casts:
6020 # render casts for the SELECT list. For PG, we are
6021 # already rendering bind casts in the parameter list,
6022 # selectively for the more "tricky" types like ARRAY.
6023 # however, even for the "easy" types, if the parameter
6024 # is NULL for every entry, PG gives up and says
6025 # "it must be TEXT", which fails for other easy types
6026 # like ints. So we cast on this side too.
6027 colnames_w_cast = ", ".join(
6028 self.render_bind_cast(
6029 col.type,
6030 col.type._unwrapped_dialect_impl(self.dialect),
6031 f"p{i}",
6032 )
6033 for i, (col, *_) in enumerate(crud_params_single)
6034 )
6035 else:
6036 colnames_w_cast = colnames
6037
6038 text += (
6039 f" SELECT {colnames_w_cast} FROM "
6040 f"(VALUES ({insert_single_values_expr})) "
6041 f"AS imp_sen({colnames}, sen_counter) "
6042 "ORDER BY sen_counter"
6043 )
6044 else:
6045 # otherwise, if no sentinel or backend doesn't support
6046 # orderable subquery form, use a plain VALUES list
6047 embed_sentinel_value = False
6048 text += f" VALUES ({insert_single_values_expr})"
6049
6050 self._insertmanyvalues = _InsertManyValues(
6051 is_default_expr=False,
6052 single_values_expr=insert_single_values_expr,
6053 insert_crud_params=cast(
6054 "List[crud._CrudParamElementStr]",
6055 crud_params_single,
6056 ),
6057 num_positional_params_counted=counted_bindparam,
6058 sort_by_parameter_order=(
6059 insert_stmt._sort_by_parameter_order
6060 ),
6061 includes_upsert_behaviors=(
6062 insert_stmt._post_values_clause is not None
6063 ),
6064 sentinel_columns=add_sentinel_cols,
6065 num_sentinel_columns=(
6066 len(add_sentinel_cols) if add_sentinel_cols else 0
6067 ),
6068 sentinel_param_keys=named_sentinel_params,
6069 implicit_sentinel=implicit_sentinel,
6070 embed_values_counter=embed_sentinel_value,
6071 )
6072
6073 else:
6074 text += f" VALUES ({insert_single_values_expr})"
6075
6076 if insert_stmt._post_values_clause is not None:
6077 post_values_clause = self.process(
6078 insert_stmt._post_values_clause, **kw
6079 )
6080 if post_values_clause:
6081 text += " " + post_values_clause
6082
6083 if returning_clause and not self.returning_precedes_values:
6084 text += " " + returning_clause
6085
6086 if self.ctes and not self.dialect.cte_follows_insert:
6087 nesting_level = len(self.stack) if not toplevel else None
6088 text = (
6089 self._render_cte_clause(
6090 nesting_level=nesting_level,
6091 include_following_stack=True,
6092 )
6093 + text
6094 )
6095
6096 self.stack.pop(-1)
6097
6098 return text
6099
6100 def update_limit_clause(self, update_stmt):
6101 """Provide a hook for MySQL to add LIMIT to the UPDATE"""
6102 return None
6103
6104 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw):
6105 """Provide a hook to override the initial table clause
6106 in an UPDATE statement.
6107
6108 MySQL overrides this.
6109
6110 """
6111 kw["asfrom"] = True
6112 return from_table._compiler_dispatch(self, iscrud=True, **kw)
6113
6114 def update_from_clause(
6115 self, update_stmt, from_table, extra_froms, from_hints, **kw
6116 ):
6117 """Provide a hook to override the generation of an
6118 UPDATE..FROM clause.
6119
6120 MySQL and MSSQL override this.
6121
6122 """
6123 raise NotImplementedError(
6124 "This backend does not support multiple-table "
6125 "criteria within UPDATE"
6126 )
6127
6128 def visit_update(self, update_stmt, visiting_cte=None, **kw):
6129 compile_state = update_stmt._compile_state_factory(
6130 update_stmt, self, **kw
6131 )
6132 update_stmt = compile_state.statement
6133
6134 if visiting_cte is not None:
6135 kw["visiting_cte"] = visiting_cte
6136 toplevel = False
6137 else:
6138 toplevel = not self.stack
6139
6140 if toplevel:
6141 self.isupdate = True
6142 if not self.dml_compile_state:
6143 self.dml_compile_state = compile_state
6144 if not self.compile_state:
6145 self.compile_state = compile_state
6146
6147 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6148 from_linter = FromLinter({}, set())
6149 warn_linting = self.linting & WARN_LINTING
6150 if toplevel:
6151 self.from_linter = from_linter
6152 else:
6153 from_linter = None
6154 warn_linting = False
6155
6156 extra_froms = compile_state._extra_froms
6157 is_multitable = bool(extra_froms)
6158
6159 if is_multitable:
6160 # main table might be a JOIN
6161 main_froms = set(_from_objects(update_stmt.table))
6162 render_extra_froms = [
6163 f for f in extra_froms if f not in main_froms
6164 ]
6165 correlate_froms = main_froms.union(extra_froms)
6166 else:
6167 render_extra_froms = []
6168 correlate_froms = {update_stmt.table}
6169
6170 self.stack.append(
6171 {
6172 "correlate_froms": correlate_froms,
6173 "asfrom_froms": correlate_froms,
6174 "selectable": update_stmt,
6175 }
6176 )
6177
6178 text = "UPDATE "
6179
6180 if update_stmt._prefixes:
6181 text += self._generate_prefixes(
6182 update_stmt, update_stmt._prefixes, **kw
6183 )
6184
6185 table_text = self.update_tables_clause(
6186 update_stmt,
6187 update_stmt.table,
6188 render_extra_froms,
6189 from_linter=from_linter,
6190 **kw,
6191 )
6192 crud_params_struct = crud._get_crud_params(
6193 self, update_stmt, compile_state, toplevel, **kw
6194 )
6195 crud_params = crud_params_struct.single_params
6196
6197 if update_stmt._hints:
6198 dialect_hints, table_text = self._setup_crud_hints(
6199 update_stmt, table_text
6200 )
6201 else:
6202 dialect_hints = None
6203
6204 if update_stmt._independent_ctes:
6205 self._dispatch_independent_ctes(update_stmt, kw)
6206
6207 text += table_text
6208
6209 text += " SET "
6210 text += ", ".join(
6211 expr + "=" + value
6212 for _, expr, value, _ in cast(
6213 "List[Tuple[Any, str, str, Any]]", crud_params
6214 )
6215 )
6216
6217 if self.implicit_returning or update_stmt._returning:
6218 if self.returning_precedes_values:
6219 text += " " + self.returning_clause(
6220 update_stmt,
6221 self.implicit_returning or update_stmt._returning,
6222 populate_result_map=toplevel,
6223 )
6224
6225 if extra_froms:
6226 extra_from_text = self.update_from_clause(
6227 update_stmt,
6228 update_stmt.table,
6229 render_extra_froms,
6230 dialect_hints,
6231 from_linter=from_linter,
6232 **kw,
6233 )
6234 if extra_from_text:
6235 text += " " + extra_from_text
6236
6237 if update_stmt._where_criteria:
6238 t = self._generate_delimited_and_list(
6239 update_stmt._where_criteria, from_linter=from_linter, **kw
6240 )
6241 if t:
6242 text += " WHERE " + t
6243
6244 limit_clause = self.update_limit_clause(update_stmt)
6245 if limit_clause:
6246 text += " " + limit_clause
6247
6248 if (
6249 self.implicit_returning or update_stmt._returning
6250 ) and not self.returning_precedes_values:
6251 text += " " + self.returning_clause(
6252 update_stmt,
6253 self.implicit_returning or update_stmt._returning,
6254 populate_result_map=toplevel,
6255 )
6256
6257 if self.ctes:
6258 nesting_level = len(self.stack) if not toplevel else None
6259 text = self._render_cte_clause(nesting_level=nesting_level) + text
6260
6261 if warn_linting:
6262 assert from_linter is not None
6263 from_linter.warn(stmt_type="UPDATE")
6264
6265 self.stack.pop(-1)
6266
6267 return text
6268
6269 def delete_extra_from_clause(
6270 self, update_stmt, from_table, extra_froms, from_hints, **kw
6271 ):
6272 """Provide a hook to override the generation of an
6273 DELETE..FROM clause.
6274
6275 This can be used to implement DELETE..USING for example.
6276
6277 MySQL and MSSQL override this.
6278
6279 """
6280 raise NotImplementedError(
6281 "This backend does not support multiple-table "
6282 "criteria within DELETE"
6283 )
6284
6285 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
6286 return from_table._compiler_dispatch(
6287 self, asfrom=True, iscrud=True, **kw
6288 )
6289
6290 def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
6291 compile_state = delete_stmt._compile_state_factory(
6292 delete_stmt, self, **kw
6293 )
6294 delete_stmt = compile_state.statement
6295
6296 if visiting_cte is not None:
6297 kw["visiting_cte"] = visiting_cte
6298 toplevel = False
6299 else:
6300 toplevel = not self.stack
6301
6302 if toplevel:
6303 self.isdelete = True
6304 if not self.dml_compile_state:
6305 self.dml_compile_state = compile_state
6306 if not self.compile_state:
6307 self.compile_state = compile_state
6308
6309 if self.linting & COLLECT_CARTESIAN_PRODUCTS:
6310 from_linter = FromLinter({}, set())
6311 warn_linting = self.linting & WARN_LINTING
6312 if toplevel:
6313 self.from_linter = from_linter
6314 else:
6315 from_linter = None
6316 warn_linting = False
6317
6318 extra_froms = compile_state._extra_froms
6319
6320 correlate_froms = {delete_stmt.table}.union(extra_froms)
6321 self.stack.append(
6322 {
6323 "correlate_froms": correlate_froms,
6324 "asfrom_froms": correlate_froms,
6325 "selectable": delete_stmt,
6326 }
6327 )
6328
6329 text = "DELETE "
6330
6331 if delete_stmt._prefixes:
6332 text += self._generate_prefixes(
6333 delete_stmt, delete_stmt._prefixes, **kw
6334 )
6335
6336 text += "FROM "
6337
6338 try:
6339 table_text = self.delete_table_clause(
6340 delete_stmt,
6341 delete_stmt.table,
6342 extra_froms,
6343 from_linter=from_linter,
6344 )
6345 except TypeError:
6346 # anticipate 3rd party dialects that don't include **kw
6347 # TODO: remove in 2.1
6348 table_text = self.delete_table_clause(
6349 delete_stmt, delete_stmt.table, extra_froms
6350 )
6351 if from_linter:
6352 _ = self.process(delete_stmt.table, from_linter=from_linter)
6353
6354 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw)
6355
6356 if delete_stmt._hints:
6357 dialect_hints, table_text = self._setup_crud_hints(
6358 delete_stmt, table_text
6359 )
6360 else:
6361 dialect_hints = None
6362
6363 if delete_stmt._independent_ctes:
6364 self._dispatch_independent_ctes(delete_stmt, kw)
6365
6366 text += table_text
6367
6368 if (
6369 self.implicit_returning or delete_stmt._returning
6370 ) and self.returning_precedes_values:
6371 text += " " + self.returning_clause(
6372 delete_stmt,
6373 self.implicit_returning or delete_stmt._returning,
6374 populate_result_map=toplevel,
6375 )
6376
6377 if extra_froms:
6378 extra_from_text = self.delete_extra_from_clause(
6379 delete_stmt,
6380 delete_stmt.table,
6381 extra_froms,
6382 dialect_hints,
6383 from_linter=from_linter,
6384 **kw,
6385 )
6386 if extra_from_text:
6387 text += " " + extra_from_text
6388
6389 if delete_stmt._where_criteria:
6390 t = self._generate_delimited_and_list(
6391 delete_stmt._where_criteria, from_linter=from_linter, **kw
6392 )
6393 if t:
6394 text += " WHERE " + t
6395
6396 if (
6397 self.implicit_returning or delete_stmt._returning
6398 ) and not self.returning_precedes_values:
6399 text += " " + self.returning_clause(
6400 delete_stmt,
6401 self.implicit_returning or delete_stmt._returning,
6402 populate_result_map=toplevel,
6403 )
6404
6405 if self.ctes:
6406 nesting_level = len(self.stack) if not toplevel else None
6407 text = self._render_cte_clause(nesting_level=nesting_level) + text
6408
6409 if warn_linting:
6410 assert from_linter is not None
6411 from_linter.warn(stmt_type="DELETE")
6412
6413 self.stack.pop(-1)
6414
6415 return text
6416
6417 def visit_savepoint(self, savepoint_stmt, **kw):
6418 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt)
6419
6420 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
6421 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint(
6422 savepoint_stmt
6423 )
6424
6425 def visit_release_savepoint(self, savepoint_stmt, **kw):
6426 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint(
6427 savepoint_stmt
6428 )
6429
6430
6431class StrSQLCompiler(SQLCompiler):
6432 """A :class:`.SQLCompiler` subclass which allows a small selection
6433 of non-standard SQL features to render into a string value.
6434
6435 The :class:`.StrSQLCompiler` is invoked whenever a Core expression
6436 element is directly stringified without calling upon the
6437 :meth:`_expression.ClauseElement.compile` method.
6438 It can render a limited set
6439 of non-standard SQL constructs to assist in basic stringification,
6440 however for more substantial custom or dialect-specific SQL constructs,
6441 it will be necessary to make use of
6442 :meth:`_expression.ClauseElement.compile`
6443 directly.
6444
6445 .. seealso::
6446
6447 :ref:`faq_sql_expression_string`
6448
6449 """
6450
6451 def _fallback_column_name(self, column):
6452 return "<name unknown>"
6453
6454 @util.preload_module("sqlalchemy.engine.url")
6455 def visit_unsupported_compilation(self, element, err, **kw):
6456 if element.stringify_dialect != "default":
6457 url = util.preloaded.engine_url
6458 dialect = url.URL.create(element.stringify_dialect).get_dialect()()
6459
6460 compiler = dialect.statement_compiler(
6461 dialect, None, _supporting_against=self
6462 )
6463 if not isinstance(compiler, StrSQLCompiler):
6464 return compiler.process(element, **kw)
6465
6466 return super().visit_unsupported_compilation(element, err)
6467
6468 def visit_getitem_binary(self, binary, operator, **kw):
6469 return "%s[%s]" % (
6470 self.process(binary.left, **kw),
6471 self.process(binary.right, **kw),
6472 )
6473
6474 def visit_json_getitem_op_binary(self, binary, operator, **kw):
6475 return self.visit_getitem_binary(binary, operator, **kw)
6476
6477 def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
6478 return self.visit_getitem_binary(binary, operator, **kw)
6479
6480 def visit_sequence(self, seq, **kw):
6481 return "<next sequence value: %s>" % self.preparer.format_sequence(seq)
6482
6483 def returning_clause(
6484 self,
6485 stmt: UpdateBase,
6486 returning_cols: Sequence[ColumnElement[Any]],
6487 *,
6488 populate_result_map: bool,
6489 **kw: Any,
6490 ) -> str:
6491 columns = [
6492 self._label_select_column(None, c, True, False, {})
6493 for c in base._select_iterables(returning_cols)
6494 ]
6495 return "RETURNING " + ", ".join(columns)
6496
6497 def update_from_clause(
6498 self, update_stmt, from_table, extra_froms, from_hints, **kw
6499 ):
6500 kw["asfrom"] = True
6501 return "FROM " + ", ".join(
6502 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6503 for t in extra_froms
6504 )
6505
6506 def delete_extra_from_clause(
6507 self, update_stmt, from_table, extra_froms, from_hints, **kw
6508 ):
6509 kw["asfrom"] = True
6510 return ", " + ", ".join(
6511 t._compiler_dispatch(self, fromhints=from_hints, **kw)
6512 for t in extra_froms
6513 )
6514
6515 def visit_empty_set_expr(self, type_, **kw):
6516 return "SELECT 1 WHERE 1!=1"
6517
6518 def get_from_hint_text(self, table, text):
6519 return "[%s]" % text
6520
6521 def visit_regexp_match_op_binary(self, binary, operator, **kw):
6522 return self._generate_generic_binary(binary, " <regexp> ", **kw)
6523
6524 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
6525 return self._generate_generic_binary(binary, " <not regexp> ", **kw)
6526
6527 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
6528 return "<regexp replace>(%s, %s)" % (
6529 binary.left._compiler_dispatch(self, **kw),
6530 binary.right._compiler_dispatch(self, **kw),
6531 )
6532
6533 def visit_try_cast(self, cast, **kwargs):
6534 return "TRY_CAST(%s AS %s)" % (
6535 cast.clause._compiler_dispatch(self, **kwargs),
6536 cast.typeclause._compiler_dispatch(self, **kwargs),
6537 )
6538
6539
6540class DDLCompiler(Compiled):
6541 is_ddl = True
6542
6543 if TYPE_CHECKING:
6544
6545 def __init__(
6546 self,
6547 dialect: Dialect,
6548 statement: ExecutableDDLElement,
6549 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
6550 render_schema_translate: bool = ...,
6551 compile_kwargs: Mapping[str, Any] = ...,
6552 ): ...
6553
6554 @util.memoized_property
6555 def sql_compiler(self):
6556 return self.dialect.statement_compiler(
6557 self.dialect, None, schema_translate_map=self.schema_translate_map
6558 )
6559
6560 @util.memoized_property
6561 def type_compiler(self):
6562 return self.dialect.type_compiler_instance
6563
6564 def construct_params(
6565 self,
6566 params: Optional[_CoreSingleExecuteParams] = None,
6567 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None,
6568 escape_names: bool = True,
6569 ) -> Optional[_MutableCoreSingleExecuteParams]:
6570 return None
6571
6572 def visit_ddl(self, ddl, **kwargs):
6573 # table events can substitute table and schema name
6574 context = ddl.context
6575 if isinstance(ddl.target, schema.Table):
6576 context = context.copy()
6577
6578 preparer = self.preparer
6579 path = preparer.format_table_seq(ddl.target)
6580 if len(path) == 1:
6581 table, sch = path[0], ""
6582 else:
6583 table, sch = path[-1], path[0]
6584
6585 context.setdefault("table", table)
6586 context.setdefault("schema", sch)
6587 context.setdefault("fullname", preparer.format_table(ddl.target))
6588
6589 return self.sql_compiler.post_process_text(ddl.statement % context)
6590
6591 def visit_create_schema(self, create, **kw):
6592 text = "CREATE SCHEMA "
6593 if create.if_not_exists:
6594 text += "IF NOT EXISTS "
6595 return text + self.preparer.format_schema(create.element)
6596
6597 def visit_drop_schema(self, drop, **kw):
6598 text = "DROP SCHEMA "
6599 if drop.if_exists:
6600 text += "IF EXISTS "
6601 text += self.preparer.format_schema(drop.element)
6602 if drop.cascade:
6603 text += " CASCADE"
6604 return text
6605
6606 def visit_create_table(self, create, **kw):
6607 table = create.element
6608 preparer = self.preparer
6609
6610 text = "\nCREATE "
6611 if table._prefixes:
6612 text += " ".join(table._prefixes) + " "
6613
6614 text += "TABLE "
6615 if create.if_not_exists:
6616 text += "IF NOT EXISTS "
6617
6618 text += preparer.format_table(table) + " "
6619
6620 create_table_suffix = self.create_table_suffix(table)
6621 if create_table_suffix:
6622 text += create_table_suffix + " "
6623
6624 text += "("
6625
6626 separator = "\n"
6627
6628 # if only one primary key, specify it along with the column
6629 first_pk = False
6630 for create_column in create.columns:
6631 column = create_column.element
6632 try:
6633 processed = self.process(
6634 create_column, first_pk=column.primary_key and not first_pk
6635 )
6636 if processed is not None:
6637 text += separator
6638 separator = ", \n"
6639 text += "\t" + processed
6640 if column.primary_key:
6641 first_pk = True
6642 except exc.CompileError as ce:
6643 raise exc.CompileError(
6644 "(in table '%s', column '%s'): %s"
6645 % (table.description, column.name, ce.args[0])
6646 ) from ce
6647
6648 const = self.create_table_constraints(
6649 table,
6650 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
6651 )
6652 if const:
6653 text += separator + "\t" + const
6654
6655 text += "\n)%s\n\n" % self.post_create_table(table)
6656 return text
6657
6658 def visit_create_column(self, create, first_pk=False, **kw):
6659 column = create.element
6660
6661 if column.system:
6662 return None
6663
6664 text = self.get_column_specification(column, first_pk=first_pk)
6665 const = " ".join(
6666 self.process(constraint) for constraint in column.constraints
6667 )
6668 if const:
6669 text += " " + const
6670
6671 return text
6672
6673 def create_table_constraints(
6674 self, table, _include_foreign_key_constraints=None, **kw
6675 ):
6676 # On some DB order is significant: visit PK first, then the
6677 # other constraints (engine.ReflectionTest.testbasic failed on FB2)
6678 constraints = []
6679 if table.primary_key:
6680 constraints.append(table.primary_key)
6681
6682 all_fkcs = table.foreign_key_constraints
6683 if _include_foreign_key_constraints is not None:
6684 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints)
6685 else:
6686 omit_fkcs = set()
6687
6688 constraints.extend(
6689 [
6690 c
6691 for c in table._sorted_constraints
6692 if c is not table.primary_key and c not in omit_fkcs
6693 ]
6694 )
6695
6696 return ", \n\t".join(
6697 p
6698 for p in (
6699 self.process(constraint)
6700 for constraint in constraints
6701 if (constraint._should_create_for_compiler(self))
6702 and (
6703 not self.dialect.supports_alter
6704 or not getattr(constraint, "use_alter", False)
6705 )
6706 )
6707 if p is not None
6708 )
6709
6710 def visit_drop_table(self, drop, **kw):
6711 text = "\nDROP TABLE "
6712 if drop.if_exists:
6713 text += "IF EXISTS "
6714 return text + self.preparer.format_table(drop.element)
6715
6716 def visit_drop_view(self, drop, **kw):
6717 return "\nDROP VIEW " + self.preparer.format_table(drop.element)
6718
6719 def _verify_index_table(self, index):
6720 if index.table is None:
6721 raise exc.CompileError(
6722 "Index '%s' is not associated with any table." % index.name
6723 )
6724
6725 def visit_create_index(
6726 self, create, include_schema=False, include_table_schema=True, **kw
6727 ):
6728 index = create.element
6729 self._verify_index_table(index)
6730 preparer = self.preparer
6731 text = "CREATE "
6732 if index.unique:
6733 text += "UNIQUE "
6734 if index.name is None:
6735 raise exc.CompileError(
6736 "CREATE INDEX requires that the index have a name"
6737 )
6738
6739 text += "INDEX "
6740 if create.if_not_exists:
6741 text += "IF NOT EXISTS "
6742
6743 text += "%s ON %s (%s)" % (
6744 self._prepared_index_name(index, include_schema=include_schema),
6745 preparer.format_table(
6746 index.table, use_schema=include_table_schema
6747 ),
6748 ", ".join(
6749 self.sql_compiler.process(
6750 expr, include_table=False, literal_binds=True
6751 )
6752 for expr in index.expressions
6753 ),
6754 )
6755 return text
6756
6757 def visit_drop_index(self, drop, **kw):
6758 index = drop.element
6759
6760 if index.name is None:
6761 raise exc.CompileError(
6762 "DROP INDEX requires that the index have a name"
6763 )
6764 text = "\nDROP INDEX "
6765 if drop.if_exists:
6766 text += "IF EXISTS "
6767
6768 return text + self._prepared_index_name(index, include_schema=True)
6769
6770 def _prepared_index_name(self, index, include_schema=False):
6771 if index.table is not None:
6772 effective_schema = self.preparer.schema_for_object(index.table)
6773 else:
6774 effective_schema = None
6775 if include_schema and effective_schema:
6776 schema_name = self.preparer.quote_schema(effective_schema)
6777 else:
6778 schema_name = None
6779
6780 index_name = self.preparer.format_index(index)
6781
6782 if schema_name:
6783 index_name = schema_name + "." + index_name
6784 return index_name
6785
6786 def visit_add_constraint(self, create, **kw):
6787 return "ALTER TABLE %s ADD %s" % (
6788 self.preparer.format_table(create.element.table),
6789 self.process(create.element),
6790 )
6791
6792 def visit_set_table_comment(self, create, **kw):
6793 return "COMMENT ON TABLE %s IS %s" % (
6794 self.preparer.format_table(create.element),
6795 self.sql_compiler.render_literal_value(
6796 create.element.comment, sqltypes.String()
6797 ),
6798 )
6799
6800 def visit_drop_table_comment(self, drop, **kw):
6801 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
6802 drop.element
6803 )
6804
6805 def visit_set_column_comment(self, create, **kw):
6806 return "COMMENT ON COLUMN %s IS %s" % (
6807 self.preparer.format_column(
6808 create.element, use_table=True, use_schema=True
6809 ),
6810 self.sql_compiler.render_literal_value(
6811 create.element.comment, sqltypes.String()
6812 ),
6813 )
6814
6815 def visit_drop_column_comment(self, drop, **kw):
6816 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
6817 drop.element, use_table=True
6818 )
6819
6820 def visit_set_constraint_comment(self, create, **kw):
6821 raise exc.UnsupportedCompilationError(self, type(create))
6822
6823 def visit_drop_constraint_comment(self, drop, **kw):
6824 raise exc.UnsupportedCompilationError(self, type(drop))
6825
6826 def get_identity_options(self, identity_options):
6827 text = []
6828 if identity_options.increment is not None:
6829 text.append("INCREMENT BY %d" % identity_options.increment)
6830 if identity_options.start is not None:
6831 text.append("START WITH %d" % identity_options.start)
6832 if identity_options.minvalue is not None:
6833 text.append("MINVALUE %d" % identity_options.minvalue)
6834 if identity_options.maxvalue is not None:
6835 text.append("MAXVALUE %d" % identity_options.maxvalue)
6836 if identity_options.nominvalue is not None:
6837 text.append("NO MINVALUE")
6838 if identity_options.nomaxvalue is not None:
6839 text.append("NO MAXVALUE")
6840 if identity_options.cache is not None:
6841 text.append("CACHE %d" % identity_options.cache)
6842 if identity_options.cycle is not None:
6843 text.append("CYCLE" if identity_options.cycle else "NO CYCLE")
6844 return " ".join(text)
6845
6846 def visit_create_sequence(self, create, prefix=None, **kw):
6847 text = "CREATE SEQUENCE "
6848 if create.if_not_exists:
6849 text += "IF NOT EXISTS "
6850 text += self.preparer.format_sequence(create.element)
6851
6852 if prefix:
6853 text += prefix
6854 options = self.get_identity_options(create.element)
6855 if options:
6856 text += " " + options
6857 return text
6858
6859 def visit_drop_sequence(self, drop, **kw):
6860 text = "DROP SEQUENCE "
6861 if drop.if_exists:
6862 text += "IF EXISTS "
6863 return text + self.preparer.format_sequence(drop.element)
6864
6865 def visit_drop_constraint(self, drop, **kw):
6866 constraint = drop.element
6867 if constraint.name is not None:
6868 formatted_name = self.preparer.format_constraint(constraint)
6869 else:
6870 formatted_name = None
6871
6872 if formatted_name is None:
6873 raise exc.CompileError(
6874 "Can't emit DROP CONSTRAINT for constraint %r; "
6875 "it has no name" % drop.element
6876 )
6877 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % (
6878 self.preparer.format_table(drop.element.table),
6879 "IF EXISTS " if drop.if_exists else "",
6880 formatted_name,
6881 " CASCADE" if drop.cascade else "",
6882 )
6883
6884 def get_column_specification(self, column, **kwargs):
6885 colspec = (
6886 self.preparer.format_column(column)
6887 + " "
6888 + self.dialect.type_compiler_instance.process(
6889 column.type, type_expression=column
6890 )
6891 )
6892 default = self.get_column_default_string(column)
6893 if default is not None:
6894 colspec += " DEFAULT " + default
6895
6896 if column.computed is not None:
6897 colspec += " " + self.process(column.computed)
6898
6899 if (
6900 column.identity is not None
6901 and self.dialect.supports_identity_columns
6902 ):
6903 colspec += " " + self.process(column.identity)
6904
6905 if not column.nullable and (
6906 not column.identity or not self.dialect.supports_identity_columns
6907 ):
6908 colspec += " NOT NULL"
6909 return colspec
6910
6911 def create_table_suffix(self, table):
6912 return ""
6913
6914 def post_create_table(self, table):
6915 return ""
6916
6917 def get_column_default_string(self, column):
6918 if isinstance(column.server_default, schema.DefaultClause):
6919 return self.render_default_string(column.server_default.arg)
6920 else:
6921 return None
6922
6923 def render_default_string(self, default):
6924 if isinstance(default, str):
6925 return self.sql_compiler.render_literal_value(
6926 default, sqltypes.STRINGTYPE
6927 )
6928 else:
6929 return self.sql_compiler.process(default, literal_binds=True)
6930
6931 def visit_table_or_column_check_constraint(self, constraint, **kw):
6932 if constraint.is_column_level:
6933 return self.visit_column_check_constraint(constraint)
6934 else:
6935 return self.visit_check_constraint(constraint)
6936
6937 def visit_check_constraint(self, constraint, **kw):
6938 text = ""
6939 if constraint.name is not None:
6940 formatted_name = self.preparer.format_constraint(constraint)
6941 if formatted_name is not None:
6942 text += "CONSTRAINT %s " % formatted_name
6943 text += "CHECK (%s)" % self.sql_compiler.process(
6944 constraint.sqltext, include_table=False, literal_binds=True
6945 )
6946 text += self.define_constraint_deferrability(constraint)
6947 return text
6948
6949 def visit_column_check_constraint(self, constraint, **kw):
6950 text = ""
6951 if constraint.name is not None:
6952 formatted_name = self.preparer.format_constraint(constraint)
6953 if formatted_name is not None:
6954 text += "CONSTRAINT %s " % formatted_name
6955 text += "CHECK (%s)" % self.sql_compiler.process(
6956 constraint.sqltext, include_table=False, literal_binds=True
6957 )
6958 text += self.define_constraint_deferrability(constraint)
6959 return text
6960
6961 def visit_primary_key_constraint(self, constraint, **kw):
6962 if len(constraint) == 0:
6963 return ""
6964 text = ""
6965 if constraint.name is not None:
6966 formatted_name = self.preparer.format_constraint(constraint)
6967 if formatted_name is not None:
6968 text += "CONSTRAINT %s " % formatted_name
6969 text += "PRIMARY KEY "
6970 text += "(%s)" % ", ".join(
6971 self.preparer.quote(c.name)
6972 for c in (
6973 constraint.columns_autoinc_first
6974 if constraint._implicit_generated
6975 else constraint.columns
6976 )
6977 )
6978 text += self.define_constraint_deferrability(constraint)
6979 return text
6980
6981 def visit_foreign_key_constraint(self, constraint, **kw):
6982 preparer = self.preparer
6983 text = ""
6984 if constraint.name is not None:
6985 formatted_name = self.preparer.format_constraint(constraint)
6986 if formatted_name is not None:
6987 text += "CONSTRAINT %s " % formatted_name
6988 remote_table = list(constraint.elements)[0].column.table
6989 text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % (
6990 ", ".join(
6991 preparer.quote(f.parent.name) for f in constraint.elements
6992 ),
6993 self.define_constraint_remote_table(
6994 constraint, remote_table, preparer
6995 ),
6996 ", ".join(
6997 preparer.quote(f.column.name) for f in constraint.elements
6998 ),
6999 )
7000 text += self.define_constraint_match(constraint)
7001 text += self.define_constraint_cascades(constraint)
7002 text += self.define_constraint_deferrability(constraint)
7003 return text
7004
7005 def define_constraint_remote_table(self, constraint, table, preparer):
7006 """Format the remote table clause of a CREATE CONSTRAINT clause."""
7007
7008 return preparer.format_table(table)
7009
7010 def visit_unique_constraint(self, constraint, **kw):
7011 if len(constraint) == 0:
7012 return ""
7013 text = ""
7014 if constraint.name is not None:
7015 formatted_name = self.preparer.format_constraint(constraint)
7016 if formatted_name is not None:
7017 text += "CONSTRAINT %s " % formatted_name
7018 text += "UNIQUE %s(%s)" % (
7019 self.define_unique_constraint_distinct(constraint, **kw),
7020 ", ".join(self.preparer.quote(c.name) for c in constraint),
7021 )
7022 text += self.define_constraint_deferrability(constraint)
7023 return text
7024
7025 def define_unique_constraint_distinct(self, constraint, **kw):
7026 return ""
7027
7028 def define_constraint_cascades(self, constraint):
7029 text = ""
7030 if constraint.ondelete is not None:
7031 text += " ON DELETE %s" % self.preparer.validate_sql_phrase(
7032 constraint.ondelete, FK_ON_DELETE
7033 )
7034 if constraint.onupdate is not None:
7035 text += " ON UPDATE %s" % self.preparer.validate_sql_phrase(
7036 constraint.onupdate, FK_ON_UPDATE
7037 )
7038 return text
7039
7040 def define_constraint_deferrability(self, constraint):
7041 text = ""
7042 if constraint.deferrable is not None:
7043 if constraint.deferrable:
7044 text += " DEFERRABLE"
7045 else:
7046 text += " NOT DEFERRABLE"
7047 if constraint.initially is not None:
7048 text += " INITIALLY %s" % self.preparer.validate_sql_phrase(
7049 constraint.initially, FK_INITIALLY
7050 )
7051 return text
7052
7053 def define_constraint_match(self, constraint):
7054 text = ""
7055 if constraint.match is not None:
7056 text += " MATCH %s" % constraint.match
7057 return text
7058
7059 def visit_computed_column(self, generated, **kw):
7060 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
7061 generated.sqltext, include_table=False, literal_binds=True
7062 )
7063 if generated.persisted is True:
7064 text += " STORED"
7065 elif generated.persisted is False:
7066 text += " VIRTUAL"
7067 return text
7068
7069 def visit_identity_column(self, identity, **kw):
7070 text = "GENERATED %s AS IDENTITY" % (
7071 "ALWAYS" if identity.always else "BY DEFAULT",
7072 )
7073 options = self.get_identity_options(identity)
7074 if options:
7075 text += " (%s)" % options
7076 return text
7077
7078
7079class GenericTypeCompiler(TypeCompiler):
7080 def visit_FLOAT(self, type_, **kw):
7081 return "FLOAT"
7082
7083 def visit_DOUBLE(self, type_, **kw):
7084 return "DOUBLE"
7085
7086 def visit_DOUBLE_PRECISION(self, type_, **kw):
7087 return "DOUBLE PRECISION"
7088
7089 def visit_REAL(self, type_, **kw):
7090 return "REAL"
7091
7092 def visit_NUMERIC(self, type_, **kw):
7093 if type_.precision is None:
7094 return "NUMERIC"
7095 elif type_.scale is None:
7096 return "NUMERIC(%(precision)s)" % {"precision": type_.precision}
7097 else:
7098 return "NUMERIC(%(precision)s, %(scale)s)" % {
7099 "precision": type_.precision,
7100 "scale": type_.scale,
7101 }
7102
7103 def visit_DECIMAL(self, type_, **kw):
7104 if type_.precision is None:
7105 return "DECIMAL"
7106 elif type_.scale is None:
7107 return "DECIMAL(%(precision)s)" % {"precision": type_.precision}
7108 else:
7109 return "DECIMAL(%(precision)s, %(scale)s)" % {
7110 "precision": type_.precision,
7111 "scale": type_.scale,
7112 }
7113
7114 def visit_INTEGER(self, type_, **kw):
7115 return "INTEGER"
7116
7117 def visit_SMALLINT(self, type_, **kw):
7118 return "SMALLINT"
7119
7120 def visit_BIGINT(self, type_, **kw):
7121 return "BIGINT"
7122
7123 def visit_TIMESTAMP(self, type_, **kw):
7124 return "TIMESTAMP"
7125
7126 def visit_DATETIME(self, type_, **kw):
7127 return "DATETIME"
7128
7129 def visit_DATE(self, type_, **kw):
7130 return "DATE"
7131
7132 def visit_TIME(self, type_, **kw):
7133 return "TIME"
7134
7135 def visit_CLOB(self, type_, **kw):
7136 return "CLOB"
7137
7138 def visit_NCLOB(self, type_, **kw):
7139 return "NCLOB"
7140
7141 def _render_string_type(self, type_, name, length_override=None):
7142 text = name
7143 if length_override:
7144 text += "(%d)" % length_override
7145 elif type_.length:
7146 text += "(%d)" % type_.length
7147 if type_.collation:
7148 text += ' COLLATE "%s"' % type_.collation
7149 return text
7150
7151 def visit_CHAR(self, type_, **kw):
7152 return self._render_string_type(type_, "CHAR")
7153
7154 def visit_NCHAR(self, type_, **kw):
7155 return self._render_string_type(type_, "NCHAR")
7156
7157 def visit_VARCHAR(self, type_, **kw):
7158 return self._render_string_type(type_, "VARCHAR")
7159
7160 def visit_NVARCHAR(self, type_, **kw):
7161 return self._render_string_type(type_, "NVARCHAR")
7162
7163 def visit_TEXT(self, type_, **kw):
7164 return self._render_string_type(type_, "TEXT")
7165
7166 def visit_UUID(self, type_, **kw):
7167 return "UUID"
7168
7169 def visit_BLOB(self, type_, **kw):
7170 return "BLOB"
7171
7172 def visit_BINARY(self, type_, **kw):
7173 return "BINARY" + (type_.length and "(%d)" % type_.length or "")
7174
7175 def visit_VARBINARY(self, type_, **kw):
7176 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "")
7177
7178 def visit_BOOLEAN(self, type_, **kw):
7179 return "BOOLEAN"
7180
7181 def visit_uuid(self, type_, **kw):
7182 if not type_.native_uuid or not self.dialect.supports_native_uuid:
7183 return self._render_string_type(type_, "CHAR", length_override=32)
7184 else:
7185 return self.visit_UUID(type_, **kw)
7186
7187 def visit_large_binary(self, type_, **kw):
7188 return self.visit_BLOB(type_, **kw)
7189
7190 def visit_boolean(self, type_, **kw):
7191 return self.visit_BOOLEAN(type_, **kw)
7192
7193 def visit_time(self, type_, **kw):
7194 return self.visit_TIME(type_, **kw)
7195
7196 def visit_datetime(self, type_, **kw):
7197 return self.visit_DATETIME(type_, **kw)
7198
7199 def visit_date(self, type_, **kw):
7200 return self.visit_DATE(type_, **kw)
7201
7202 def visit_big_integer(self, type_, **kw):
7203 return self.visit_BIGINT(type_, **kw)
7204
7205 def visit_small_integer(self, type_, **kw):
7206 return self.visit_SMALLINT(type_, **kw)
7207
7208 def visit_integer(self, type_, **kw):
7209 return self.visit_INTEGER(type_, **kw)
7210
7211 def visit_real(self, type_, **kw):
7212 return self.visit_REAL(type_, **kw)
7213
7214 def visit_float(self, type_, **kw):
7215 return self.visit_FLOAT(type_, **kw)
7216
7217 def visit_double(self, type_, **kw):
7218 return self.visit_DOUBLE(type_, **kw)
7219
7220 def visit_numeric(self, type_, **kw):
7221 return self.visit_NUMERIC(type_, **kw)
7222
7223 def visit_string(self, type_, **kw):
7224 return self.visit_VARCHAR(type_, **kw)
7225
7226 def visit_unicode(self, type_, **kw):
7227 return self.visit_VARCHAR(type_, **kw)
7228
7229 def visit_text(self, type_, **kw):
7230 return self.visit_TEXT(type_, **kw)
7231
7232 def visit_unicode_text(self, type_, **kw):
7233 return self.visit_TEXT(type_, **kw)
7234
7235 def visit_enum(self, type_, **kw):
7236 return self.visit_VARCHAR(type_, **kw)
7237
7238 def visit_null(self, type_, **kw):
7239 raise exc.CompileError(
7240 "Can't generate DDL for %r; "
7241 "did you forget to specify a "
7242 "type on this Column?" % type_
7243 )
7244
7245 def visit_type_decorator(self, type_, **kw):
7246 return self.process(type_.type_engine(self.dialect), **kw)
7247
7248 def visit_user_defined(self, type_, **kw):
7249 return type_.get_col_spec(**kw)
7250
7251
7252class StrSQLTypeCompiler(GenericTypeCompiler):
7253 def process(self, type_, **kw):
7254 try:
7255 _compiler_dispatch = type_._compiler_dispatch
7256 except AttributeError:
7257 return self._visit_unknown(type_, **kw)
7258 else:
7259 return _compiler_dispatch(self, **kw)
7260
7261 def __getattr__(self, key):
7262 if key.startswith("visit_"):
7263 return self._visit_unknown
7264 else:
7265 raise AttributeError(key)
7266
7267 def _visit_unknown(self, type_, **kw):
7268 if type_.__class__.__name__ == type_.__class__.__name__.upper():
7269 return type_.__class__.__name__
7270 else:
7271 return repr(type_)
7272
7273 def visit_null(self, type_, **kw):
7274 return "NULL"
7275
7276 def visit_user_defined(self, type_, **kw):
7277 try:
7278 get_col_spec = type_.get_col_spec
7279 except AttributeError:
7280 return repr(type_)
7281 else:
7282 return get_col_spec(**kw)
7283
7284
7285class _SchemaForObjectCallable(Protocol):
7286 def __call__(self, obj: Any, /) -> str: ...
7287
7288
7289class _BindNameForColProtocol(Protocol):
7290 def __call__(self, col: ColumnClause[Any]) -> str: ...
7291
7292
7293class IdentifierPreparer:
7294 """Handle quoting and case-folding of identifiers based on options."""
7295
7296 reserved_words = RESERVED_WORDS
7297
7298 legal_characters = LEGAL_CHARACTERS
7299
7300 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
7301
7302 initial_quote: str
7303
7304 final_quote: str
7305
7306 _strings: MutableMapping[str, str]
7307
7308 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema")
7309 """Return the .schema attribute for an object.
7310
7311 For the default IdentifierPreparer, the schema for an object is always
7312 the value of the ".schema" attribute. if the preparer is replaced
7313 with one that has a non-empty schema_translate_map, the value of the
7314 ".schema" attribute is rendered a symbol that will be converted to a
7315 real schema name from the mapping post-compile.
7316
7317 """
7318
7319 _includes_none_schema_translate: bool = False
7320
7321 def __init__(
7322 self,
7323 dialect,
7324 initial_quote='"',
7325 final_quote=None,
7326 escape_quote='"',
7327 quote_case_sensitive_collations=True,
7328 omit_schema=False,
7329 ):
7330 """Construct a new ``IdentifierPreparer`` object.
7331
7332 initial_quote
7333 Character that begins a delimited identifier.
7334
7335 final_quote
7336 Character that ends a delimited identifier. Defaults to
7337 `initial_quote`.
7338
7339 omit_schema
7340 Prevent prepending schema name. Useful for databases that do
7341 not support schemae.
7342 """
7343
7344 self.dialect = dialect
7345 self.initial_quote = initial_quote
7346 self.final_quote = final_quote or self.initial_quote
7347 self.escape_quote = escape_quote
7348 self.escape_to_quote = self.escape_quote * 2
7349 self.omit_schema = omit_schema
7350 self.quote_case_sensitive_collations = quote_case_sensitive_collations
7351 self._strings = {}
7352 self._double_percents = self.dialect.paramstyle in (
7353 "format",
7354 "pyformat",
7355 )
7356
7357 def _with_schema_translate(self, schema_translate_map):
7358 prep = self.__class__.__new__(self.__class__)
7359 prep.__dict__.update(self.__dict__)
7360
7361 includes_none = None in schema_translate_map
7362
7363 def symbol_getter(obj):
7364 name = obj.schema
7365 if obj._use_schema_map and (name is not None or includes_none):
7366 if name is not None and ("[" in name or "]" in name):
7367 raise exc.CompileError(
7368 "Square bracket characters ([]) not supported "
7369 "in schema translate name '%s'" % name
7370 )
7371 return quoted_name(
7372 "__[SCHEMA_%s]" % (name or "_none"), quote=False
7373 )
7374 else:
7375 return obj.schema
7376
7377 prep.schema_for_object = symbol_getter
7378 prep._includes_none_schema_translate = includes_none
7379 return prep
7380
7381 def _render_schema_translates(self, statement, schema_translate_map):
7382 d = schema_translate_map
7383 if None in d:
7384 if not self._includes_none_schema_translate:
7385 raise exc.InvalidRequestError(
7386 "schema translate map which previously did not have "
7387 "`None` present as a key now has `None` present; compiled "
7388 "statement may lack adequate placeholders. Please use "
7389 "consistent keys in successive "
7390 "schema_translate_map dictionaries."
7391 )
7392
7393 d["_none"] = d[None]
7394
7395 def replace(m):
7396 name = m.group(2)
7397 if name in d:
7398 effective_schema = d[name]
7399 else:
7400 if name in (None, "_none"):
7401 raise exc.InvalidRequestError(
7402 "schema translate map which previously had `None` "
7403 "present as a key now no longer has it present; don't "
7404 "know how to apply schema for compiled statement. "
7405 "Please use consistent keys in successive "
7406 "schema_translate_map dictionaries."
7407 )
7408 effective_schema = name
7409
7410 if not effective_schema:
7411 effective_schema = self.dialect.default_schema_name
7412 if not effective_schema:
7413 # TODO: no coverage here
7414 raise exc.CompileError(
7415 "Dialect has no default schema name; can't "
7416 "use None as dynamic schema target."
7417 )
7418 return self.quote_schema(effective_schema)
7419
7420 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement)
7421
7422 def _escape_identifier(self, value: str) -> str:
7423 """Escape an identifier.
7424
7425 Subclasses should override this to provide database-dependent
7426 escaping behavior.
7427 """
7428
7429 value = value.replace(self.escape_quote, self.escape_to_quote)
7430 if self._double_percents:
7431 value = value.replace("%", "%%")
7432 return value
7433
7434 def _unescape_identifier(self, value: str) -> str:
7435 """Canonicalize an escaped identifier.
7436
7437 Subclasses should override this to provide database-dependent
7438 unescaping behavior that reverses _escape_identifier.
7439 """
7440
7441 return value.replace(self.escape_to_quote, self.escape_quote)
7442
7443 def validate_sql_phrase(self, element, reg):
7444 """keyword sequence filter.
7445
7446 a filter for elements that are intended to represent keyword sequences,
7447 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters
7448 should be present.
7449
7450 .. versionadded:: 1.3
7451
7452 """
7453
7454 if element is not None and not reg.match(element):
7455 raise exc.CompileError(
7456 "Unexpected SQL phrase: %r (matching against %r)"
7457 % (element, reg.pattern)
7458 )
7459 return element
7460
7461 def quote_identifier(self, value: str) -> str:
7462 """Quote an identifier.
7463
7464 Subclasses should override this to provide database-dependent
7465 quoting behavior.
7466 """
7467
7468 return (
7469 self.initial_quote
7470 + self._escape_identifier(value)
7471 + self.final_quote
7472 )
7473
7474 def _requires_quotes(self, value: str) -> bool:
7475 """Return True if the given identifier requires quoting."""
7476 lc_value = value.lower()
7477 return (
7478 lc_value in self.reserved_words
7479 or value[0] in self.illegal_initial_characters
7480 or not self.legal_characters.match(str(value))
7481 or (lc_value != value)
7482 )
7483
7484 def _requires_quotes_illegal_chars(self, value):
7485 """Return True if the given identifier requires quoting, but
7486 not taking case convention into account."""
7487 return not self.legal_characters.match(str(value))
7488
7489 def quote_schema(self, schema: str, force: Any = None) -> str:
7490 """Conditionally quote a schema name.
7491
7492
7493 The name is quoted if it is a reserved word, contains quote-necessary
7494 characters, or is an instance of :class:`.quoted_name` which includes
7495 ``quote`` set to ``True``.
7496
7497 Subclasses can override this to provide database-dependent
7498 quoting behavior for schema names.
7499
7500 :param schema: string schema name
7501 :param force: unused
7502
7503 .. deprecated:: 0.9
7504
7505 The :paramref:`.IdentifierPreparer.quote_schema.force`
7506 parameter is deprecated and will be removed in a future
7507 release. This flag has no effect on the behavior of the
7508 :meth:`.IdentifierPreparer.quote` method; please refer to
7509 :class:`.quoted_name`.
7510
7511 """
7512 if force is not None:
7513 # not using the util.deprecated_params() decorator in this
7514 # case because of the additional function call overhead on this
7515 # very performance-critical spot.
7516 util.warn_deprecated(
7517 "The IdentifierPreparer.quote_schema.force parameter is "
7518 "deprecated and will be removed in a future release. This "
7519 "flag has no effect on the behavior of the "
7520 "IdentifierPreparer.quote method; please refer to "
7521 "quoted_name().",
7522 # deprecated 0.9. warning from 1.3
7523 version="0.9",
7524 )
7525
7526 return self.quote(schema)
7527
7528 def quote(self, ident: str, force: Any = None) -> str:
7529 """Conditionally quote an identifier.
7530
7531 The identifier is quoted if it is a reserved word, contains
7532 quote-necessary characters, or is an instance of
7533 :class:`.quoted_name` which includes ``quote`` set to ``True``.
7534
7535 Subclasses can override this to provide database-dependent
7536 quoting behavior for identifier names.
7537
7538 :param ident: string identifier
7539 :param force: unused
7540
7541 .. deprecated:: 0.9
7542
7543 The :paramref:`.IdentifierPreparer.quote.force`
7544 parameter is deprecated and will be removed in a future
7545 release. This flag has no effect on the behavior of the
7546 :meth:`.IdentifierPreparer.quote` method; please refer to
7547 :class:`.quoted_name`.
7548
7549 """
7550 if force is not None:
7551 # not using the util.deprecated_params() decorator in this
7552 # case because of the additional function call overhead on this
7553 # very performance-critical spot.
7554 util.warn_deprecated(
7555 "The IdentifierPreparer.quote.force parameter is "
7556 "deprecated and will be removed in a future release. This "
7557 "flag has no effect on the behavior of the "
7558 "IdentifierPreparer.quote method; please refer to "
7559 "quoted_name().",
7560 # deprecated 0.9. warning from 1.3
7561 version="0.9",
7562 )
7563
7564 force = getattr(ident, "quote", None)
7565
7566 if force is None:
7567 if ident in self._strings:
7568 return self._strings[ident]
7569 else:
7570 if self._requires_quotes(ident):
7571 self._strings[ident] = self.quote_identifier(ident)
7572 else:
7573 self._strings[ident] = ident
7574 return self._strings[ident]
7575 elif force:
7576 return self.quote_identifier(ident)
7577 else:
7578 return ident
7579
7580 def format_collation(self, collation_name):
7581 if self.quote_case_sensitive_collations:
7582 return self.quote(collation_name)
7583 else:
7584 return collation_name
7585
7586 def format_sequence(self, sequence, use_schema=True):
7587 name = self.quote(sequence.name)
7588
7589 effective_schema = self.schema_for_object(sequence)
7590
7591 if (
7592 not self.omit_schema
7593 and use_schema
7594 and effective_schema is not None
7595 ):
7596 name = self.quote_schema(effective_schema) + "." + name
7597 return name
7598
7599 def format_label(
7600 self, label: Label[Any], name: Optional[str] = None
7601 ) -> str:
7602 return self.quote(name or label.name)
7603
7604 def format_alias(
7605 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None
7606 ) -> str:
7607 if name is None:
7608 assert alias is not None
7609 return self.quote(alias.name)
7610 else:
7611 return self.quote(name)
7612
7613 def format_savepoint(self, savepoint, name=None):
7614 # Running the savepoint name through quoting is unnecessary
7615 # for all known dialects. This is here to support potential
7616 # third party use cases
7617 ident = name or savepoint.ident
7618 if self._requires_quotes(ident):
7619 ident = self.quote_identifier(ident)
7620 return ident
7621
7622 @util.preload_module("sqlalchemy.sql.naming")
7623 def format_constraint(self, constraint, _alembic_quote=True):
7624 naming = util.preloaded.sql_naming
7625
7626 if constraint.name is _NONE_NAME:
7627 name = naming._constraint_name_for_table(
7628 constraint, constraint.table
7629 )
7630
7631 if name is None:
7632 return None
7633 else:
7634 name = constraint.name
7635
7636 if constraint.__visit_name__ == "index":
7637 return self.truncate_and_render_index_name(
7638 name, _alembic_quote=_alembic_quote
7639 )
7640 else:
7641 return self.truncate_and_render_constraint_name(
7642 name, _alembic_quote=_alembic_quote
7643 )
7644
7645 def truncate_and_render_index_name(self, name, _alembic_quote=True):
7646 # calculate these at format time so that ad-hoc changes
7647 # to dialect.max_identifier_length etc. can be reflected
7648 # as IdentifierPreparer is long lived
7649 max_ = (
7650 self.dialect.max_index_name_length
7651 or self.dialect.max_identifier_length
7652 )
7653 return self._truncate_and_render_maxlen_name(
7654 name, max_, _alembic_quote
7655 )
7656
7657 def truncate_and_render_constraint_name(self, name, _alembic_quote=True):
7658 # calculate these at format time so that ad-hoc changes
7659 # to dialect.max_identifier_length etc. can be reflected
7660 # as IdentifierPreparer is long lived
7661 max_ = (
7662 self.dialect.max_constraint_name_length
7663 or self.dialect.max_identifier_length
7664 )
7665 return self._truncate_and_render_maxlen_name(
7666 name, max_, _alembic_quote
7667 )
7668
7669 def _truncate_and_render_maxlen_name(self, name, max_, _alembic_quote):
7670 if isinstance(name, elements._truncated_label):
7671 if len(name) > max_:
7672 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:]
7673 else:
7674 self.dialect.validate_identifier(name)
7675
7676 if not _alembic_quote:
7677 return name
7678 else:
7679 return self.quote(name)
7680
7681 def format_index(self, index):
7682 return self.format_constraint(index)
7683
7684 def format_table(self, table, use_schema=True, name=None):
7685 """Prepare a quoted table and schema name."""
7686
7687 if name is None:
7688 name = table.name
7689
7690 result = self.quote(name)
7691
7692 effective_schema = self.schema_for_object(table)
7693
7694 if not self.omit_schema and use_schema and effective_schema:
7695 result = self.quote_schema(effective_schema) + "." + result
7696 return result
7697
7698 def format_schema(self, name):
7699 """Prepare a quoted schema name."""
7700
7701 return self.quote(name)
7702
7703 def format_label_name(
7704 self,
7705 name,
7706 anon_map=None,
7707 ):
7708 """Prepare a quoted column name."""
7709
7710 if anon_map is not None and isinstance(
7711 name, elements._truncated_label
7712 ):
7713 name = name.apply_map(anon_map)
7714
7715 return self.quote(name)
7716
7717 def format_column(
7718 self,
7719 column,
7720 use_table=False,
7721 name=None,
7722 table_name=None,
7723 use_schema=False,
7724 anon_map=None,
7725 ):
7726 """Prepare a quoted column name."""
7727
7728 if name is None:
7729 name = column.name
7730
7731 if anon_map is not None and isinstance(
7732 name, elements._truncated_label
7733 ):
7734 name = name.apply_map(anon_map)
7735
7736 if not getattr(column, "is_literal", False):
7737 if use_table:
7738 return (
7739 self.format_table(
7740 column.table, use_schema=use_schema, name=table_name
7741 )
7742 + "."
7743 + self.quote(name)
7744 )
7745 else:
7746 return self.quote(name)
7747 else:
7748 # literal textual elements get stuck into ColumnClause a lot,
7749 # which shouldn't get quoted
7750
7751 if use_table:
7752 return (
7753 self.format_table(
7754 column.table, use_schema=use_schema, name=table_name
7755 )
7756 + "."
7757 + name
7758 )
7759 else:
7760 return name
7761
7762 def format_table_seq(self, table, use_schema=True):
7763 """Format table name and schema as a tuple."""
7764
7765 # Dialects with more levels in their fully qualified references
7766 # ('database', 'owner', etc.) could override this and return
7767 # a longer sequence.
7768
7769 effective_schema = self.schema_for_object(table)
7770
7771 if not self.omit_schema and use_schema and effective_schema:
7772 return (
7773 self.quote_schema(effective_schema),
7774 self.format_table(table, use_schema=False),
7775 )
7776 else:
7777 return (self.format_table(table, use_schema=False),)
7778
7779 @util.memoized_property
7780 def _r_identifiers(self):
7781 initial, final, escaped_final = (
7782 re.escape(s)
7783 for s in (
7784 self.initial_quote,
7785 self.final_quote,
7786 self._escape_identifier(self.final_quote),
7787 )
7788 )
7789 r = re.compile(
7790 r"(?:"
7791 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s"
7792 r"|([^\.]+))(?=\.|$))+"
7793 % {"initial": initial, "final": final, "escaped": escaped_final}
7794 )
7795 return r
7796
7797 def unformat_identifiers(self, identifiers):
7798 """Unpack 'schema.table.column'-like strings into components."""
7799
7800 r = self._r_identifiers
7801 return [
7802 self._unescape_identifier(i)
7803 for i in [a or b for a, b in r.findall(identifiers)]
7804 ]