1# engine/default.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from . import reflection
48from .base import Connection
49from .interfaces import CacheStats
50from .interfaces import DBAPICursor
51from .interfaces import Dialect
52from .interfaces import ExecuteStyle
53from .interfaces import ExecutionContext
54from .reflection import ObjectKind
55from .reflection import ObjectScope
56from .. import event
57from .. import exc
58from .. import pool
59from .. import util
60from ..sql import compiler
61from ..sql import dml
62from ..sql import expression
63from ..sql import type_api
64from ..sql import util as sql_util
65from ..sql._typing import is_tuple_type
66from ..sql.base import _NoArg
67from ..sql.compiler import AggregateOrderByStyle
68from ..sql.compiler import DDLCompiler
69from ..sql.compiler import InsertmanyvaluesSentinelOpts
70from ..sql.compiler import SQLCompiler
71from ..sql.elements import quoted_name
72from ..util.typing import TupleAny
73from ..util.typing import Unpack
74
75if typing.TYPE_CHECKING:
76 from .base import Engine
77 from .cursor import ResultFetchStrategy
78 from .interfaces import _CoreMultiExecuteParams
79 from .interfaces import _CoreSingleExecuteParams
80 from .interfaces import _DBAPICursorDescription
81 from .interfaces import _DBAPIMultiExecuteParams
82 from .interfaces import _DBAPISingleExecuteParams
83 from .interfaces import _ExecuteOptions
84 from .interfaces import _MutableCoreSingleExecuteParams
85 from .interfaces import _ParamStyle
86 from .interfaces import ConnectArgsType
87 from .interfaces import DBAPIConnection
88 from .interfaces import DBAPIModule
89 from .interfaces import DBAPIType
90 from .interfaces import IsolationLevel
91 from .row import Row
92 from .url import URL
93 from ..event import _ListenerFnType
94 from ..pool import Pool
95 from ..pool import PoolProxiedConnection
96 from ..sql import Executable
97 from ..sql.compiler import Compiled
98 from ..sql.compiler import Linting
99 from ..sql.compiler import ResultColumnsEntry
100 from ..sql.dml import DMLState
101 from ..sql.dml import UpdateBase
102 from ..sql.elements import BindParameter
103 from ..sql.schema import Column
104 from ..sql.sqltypes import _JSON_VALUE
105 from ..sql.type_api import _BindProcessorType
106 from ..sql.type_api import _ResultProcessorType
107 from ..sql.type_api import TypeEngine
108
109
110# When we're handed literal SQL, ensure it's a SELECT query
111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
112
113
114(
115 CACHE_HIT,
116 CACHE_MISS,
117 CACHING_DISABLED,
118 NO_CACHE_KEY,
119 NO_DIALECT_SUPPORT,
120) = list(CacheStats)
121
122
123class _BackendsMultiReflection(Dialect):
124 """Mixin providing single-table reflection wrappers that delegate to
125 the corresponding ``get_multi_*`` methods.
126
127 Used by dialects that implement native multi-table reflection
128 (PostgreSQL, Oracle, MSSQL).
129 """
130
131 def _value_or_raise(self, data, table, schema):
132 try:
133 return dict(data)[(schema, table)]
134 except KeyError:
135 raise exc.NoSuchTableError(
136 f"{schema}.{table}" if schema else table
137 ) from None
138
139 @reflection.cache
140 def get_columns(self, connection, table_name, schema=None, **kw):
141 data = self.get_multi_columns(
142 connection,
143 schema=schema,
144 filter_names=[table_name],
145 scope=ObjectScope.ANY,
146 kind=ObjectKind.ANY,
147 **kw,
148 )
149 return self._value_or_raise(data, table_name, schema)
150
151 @reflection.cache
152 def get_table_options(self, connection, table_name, schema=None, **kw):
153 data = self.get_multi_table_options(
154 connection,
155 schema=schema,
156 filter_names=[table_name],
157 scope=ObjectScope.ANY,
158 kind=ObjectKind.ANY,
159 **kw,
160 )
161 return self._value_or_raise(data, table_name, schema)
162
163 @reflection.cache
164 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
165 data = self.get_multi_pk_constraint(
166 connection,
167 schema=schema,
168 filter_names=[table_name],
169 scope=ObjectScope.ANY,
170 kind=ObjectKind.ANY,
171 **kw,
172 )
173 return self._value_or_raise(data, table_name, schema)
174
175 @reflection.cache
176 def get_foreign_keys(self, connection, table_name, schema=None, **kw):
177 data = self.get_multi_foreign_keys(
178 connection,
179 schema=schema,
180 filter_names=[table_name],
181 scope=ObjectScope.ANY,
182 kind=ObjectKind.ANY,
183 **kw,
184 )
185 return self._value_or_raise(data, table_name, schema)
186
187 @reflection.cache
188 def get_indexes(self, connection, table_name, schema=None, **kw):
189 data = self.get_multi_indexes(
190 connection,
191 schema=schema,
192 filter_names=[table_name],
193 scope=ObjectScope.ANY,
194 kind=ObjectKind.ANY,
195 **kw,
196 )
197 return self._value_or_raise(data, table_name, schema)
198
199 @reflection.cache
200 def get_unique_constraints(
201 self, connection, table_name, schema=None, **kw
202 ):
203 data = self.get_multi_unique_constraints(
204 connection,
205 schema=schema,
206 filter_names=[table_name],
207 scope=ObjectScope.ANY,
208 kind=ObjectKind.ANY,
209 **kw,
210 )
211 return self._value_or_raise(data, table_name, schema)
212
213 @reflection.cache
214 def get_check_constraints(self, connection, table_name, schema=None, **kw):
215 data = self.get_multi_check_constraints(
216 connection,
217 schema=schema,
218 filter_names=[table_name],
219 scope=ObjectScope.ANY,
220 kind=ObjectKind.ANY,
221 **kw,
222 )
223 return self._value_or_raise(data, table_name, schema)
224
225 @reflection.cache
226 def get_table_comment(self, connection, table_name, schema=None, **kw):
227 data = self.get_multi_table_comment(
228 connection,
229 schema=schema,
230 filter_names=[table_name],
231 scope=ObjectScope.ANY,
232 kind=ObjectKind.ANY,
233 **kw,
234 )
235 return self._value_or_raise(data, table_name, schema)
236
237
238class DefaultDialect(Dialect):
239 """Default implementation of Dialect"""
240
241 statement_compiler = compiler.SQLCompiler
242 ddl_compiler = compiler.DDLCompiler
243 type_compiler_cls = compiler.GenericTypeCompiler
244
245 preparer = compiler.IdentifierPreparer
246 supports_alter = True
247 supports_comments = False
248 supports_constraint_comments = False
249 inline_comments = False
250 supports_statement_cache = True
251
252 div_is_floordiv = True
253
254 bind_typing = interfaces.BindTyping.NONE
255
256 include_set_input_sizes: Optional[Set[Any]] = None
257 exclude_set_input_sizes: Optional[Set[Any]] = None
258
259 # the first value we'd get for an autoincrement column.
260 default_sequence_base = 1
261
262 # most DBAPIs happy with this for execute().
263 # not cx_oracle.
264 execute_sequence_format = tuple
265
266 supports_schemas = True
267 supports_views = True
268 supports_sequences = False
269 sequences_optional = False
270 preexecute_autoincrement_sequences = False
271 supports_identity_columns = False
272 postfetch_lastrowid = True
273 favor_returning_over_lastrowid = False
274 insert_null_pk_still_autoincrements = False
275 update_returning = False
276 delete_returning = False
277 update_returning_multifrom = False
278 delete_returning_multifrom = False
279 insert_returning = False
280
281 aggregate_order_by_style = AggregateOrderByStyle.INLINE
282
283 cte_follows_insert = False
284
285 supports_native_enum = False
286 supports_native_boolean = False
287 supports_native_uuid = False
288 returns_native_bytes = False
289
290 supports_native_json_serialization = False
291 supports_native_json_deserialization = False
292 dialect_injects_custom_json_deserializer = False
293 _json_serializer: Callable[[_JSON_VALUE], str] | None = None
294
295 _json_deserializer: Callable[[str], _JSON_VALUE] | None = None
296
297 non_native_boolean_check_constraint = True
298
299 supports_simple_order_by_label = True
300
301 tuple_in_values = False
302
303 connection_characteristics = util.immutabledict(
304 {
305 "isolation_level": characteristics.IsolationLevelCharacteristic(),
306 "logging_token": characteristics.LoggingTokenCharacteristic(),
307 }
308 )
309
310 engine_config_types: Mapping[str, Any] = util.immutabledict(
311 {
312 "pool_timeout": util.asint,
313 "echo": util.bool_or_str("debug"),
314 "echo_pool": util.bool_or_str("debug"),
315 "pool_recycle": util.asint,
316 "pool_size": util.asint,
317 "max_overflow": util.asint,
318 "future": util.asbool,
319 }
320 )
321
322 # if the NUMERIC type
323 # returns decimal.Decimal.
324 # *not* the FLOAT type however.
325 supports_native_decimal = False
326
327 name = "default"
328
329 # length at which to truncate
330 # any identifier.
331 max_identifier_length = 9999
332 _user_defined_max_identifier_length: Optional[int] = None
333
334 isolation_level: Optional[str] = None
335
336 # sub-categories of max_identifier_length.
337 # currently these accommodate for MySQL which allows alias names
338 # of 255 but DDL names only of 64.
339 max_index_name_length: Optional[int] = None
340 max_constraint_name_length: Optional[int] = None
341
342 supports_sane_rowcount = True
343 supports_sane_multi_rowcount = True
344 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
345 default_paramstyle = "named"
346
347 supports_default_values = False
348 """dialect supports INSERT... DEFAULT VALUES syntax"""
349
350 supports_default_metavalue = False
351 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
352
353 default_metavalue_token = "DEFAULT"
354 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
355 parenthesis."""
356
357 # not sure if this is a real thing but the compiler will deliver it
358 # if this is the only flag enabled.
359 supports_empty_insert = True
360 """dialect supports INSERT () VALUES ()"""
361
362 supports_multivalues_insert = False
363
364 use_insertmanyvalues: bool = False
365
366 use_insertmanyvalues_wo_returning: bool = False
367
368 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
369 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
370 )
371
372 insertmanyvalues_page_size: int = 1000
373 insertmanyvalues_max_parameters = 32700
374
375 supports_is_distinct_from = True
376
377 supports_server_side_cursors = False
378
379 server_side_cursors = False
380
381 # extra record-level locking features (#4860)
382 supports_for_update_of = False
383
384 server_version_info = None
385
386 default_schema_name: Optional[str] = None
387
388 # indicates symbol names are
389 # UPPERCASED if they are case insensitive
390 # within the database.
391 # if this is True, the methods normalize_name()
392 # and denormalize_name() must be provided.
393 requires_name_normalize = False
394
395 is_async = False
396
397 has_terminate = False
398
399 # TODO: this is not to be part of 2.0. implement rudimentary binary
400 # literals for SQLite, PostgreSQL, MySQL only within
401 # _Binary.literal_processor
402 _legacy_binary_type_literal_encoding = "utf-8"
403
404 @util.deprecated_params(
405 empty_in_strategy=(
406 "1.4",
407 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
408 "deprecated, and no longer has any effect. All IN expressions "
409 "are now rendered using "
410 'the "expanding parameter" strategy which renders a set of bound'
411 'expressions, or an "empty set" SELECT, at statement execution'
412 "time.",
413 ),
414 server_side_cursors=(
415 "1.4",
416 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
417 "is deprecated and will be removed in a future release. Please "
418 "use the "
419 ":paramref:`_engine.Connection.execution_options.stream_results` "
420 "parameter.",
421 ),
422 )
423 def __init__(
424 self,
425 paramstyle: Optional[_ParamStyle] = None,
426 isolation_level: Optional[IsolationLevel] = None,
427 dbapi: Optional[DBAPIModule] = None,
428 implicit_returning: Literal[True] = True,
429 supports_native_boolean: Optional[bool] = None,
430 max_identifier_length: Optional[int] = None,
431 label_length: Optional[int] = None,
432 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
433 use_insertmanyvalues: Optional[bool] = None,
434 # util.deprecated_params decorator cannot render the
435 # Linting.NO_LINTING constant
436 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
437 server_side_cursors: bool = False,
438 skip_autocommit_rollback: bool = False,
439 **kwargs: Any,
440 ):
441 if server_side_cursors:
442 if not self.supports_server_side_cursors:
443 raise exc.ArgumentError(
444 "Dialect %s does not support server side cursors" % self
445 )
446 else:
447 self.server_side_cursors = True
448
449 if getattr(self, "use_setinputsizes", False):
450 util.warn_deprecated(
451 "The dialect-level use_setinputsizes attribute is "
452 "deprecated. Please use "
453 "bind_typing = BindTyping.SETINPUTSIZES",
454 "2.0",
455 )
456 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
457
458 self.positional = False
459 self._ischema = None
460
461 self.dbapi = dbapi
462
463 self.skip_autocommit_rollback = skip_autocommit_rollback
464
465 if paramstyle is not None:
466 self.paramstyle = paramstyle
467 elif self.dbapi is not None:
468 self.paramstyle = self.dbapi.paramstyle
469 else:
470 self.paramstyle = self.default_paramstyle
471 self.positional = self.paramstyle in (
472 "qmark",
473 "format",
474 "numeric",
475 "numeric_dollar",
476 )
477 self.identifier_preparer = self.preparer(self)
478 self._on_connect_isolation_level = isolation_level
479
480 legacy_tt_callable = getattr(self, "type_compiler", None)
481 if legacy_tt_callable is not None:
482 tt_callable = cast(
483 Type[compiler.GenericTypeCompiler],
484 self.type_compiler,
485 )
486 else:
487 tt_callable = self.type_compiler_cls
488
489 self.type_compiler_instance = self.type_compiler = tt_callable(self)
490
491 if supports_native_boolean is not None:
492 self.supports_native_boolean = supports_native_boolean
493
494 self._user_defined_max_identifier_length = max_identifier_length
495 if self._user_defined_max_identifier_length:
496 self.max_identifier_length = (
497 self._user_defined_max_identifier_length
498 )
499 self.label_length = label_length
500 self.compiler_linting = compiler_linting
501
502 if use_insertmanyvalues is not None:
503 self.use_insertmanyvalues = use_insertmanyvalues
504
505 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
506 self.insertmanyvalues_page_size = insertmanyvalues_page_size
507
508 @property
509 @util.deprecated(
510 "2.0",
511 "full_returning is deprecated, please use insert_returning, "
512 "update_returning, delete_returning",
513 )
514 def full_returning(self):
515 return (
516 self.insert_returning
517 and self.update_returning
518 and self.delete_returning
519 )
520
521 @util.memoized_property
522 def insert_executemany_returning(self):
523 """Default implementation for insert_executemany_returning, if not
524 otherwise overridden by the specific dialect.
525
526 The default dialect determines "insert_executemany_returning" is
527 available if the dialect in use has opted into using the
528 "use_insertmanyvalues" feature. If they haven't opted into that, then
529 this attribute is False, unless the dialect in question overrides this
530 and provides some other implementation (such as the Oracle Database
531 dialects).
532
533 """
534 return self.insert_returning and self.use_insertmanyvalues
535
536 @util.memoized_property
537 def insert_executemany_returning_sort_by_parameter_order(self):
538 """Default implementation for
539 insert_executemany_returning_deterministic_order, if not otherwise
540 overridden by the specific dialect.
541
542 The default dialect determines "insert_executemany_returning" can have
543 deterministic order only if the dialect in use has opted into using the
544 "use_insertmanyvalues" feature, which implements deterministic ordering
545 using client side sentinel columns only by default. The
546 "insertmanyvalues" feature also features alternate forms that can
547 use server-generated PK values as "sentinels", but those are only
548 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
549 bitflag enables those alternate SQL forms, which are disabled
550 by default.
551
552 If the dialect in use hasn't opted into that, then this attribute is
553 False, unless the dialect in question overrides this and provides some
554 other implementation (such as the Oracle Database dialects).
555
556 """
557 return self.insert_returning and self.use_insertmanyvalues
558
559 update_executemany_returning = False
560 delete_executemany_returning = False
561
562 @util.memoized_property
563 def loaded_dbapi(self) -> DBAPIModule:
564 if self.dbapi is None:
565 raise exc.InvalidRequestError(
566 f"Dialect {self} does not have a Python DBAPI established "
567 "and cannot be used for actual database interaction"
568 )
569 return self.dbapi
570
571 @util.memoized_property
572 def _bind_typing_render_casts(self):
573 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
574
575 def _ensure_has_table_connection(self, arg: Connection) -> None:
576 if not isinstance(arg, Connection):
577 raise exc.ArgumentError(
578 "The argument passed to Dialect.has_table() should be a "
579 "%s, got %s. "
580 "Additionally, the Dialect.has_table() method is for "
581 "internal dialect "
582 "use only; please use "
583 "``inspect(some_engine).has_table(<tablename>>)`` "
584 "for public API use." % (Connection, type(arg))
585 )
586
587 @util.memoized_property
588 def _supports_statement_cache(self):
589 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
590 if ssc is None:
591 util.warn(
592 "Dialect %s:%s will not make use of SQL compilation caching "
593 "as it does not set the 'supports_statement_cache' attribute "
594 "to ``True``. This can have "
595 "significant performance implications including some "
596 "performance degradations in comparison to prior SQLAlchemy "
597 "versions. Dialect maintainers should seek to set this "
598 "attribute to True after appropriate development and testing "
599 "for SQLAlchemy 1.4 caching support. Alternatively, this "
600 "attribute may be set to False which will disable this "
601 "warning." % (self.name, self.driver),
602 code="cprf",
603 )
604
605 return bool(ssc)
606
607 @util.memoized_property
608 def _type_memos(self):
609 return weakref.WeakKeyDictionary()
610
611 @property
612 def dialect_description(self): # type: ignore[override]
613 return self.name + "+" + self.driver
614
615 @property
616 def supports_sane_rowcount_returning(self):
617 """True if this dialect supports sane rowcount even if RETURNING is
618 in use.
619
620 For dialects that don't support RETURNING, this is synonymous with
621 ``supports_sane_rowcount``.
622
623 """
624 return self.supports_sane_rowcount
625
626 @classmethod
627 def get_pool_class(cls, url: URL) -> Type[Pool]:
628 default: Type[pool.Pool]
629 if cls.is_async:
630 default = pool.AsyncAdaptedQueuePool
631 else:
632 default = pool.QueuePool
633
634 return getattr(cls, "poolclass", default)
635
636 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
637 return self.get_pool_class(url)
638
639 @classmethod
640 def load_provisioning(cls):
641 package = ".".join(cls.__module__.split(".")[0:-1])
642 try:
643 __import__(package + ".provision")
644 except ImportError:
645 pass
646
647 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
648 if self._on_connect_isolation_level is not None:
649
650 def builtin_connect(dbapi_conn, conn_rec):
651 self._assert_and_set_isolation_level(
652 dbapi_conn, self._on_connect_isolation_level
653 )
654
655 return builtin_connect
656 else:
657 return None
658
659 def initialize(self, connection: Connection) -> None:
660 try:
661 self.server_version_info = self._get_server_version_info(
662 connection
663 )
664 except NotImplementedError:
665 self.server_version_info = None
666 try:
667 self.default_schema_name = self._get_default_schema_name(
668 connection
669 )
670 except NotImplementedError:
671 self.default_schema_name = None
672
673 try:
674 self.default_isolation_level = self.get_default_isolation_level(
675 connection.connection.dbapi_connection
676 )
677 except NotImplementedError:
678 self.default_isolation_level = None
679
680 if not self._user_defined_max_identifier_length:
681 max_ident_length = self._check_max_identifier_length(connection)
682 if max_ident_length:
683 self.max_identifier_length = max_ident_length
684
685 if (
686 self.label_length
687 and self.label_length > self.max_identifier_length
688 ):
689 raise exc.ArgumentError(
690 "Label length of %d is greater than this dialect's"
691 " maximum identifier length of %d"
692 % (self.label_length, self.max_identifier_length)
693 )
694
695 def on_connect(self) -> Optional[Callable[[Any], None]]:
696 # inherits the docstring from interfaces.Dialect.on_connect
697 return None
698
699 def _check_max_identifier_length(self, connection):
700 """Perform a connection / server version specific check to determine
701 the max_identifier_length.
702
703 If the dialect's class level max_identifier_length should be used,
704 can return None.
705
706 """
707 return None
708
709 def get_default_isolation_level(self, dbapi_conn):
710 """Given a DBAPI connection, return its isolation level, or
711 a default isolation level if one cannot be retrieved.
712
713 May be overridden by subclasses in order to provide a
714 "fallback" isolation level for databases that cannot reliably
715 retrieve the actual isolation level.
716
717 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
718 method, propagating any exceptions raised.
719
720 """
721 return self.get_isolation_level(dbapi_conn)
722
723 def type_descriptor(self, typeobj):
724 """Provide a database-specific :class:`.TypeEngine` object, given
725 the generic object which comes from the types module.
726
727 This method looks for a dictionary called
728 ``colspecs`` as a class or instance-level variable,
729 and passes on to :func:`_types.adapt_type`.
730
731 """
732 return type_api.adapt_type(typeobj, self.colspecs)
733
734 def has_index(self, connection, table_name, index_name, schema=None, **kw):
735 if not self.has_table(connection, table_name, schema=schema, **kw):
736 return False
737 for idx in self.get_indexes(
738 connection, table_name, schema=schema, **kw
739 ):
740 if idx["name"] == index_name:
741 return True
742 else:
743 return False
744
745 def has_schema(
746 self, connection: Connection, schema_name: str, **kw: Any
747 ) -> bool:
748 return schema_name in self.get_schema_names(connection, **kw)
749
750 def validate_identifier(self, ident: str) -> None:
751 if len(ident) > self.max_identifier_length:
752 raise exc.IdentifierError(
753 "Identifier '%s' exceeds maximum length of %d characters"
754 % (ident, self.max_identifier_length)
755 )
756
757 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
758 # inherits the docstring from interfaces.Dialect.connect
759 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
760
761 def create_connect_args(self, url: URL) -> ConnectArgsType:
762 # inherits the docstring from interfaces.Dialect.create_connect_args
763 opts = url.translate_connect_args()
764 opts.update(url.query)
765 return ([], opts)
766
767 def set_engine_execution_options(
768 self, engine: Engine, opts: Mapping[str, Any]
769 ) -> None:
770 supported_names = set(self.connection_characteristics).intersection(
771 opts
772 )
773 if supported_names:
774 characteristics: Mapping[str, Any] = util.immutabledict(
775 (name, opts[name]) for name in supported_names
776 )
777
778 @event.listens_for(engine, "engine_connect")
779 def set_connection_characteristics(connection):
780 self._set_connection_characteristics(
781 connection, characteristics
782 )
783
784 def set_connection_execution_options(
785 self, connection: Connection, opts: Mapping[str, Any]
786 ) -> None:
787 supported_names = set(self.connection_characteristics).intersection(
788 opts
789 )
790 if supported_names:
791 characteristics: Mapping[str, Any] = util.immutabledict(
792 (name, opts[name]) for name in supported_names
793 )
794 self._set_connection_characteristics(connection, characteristics)
795
796 def _set_connection_characteristics(self, connection, characteristics):
797 characteristic_values = [
798 (name, self.connection_characteristics[name], value)
799 for name, value in characteristics.items()
800 ]
801
802 if connection.in_transaction():
803 trans_objs = [
804 (name, obj)
805 for name, obj, _ in characteristic_values
806 if obj.transactional
807 ]
808 if trans_objs:
809 raise exc.InvalidRequestError(
810 "This connection has already initialized a SQLAlchemy "
811 "Transaction() object via begin() or autobegin; "
812 "%s may not be altered unless rollback() or commit() "
813 "is called first."
814 % (", ".join(name for name, obj in trans_objs))
815 )
816
817 dbapi_connection = connection.connection.dbapi_connection
818 for _, characteristic, value in characteristic_values:
819 characteristic.set_connection_characteristic(
820 self, connection, dbapi_connection, value
821 )
822 connection.connection._connection_record.finalize_callback.append(
823 functools.partial(self._reset_characteristics, characteristics)
824 )
825
826 def _reset_characteristics(self, characteristics, dbapi_connection):
827 for characteristic_name in characteristics:
828 characteristic = self.connection_characteristics[
829 characteristic_name
830 ]
831 characteristic.reset_characteristic(self, dbapi_connection)
832
833 def do_begin(self, dbapi_connection):
834 pass
835
836 def do_rollback(self, dbapi_connection):
837 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
838 dbapi_connection
839 ):
840 return
841 dbapi_connection.rollback()
842
843 def do_commit(self, dbapi_connection):
844 dbapi_connection.commit()
845
846 def do_terminate(self, dbapi_connection):
847 self.do_close(dbapi_connection)
848
849 def do_close(self, dbapi_connection):
850 dbapi_connection.close()
851
852 @util.memoized_property
853 def _dialect_specific_select_one(self):
854 return str(expression.select(1).compile(dialect=self))
855
856 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
857 try:
858 return self.do_ping(dbapi_connection)
859 except self.loaded_dbapi.Error as err:
860 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
861
862 if self._has_events:
863 try:
864 Connection._handle_dbapi_exception_noconnection(
865 err,
866 self,
867 is_disconnect=is_disconnect,
868 invalidate_pool_on_disconnect=False,
869 is_pre_ping=True,
870 )
871 except exc.StatementError as new_err:
872 is_disconnect = new_err.connection_invalidated
873
874 if is_disconnect:
875 return False
876 else:
877 raise
878
879 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
880 cursor = dbapi_connection.cursor()
881 try:
882 cursor.execute(self._dialect_specific_select_one)
883 finally:
884 cursor.close()
885 return True
886
887 def create_xid(self):
888 """Create a random two-phase transaction ID.
889
890 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
891 do_commit_twophase(). Its format is unspecified.
892 """
893
894 return "_sa_%032x" % random.randint(0, 2**128)
895
896 def do_savepoint(self, connection, name):
897 connection.execute(expression.SavepointClause(name))
898
899 def do_rollback_to_savepoint(self, connection, name):
900 connection.execute(expression.RollbackToSavepointClause(name))
901
902 def do_release_savepoint(self, connection, name):
903 connection.execute(expression.ReleaseSavepointClause(name))
904
905 def _deliver_insertmanyvalues_batches(
906 self,
907 connection,
908 cursor,
909 statement,
910 parameters,
911 generic_setinputsizes,
912 context,
913 ):
914 context = cast(DefaultExecutionContext, context)
915 compiled = cast(SQLCompiler, context.compiled)
916
917 _composite_sentinel_proc: Sequence[
918 Optional[_ResultProcessorType[Any]]
919 ] = ()
920 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
921 _sentinel_proc_initialized: bool = False
922
923 compiled_parameters = context.compiled_parameters
924
925 imv = compiled._insertmanyvalues
926 assert imv is not None
927
928 is_returning: Final[bool] = bool(compiled.effective_returning)
929 batch_size = context.execution_options.get(
930 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
931 )
932
933 if compiled.schema_translate_map:
934 schema_translate_map = context.execution_options.get(
935 "schema_translate_map", {}
936 )
937 else:
938 schema_translate_map = None
939
940 if is_returning:
941 result: Optional[List[Any]] = []
942 context._insertmanyvalues_rows = result
943
944 sort_by_parameter_order = imv.sort_by_parameter_order
945
946 else:
947 sort_by_parameter_order = False
948 result = None
949
950 for imv_batch in compiled._deliver_insertmanyvalues_batches(
951 statement,
952 parameters,
953 compiled_parameters,
954 generic_setinputsizes,
955 batch_size,
956 sort_by_parameter_order,
957 schema_translate_map,
958 ):
959 yield imv_batch
960
961 if is_returning:
962
963 try:
964 rows = context.fetchall_for_returning(cursor)
965 except BaseException as be:
966 connection._handle_dbapi_exception(
967 be,
968 sql_util._long_statement(imv_batch.replaced_statement),
969 imv_batch.replaced_parameters,
970 None,
971 context,
972 is_sub_exec=True,
973 )
974
975 # I would have thought "is_returning: Final[bool]"
976 # would have assured this but pylance thinks not
977 assert result is not None
978
979 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
980 composite_sentinel = imv.num_sentinel_columns > 1
981 if imv.implicit_sentinel:
982 # for implicit sentinel, which is currently single-col
983 # integer autoincrement, do a simple sort.
984 assert not composite_sentinel
985 result.extend(
986 sorted(rows, key=operator.itemgetter(-1))
987 )
988 continue
989
990 # otherwise, create dictionaries to match up batches
991 # with parameters
992 assert imv.sentinel_param_keys
993 assert imv.sentinel_columns
994
995 _nsc = imv.num_sentinel_columns
996
997 if not _sentinel_proc_initialized:
998 if composite_sentinel:
999 _composite_sentinel_proc = [
1000 col.type._cached_result_processor(
1001 self, cursor_desc[1]
1002 )
1003 for col, cursor_desc in zip(
1004 imv.sentinel_columns,
1005 cursor.description[-_nsc:],
1006 )
1007 ]
1008 else:
1009 _scalar_sentinel_proc = (
1010 imv.sentinel_columns[0]
1011 ).type._cached_result_processor(
1012 self, cursor.description[-1][1]
1013 )
1014 _sentinel_proc_initialized = True
1015
1016 rows_by_sentinel: Union[
1017 Dict[Tuple[Any, ...], Any],
1018 Dict[Any, Any],
1019 ]
1020
1021 if composite_sentinel:
1022 rows_by_sentinel = {
1023 tuple(
1024 (proc(val) if proc else val)
1025 for val, proc in zip(
1026 row[-_nsc:], _composite_sentinel_proc
1027 )
1028 ): row
1029 for row in rows
1030 }
1031 elif _scalar_sentinel_proc:
1032 rows_by_sentinel = {
1033 _scalar_sentinel_proc(row[-1]): row for row in rows
1034 }
1035 else:
1036 rows_by_sentinel = {row[-1]: row for row in rows}
1037
1038 if len(rows_by_sentinel) != len(imv_batch.batch):
1039 # see test_insert_exec.py::
1040 # IMVSentinelTest::test_sentinel_incorrect_rowcount
1041 # for coverage / demonstration
1042 raise exc.InvalidRequestError(
1043 f"Sentinel-keyed result set did not produce "
1044 f"correct number of rows {len(imv_batch.batch)}; "
1045 "produced "
1046 f"{len(rows_by_sentinel)}. Please ensure the "
1047 "sentinel column is fully unique and populated in "
1048 "all cases."
1049 )
1050
1051 try:
1052 ordered_rows = [
1053 rows_by_sentinel[sentinel_keys]
1054 for sentinel_keys in imv_batch.sentinel_values
1055 ]
1056 except KeyError as ke:
1057 # see test_insert_exec.py::
1058 # IMVSentinelTest::test_sentinel_cant_match_keys
1059 # for coverage / demonstration
1060 raise exc.InvalidRequestError(
1061 f"Can't match sentinel values in result set to "
1062 f"parameter sets; key {ke.args[0]!r} was not "
1063 "found. "
1064 "There may be a mismatch between the datatype "
1065 "passed to the DBAPI driver vs. that which it "
1066 "returns in a result row. Ensure the given "
1067 "Python value matches the expected result type "
1068 "*exactly*, taking care to not rely upon implicit "
1069 "conversions which may occur such as when using "
1070 "strings in place of UUID or integer values, etc. "
1071 ) from ke
1072
1073 result.extend(ordered_rows)
1074
1075 else:
1076 result.extend(rows)
1077
1078 def do_executemany(self, cursor, statement, parameters, context=None):
1079 cursor.executemany(statement, parameters)
1080
1081 def do_execute(self, cursor, statement, parameters, context=None):
1082 cursor.execute(statement, parameters)
1083
1084 def do_execute_no_params(self, cursor, statement, context=None):
1085 cursor.execute(statement)
1086
1087 def is_disconnect(
1088 self,
1089 e: DBAPIModule.Error,
1090 connection: Union[
1091 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
1092 ],
1093 cursor: Optional[interfaces.DBAPICursor],
1094 ) -> bool:
1095 return False
1096
1097 @util.memoized_instancemethod
1098 def _gen_allowed_isolation_levels(self, dbapi_conn):
1099 try:
1100 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
1101 except NotImplementedError:
1102 return None
1103 else:
1104 normalized_levels = [
1105 level.replace("_", " ").upper() for level in raw_levels
1106 ]
1107 if raw_levels != normalized_levels:
1108 raise ValueError(
1109 f"Dialect {self.name!r} get_isolation_level_values() "
1110 f"method should return names as UPPERCASE using spaces, "
1111 f"not underscores; got "
1112 f"{sorted(set(raw_levels).difference(normalized_levels))}"
1113 )
1114 return tuple(normalized_levels)
1115
1116 def _assert_and_set_isolation_level(self, dbapi_conn, level):
1117 level = level.replace("_", " ").upper()
1118
1119 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
1120 dbapi_conn
1121 )
1122 if (
1123 _allowed_isolation_levels
1124 and level not in _allowed_isolation_levels
1125 ):
1126 raise exc.ArgumentError(
1127 f"Invalid value {level!r} for isolation_level. "
1128 f"Valid isolation levels for {self.name!r} are "
1129 f"{', '.join(_allowed_isolation_levels)}"
1130 )
1131
1132 self.set_isolation_level(dbapi_conn, level)
1133
1134 def reset_isolation_level(self, dbapi_conn):
1135 if self._on_connect_isolation_level is not None:
1136 assert (
1137 self._on_connect_isolation_level == "AUTOCOMMIT"
1138 or self._on_connect_isolation_level
1139 == self.default_isolation_level
1140 )
1141 self._assert_and_set_isolation_level(
1142 dbapi_conn, self._on_connect_isolation_level
1143 )
1144 else:
1145 assert self.default_isolation_level is not None
1146 self._assert_and_set_isolation_level(
1147 dbapi_conn,
1148 self.default_isolation_level,
1149 )
1150
1151 def normalize_name(self, name):
1152 if name is None:
1153 return None
1154
1155 name_lower = name.lower()
1156 name_upper = name.upper()
1157
1158 if name_upper == name_lower:
1159 # name has no upper/lower conversion, e.g. non-european characters.
1160 # return unchanged
1161 return name
1162 elif name_upper == name and not (
1163 self.identifier_preparer._requires_quotes
1164 )(name_lower):
1165 # name is all uppercase and doesn't require quoting; normalize
1166 # to all lower case
1167 return name_lower
1168 elif name_lower == name:
1169 # name is all lower case, which if denormalized means we need to
1170 # force quoting on it
1171 return quoted_name(name, quote=True)
1172 else:
1173 # name is mixed case, means it will be quoted in SQL when used
1174 # later, no normalizes
1175 return name
1176
1177 def denormalize_name(self, name):
1178 if name is None:
1179 return None
1180
1181 name_lower = name.lower()
1182 name_upper = name.upper()
1183
1184 if name_upper == name_lower:
1185 # name has no upper/lower conversion, e.g. non-european characters.
1186 # return unchanged
1187 return name
1188 elif name_lower == name and not (
1189 self.identifier_preparer._requires_quotes
1190 )(name_lower):
1191 name = name_upper
1192 return name
1193
1194 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1195 return connection
1196
1197 def _overrides_default(self, method):
1198 return (
1199 getattr(type(self), method).__code__
1200 is not getattr(DefaultDialect, method).__code__
1201 )
1202
1203 def _default_multi_reflect(
1204 self,
1205 single_tbl_method,
1206 connection,
1207 kind,
1208 schema,
1209 filter_names,
1210 scope,
1211 **kw,
1212 ):
1213 names_fns = []
1214 temp_names_fns = []
1215 if ObjectKind.TABLE in kind:
1216 names_fns.append(self.get_table_names)
1217 temp_names_fns.append(self.get_temp_table_names)
1218 if ObjectKind.VIEW in kind:
1219 names_fns.append(self.get_view_names)
1220 temp_names_fns.append(self.get_temp_view_names)
1221 if ObjectKind.MATERIALIZED_VIEW in kind:
1222 names_fns.append(self.get_materialized_view_names)
1223 # no temp materialized view at the moment
1224 # temp_names_fns.append(self.get_temp_materialized_view_names)
1225
1226 unreflectable = kw.pop("unreflectable", {})
1227
1228 if (
1229 filter_names
1230 and scope is ObjectScope.ANY
1231 and kind is ObjectKind.ANY
1232 ):
1233 # if names are given and no qualification on type of table
1234 # (i.e. the Table(..., autoload) case), take the names as given,
1235 # don't run names queries. If a table does not exit
1236 # NoSuchTableError is raised and it's skipped
1237
1238 # this also suits the case for mssql where we can reflect
1239 # individual temp tables but there's no temp_names_fn
1240 names = filter_names
1241 else:
1242 names = []
1243 name_kw = {"schema": schema, **kw}
1244 fns = []
1245 if ObjectScope.DEFAULT in scope:
1246 fns.extend(names_fns)
1247 if ObjectScope.TEMPORARY in scope:
1248 fns.extend(temp_names_fns)
1249
1250 for fn in fns:
1251 try:
1252 names.extend(fn(connection, **name_kw))
1253 except NotImplementedError:
1254 pass
1255
1256 if filter_names:
1257 filter_names = set(filter_names)
1258
1259 # iterate over all the tables/views and call the single table method
1260 for table in names:
1261 if not filter_names or table in filter_names:
1262 key = (schema, table)
1263 try:
1264 yield (
1265 key,
1266 single_tbl_method(
1267 connection, table, schema=schema, **kw
1268 ),
1269 )
1270 except exc.UnreflectableTableError as err:
1271 if key not in unreflectable:
1272 unreflectable[key] = err
1273 except exc.NoSuchTableError:
1274 pass
1275
1276 def get_multi_table_options(self, connection, **kw):
1277 return self._default_multi_reflect(
1278 self.get_table_options, connection, **kw
1279 )
1280
1281 def get_multi_columns(self, connection, **kw):
1282 return self._default_multi_reflect(self.get_columns, connection, **kw)
1283
1284 def get_multi_pk_constraint(self, connection, **kw):
1285 return self._default_multi_reflect(
1286 self.get_pk_constraint, connection, **kw
1287 )
1288
1289 def get_multi_foreign_keys(self, connection, **kw):
1290 return self._default_multi_reflect(
1291 self.get_foreign_keys, connection, **kw
1292 )
1293
1294 def get_multi_indexes(self, connection, **kw):
1295 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1296
1297 def get_multi_unique_constraints(self, connection, **kw):
1298 return self._default_multi_reflect(
1299 self.get_unique_constraints, connection, **kw
1300 )
1301
1302 def get_multi_check_constraints(self, connection, **kw):
1303 return self._default_multi_reflect(
1304 self.get_check_constraints, connection, **kw
1305 )
1306
1307 def get_multi_table_comment(self, connection, **kw):
1308 return self._default_multi_reflect(
1309 self.get_table_comment, connection, **kw
1310 )
1311
1312
1313class StrCompileDialect(DefaultDialect):
1314 statement_compiler = compiler.StrSQLCompiler
1315 ddl_compiler = compiler.DDLCompiler
1316 type_compiler_cls = compiler.StrSQLTypeCompiler
1317 preparer = compiler.IdentifierPreparer
1318
1319 insert_returning = True
1320 update_returning = True
1321 delete_returning = True
1322
1323 supports_statement_cache = True
1324
1325 supports_identity_columns = True
1326
1327 supports_sequences = True
1328 sequences_optional = True
1329 preexecute_autoincrement_sequences = False
1330
1331 supports_native_boolean = True
1332
1333 supports_multivalues_insert = True
1334 supports_simple_order_by_label = True
1335
1336
1337class DefaultExecutionContext(ExecutionContext):
1338 isinsert = False
1339 isupdate = False
1340 isdelete = False
1341 is_crud = False
1342 is_text = False
1343 isddl = False
1344
1345 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1346
1347 compiled: Optional[Compiled] = None
1348 result_column_struct: Optional[
1349 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1350 ] = None
1351 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1352
1353 execution_options: _ExecuteOptions = util.EMPTY_DICT
1354
1355 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1356
1357 invoked_statement: Optional[Executable] = None
1358
1359 _is_implicit_returning = False
1360 _is_explicit_returning = False
1361 _is_supplemental_returning = False
1362 _is_server_side = False
1363
1364 _soft_closed = False
1365
1366 _rowcount: Optional[int] = None
1367
1368 # a hook for SQLite's translation of
1369 # result column names
1370 # NOTE: pyhive is using this hook, can't remove it :(
1371 _translate_colname: Optional[
1372 Callable[[str], Tuple[str, Optional[str]]]
1373 ] = None
1374
1375 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1376 """used by set_input_sizes().
1377
1378 This collection comes from ``ExpandedState.parameter_expansion``.
1379
1380 """
1381
1382 cache_hit = NO_CACHE_KEY
1383
1384 root_connection: Connection
1385 _dbapi_connection: PoolProxiedConnection
1386 dialect: Dialect
1387 unicode_statement: str
1388 cursor: DBAPICursor
1389 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1390 parameters: _DBAPIMultiExecuteParams
1391 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1392
1393 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1394
1395 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1396 _num_sentinel_cols: int = 0
1397
1398 @classmethod
1399 def _init_ddl(
1400 cls,
1401 dialect: Dialect,
1402 connection: Connection,
1403 dbapi_connection: PoolProxiedConnection,
1404 execution_options: _ExecuteOptions,
1405 compiled_ddl: DDLCompiler,
1406 ) -> ExecutionContext:
1407 """Initialize execution context for an ExecutableDDLElement
1408 construct."""
1409
1410 self = cls.__new__(cls)
1411 self.root_connection = connection
1412 self._dbapi_connection = dbapi_connection
1413 self.dialect = connection.dialect
1414
1415 self.compiled = compiled = compiled_ddl
1416 self.isddl = True
1417
1418 self.execution_options = execution_options
1419
1420 self.unicode_statement = str(compiled)
1421 if compiled.schema_translate_map:
1422 schema_translate_map = self.execution_options.get(
1423 "schema_translate_map", {}
1424 )
1425
1426 rst = compiled.preparer._render_schema_translates
1427 self.unicode_statement = rst(
1428 self.unicode_statement, schema_translate_map
1429 )
1430
1431 self.statement = self.unicode_statement
1432
1433 self.cursor = self.create_cursor()
1434 self.compiled_parameters = []
1435
1436 if dialect.positional:
1437 self.parameters = [dialect.execute_sequence_format()]
1438 else:
1439 self.parameters = [self._empty_dict_params]
1440
1441 return self
1442
1443 @classmethod
1444 def _init_compiled(
1445 cls,
1446 dialect: Dialect,
1447 connection: Connection,
1448 dbapi_connection: PoolProxiedConnection,
1449 execution_options: _ExecuteOptions,
1450 compiled: SQLCompiler,
1451 parameters: _CoreMultiExecuteParams,
1452 invoked_statement: Executable,
1453 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1454 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1455 param_dict: _CoreSingleExecuteParams | None = None,
1456 ) -> ExecutionContext:
1457 """Initialize execution context for a Compiled construct."""
1458
1459 self = cls.__new__(cls)
1460 self.root_connection = connection
1461 self._dbapi_connection = dbapi_connection
1462 self.dialect = connection.dialect
1463 self.extracted_parameters = extracted_parameters
1464 self.invoked_statement = invoked_statement
1465 self.compiled = compiled
1466 self.cache_hit = cache_hit
1467
1468 self.execution_options = execution_options
1469
1470 self.result_column_struct = (
1471 compiled._result_columns,
1472 compiled._ordered_columns,
1473 compiled._textual_ordered_columns,
1474 compiled._ad_hoc_textual,
1475 compiled._loose_column_name_matching,
1476 )
1477
1478 self.isinsert = ii = compiled.isinsert
1479 self.isupdate = iu = compiled.isupdate
1480 self.isdelete = id_ = compiled.isdelete
1481 self.is_text = compiled.isplaintext
1482
1483 if ii or iu or id_:
1484 dml_statement = compiled.compile_state.statement # type: ignore
1485 if TYPE_CHECKING:
1486 assert isinstance(dml_statement, UpdateBase)
1487 self.is_crud = True
1488 self._is_explicit_returning = ier = bool(dml_statement._returning)
1489 self._is_implicit_returning = iir = bool(
1490 compiled.implicit_returning
1491 )
1492 if iir and dml_statement._supplemental_returning:
1493 self._is_supplemental_returning = True
1494
1495 # dont mix implicit and explicit returning
1496 assert not (iir and ier)
1497
1498 if (ier or iir) and compiled.for_executemany:
1499 if ii and not self.dialect.insert_executemany_returning:
1500 raise exc.InvalidRequestError(
1501 f"Dialect {self.dialect.dialect_description} with "
1502 f"current server capabilities does not support "
1503 "INSERT..RETURNING when executemany is used"
1504 )
1505 elif (
1506 ii
1507 and dml_statement._sort_by_parameter_order
1508 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1509 ):
1510 raise exc.InvalidRequestError(
1511 f"Dialect {self.dialect.dialect_description} with "
1512 f"current server capabilities does not support "
1513 "INSERT..RETURNING with deterministic row ordering "
1514 "when executemany is used"
1515 )
1516 elif (
1517 ii
1518 and self.dialect.use_insertmanyvalues
1519 and not compiled._insertmanyvalues
1520 ):
1521 raise exc.InvalidRequestError(
1522 'Statement does not have "insertmanyvalues" '
1523 "enabled, can't use INSERT..RETURNING with "
1524 "executemany in this case."
1525 )
1526 elif iu and not self.dialect.update_executemany_returning:
1527 raise exc.InvalidRequestError(
1528 f"Dialect {self.dialect.dialect_description} with "
1529 f"current server capabilities does not support "
1530 "UPDATE..RETURNING when executemany is used"
1531 )
1532 elif id_ and not self.dialect.delete_executemany_returning:
1533 raise exc.InvalidRequestError(
1534 f"Dialect {self.dialect.dialect_description} with "
1535 f"current server capabilities does not support "
1536 "DELETE..RETURNING when executemany is used"
1537 )
1538
1539 if not parameters:
1540 self.compiled_parameters = [
1541 compiled.construct_params(
1542 extracted_parameters=extracted_parameters,
1543 escape_names=False,
1544 _collected_params=param_dict,
1545 )
1546 ]
1547 else:
1548 self.compiled_parameters = [
1549 compiled.construct_params(
1550 m,
1551 escape_names=False,
1552 _group_number=grp,
1553 extracted_parameters=extracted_parameters,
1554 _collected_params=param_dict,
1555 )
1556 for grp, m in enumerate(parameters)
1557 ]
1558
1559 if len(parameters) > 1:
1560 if self.isinsert and compiled._insertmanyvalues:
1561 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1562
1563 imv = compiled._insertmanyvalues
1564 if imv.sentinel_columns is not None:
1565 self._num_sentinel_cols = imv.num_sentinel_columns
1566 else:
1567 self.execute_style = ExecuteStyle.EXECUTEMANY
1568
1569 self.unicode_statement = compiled.string
1570
1571 self.cursor = self.create_cursor()
1572
1573 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1574 self._process_execute_defaults()
1575
1576 processors = compiled._bind_processors
1577
1578 flattened_processors: Mapping[
1579 str, _BindProcessorType[Any]
1580 ] = processors # type: ignore[assignment]
1581
1582 if compiled.literal_execute_params or compiled.post_compile_params:
1583 if self.executemany:
1584 raise exc.InvalidRequestError(
1585 "'literal_execute' or 'expanding' parameters can't be "
1586 "used with executemany()"
1587 )
1588
1589 expanded_state = compiled._process_parameters_for_postcompile(
1590 self.compiled_parameters[0]
1591 )
1592
1593 # re-assign self.unicode_statement
1594 self.unicode_statement = expanded_state.statement
1595
1596 self._expanded_parameters = expanded_state.parameter_expansion
1597
1598 flattened_processors = dict(processors) # type: ignore
1599 flattened_processors.update(expanded_state.processors)
1600 positiontup = expanded_state.positiontup
1601 elif compiled.positional:
1602 positiontup = self.compiled.positiontup
1603 else:
1604 positiontup = None
1605
1606 if compiled.schema_translate_map:
1607 schema_translate_map = self.execution_options.get(
1608 "schema_translate_map", {}
1609 )
1610 rst = compiled.preparer._render_schema_translates
1611 self.unicode_statement = rst(
1612 self.unicode_statement, schema_translate_map
1613 )
1614
1615 # final self.unicode_statement is now assigned, encode if needed
1616 # by dialect
1617 self.statement = self.unicode_statement
1618
1619 # Convert the dictionary of bind parameter values
1620 # into a dict or list to be sent to the DBAPI's
1621 # execute() or executemany() method.
1622
1623 if compiled.positional:
1624 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1625 assert positiontup is not None
1626 for compiled_params in self.compiled_parameters:
1627 l_param: List[Any] = [
1628 (
1629 flattened_processors[key](compiled_params[key])
1630 if key in flattened_processors
1631 else compiled_params[key]
1632 )
1633 for key in positiontup
1634 ]
1635 core_positional_parameters.append(
1636 dialect.execute_sequence_format(l_param)
1637 )
1638
1639 self.parameters = core_positional_parameters
1640 else:
1641 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1642 escaped_names = compiled.escaped_bind_names
1643
1644 # note that currently, "expanded" parameters will be present
1645 # in self.compiled_parameters in their quoted form. This is
1646 # slightly inconsistent with the approach taken as of
1647 # #8056 where self.compiled_parameters is meant to contain unquoted
1648 # param names.
1649 d_param: Dict[str, Any]
1650 for compiled_params in self.compiled_parameters:
1651 if escaped_names:
1652 d_param = {
1653 escaped_names.get(key, key): (
1654 flattened_processors[key](compiled_params[key])
1655 if key in flattened_processors
1656 else compiled_params[key]
1657 )
1658 for key in compiled_params
1659 }
1660 else:
1661 d_param = {
1662 key: (
1663 flattened_processors[key](compiled_params[key])
1664 if key in flattened_processors
1665 else compiled_params[key]
1666 )
1667 for key in compiled_params
1668 }
1669
1670 core_dict_parameters.append(d_param)
1671
1672 self.parameters = core_dict_parameters
1673
1674 return self
1675
1676 @classmethod
1677 def _init_statement(
1678 cls,
1679 dialect: Dialect,
1680 connection: Connection,
1681 dbapi_connection: PoolProxiedConnection,
1682 execution_options: _ExecuteOptions,
1683 statement: str,
1684 parameters: _DBAPIMultiExecuteParams,
1685 ) -> ExecutionContext:
1686 """Initialize execution context for a string SQL statement."""
1687
1688 self = cls.__new__(cls)
1689 self.root_connection = connection
1690 self._dbapi_connection = dbapi_connection
1691 self.dialect = connection.dialect
1692 self.is_text = True
1693
1694 self.execution_options = execution_options
1695
1696 if not parameters:
1697 if self.dialect.positional:
1698 self.parameters = [dialect.execute_sequence_format()]
1699 else:
1700 self.parameters = [self._empty_dict_params]
1701 elif isinstance(parameters[0], dialect.execute_sequence_format):
1702 self.parameters = parameters
1703 elif isinstance(parameters[0], dict):
1704 self.parameters = parameters
1705 else:
1706 self.parameters = [
1707 dialect.execute_sequence_format(p) for p in parameters
1708 ]
1709
1710 if len(parameters) > 1:
1711 self.execute_style = ExecuteStyle.EXECUTEMANY
1712
1713 self.statement = self.unicode_statement = statement
1714
1715 self.cursor = self.create_cursor()
1716 return self
1717
1718 @classmethod
1719 def _init_default(
1720 cls,
1721 dialect: Dialect,
1722 connection: Connection,
1723 dbapi_connection: PoolProxiedConnection,
1724 execution_options: _ExecuteOptions,
1725 ) -> ExecutionContext:
1726 """Initialize execution context for a ColumnDefault construct."""
1727
1728 self = cls.__new__(cls)
1729 self.root_connection = connection
1730 self._dbapi_connection = dbapi_connection
1731 self.dialect = connection.dialect
1732
1733 self.execution_options = execution_options
1734
1735 self.cursor = self.create_cursor()
1736 return self
1737
1738 def _get_cache_stats(self) -> str:
1739 if self.compiled is None:
1740 return "raw sql"
1741
1742 now = perf_counter()
1743
1744 ch = self.cache_hit
1745
1746 gen_time = self.compiled._gen_time
1747 assert gen_time is not None
1748
1749 if ch is NO_CACHE_KEY:
1750 return "no key %.5fs" % (now - gen_time,)
1751 elif ch is CACHE_HIT:
1752 return "cached since %.4gs ago" % (now - gen_time,)
1753 elif ch is CACHE_MISS:
1754 return "generated in %.5fs" % (now - gen_time,)
1755 elif ch is CACHING_DISABLED:
1756 if "_cache_disable_reason" in self.execution_options:
1757 return "caching disabled (%s) %.5fs " % (
1758 self.execution_options["_cache_disable_reason"],
1759 now - gen_time,
1760 )
1761 else:
1762 return "caching disabled %.5fs" % (now - gen_time,)
1763 elif ch is NO_DIALECT_SUPPORT:
1764 return "dialect %s+%s does not support caching %.5fs" % (
1765 self.dialect.name,
1766 self.dialect.driver,
1767 now - gen_time,
1768 )
1769 else:
1770 return "unknown"
1771
1772 @property
1773 def executemany(self): # type: ignore[override]
1774 return self.execute_style in (
1775 ExecuteStyle.EXECUTEMANY,
1776 ExecuteStyle.INSERTMANYVALUES,
1777 )
1778
1779 @util.memoized_property
1780 def identifier_preparer(self):
1781 if self.compiled:
1782 return self.compiled.preparer
1783 elif "schema_translate_map" in self.execution_options:
1784 return self.dialect.identifier_preparer._with_schema_translate(
1785 self.execution_options["schema_translate_map"]
1786 )
1787 else:
1788 return self.dialect.identifier_preparer
1789
1790 @util.memoized_property
1791 def engine(self):
1792 return self.root_connection.engine
1793
1794 @util.memoized_property
1795 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1796 if TYPE_CHECKING:
1797 assert isinstance(self.compiled, SQLCompiler)
1798 return self.compiled.postfetch
1799
1800 @util.memoized_property
1801 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1802 if TYPE_CHECKING:
1803 assert isinstance(self.compiled, SQLCompiler)
1804 if self.isinsert:
1805 return self.compiled.insert_prefetch
1806 elif self.isupdate:
1807 return self.compiled.update_prefetch
1808 else:
1809 return ()
1810
1811 @util.memoized_property
1812 def no_parameters(self):
1813 return self.execution_options.get("no_parameters", False)
1814
1815 def _execute_scalar(
1816 self,
1817 stmt: str,
1818 type_: Optional[TypeEngine[Any]],
1819 parameters: Optional[_DBAPISingleExecuteParams] = None,
1820 ) -> Any:
1821 """Execute a string statement on the current cursor, returning a
1822 scalar result.
1823
1824 Used to fire off sequences, default phrases, and "select lastrowid"
1825 types of statements individually or in the context of a parent INSERT
1826 or UPDATE statement.
1827
1828 """
1829
1830 conn = self.root_connection
1831
1832 if "schema_translate_map" in self.execution_options:
1833 schema_translate_map = self.execution_options.get(
1834 "schema_translate_map", {}
1835 )
1836
1837 rst = self.identifier_preparer._render_schema_translates
1838 stmt = rst(stmt, schema_translate_map)
1839
1840 if not parameters:
1841 if self.dialect.positional:
1842 parameters = self.dialect.execute_sequence_format()
1843 else:
1844 parameters = {}
1845
1846 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1847 row = self.cursor.fetchone()
1848 if row is not None:
1849 r = row[0]
1850 else:
1851 r = None
1852 if type_ is not None:
1853 # apply type post processors to the result
1854 proc = type_._cached_result_processor(
1855 self.dialect, self.cursor.description[0][1]
1856 )
1857 if proc:
1858 return proc(r)
1859 return r
1860
1861 @util.memoized_property
1862 def connection(self):
1863 return self.root_connection
1864
1865 def _use_server_side_cursor(self):
1866 if not self.dialect.supports_server_side_cursors:
1867 return False
1868
1869 if self.dialect.server_side_cursors:
1870 # this is deprecated
1871 use_server_side = self.execution_options.get(
1872 "stream_results", True
1873 ) and (
1874 self.compiled
1875 and isinstance(self.compiled.statement, expression.Selectable)
1876 or (
1877 (
1878 not self.compiled
1879 or isinstance(
1880 self.compiled.statement, expression.TextClause
1881 )
1882 )
1883 and self.unicode_statement
1884 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1885 )
1886 )
1887 else:
1888 use_server_side = self.execution_options.get(
1889 "stream_results", False
1890 )
1891
1892 return use_server_side
1893
1894 def create_cursor(self) -> DBAPICursor:
1895 if (
1896 # inlining initial preference checks for SS cursors
1897 self.dialect.supports_server_side_cursors
1898 and (
1899 self.execution_options.get("stream_results", False)
1900 or (
1901 self.dialect.server_side_cursors
1902 and self._use_server_side_cursor()
1903 )
1904 )
1905 ):
1906 self._is_server_side = True
1907 return self.create_server_side_cursor()
1908 else:
1909 self._is_server_side = False
1910 return self.create_default_cursor()
1911
1912 def fetchall_for_returning(self, cursor):
1913 return cursor.fetchall()
1914
1915 def create_default_cursor(self) -> DBAPICursor:
1916 return self._dbapi_connection.cursor()
1917
1918 def create_server_side_cursor(self) -> DBAPICursor:
1919 raise NotImplementedError()
1920
1921 def pre_exec(self):
1922 pass
1923
1924 def get_out_parameter_values(self, names):
1925 raise NotImplementedError(
1926 "This dialect does not support OUT parameters"
1927 )
1928
1929 def post_exec(self):
1930 pass
1931
1932 def get_result_processor(
1933 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1934 ) -> Optional[_ResultProcessorType[Any]]:
1935 """Return a 'result processor' for a given type as present in
1936 cursor.description.
1937
1938 This has a default implementation that dialects can override
1939 for context-sensitive result type handling.
1940
1941 """
1942 return type_._cached_result_processor(self.dialect, coltype)
1943
1944 def get_lastrowid(self) -> int:
1945 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1946
1947 This may involve calling special cursor functions, issuing a new SELECT
1948 on the cursor (or a new one), or returning a stored value that was
1949 calculated within post_exec().
1950
1951 This function will only be called for dialects which support "implicit"
1952 primary key generation, keep preexecute_autoincrement_sequences set to
1953 False, and when no explicit id value was bound to the statement.
1954
1955 The function is called once for an INSERT statement that would need to
1956 return the last inserted primary key for those dialects that make use
1957 of the lastrowid concept. In these cases, it is called directly after
1958 :meth:`.ExecutionContext.post_exec`.
1959
1960 """
1961 return self.cursor.lastrowid
1962
1963 def handle_dbapi_exception(self, e):
1964 pass
1965
1966 @util.non_memoized_property
1967 def rowcount(self) -> int:
1968 if self._rowcount is not None:
1969 return self._rowcount
1970 else:
1971 return self.cursor.rowcount
1972
1973 @property
1974 def _has_rowcount(self):
1975 return self._rowcount is not None
1976
1977 def supports_sane_rowcount(self):
1978 return self.dialect.supports_sane_rowcount
1979
1980 def supports_sane_multi_rowcount(self):
1981 return self.dialect.supports_sane_multi_rowcount
1982
1983 def _setup_result_proxy(self):
1984 exec_opt = self.execution_options
1985
1986 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1987 self._rowcount = self.cursor.rowcount
1988
1989 yp: Optional[Union[int, bool]]
1990 if self.is_crud or self.is_text:
1991 result = self._setup_dml_or_text_result()
1992 yp = False
1993 else:
1994 yp = exec_opt.get("yield_per", None)
1995 sr = self._is_server_side or exec_opt.get("stream_results", False)
1996 strategy = self.cursor_fetch_strategy
1997 if sr and strategy is _cursor._DEFAULT_FETCH:
1998 strategy = _cursor.BufferedRowCursorFetchStrategy(
1999 self.cursor, self.execution_options
2000 )
2001 cursor_description: _DBAPICursorDescription = (
2002 strategy.alternate_cursor_description
2003 or self.cursor.description
2004 )
2005 if cursor_description is None:
2006 strategy = _cursor._NO_CURSOR_DQL
2007
2008 result = _cursor.CursorResult(self, strategy, cursor_description)
2009
2010 compiled = self.compiled
2011
2012 if (
2013 compiled
2014 and not self.isddl
2015 and cast(SQLCompiler, compiled).has_out_parameters
2016 ):
2017 self._setup_out_parameters(result)
2018
2019 self._soft_closed = result._soft_closed
2020
2021 if yp:
2022 result = result.yield_per(yp)
2023
2024 return result
2025
2026 def _setup_out_parameters(self, result):
2027 compiled = cast(SQLCompiler, self.compiled)
2028
2029 out_bindparams = [
2030 (param, name)
2031 for param, name in compiled.bind_names.items()
2032 if param.isoutparam
2033 ]
2034 out_parameters = {}
2035
2036 for bindparam, raw_value in zip(
2037 [param for param, name in out_bindparams],
2038 self.get_out_parameter_values(
2039 [name for param, name in out_bindparams]
2040 ),
2041 ):
2042 type_ = bindparam.type
2043 impl_type = type_.dialect_impl(self.dialect)
2044 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
2045 result_processor = impl_type.result_processor(
2046 self.dialect, dbapi_type
2047 )
2048 if result_processor is not None:
2049 raw_value = result_processor(raw_value)
2050 out_parameters[bindparam.key] = raw_value
2051
2052 result.out_parameters = out_parameters
2053
2054 def _setup_dml_or_text_result(self):
2055 compiled = cast(SQLCompiler, self.compiled)
2056
2057 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
2058
2059 if self.isinsert:
2060 if (
2061 self.execute_style is ExecuteStyle.INSERTMANYVALUES
2062 and compiled.effective_returning
2063 ):
2064 strategy = _cursor.FullyBufferedCursorFetchStrategy(
2065 self.cursor,
2066 initial_buffer=self._insertmanyvalues_rows,
2067 # maintain alt cursor description if set by the
2068 # dialect, e.g. mssql preserves it
2069 alternate_description=(
2070 strategy.alternate_cursor_description
2071 ),
2072 )
2073
2074 if compiled.postfetch_lastrowid:
2075 self.inserted_primary_key_rows = (
2076 self._setup_ins_pk_from_lastrowid()
2077 )
2078 # else if not self._is_implicit_returning,
2079 # the default inserted_primary_key_rows accessor will
2080 # return an "empty" primary key collection when accessed.
2081
2082 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
2083 strategy = _cursor.BufferedRowCursorFetchStrategy(
2084 self.cursor, self.execution_options
2085 )
2086
2087 if strategy is _cursor._NO_CURSOR_DML:
2088 cursor_description = None
2089 else:
2090 cursor_description = (
2091 strategy.alternate_cursor_description
2092 or self.cursor.description
2093 )
2094
2095 if cursor_description is None:
2096 strategy = _cursor._NO_CURSOR_DML
2097 elif self._num_sentinel_cols:
2098 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
2099 # the sentinel columns are handled in CursorResult._init_metadata
2100 # using essentially _reduce
2101
2102 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
2103 self, strategy, cursor_description
2104 )
2105
2106 if self.isinsert:
2107 if self._is_implicit_returning:
2108 rows = result.all()
2109
2110 self.returned_default_rows = rows
2111
2112 self.inserted_primary_key_rows = (
2113 self._setup_ins_pk_from_implicit_returning(result, rows)
2114 )
2115
2116 # test that it has a cursor metadata that is accurate. the
2117 # first row will have been fetched and current assumptions
2118 # are that the result has only one row, until executemany()
2119 # support is added here.
2120 assert result._metadata.returns_rows
2121
2122 # Insert statement has both return_defaults() and
2123 # returning(). rewind the result on the list of rows
2124 # we just used.
2125 if self._is_supplemental_returning:
2126 result._rewind(rows)
2127 else:
2128 result._soft_close()
2129 elif not self._is_explicit_returning:
2130 result._soft_close()
2131
2132 # we assume here the result does not return any rows.
2133 # *usually*, this will be true. However, some dialects
2134 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2135 # function so this is not necessarily true.
2136 # assert not result.returns_rows
2137
2138 elif self._is_implicit_returning:
2139 rows = result.all()
2140
2141 if rows:
2142 self.returned_default_rows = rows
2143 self._rowcount = len(rows)
2144
2145 if self._is_supplemental_returning:
2146 result._rewind(rows)
2147 else:
2148 result._soft_close()
2149
2150 # test that it has a cursor metadata that is accurate.
2151 # the rows have all been fetched however.
2152 assert result._metadata.returns_rows
2153
2154 elif not result._metadata.returns_rows:
2155 # no results, get rowcount
2156 # (which requires open cursor on some drivers)
2157 if self._rowcount is None:
2158 self._rowcount = self.cursor.rowcount
2159 result._soft_close()
2160 elif self.isupdate or self.isdelete:
2161 if self._rowcount is None:
2162 self._rowcount = self.cursor.rowcount
2163 return result
2164
2165 @util.memoized_property
2166 def inserted_primary_key_rows(self):
2167 # if no specific "get primary key" strategy was set up
2168 # during execution, return a "default" primary key based
2169 # on what's in the compiled_parameters and nothing else.
2170 return self._setup_ins_pk_from_empty()
2171
2172 def _setup_ins_pk_from_lastrowid(self):
2173 getter = cast(
2174 SQLCompiler, self.compiled
2175 )._inserted_primary_key_from_lastrowid_getter
2176 lastrowid = self.get_lastrowid()
2177 return [getter(lastrowid, self.compiled_parameters[0])]
2178
2179 def _setup_ins_pk_from_empty(self):
2180 getter = cast(
2181 SQLCompiler, self.compiled
2182 )._inserted_primary_key_from_lastrowid_getter
2183 return [getter(None, param) for param in self.compiled_parameters]
2184
2185 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2186 if not rows:
2187 return []
2188
2189 getter = cast(
2190 SQLCompiler, self.compiled
2191 )._inserted_primary_key_from_returning_getter
2192 compiled_params = self.compiled_parameters
2193
2194 return [
2195 getter(row, param) for row, param in zip(rows, compiled_params)
2196 ]
2197
2198 def lastrow_has_defaults(self) -> bool:
2199 return (self.isinsert or self.isupdate) and bool(
2200 cast(SQLCompiler, self.compiled).postfetch
2201 )
2202
2203 def _prepare_set_input_sizes(
2204 self,
2205 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2206 """Given a cursor and ClauseParameters, prepare arguments
2207 in order to call the appropriate
2208 style of ``setinputsizes()`` on the cursor, using DB-API types
2209 from the bind parameter's ``TypeEngine`` objects.
2210
2211 This method only called by those dialects which set the
2212 :attr:`.Dialect.bind_typing` attribute to
2213 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2214 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2215 option.
2216
2217 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2218 for pg8000 and asyncpg, which has been changed to inline rendering
2219 of casts.
2220
2221 """
2222 if self.isddl or self.is_text:
2223 return None
2224
2225 compiled = cast(SQLCompiler, self.compiled)
2226
2227 inputsizes = compiled._get_set_input_sizes_lookup()
2228
2229 if inputsizes is None:
2230 return None
2231
2232 dialect = self.dialect
2233
2234 # all of the rest of this... cython?
2235
2236 if dialect._has_events:
2237 inputsizes = dict(inputsizes)
2238 dialect.dispatch.do_setinputsizes(
2239 inputsizes, self.cursor, self.statement, self.parameters, self
2240 )
2241
2242 if compiled.escaped_bind_names:
2243 escaped_bind_names = compiled.escaped_bind_names
2244 else:
2245 escaped_bind_names = None
2246
2247 if dialect.positional:
2248 items = [
2249 (key, compiled.binds[key])
2250 for key in compiled.positiontup or ()
2251 ]
2252 else:
2253 items = [
2254 (key, bindparam)
2255 for bindparam, key in compiled.bind_names.items()
2256 ]
2257
2258 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2259 for key, bindparam in items:
2260 if bindparam in compiled.literal_execute_params:
2261 continue
2262
2263 if key in self._expanded_parameters:
2264 if is_tuple_type(bindparam.type):
2265 num = len(bindparam.type.types)
2266 dbtypes = inputsizes[bindparam]
2267 generic_inputsizes.extend(
2268 (
2269 (
2270 escaped_bind_names.get(paramname, paramname)
2271 if escaped_bind_names is not None
2272 else paramname
2273 ),
2274 dbtypes[idx % num],
2275 bindparam.type.types[idx % num],
2276 )
2277 for idx, paramname in enumerate(
2278 self._expanded_parameters[key]
2279 )
2280 )
2281 else:
2282 dbtype = inputsizes.get(bindparam, None)
2283 generic_inputsizes.extend(
2284 (
2285 (
2286 escaped_bind_names.get(paramname, paramname)
2287 if escaped_bind_names is not None
2288 else paramname
2289 ),
2290 dbtype,
2291 bindparam.type,
2292 )
2293 for paramname in self._expanded_parameters[key]
2294 )
2295 else:
2296 dbtype = inputsizes.get(bindparam, None)
2297
2298 escaped_name = (
2299 escaped_bind_names.get(key, key)
2300 if escaped_bind_names is not None
2301 else key
2302 )
2303
2304 generic_inputsizes.append(
2305 (escaped_name, dbtype, bindparam.type)
2306 )
2307
2308 return generic_inputsizes
2309
2310 def _exec_default(self, column, default, type_):
2311 if default.is_sequence:
2312 return self.fire_sequence(default, type_)
2313 elif default.is_callable:
2314 # this codepath is not normally used as it's inlined
2315 # into _process_execute_defaults
2316 self.current_column = column
2317 return default.arg(self)
2318 elif default.is_clause_element:
2319 return self._exec_default_clause_element(column, default, type_)
2320 else:
2321 # this codepath is not normally used as it's inlined
2322 # into _process_execute_defaults
2323 return default.arg
2324
2325 def _exec_default_clause_element(self, column, default, type_):
2326 # execute a default that's a complete clause element. Here, we have
2327 # to re-implement a miniature version of the compile->parameters->
2328 # cursor.execute() sequence, since we don't want to modify the state
2329 # of the connection / result in progress or create new connection/
2330 # result objects etc.
2331 # .. versionchanged:: 1.4
2332
2333 if not default._arg_is_typed:
2334 default_arg = expression.type_coerce(default.arg, type_)
2335 else:
2336 default_arg = default.arg
2337 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2338 compiled_params = compiled.construct_params()
2339 processors = compiled._bind_processors
2340 if compiled.positional:
2341 parameters = self.dialect.execute_sequence_format(
2342 [
2343 (
2344 processors[key](compiled_params[key]) # type: ignore
2345 if key in processors
2346 else compiled_params[key]
2347 )
2348 for key in compiled.positiontup or ()
2349 ]
2350 )
2351 else:
2352 parameters = {
2353 key: (
2354 processors[key](compiled_params[key]) # type: ignore
2355 if key in processors
2356 else compiled_params[key]
2357 )
2358 for key in compiled_params
2359 }
2360 return self._execute_scalar(
2361 str(compiled), type_, parameters=parameters
2362 )
2363
2364 current_parameters: Optional[_CoreSingleExecuteParams] = None
2365 """A dictionary of parameters applied to the current row.
2366
2367 This attribute is only available in the context of a user-defined default
2368 generation function, e.g. as described at :ref:`context_default_functions`.
2369 It consists of a dictionary which includes entries for each column/value
2370 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2371 dictionary will be the key value of each :class:`_schema.Column`,
2372 which is usually
2373 synonymous with the name.
2374
2375 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2376 does not accommodate for the "multi-values" feature of the
2377 :meth:`_expression.Insert.values` method. The
2378 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2379 preferred.
2380
2381 .. seealso::
2382
2383 :meth:`.DefaultExecutionContext.get_current_parameters`
2384
2385 :ref:`context_default_functions`
2386
2387 """
2388
2389 def get_current_parameters(self, isolate_multiinsert_groups=True):
2390 """Return a dictionary of parameters applied to the current row.
2391
2392 This method can only be used in the context of a user-defined default
2393 generation function, e.g. as described at
2394 :ref:`context_default_functions`. When invoked, a dictionary is
2395 returned which includes entries for each column/value pair that is part
2396 of the INSERT or UPDATE statement. The keys of the dictionary will be
2397 the key value of each :class:`_schema.Column`,
2398 which is usually synonymous
2399 with the name.
2400
2401 :param isolate_multiinsert_groups=True: indicates that multi-valued
2402 INSERT constructs created using :meth:`_expression.Insert.values`
2403 should be
2404 handled by returning only the subset of parameters that are local
2405 to the current column default invocation. When ``False``, the
2406 raw parameters of the statement are returned including the
2407 naming convention used in the case of multi-valued INSERT.
2408
2409 .. seealso::
2410
2411 :attr:`.DefaultExecutionContext.current_parameters`
2412
2413 :ref:`context_default_functions`
2414
2415 """
2416 try:
2417 parameters = self.current_parameters
2418 column = self.current_column
2419 except AttributeError:
2420 raise exc.InvalidRequestError(
2421 "get_current_parameters() can only be invoked in the "
2422 "context of a Python side column default function"
2423 )
2424 else:
2425 assert column is not None
2426 assert parameters is not None
2427 compile_state = cast(
2428 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2429 )
2430 assert compile_state is not None
2431 if (
2432 isolate_multiinsert_groups
2433 and dml.isinsert(compile_state)
2434 and compile_state._has_multi_parameters
2435 ):
2436 if column._is_multiparam_column:
2437 index = column.index + 1
2438 d = {column.original.key: parameters[column.key]}
2439 else:
2440 d = {column.key: parameters[column.key]}
2441 index = 0
2442 assert compile_state._dict_parameters is not None
2443 keys = compile_state._dict_parameters.keys()
2444 d.update(
2445 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2446 )
2447 return d
2448 else:
2449 return parameters
2450
2451 def get_insert_default(self, column):
2452 if column.default is None:
2453 return None
2454 else:
2455 return self._exec_default(column, column.default, column.type)
2456
2457 def get_update_default(self, column):
2458 if column.onupdate is None:
2459 return None
2460 else:
2461 return self._exec_default(column, column.onupdate, column.type)
2462
2463 def _process_execute_defaults(self):
2464 compiled = cast(SQLCompiler, self.compiled)
2465
2466 key_getter = compiled._within_exec_param_key_getter
2467
2468 sentinel_counter = 0
2469
2470 if compiled.insert_prefetch:
2471 prefetch_recs = [
2472 (
2473 c,
2474 key_getter(c),
2475 c._default_description_tuple,
2476 self.get_insert_default,
2477 )
2478 for c in compiled.insert_prefetch
2479 ]
2480 elif compiled.update_prefetch:
2481 prefetch_recs = [
2482 (
2483 c,
2484 key_getter(c),
2485 c._onupdate_description_tuple,
2486 self.get_update_default,
2487 )
2488 for c in compiled.update_prefetch
2489 ]
2490 else:
2491 prefetch_recs = []
2492
2493 for param in self.compiled_parameters:
2494 self.current_parameters = param
2495
2496 for (
2497 c,
2498 param_key,
2499 (arg, is_scalar, is_callable, is_sentinel),
2500 fallback,
2501 ) in prefetch_recs:
2502 if is_sentinel:
2503 param[param_key] = sentinel_counter
2504 sentinel_counter += 1
2505 elif is_scalar:
2506 param[param_key] = arg
2507 elif is_callable:
2508 self.current_column = c
2509 param[param_key] = arg(self)
2510 else:
2511 val = fallback(c)
2512 if val is not None:
2513 param[param_key] = val
2514
2515 del self.current_parameters
2516
2517
2518DefaultDialect.execution_ctx_cls = DefaultExecutionContext