1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import MutableSequence
34from typing import Optional
35from typing import Sequence
36from typing import Set
37from typing import Tuple
38from typing import Type
39from typing import TYPE_CHECKING
40from typing import Union
41import weakref
42
43from . import characteristics
44from . import cursor as _cursor
45from . import interfaces
46from .base import Connection
47from .interfaces import CacheStats
48from .interfaces import DBAPICursor
49from .interfaces import Dialect
50from .interfaces import ExecuteStyle
51from .interfaces import ExecutionContext
52from .reflection import ObjectKind
53from .reflection import ObjectScope
54from .. import event
55from .. import exc
56from .. import pool
57from .. import util
58from ..sql import compiler
59from ..sql import dml
60from ..sql import expression
61from ..sql import type_api
62from ..sql import util as sql_util
63from ..sql._typing import is_tuple_type
64from ..sql.base import _NoArg
65from ..sql.compiler import DDLCompiler
66from ..sql.compiler import InsertmanyvaluesSentinelOpts
67from ..sql.compiler import SQLCompiler
68from ..sql.elements import quoted_name
69from ..util.typing import Literal
70from ..util.typing import TupleAny
71from ..util.typing import Unpack
72
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _DBAPISingleExecuteParams
84 from .interfaces import _ExecuteOptions
85 from .interfaces import _MutableCoreSingleExecuteParams
86 from .interfaces import _ParamStyle
87 from .interfaces import ConnectArgsType
88 from .interfaces import DBAPIConnection
89 from .interfaces import DBAPIModule
90 from .interfaces import IsolationLevel
91 from .row import Row
92 from .url import URL
93 from ..event import _ListenerFnType
94 from ..pool import Pool
95 from ..pool import PoolProxiedConnection
96 from ..sql import Executable
97 from ..sql.compiler import Compiled
98 from ..sql.compiler import Linting
99 from ..sql.compiler import ResultColumnsEntry
100 from ..sql.dml import DMLState
101 from ..sql.dml import UpdateBase
102 from ..sql.elements import BindParameter
103 from ..sql.schema import Column
104 from ..sql.type_api import _BindProcessorType
105 from ..sql.type_api import _ResultProcessorType
106 from ..sql.type_api import TypeEngine
107
108
109# When we're handed literal SQL, ensure it's a SELECT query
110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
111
112
113(
114 CACHE_HIT,
115 CACHE_MISS,
116 CACHING_DISABLED,
117 NO_CACHE_KEY,
118 NO_DIALECT_SUPPORT,
119) = list(CacheStats)
120
121
122class DefaultDialect(Dialect):
123 """Default implementation of Dialect"""
124
125 statement_compiler = compiler.SQLCompiler
126 ddl_compiler = compiler.DDLCompiler
127 type_compiler_cls = compiler.GenericTypeCompiler
128
129 preparer = compiler.IdentifierPreparer
130 supports_alter = True
131 supports_comments = False
132 supports_constraint_comments = False
133 inline_comments = False
134 supports_statement_cache = True
135
136 div_is_floordiv = True
137
138 bind_typing = interfaces.BindTyping.NONE
139
140 include_set_input_sizes: Optional[Set[Any]] = None
141 exclude_set_input_sizes: Optional[Set[Any]] = None
142
143 # the first value we'd get for an autoincrement column.
144 default_sequence_base = 1
145
146 # most DBAPIs happy with this for execute().
147 # not cx_oracle.
148 execute_sequence_format = tuple
149
150 supports_schemas = True
151 supports_views = True
152 supports_sequences = False
153 sequences_optional = False
154 preexecute_autoincrement_sequences = False
155 supports_identity_columns = False
156 postfetch_lastrowid = True
157 favor_returning_over_lastrowid = False
158 insert_null_pk_still_autoincrements = False
159 update_returning = False
160 delete_returning = False
161 update_returning_multifrom = False
162 delete_returning_multifrom = False
163 insert_returning = False
164
165 cte_follows_insert = False
166
167 supports_native_enum = False
168 supports_native_boolean = False
169 supports_native_uuid = False
170 returns_native_bytes = False
171
172 non_native_boolean_check_constraint = True
173
174 supports_simple_order_by_label = True
175
176 tuple_in_values = False
177
178 connection_characteristics = util.immutabledict(
179 {
180 "isolation_level": characteristics.IsolationLevelCharacteristic(),
181 "logging_token": characteristics.LoggingTokenCharacteristic(),
182 }
183 )
184
185 engine_config_types: Mapping[str, Any] = util.immutabledict(
186 {
187 "pool_timeout": util.asint,
188 "echo": util.bool_or_str("debug"),
189 "echo_pool": util.bool_or_str("debug"),
190 "pool_recycle": util.asint,
191 "pool_size": util.asint,
192 "max_overflow": util.asint,
193 "future": util.asbool,
194 }
195 )
196
197 # if the NUMERIC type
198 # returns decimal.Decimal.
199 # *not* the FLOAT type however.
200 supports_native_decimal = False
201
202 name = "default"
203
204 # length at which to truncate
205 # any identifier.
206 max_identifier_length = 9999
207 _user_defined_max_identifier_length: Optional[int] = None
208
209 isolation_level: Optional[str] = None
210
211 # sub-categories of max_identifier_length.
212 # currently these accommodate for MySQL which allows alias names
213 # of 255 but DDL names only of 64.
214 max_index_name_length: Optional[int] = None
215 max_constraint_name_length: Optional[int] = None
216
217 supports_sane_rowcount = True
218 supports_sane_multi_rowcount = True
219 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
220 default_paramstyle = "named"
221
222 supports_default_values = False
223 """dialect supports INSERT... DEFAULT VALUES syntax"""
224
225 supports_default_metavalue = False
226 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
227
228 default_metavalue_token = "DEFAULT"
229 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
230 parenthesis."""
231
232 # not sure if this is a real thing but the compiler will deliver it
233 # if this is the only flag enabled.
234 supports_empty_insert = True
235 """dialect supports INSERT () VALUES ()"""
236
237 supports_multivalues_insert = False
238
239 use_insertmanyvalues: bool = False
240
241 use_insertmanyvalues_wo_returning: bool = False
242
243 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
244 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
245 )
246
247 insertmanyvalues_page_size: int = 1000
248 insertmanyvalues_max_parameters = 32700
249
250 supports_is_distinct_from = True
251
252 supports_server_side_cursors = False
253
254 server_side_cursors = False
255
256 # extra record-level locking features (#4860)
257 supports_for_update_of = False
258
259 server_version_info = None
260
261 default_schema_name: Optional[str] = None
262
263 # indicates symbol names are
264 # UPPERCASED if they are case insensitive
265 # within the database.
266 # if this is True, the methods normalize_name()
267 # and denormalize_name() must be provided.
268 requires_name_normalize = False
269
270 is_async = False
271
272 has_terminate = False
273
274 # TODO: this is not to be part of 2.0. implement rudimentary binary
275 # literals for SQLite, PostgreSQL, MySQL only within
276 # _Binary.literal_processor
277 _legacy_binary_type_literal_encoding = "utf-8"
278
279 @util.deprecated_params(
280 empty_in_strategy=(
281 "1.4",
282 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
283 "deprecated, and no longer has any effect. All IN expressions "
284 "are now rendered using "
285 'the "expanding parameter" strategy which renders a set of bound'
286 'expressions, or an "empty set" SELECT, at statement execution'
287 "time.",
288 ),
289 server_side_cursors=(
290 "1.4",
291 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
292 "is deprecated and will be removed in a future release. Please "
293 "use the "
294 ":paramref:`_engine.Connection.execution_options.stream_results` "
295 "parameter.",
296 ),
297 )
298 def __init__(
299 self,
300 paramstyle: Optional[_ParamStyle] = None,
301 isolation_level: Optional[IsolationLevel] = None,
302 dbapi: Optional[ModuleType] = None,
303 implicit_returning: Literal[True] = True,
304 supports_native_boolean: Optional[bool] = None,
305 max_identifier_length: Optional[int] = None,
306 label_length: Optional[int] = None,
307 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
308 use_insertmanyvalues: Optional[bool] = None,
309 # util.deprecated_params decorator cannot render the
310 # Linting.NO_LINTING constant
311 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
312 server_side_cursors: bool = False,
313 skip_autocommit_rollback: bool = False,
314 **kwargs: Any,
315 ):
316 if server_side_cursors:
317 if not self.supports_server_side_cursors:
318 raise exc.ArgumentError(
319 "Dialect %s does not support server side cursors" % self
320 )
321 else:
322 self.server_side_cursors = True
323
324 if getattr(self, "use_setinputsizes", False):
325 util.warn_deprecated(
326 "The dialect-level use_setinputsizes attribute is "
327 "deprecated. Please use "
328 "bind_typing = BindTyping.SETINPUTSIZES",
329 "2.0",
330 )
331 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
332
333 self.positional = False
334 self._ischema = None
335
336 self.dbapi = dbapi
337
338 self.skip_autocommit_rollback = skip_autocommit_rollback
339
340 if paramstyle is not None:
341 self.paramstyle = paramstyle
342 elif self.dbapi is not None:
343 self.paramstyle = self.dbapi.paramstyle
344 else:
345 self.paramstyle = self.default_paramstyle
346 self.positional = self.paramstyle in (
347 "qmark",
348 "format",
349 "numeric",
350 "numeric_dollar",
351 )
352 self.identifier_preparer = self.preparer(self)
353 self._on_connect_isolation_level = isolation_level
354
355 legacy_tt_callable = getattr(self, "type_compiler", None)
356 if legacy_tt_callable is not None:
357 tt_callable = cast(
358 Type[compiler.GenericTypeCompiler],
359 self.type_compiler,
360 )
361 else:
362 tt_callable = self.type_compiler_cls
363
364 self.type_compiler_instance = self.type_compiler = tt_callable(self)
365
366 if supports_native_boolean is not None:
367 self.supports_native_boolean = supports_native_boolean
368
369 self._user_defined_max_identifier_length = max_identifier_length
370 if self._user_defined_max_identifier_length:
371 self.max_identifier_length = (
372 self._user_defined_max_identifier_length
373 )
374 self.label_length = label_length
375 self.compiler_linting = compiler_linting
376
377 if use_insertmanyvalues is not None:
378 self.use_insertmanyvalues = use_insertmanyvalues
379
380 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
381 self.insertmanyvalues_page_size = insertmanyvalues_page_size
382
383 @property
384 @util.deprecated(
385 "2.0",
386 "full_returning is deprecated, please use insert_returning, "
387 "update_returning, delete_returning",
388 )
389 def full_returning(self):
390 return (
391 self.insert_returning
392 and self.update_returning
393 and self.delete_returning
394 )
395
396 @util.memoized_property
397 def insert_executemany_returning(self):
398 """Default implementation for insert_executemany_returning, if not
399 otherwise overridden by the specific dialect.
400
401 The default dialect determines "insert_executemany_returning" is
402 available if the dialect in use has opted into using the
403 "use_insertmanyvalues" feature. If they haven't opted into that, then
404 this attribute is False, unless the dialect in question overrides this
405 and provides some other implementation (such as the Oracle Database
406 dialects).
407
408 """
409 return self.insert_returning and self.use_insertmanyvalues
410
411 @util.memoized_property
412 def insert_executemany_returning_sort_by_parameter_order(self):
413 """Default implementation for
414 insert_executemany_returning_deterministic_order, if not otherwise
415 overridden by the specific dialect.
416
417 The default dialect determines "insert_executemany_returning" can have
418 deterministic order only if the dialect in use has opted into using the
419 "use_insertmanyvalues" feature, which implements deterministic ordering
420 using client side sentinel columns only by default. The
421 "insertmanyvalues" feature also features alternate forms that can
422 use server-generated PK values as "sentinels", but those are only
423 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
424 bitflag enables those alternate SQL forms, which are disabled
425 by default.
426
427 If the dialect in use hasn't opted into that, then this attribute is
428 False, unless the dialect in question overrides this and provides some
429 other implementation (such as the Oracle Database dialects).
430
431 """
432 return self.insert_returning and self.use_insertmanyvalues
433
434 update_executemany_returning = False
435 delete_executemany_returning = False
436
437 @util.memoized_property
438 def loaded_dbapi(self) -> DBAPIModule:
439 if self.dbapi is None:
440 raise exc.InvalidRequestError(
441 f"Dialect {self} does not have a Python DBAPI established "
442 "and cannot be used for actual database interaction"
443 )
444 return self.dbapi
445
446 @util.memoized_property
447 def _bind_typing_render_casts(self):
448 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
449
450 def _ensure_has_table_connection(self, arg: Connection) -> None:
451 if not isinstance(arg, Connection):
452 raise exc.ArgumentError(
453 "The argument passed to Dialect.has_table() should be a "
454 "%s, got %s. "
455 "Additionally, the Dialect.has_table() method is for "
456 "internal dialect "
457 "use only; please use "
458 "``inspect(some_engine).has_table(<tablename>>)`` "
459 "for public API use." % (Connection, type(arg))
460 )
461
462 @util.memoized_property
463 def _supports_statement_cache(self):
464 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
465 if ssc is None:
466 util.warn(
467 "Dialect %s:%s will not make use of SQL compilation caching "
468 "as it does not set the 'supports_statement_cache' attribute "
469 "to ``True``. This can have "
470 "significant performance implications including some "
471 "performance degradations in comparison to prior SQLAlchemy "
472 "versions. Dialect maintainers should seek to set this "
473 "attribute to True after appropriate development and testing "
474 "for SQLAlchemy 1.4 caching support. Alternatively, this "
475 "attribute may be set to False which will disable this "
476 "warning." % (self.name, self.driver),
477 code="cprf",
478 )
479
480 return bool(ssc)
481
482 @util.memoized_property
483 def _type_memos(self):
484 return weakref.WeakKeyDictionary()
485
486 @property
487 def dialect_description(self): # type: ignore[override]
488 return self.name + "+" + self.driver
489
490 @property
491 def supports_sane_rowcount_returning(self):
492 """True if this dialect supports sane rowcount even if RETURNING is
493 in use.
494
495 For dialects that don't support RETURNING, this is synonymous with
496 ``supports_sane_rowcount``.
497
498 """
499 return self.supports_sane_rowcount
500
501 @classmethod
502 def get_pool_class(cls, url: URL) -> Type[Pool]:
503 default: Type[pool.Pool]
504 if cls.is_async:
505 default = pool.AsyncAdaptedQueuePool
506 else:
507 default = pool.QueuePool
508
509 return getattr(cls, "poolclass", default)
510
511 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
512 return self.get_pool_class(url)
513
514 @classmethod
515 def load_provisioning(cls):
516 package = ".".join(cls.__module__.split(".")[0:-1])
517 try:
518 __import__(package + ".provision")
519 except ImportError:
520 pass
521
522 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
523 if self._on_connect_isolation_level is not None:
524
525 def builtin_connect(dbapi_conn, conn_rec):
526 self._assert_and_set_isolation_level(
527 dbapi_conn, self._on_connect_isolation_level
528 )
529
530 return builtin_connect
531 else:
532 return None
533
534 def initialize(self, connection: Connection) -> None:
535 try:
536 self.server_version_info = self._get_server_version_info(
537 connection
538 )
539 except NotImplementedError:
540 self.server_version_info = None
541 try:
542 self.default_schema_name = self._get_default_schema_name(
543 connection
544 )
545 except NotImplementedError:
546 self.default_schema_name = None
547
548 try:
549 self.default_isolation_level = self.get_default_isolation_level(
550 connection.connection.dbapi_connection
551 )
552 except NotImplementedError:
553 self.default_isolation_level = None
554
555 if not self._user_defined_max_identifier_length:
556 max_ident_length = self._check_max_identifier_length(connection)
557 if max_ident_length:
558 self.max_identifier_length = max_ident_length
559
560 if (
561 self.label_length
562 and self.label_length > self.max_identifier_length
563 ):
564 raise exc.ArgumentError(
565 "Label length of %d is greater than this dialect's"
566 " maximum identifier length of %d"
567 % (self.label_length, self.max_identifier_length)
568 )
569
570 def on_connect(self) -> Optional[Callable[[Any], None]]:
571 # inherits the docstring from interfaces.Dialect.on_connect
572 return None
573
574 def _check_max_identifier_length(self, connection):
575 """Perform a connection / server version specific check to determine
576 the max_identifier_length.
577
578 If the dialect's class level max_identifier_length should be used,
579 can return None.
580
581 """
582 return None
583
584 def get_default_isolation_level(self, dbapi_conn):
585 """Given a DBAPI connection, return its isolation level, or
586 a default isolation level if one cannot be retrieved.
587
588 May be overridden by subclasses in order to provide a
589 "fallback" isolation level for databases that cannot reliably
590 retrieve the actual isolation level.
591
592 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
593 method, propagating any exceptions raised.
594
595 """
596 return self.get_isolation_level(dbapi_conn)
597
598 def type_descriptor(self, typeobj):
599 """Provide a database-specific :class:`.TypeEngine` object, given
600 the generic object which comes from the types module.
601
602 This method looks for a dictionary called
603 ``colspecs`` as a class or instance-level variable,
604 and passes on to :func:`_types.adapt_type`.
605
606 """
607 return type_api.adapt_type(typeobj, self.colspecs)
608
609 def has_index(self, connection, table_name, index_name, schema=None, **kw):
610 if not self.has_table(connection, table_name, schema=schema, **kw):
611 return False
612 for idx in self.get_indexes(
613 connection, table_name, schema=schema, **kw
614 ):
615 if idx["name"] == index_name:
616 return True
617 else:
618 return False
619
620 def has_schema(
621 self, connection: Connection, schema_name: str, **kw: Any
622 ) -> bool:
623 return schema_name in self.get_schema_names(connection, **kw)
624
625 def validate_identifier(self, ident: str) -> None:
626 if len(ident) > self.max_identifier_length:
627 raise exc.IdentifierError(
628 "Identifier '%s' exceeds maximum length of %d characters"
629 % (ident, self.max_identifier_length)
630 )
631
632 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
633 # inherits the docstring from interfaces.Dialect.connect
634 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
635
636 def create_connect_args(self, url: URL) -> ConnectArgsType:
637 # inherits the docstring from interfaces.Dialect.create_connect_args
638 opts = url.translate_connect_args()
639 opts.update(url.query)
640 return ([], opts)
641
642 def set_engine_execution_options(
643 self, engine: Engine, opts: Mapping[str, Any]
644 ) -> None:
645 supported_names = set(self.connection_characteristics).intersection(
646 opts
647 )
648 if supported_names:
649 characteristics: Mapping[str, Any] = util.immutabledict(
650 (name, opts[name]) for name in supported_names
651 )
652
653 @event.listens_for(engine, "engine_connect")
654 def set_connection_characteristics(connection):
655 self._set_connection_characteristics(
656 connection, characteristics
657 )
658
659 def set_connection_execution_options(
660 self, connection: Connection, opts: Mapping[str, Any]
661 ) -> None:
662 supported_names = set(self.connection_characteristics).intersection(
663 opts
664 )
665 if supported_names:
666 characteristics: Mapping[str, Any] = util.immutabledict(
667 (name, opts[name]) for name in supported_names
668 )
669 self._set_connection_characteristics(connection, characteristics)
670
671 def _set_connection_characteristics(self, connection, characteristics):
672 characteristic_values = [
673 (name, self.connection_characteristics[name], value)
674 for name, value in characteristics.items()
675 ]
676
677 if connection.in_transaction():
678 trans_objs = [
679 (name, obj)
680 for name, obj, _ in characteristic_values
681 if obj.transactional
682 ]
683 if trans_objs:
684 raise exc.InvalidRequestError(
685 "This connection has already initialized a SQLAlchemy "
686 "Transaction() object via begin() or autobegin; "
687 "%s may not be altered unless rollback() or commit() "
688 "is called first."
689 % (", ".join(name for name, obj in trans_objs))
690 )
691
692 dbapi_connection = connection.connection.dbapi_connection
693 for _, characteristic, value in characteristic_values:
694 characteristic.set_connection_characteristic(
695 self, connection, dbapi_connection, value
696 )
697 connection.connection._connection_record.finalize_callback.append(
698 functools.partial(self._reset_characteristics, characteristics)
699 )
700
701 def _reset_characteristics(self, characteristics, dbapi_connection):
702 for characteristic_name in characteristics:
703 characteristic = self.connection_characteristics[
704 characteristic_name
705 ]
706 characteristic.reset_characteristic(self, dbapi_connection)
707
708 def do_begin(self, dbapi_connection):
709 pass
710
711 def do_rollback(self, dbapi_connection):
712 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
713 dbapi_connection
714 ):
715 return
716 dbapi_connection.rollback()
717
718 def do_commit(self, dbapi_connection):
719 dbapi_connection.commit()
720
721 def do_terminate(self, dbapi_connection):
722 self.do_close(dbapi_connection)
723
724 def do_close(self, dbapi_connection):
725 dbapi_connection.close()
726
727 @util.memoized_property
728 def _dialect_specific_select_one(self):
729 return str(expression.select(1).compile(dialect=self))
730
731 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
732 try:
733 return self.do_ping(dbapi_connection)
734 except self.loaded_dbapi.Error as err:
735 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
736
737 if self._has_events:
738 try:
739 Connection._handle_dbapi_exception_noconnection(
740 err,
741 self,
742 is_disconnect=is_disconnect,
743 invalidate_pool_on_disconnect=False,
744 is_pre_ping=True,
745 )
746 except exc.StatementError as new_err:
747 is_disconnect = new_err.connection_invalidated
748
749 if is_disconnect:
750 return False
751 else:
752 raise
753
754 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
755 cursor = dbapi_connection.cursor()
756 try:
757 cursor.execute(self._dialect_specific_select_one)
758 finally:
759 cursor.close()
760 return True
761
762 def create_xid(self):
763 """Create a random two-phase transaction ID.
764
765 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
766 do_commit_twophase(). Its format is unspecified.
767 """
768
769 return "_sa_%032x" % random.randint(0, 2**128)
770
771 def do_savepoint(self, connection, name):
772 connection.execute(expression.SavepointClause(name))
773
774 def do_rollback_to_savepoint(self, connection, name):
775 connection.execute(expression.RollbackToSavepointClause(name))
776
777 def do_release_savepoint(self, connection, name):
778 connection.execute(expression.ReleaseSavepointClause(name))
779
780 def _deliver_insertmanyvalues_batches(
781 self,
782 connection,
783 cursor,
784 statement,
785 parameters,
786 generic_setinputsizes,
787 context,
788 ):
789 context = cast(DefaultExecutionContext, context)
790 compiled = cast(SQLCompiler, context.compiled)
791
792 _composite_sentinel_proc: Sequence[
793 Optional[_ResultProcessorType[Any]]
794 ] = ()
795 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
796 _sentinel_proc_initialized: bool = False
797
798 compiled_parameters = context.compiled_parameters
799
800 imv = compiled._insertmanyvalues
801 assert imv is not None
802
803 is_returning: Final[bool] = bool(compiled.effective_returning)
804 batch_size = context.execution_options.get(
805 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
806 )
807
808 if compiled.schema_translate_map:
809 schema_translate_map = context.execution_options.get(
810 "schema_translate_map", {}
811 )
812 else:
813 schema_translate_map = None
814
815 if is_returning:
816 result: Optional[List[Any]] = []
817 context._insertmanyvalues_rows = result
818
819 sort_by_parameter_order = imv.sort_by_parameter_order
820
821 else:
822 sort_by_parameter_order = False
823 result = None
824
825 for imv_batch in compiled._deliver_insertmanyvalues_batches(
826 statement,
827 parameters,
828 compiled_parameters,
829 generic_setinputsizes,
830 batch_size,
831 sort_by_parameter_order,
832 schema_translate_map,
833 ):
834 yield imv_batch
835
836 if is_returning:
837
838 try:
839 rows = context.fetchall_for_returning(cursor)
840 except BaseException as be:
841 connection._handle_dbapi_exception(
842 be,
843 sql_util._long_statement(imv_batch.replaced_statement),
844 imv_batch.replaced_parameters,
845 None,
846 context,
847 is_sub_exec=True,
848 )
849
850 # I would have thought "is_returning: Final[bool]"
851 # would have assured this but pylance thinks not
852 assert result is not None
853
854 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
855 composite_sentinel = imv.num_sentinel_columns > 1
856 if imv.implicit_sentinel:
857 # for implicit sentinel, which is currently single-col
858 # integer autoincrement, do a simple sort.
859 assert not composite_sentinel
860 result.extend(
861 sorted(rows, key=operator.itemgetter(-1))
862 )
863 continue
864
865 # otherwise, create dictionaries to match up batches
866 # with parameters
867 assert imv.sentinel_param_keys
868 assert imv.sentinel_columns
869
870 _nsc = imv.num_sentinel_columns
871
872 if not _sentinel_proc_initialized:
873 if composite_sentinel:
874 _composite_sentinel_proc = [
875 col.type._cached_result_processor(
876 self, cursor_desc[1]
877 )
878 for col, cursor_desc in zip(
879 imv.sentinel_columns,
880 cursor.description[-_nsc:],
881 )
882 ]
883 else:
884 _scalar_sentinel_proc = (
885 imv.sentinel_columns[0]
886 ).type._cached_result_processor(
887 self, cursor.description[-1][1]
888 )
889 _sentinel_proc_initialized = True
890
891 rows_by_sentinel: Union[
892 Dict[Tuple[Any, ...], Any],
893 Dict[Any, Any],
894 ]
895 if composite_sentinel:
896 rows_by_sentinel = {
897 tuple(
898 (proc(val) if proc else val)
899 for val, proc in zip(
900 row[-_nsc:], _composite_sentinel_proc
901 )
902 ): row
903 for row in rows
904 }
905 elif _scalar_sentinel_proc:
906 rows_by_sentinel = {
907 _scalar_sentinel_proc(row[-1]): row for row in rows
908 }
909 else:
910 rows_by_sentinel = {row[-1]: row for row in rows}
911
912 if len(rows_by_sentinel) != len(imv_batch.batch):
913 # see test_insert_exec.py::
914 # IMVSentinelTest::test_sentinel_incorrect_rowcount
915 # for coverage / demonstration
916 raise exc.InvalidRequestError(
917 f"Sentinel-keyed result set did not produce "
918 f"correct number of rows {len(imv_batch.batch)}; "
919 "produced "
920 f"{len(rows_by_sentinel)}. Please ensure the "
921 "sentinel column is fully unique and populated in "
922 "all cases."
923 )
924
925 try:
926 ordered_rows = [
927 rows_by_sentinel[sentinel_keys]
928 for sentinel_keys in imv_batch.sentinel_values
929 ]
930 except KeyError as ke:
931 # see test_insert_exec.py::
932 # IMVSentinelTest::test_sentinel_cant_match_keys
933 # for coverage / demonstration
934 raise exc.InvalidRequestError(
935 f"Can't match sentinel values in result set to "
936 f"parameter sets; key {ke.args[0]!r} was not "
937 "found. "
938 "There may be a mismatch between the datatype "
939 "passed to the DBAPI driver vs. that which it "
940 "returns in a result row. Ensure the given "
941 "Python value matches the expected result type "
942 "*exactly*, taking care to not rely upon implicit "
943 "conversions which may occur such as when using "
944 "strings in place of UUID or integer values, etc. "
945 ) from ke
946
947 result.extend(ordered_rows)
948
949 else:
950 result.extend(rows)
951
952 def do_executemany(self, cursor, statement, parameters, context=None):
953 cursor.executemany(statement, parameters)
954
955 def do_execute(self, cursor, statement, parameters, context=None):
956 cursor.execute(statement, parameters)
957
958 def do_execute_no_params(self, cursor, statement, context=None):
959 cursor.execute(statement)
960
961 def is_disconnect(
962 self,
963 e: DBAPIModule.Error,
964 connection: Union[
965 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
966 ],
967 cursor: Optional[interfaces.DBAPICursor],
968 ) -> bool:
969 return False
970
971 @util.memoized_instancemethod
972 def _gen_allowed_isolation_levels(self, dbapi_conn):
973 try:
974 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
975 except NotImplementedError:
976 return None
977 else:
978 normalized_levels = [
979 level.replace("_", " ").upper() for level in raw_levels
980 ]
981 if raw_levels != normalized_levels:
982 raise ValueError(
983 f"Dialect {self.name!r} get_isolation_level_values() "
984 f"method should return names as UPPERCASE using spaces, "
985 f"not underscores; got "
986 f"{sorted(set(raw_levels).difference(normalized_levels))}"
987 )
988 return tuple(normalized_levels)
989
990 def _assert_and_set_isolation_level(self, dbapi_conn, level):
991 level = level.replace("_", " ").upper()
992
993 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
994 dbapi_conn
995 )
996 if (
997 _allowed_isolation_levels
998 and level not in _allowed_isolation_levels
999 ):
1000 raise exc.ArgumentError(
1001 f"Invalid value {level!r} for isolation_level. "
1002 f"Valid isolation levels for {self.name!r} are "
1003 f"{', '.join(_allowed_isolation_levels)}"
1004 )
1005
1006 self.set_isolation_level(dbapi_conn, level)
1007
1008 def reset_isolation_level(self, dbapi_conn):
1009 if self._on_connect_isolation_level is not None:
1010 assert (
1011 self._on_connect_isolation_level == "AUTOCOMMIT"
1012 or self._on_connect_isolation_level
1013 == self.default_isolation_level
1014 )
1015 self._assert_and_set_isolation_level(
1016 dbapi_conn, self._on_connect_isolation_level
1017 )
1018 else:
1019 assert self.default_isolation_level is not None
1020 self._assert_and_set_isolation_level(
1021 dbapi_conn,
1022 self.default_isolation_level,
1023 )
1024
1025 def normalize_name(self, name):
1026 if name is None:
1027 return None
1028
1029 name_lower = name.lower()
1030 name_upper = name.upper()
1031
1032 if name_upper == name_lower:
1033 # name has no upper/lower conversion, e.g. non-european characters.
1034 # return unchanged
1035 return name
1036 elif name_upper == name and not (
1037 self.identifier_preparer._requires_quotes
1038 )(name_lower):
1039 # name is all uppercase and doesn't require quoting; normalize
1040 # to all lower case
1041 return name_lower
1042 elif name_lower == name:
1043 # name is all lower case, which if denormalized means we need to
1044 # force quoting on it
1045 return quoted_name(name, quote=True)
1046 else:
1047 # name is mixed case, means it will be quoted in SQL when used
1048 # later, no normalizes
1049 return name
1050
1051 def denormalize_name(self, name):
1052 if name is None:
1053 return None
1054
1055 name_lower = name.lower()
1056 name_upper = name.upper()
1057
1058 if name_upper == name_lower:
1059 # name has no upper/lower conversion, e.g. non-european characters.
1060 # return unchanged
1061 return name
1062 elif name_lower == name and not (
1063 self.identifier_preparer._requires_quotes
1064 )(name_lower):
1065 name = name_upper
1066 return name
1067
1068 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1069 return connection
1070
1071 def _overrides_default(self, method):
1072 return (
1073 getattr(type(self), method).__code__
1074 is not getattr(DefaultDialect, method).__code__
1075 )
1076
1077 def _default_multi_reflect(
1078 self,
1079 single_tbl_method,
1080 connection,
1081 kind,
1082 schema,
1083 filter_names,
1084 scope,
1085 **kw,
1086 ):
1087 names_fns = []
1088 temp_names_fns = []
1089 if ObjectKind.TABLE in kind:
1090 names_fns.append(self.get_table_names)
1091 temp_names_fns.append(self.get_temp_table_names)
1092 if ObjectKind.VIEW in kind:
1093 names_fns.append(self.get_view_names)
1094 temp_names_fns.append(self.get_temp_view_names)
1095 if ObjectKind.MATERIALIZED_VIEW in kind:
1096 names_fns.append(self.get_materialized_view_names)
1097 # no temp materialized view at the moment
1098 # temp_names_fns.append(self.get_temp_materialized_view_names)
1099
1100 unreflectable = kw.pop("unreflectable", {})
1101
1102 if (
1103 filter_names
1104 and scope is ObjectScope.ANY
1105 and kind is ObjectKind.ANY
1106 ):
1107 # if names are given and no qualification on type of table
1108 # (i.e. the Table(..., autoload) case), take the names as given,
1109 # don't run names queries. If a table does not exit
1110 # NoSuchTableError is raised and it's skipped
1111
1112 # this also suits the case for mssql where we can reflect
1113 # individual temp tables but there's no temp_names_fn
1114 names = filter_names
1115 else:
1116 names = []
1117 name_kw = {"schema": schema, **kw}
1118 fns = []
1119 if ObjectScope.DEFAULT in scope:
1120 fns.extend(names_fns)
1121 if ObjectScope.TEMPORARY in scope:
1122 fns.extend(temp_names_fns)
1123
1124 for fn in fns:
1125 try:
1126 names.extend(fn(connection, **name_kw))
1127 except NotImplementedError:
1128 pass
1129
1130 if filter_names:
1131 filter_names = set(filter_names)
1132
1133 # iterate over all the tables/views and call the single table method
1134 for table in names:
1135 if not filter_names or table in filter_names:
1136 key = (schema, table)
1137 try:
1138 yield (
1139 key,
1140 single_tbl_method(
1141 connection, table, schema=schema, **kw
1142 ),
1143 )
1144 except exc.UnreflectableTableError as err:
1145 if key not in unreflectable:
1146 unreflectable[key] = err
1147 except exc.NoSuchTableError:
1148 pass
1149
1150 def get_multi_table_options(self, connection, **kw):
1151 return self._default_multi_reflect(
1152 self.get_table_options, connection, **kw
1153 )
1154
1155 def get_multi_columns(self, connection, **kw):
1156 return self._default_multi_reflect(self.get_columns, connection, **kw)
1157
1158 def get_multi_pk_constraint(self, connection, **kw):
1159 return self._default_multi_reflect(
1160 self.get_pk_constraint, connection, **kw
1161 )
1162
1163 def get_multi_foreign_keys(self, connection, **kw):
1164 return self._default_multi_reflect(
1165 self.get_foreign_keys, connection, **kw
1166 )
1167
1168 def get_multi_indexes(self, connection, **kw):
1169 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1170
1171 def get_multi_unique_constraints(self, connection, **kw):
1172 return self._default_multi_reflect(
1173 self.get_unique_constraints, connection, **kw
1174 )
1175
1176 def get_multi_check_constraints(self, connection, **kw):
1177 return self._default_multi_reflect(
1178 self.get_check_constraints, connection, **kw
1179 )
1180
1181 def get_multi_table_comment(self, connection, **kw):
1182 return self._default_multi_reflect(
1183 self.get_table_comment, connection, **kw
1184 )
1185
1186
1187class StrCompileDialect(DefaultDialect):
1188 statement_compiler = compiler.StrSQLCompiler
1189 ddl_compiler = compiler.DDLCompiler
1190 type_compiler_cls = compiler.StrSQLTypeCompiler
1191 preparer = compiler.IdentifierPreparer
1192
1193 insert_returning = True
1194 update_returning = True
1195 delete_returning = True
1196
1197 supports_statement_cache = True
1198
1199 supports_identity_columns = True
1200
1201 supports_sequences = True
1202 sequences_optional = True
1203 preexecute_autoincrement_sequences = False
1204
1205 supports_native_boolean = True
1206
1207 supports_multivalues_insert = True
1208 supports_simple_order_by_label = True
1209
1210
1211class DefaultExecutionContext(ExecutionContext):
1212 isinsert = False
1213 isupdate = False
1214 isdelete = False
1215 is_crud = False
1216 is_text = False
1217 isddl = False
1218
1219 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1220
1221 compiled: Optional[Compiled] = None
1222 result_column_struct: Optional[
1223 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1224 ] = None
1225 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1226
1227 execution_options: _ExecuteOptions = util.EMPTY_DICT
1228
1229 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1230
1231 invoked_statement: Optional[Executable] = None
1232
1233 _is_implicit_returning = False
1234 _is_explicit_returning = False
1235 _is_supplemental_returning = False
1236 _is_server_side = False
1237
1238 _soft_closed = False
1239
1240 _rowcount: Optional[int] = None
1241
1242 # a hook for SQLite's translation of
1243 # result column names
1244 # NOTE: pyhive is using this hook, can't remove it :(
1245 _translate_colname: Optional[Callable[[str], str]] = None
1246
1247 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1248 """used by set_input_sizes().
1249
1250 This collection comes from ``ExpandedState.parameter_expansion``.
1251
1252 """
1253
1254 cache_hit = NO_CACHE_KEY
1255
1256 root_connection: Connection
1257 _dbapi_connection: PoolProxiedConnection
1258 dialect: Dialect
1259 unicode_statement: str
1260 cursor: DBAPICursor
1261 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1262 parameters: _DBAPIMultiExecuteParams
1263 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1264
1265 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1266
1267 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1268 _num_sentinel_cols: int = 0
1269
1270 @classmethod
1271 def _init_ddl(
1272 cls,
1273 dialect: Dialect,
1274 connection: Connection,
1275 dbapi_connection: PoolProxiedConnection,
1276 execution_options: _ExecuteOptions,
1277 compiled_ddl: DDLCompiler,
1278 ) -> ExecutionContext:
1279 """Initialize execution context for an ExecutableDDLElement
1280 construct."""
1281
1282 self = cls.__new__(cls)
1283 self.root_connection = connection
1284 self._dbapi_connection = dbapi_connection
1285 self.dialect = connection.dialect
1286
1287 self.compiled = compiled = compiled_ddl
1288 self.isddl = True
1289
1290 self.execution_options = execution_options
1291
1292 self.unicode_statement = str(compiled)
1293 if compiled.schema_translate_map:
1294 schema_translate_map = self.execution_options.get(
1295 "schema_translate_map", {}
1296 )
1297
1298 rst = compiled.preparer._render_schema_translates
1299 self.unicode_statement = rst(
1300 self.unicode_statement, schema_translate_map
1301 )
1302
1303 self.statement = self.unicode_statement
1304
1305 self.cursor = self.create_cursor()
1306 self.compiled_parameters = []
1307
1308 if dialect.positional:
1309 self.parameters = [dialect.execute_sequence_format()]
1310 else:
1311 self.parameters = [self._empty_dict_params]
1312
1313 return self
1314
1315 @classmethod
1316 def _init_compiled(
1317 cls,
1318 dialect: Dialect,
1319 connection: Connection,
1320 dbapi_connection: PoolProxiedConnection,
1321 execution_options: _ExecuteOptions,
1322 compiled: SQLCompiler,
1323 parameters: _CoreMultiExecuteParams,
1324 invoked_statement: Executable,
1325 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1326 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1327 ) -> ExecutionContext:
1328 """Initialize execution context for a Compiled construct."""
1329
1330 self = cls.__new__(cls)
1331 self.root_connection = connection
1332 self._dbapi_connection = dbapi_connection
1333 self.dialect = connection.dialect
1334 self.extracted_parameters = extracted_parameters
1335 self.invoked_statement = invoked_statement
1336 self.compiled = compiled
1337 self.cache_hit = cache_hit
1338
1339 self.execution_options = execution_options
1340
1341 self.result_column_struct = (
1342 compiled._result_columns,
1343 compiled._ordered_columns,
1344 compiled._textual_ordered_columns,
1345 compiled._ad_hoc_textual,
1346 compiled._loose_column_name_matching,
1347 )
1348
1349 self.isinsert = ii = compiled.isinsert
1350 self.isupdate = iu = compiled.isupdate
1351 self.isdelete = id_ = compiled.isdelete
1352 self.is_text = compiled.isplaintext
1353
1354 if ii or iu or id_:
1355 dml_statement = compiled.compile_state.statement # type: ignore
1356 if TYPE_CHECKING:
1357 assert isinstance(dml_statement, UpdateBase)
1358 self.is_crud = True
1359 self._is_explicit_returning = ier = bool(dml_statement._returning)
1360 self._is_implicit_returning = iir = bool(
1361 compiled.implicit_returning
1362 )
1363 if iir and dml_statement._supplemental_returning:
1364 self._is_supplemental_returning = True
1365
1366 # dont mix implicit and explicit returning
1367 assert not (iir and ier)
1368
1369 if (ier or iir) and compiled.for_executemany:
1370 if ii and not self.dialect.insert_executemany_returning:
1371 raise exc.InvalidRequestError(
1372 f"Dialect {self.dialect.dialect_description} with "
1373 f"current server capabilities does not support "
1374 "INSERT..RETURNING when executemany is used"
1375 )
1376 elif (
1377 ii
1378 and dml_statement._sort_by_parameter_order
1379 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1380 ):
1381 raise exc.InvalidRequestError(
1382 f"Dialect {self.dialect.dialect_description} with "
1383 f"current server capabilities does not support "
1384 "INSERT..RETURNING with deterministic row ordering "
1385 "when executemany is used"
1386 )
1387 elif (
1388 ii
1389 and self.dialect.use_insertmanyvalues
1390 and not compiled._insertmanyvalues
1391 ):
1392 raise exc.InvalidRequestError(
1393 'Statement does not have "insertmanyvalues" '
1394 "enabled, can't use INSERT..RETURNING with "
1395 "executemany in this case."
1396 )
1397 elif iu and not self.dialect.update_executemany_returning:
1398 raise exc.InvalidRequestError(
1399 f"Dialect {self.dialect.dialect_description} with "
1400 f"current server capabilities does not support "
1401 "UPDATE..RETURNING when executemany is used"
1402 )
1403 elif id_ and not self.dialect.delete_executemany_returning:
1404 raise exc.InvalidRequestError(
1405 f"Dialect {self.dialect.dialect_description} with "
1406 f"current server capabilities does not support "
1407 "DELETE..RETURNING when executemany is used"
1408 )
1409
1410 if not parameters:
1411 self.compiled_parameters = [
1412 compiled.construct_params(
1413 extracted_parameters=extracted_parameters,
1414 escape_names=False,
1415 )
1416 ]
1417 else:
1418 self.compiled_parameters = [
1419 compiled.construct_params(
1420 m,
1421 escape_names=False,
1422 _group_number=grp,
1423 extracted_parameters=extracted_parameters,
1424 )
1425 for grp, m in enumerate(parameters)
1426 ]
1427
1428 if len(parameters) > 1:
1429 if self.isinsert and compiled._insertmanyvalues:
1430 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1431
1432 imv = compiled._insertmanyvalues
1433 if imv.sentinel_columns is not None:
1434 self._num_sentinel_cols = imv.num_sentinel_columns
1435 else:
1436 self.execute_style = ExecuteStyle.EXECUTEMANY
1437
1438 self.unicode_statement = compiled.string
1439
1440 self.cursor = self.create_cursor()
1441
1442 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1443 self._process_execute_defaults()
1444
1445 processors = compiled._bind_processors
1446
1447 flattened_processors: Mapping[
1448 str, _BindProcessorType[Any]
1449 ] = processors # type: ignore[assignment]
1450
1451 if compiled.literal_execute_params or compiled.post_compile_params:
1452 if self.executemany:
1453 raise exc.InvalidRequestError(
1454 "'literal_execute' or 'expanding' parameters can't be "
1455 "used with executemany()"
1456 )
1457
1458 expanded_state = compiled._process_parameters_for_postcompile(
1459 self.compiled_parameters[0]
1460 )
1461
1462 # re-assign self.unicode_statement
1463 self.unicode_statement = expanded_state.statement
1464
1465 self._expanded_parameters = expanded_state.parameter_expansion
1466
1467 flattened_processors = dict(processors) # type: ignore
1468 flattened_processors.update(expanded_state.processors)
1469 positiontup = expanded_state.positiontup
1470 elif compiled.positional:
1471 positiontup = self.compiled.positiontup
1472 else:
1473 positiontup = None
1474
1475 if compiled.schema_translate_map:
1476 schema_translate_map = self.execution_options.get(
1477 "schema_translate_map", {}
1478 )
1479 rst = compiled.preparer._render_schema_translates
1480 self.unicode_statement = rst(
1481 self.unicode_statement, schema_translate_map
1482 )
1483
1484 # final self.unicode_statement is now assigned, encode if needed
1485 # by dialect
1486 self.statement = self.unicode_statement
1487
1488 # Convert the dictionary of bind parameter values
1489 # into a dict or list to be sent to the DBAPI's
1490 # execute() or executemany() method.
1491
1492 if compiled.positional:
1493 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1494 assert positiontup is not None
1495 for compiled_params in self.compiled_parameters:
1496 l_param: List[Any] = [
1497 (
1498 flattened_processors[key](compiled_params[key])
1499 if key in flattened_processors
1500 else compiled_params[key]
1501 )
1502 for key in positiontup
1503 ]
1504 core_positional_parameters.append(
1505 dialect.execute_sequence_format(l_param)
1506 )
1507
1508 self.parameters = core_positional_parameters
1509 else:
1510 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1511 escaped_names = compiled.escaped_bind_names
1512
1513 # note that currently, "expanded" parameters will be present
1514 # in self.compiled_parameters in their quoted form. This is
1515 # slightly inconsistent with the approach taken as of
1516 # #8056 where self.compiled_parameters is meant to contain unquoted
1517 # param names.
1518 d_param: Dict[str, Any]
1519 for compiled_params in self.compiled_parameters:
1520 if escaped_names:
1521 d_param = {
1522 escaped_names.get(key, key): (
1523 flattened_processors[key](compiled_params[key])
1524 if key in flattened_processors
1525 else compiled_params[key]
1526 )
1527 for key in compiled_params
1528 }
1529 else:
1530 d_param = {
1531 key: (
1532 flattened_processors[key](compiled_params[key])
1533 if key in flattened_processors
1534 else compiled_params[key]
1535 )
1536 for key in compiled_params
1537 }
1538
1539 core_dict_parameters.append(d_param)
1540
1541 self.parameters = core_dict_parameters
1542
1543 return self
1544
1545 @classmethod
1546 def _init_statement(
1547 cls,
1548 dialect: Dialect,
1549 connection: Connection,
1550 dbapi_connection: PoolProxiedConnection,
1551 execution_options: _ExecuteOptions,
1552 statement: str,
1553 parameters: _DBAPIMultiExecuteParams,
1554 ) -> ExecutionContext:
1555 """Initialize execution context for a string SQL statement."""
1556
1557 self = cls.__new__(cls)
1558 self.root_connection = connection
1559 self._dbapi_connection = dbapi_connection
1560 self.dialect = connection.dialect
1561 self.is_text = True
1562
1563 self.execution_options = execution_options
1564
1565 if not parameters:
1566 if self.dialect.positional:
1567 self.parameters = [dialect.execute_sequence_format()]
1568 else:
1569 self.parameters = [self._empty_dict_params]
1570 elif isinstance(parameters[0], dialect.execute_sequence_format):
1571 self.parameters = parameters
1572 elif isinstance(parameters[0], dict):
1573 self.parameters = parameters
1574 else:
1575 self.parameters = [
1576 dialect.execute_sequence_format(p) for p in parameters
1577 ]
1578
1579 if len(parameters) > 1:
1580 self.execute_style = ExecuteStyle.EXECUTEMANY
1581
1582 self.statement = self.unicode_statement = statement
1583
1584 self.cursor = self.create_cursor()
1585 return self
1586
1587 @classmethod
1588 def _init_default(
1589 cls,
1590 dialect: Dialect,
1591 connection: Connection,
1592 dbapi_connection: PoolProxiedConnection,
1593 execution_options: _ExecuteOptions,
1594 ) -> ExecutionContext:
1595 """Initialize execution context for a ColumnDefault construct."""
1596
1597 self = cls.__new__(cls)
1598 self.root_connection = connection
1599 self._dbapi_connection = dbapi_connection
1600 self.dialect = connection.dialect
1601
1602 self.execution_options = execution_options
1603
1604 self.cursor = self.create_cursor()
1605 return self
1606
1607 def _get_cache_stats(self) -> str:
1608 if self.compiled is None:
1609 return "raw sql"
1610
1611 now = perf_counter()
1612
1613 ch = self.cache_hit
1614
1615 gen_time = self.compiled._gen_time
1616 assert gen_time is not None
1617
1618 if ch is NO_CACHE_KEY:
1619 return "no key %.5fs" % (now - gen_time,)
1620 elif ch is CACHE_HIT:
1621 return "cached since %.4gs ago" % (now - gen_time,)
1622 elif ch is CACHE_MISS:
1623 return "generated in %.5fs" % (now - gen_time,)
1624 elif ch is CACHING_DISABLED:
1625 if "_cache_disable_reason" in self.execution_options:
1626 return "caching disabled (%s) %.5fs " % (
1627 self.execution_options["_cache_disable_reason"],
1628 now - gen_time,
1629 )
1630 else:
1631 return "caching disabled %.5fs" % (now - gen_time,)
1632 elif ch is NO_DIALECT_SUPPORT:
1633 return "dialect %s+%s does not support caching %.5fs" % (
1634 self.dialect.name,
1635 self.dialect.driver,
1636 now - gen_time,
1637 )
1638 else:
1639 return "unknown"
1640
1641 @property
1642 def executemany(self): # type: ignore[override]
1643 return self.execute_style in (
1644 ExecuteStyle.EXECUTEMANY,
1645 ExecuteStyle.INSERTMANYVALUES,
1646 )
1647
1648 @util.memoized_property
1649 def identifier_preparer(self):
1650 if self.compiled:
1651 return self.compiled.preparer
1652 elif "schema_translate_map" in self.execution_options:
1653 return self.dialect.identifier_preparer._with_schema_translate(
1654 self.execution_options["schema_translate_map"]
1655 )
1656 else:
1657 return self.dialect.identifier_preparer
1658
1659 @util.memoized_property
1660 def engine(self):
1661 return self.root_connection.engine
1662
1663 @util.memoized_property
1664 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1665 if TYPE_CHECKING:
1666 assert isinstance(self.compiled, SQLCompiler)
1667 return self.compiled.postfetch
1668
1669 @util.memoized_property
1670 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1671 if TYPE_CHECKING:
1672 assert isinstance(self.compiled, SQLCompiler)
1673 if self.isinsert:
1674 return self.compiled.insert_prefetch
1675 elif self.isupdate:
1676 return self.compiled.update_prefetch
1677 else:
1678 return ()
1679
1680 @util.memoized_property
1681 def no_parameters(self):
1682 return self.execution_options.get("no_parameters", False)
1683
1684 def _execute_scalar(
1685 self,
1686 stmt: str,
1687 type_: Optional[TypeEngine[Any]],
1688 parameters: Optional[_DBAPISingleExecuteParams] = None,
1689 ) -> Any:
1690 """Execute a string statement on the current cursor, returning a
1691 scalar result.
1692
1693 Used to fire off sequences, default phrases, and "select lastrowid"
1694 types of statements individually or in the context of a parent INSERT
1695 or UPDATE statement.
1696
1697 """
1698
1699 conn = self.root_connection
1700
1701 if "schema_translate_map" in self.execution_options:
1702 schema_translate_map = self.execution_options.get(
1703 "schema_translate_map", {}
1704 )
1705
1706 rst = self.identifier_preparer._render_schema_translates
1707 stmt = rst(stmt, schema_translate_map)
1708
1709 if not parameters:
1710 if self.dialect.positional:
1711 parameters = self.dialect.execute_sequence_format()
1712 else:
1713 parameters = {}
1714
1715 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1716 row = self.cursor.fetchone()
1717 if row is not None:
1718 r = row[0]
1719 else:
1720 r = None
1721 if type_ is not None:
1722 # apply type post processors to the result
1723 proc = type_._cached_result_processor(
1724 self.dialect, self.cursor.description[0][1]
1725 )
1726 if proc:
1727 return proc(r)
1728 return r
1729
1730 @util.memoized_property
1731 def connection(self):
1732 return self.root_connection
1733
1734 def _use_server_side_cursor(self):
1735 if not self.dialect.supports_server_side_cursors:
1736 return False
1737
1738 if self.dialect.server_side_cursors:
1739 # this is deprecated
1740 use_server_side = self.execution_options.get(
1741 "stream_results", True
1742 ) and (
1743 self.compiled
1744 and isinstance(self.compiled.statement, expression.Selectable)
1745 or (
1746 (
1747 not self.compiled
1748 or isinstance(
1749 self.compiled.statement, expression.TextClause
1750 )
1751 )
1752 and self.unicode_statement
1753 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1754 )
1755 )
1756 else:
1757 use_server_side = self.execution_options.get(
1758 "stream_results", False
1759 )
1760
1761 return use_server_side
1762
1763 def create_cursor(self) -> DBAPICursor:
1764 if (
1765 # inlining initial preference checks for SS cursors
1766 self.dialect.supports_server_side_cursors
1767 and (
1768 self.execution_options.get("stream_results", False)
1769 or (
1770 self.dialect.server_side_cursors
1771 and self._use_server_side_cursor()
1772 )
1773 )
1774 ):
1775 self._is_server_side = True
1776 return self.create_server_side_cursor()
1777 else:
1778 self._is_server_side = False
1779 return self.create_default_cursor()
1780
1781 def fetchall_for_returning(self, cursor):
1782 return cursor.fetchall()
1783
1784 def create_default_cursor(self) -> DBAPICursor:
1785 return self._dbapi_connection.cursor()
1786
1787 def create_server_side_cursor(self) -> DBAPICursor:
1788 raise NotImplementedError()
1789
1790 def pre_exec(self):
1791 pass
1792
1793 def get_out_parameter_values(self, names):
1794 raise NotImplementedError(
1795 "This dialect does not support OUT parameters"
1796 )
1797
1798 def post_exec(self):
1799 pass
1800
1801 def get_result_processor(self, type_, colname, coltype):
1802 """Return a 'result processor' for a given type as present in
1803 cursor.description.
1804
1805 This has a default implementation that dialects can override
1806 for context-sensitive result type handling.
1807
1808 """
1809 return type_._cached_result_processor(self.dialect, coltype)
1810
1811 def get_lastrowid(self):
1812 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1813
1814 This may involve calling special cursor functions, issuing a new SELECT
1815 on the cursor (or a new one), or returning a stored value that was
1816 calculated within post_exec().
1817
1818 This function will only be called for dialects which support "implicit"
1819 primary key generation, keep preexecute_autoincrement_sequences set to
1820 False, and when no explicit id value was bound to the statement.
1821
1822 The function is called once for an INSERT statement that would need to
1823 return the last inserted primary key for those dialects that make use
1824 of the lastrowid concept. In these cases, it is called directly after
1825 :meth:`.ExecutionContext.post_exec`.
1826
1827 """
1828 return self.cursor.lastrowid
1829
1830 def handle_dbapi_exception(self, e):
1831 pass
1832
1833 @util.non_memoized_property
1834 def rowcount(self) -> int:
1835 if self._rowcount is not None:
1836 return self._rowcount
1837 else:
1838 return self.cursor.rowcount
1839
1840 @property
1841 def _has_rowcount(self):
1842 return self._rowcount is not None
1843
1844 def supports_sane_rowcount(self):
1845 return self.dialect.supports_sane_rowcount
1846
1847 def supports_sane_multi_rowcount(self):
1848 return self.dialect.supports_sane_multi_rowcount
1849
1850 def _setup_result_proxy(self):
1851 exec_opt = self.execution_options
1852
1853 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1854 self._rowcount = self.cursor.rowcount
1855
1856 yp: Optional[Union[int, bool]]
1857 if self.is_crud or self.is_text:
1858 result = self._setup_dml_or_text_result()
1859 yp = False
1860 else:
1861 yp = exec_opt.get("yield_per", None)
1862 sr = self._is_server_side or exec_opt.get("stream_results", False)
1863 strategy = self.cursor_fetch_strategy
1864 if sr and strategy is _cursor._DEFAULT_FETCH:
1865 strategy = _cursor.BufferedRowCursorFetchStrategy(
1866 self.cursor, self.execution_options
1867 )
1868 cursor_description: _DBAPICursorDescription = (
1869 strategy.alternate_cursor_description
1870 or self.cursor.description
1871 )
1872 if cursor_description is None:
1873 strategy = _cursor._NO_CURSOR_DQL
1874
1875 result = _cursor.CursorResult(self, strategy, cursor_description)
1876
1877 compiled = self.compiled
1878
1879 if (
1880 compiled
1881 and not self.isddl
1882 and cast(SQLCompiler, compiled).has_out_parameters
1883 ):
1884 self._setup_out_parameters(result)
1885
1886 self._soft_closed = result._soft_closed
1887
1888 if yp:
1889 result = result.yield_per(yp)
1890
1891 return result
1892
1893 def _setup_out_parameters(self, result):
1894 compiled = cast(SQLCompiler, self.compiled)
1895
1896 out_bindparams = [
1897 (param, name)
1898 for param, name in compiled.bind_names.items()
1899 if param.isoutparam
1900 ]
1901 out_parameters = {}
1902
1903 for bindparam, raw_value in zip(
1904 [param for param, name in out_bindparams],
1905 self.get_out_parameter_values(
1906 [name for param, name in out_bindparams]
1907 ),
1908 ):
1909 type_ = bindparam.type
1910 impl_type = type_.dialect_impl(self.dialect)
1911 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1912 result_processor = impl_type.result_processor(
1913 self.dialect, dbapi_type
1914 )
1915 if result_processor is not None:
1916 raw_value = result_processor(raw_value)
1917 out_parameters[bindparam.key] = raw_value
1918
1919 result.out_parameters = out_parameters
1920
1921 def _setup_dml_or_text_result(self):
1922 compiled = cast(SQLCompiler, self.compiled)
1923
1924 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1925
1926 if self.isinsert:
1927 if (
1928 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1929 and compiled.effective_returning
1930 ):
1931 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1932 self.cursor,
1933 initial_buffer=self._insertmanyvalues_rows,
1934 # maintain alt cursor description if set by the
1935 # dialect, e.g. mssql preserves it
1936 alternate_description=(
1937 strategy.alternate_cursor_description
1938 ),
1939 )
1940
1941 if compiled.postfetch_lastrowid:
1942 self.inserted_primary_key_rows = (
1943 self._setup_ins_pk_from_lastrowid()
1944 )
1945 # else if not self._is_implicit_returning,
1946 # the default inserted_primary_key_rows accessor will
1947 # return an "empty" primary key collection when accessed.
1948
1949 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1950 strategy = _cursor.BufferedRowCursorFetchStrategy(
1951 self.cursor, self.execution_options
1952 )
1953
1954 if strategy is _cursor._NO_CURSOR_DML:
1955 cursor_description = None
1956 else:
1957 cursor_description = (
1958 strategy.alternate_cursor_description
1959 or self.cursor.description
1960 )
1961
1962 if cursor_description is None:
1963 strategy = _cursor._NO_CURSOR_DML
1964 elif self._num_sentinel_cols:
1965 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1966 # the sentinel columns are handled in CursorResult._init_metadata
1967 # using essentially _reduce
1968
1969 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1970 self, strategy, cursor_description
1971 )
1972
1973 if self.isinsert:
1974 if self._is_implicit_returning:
1975 rows = result.all()
1976
1977 self.returned_default_rows = rows
1978
1979 self.inserted_primary_key_rows = (
1980 self._setup_ins_pk_from_implicit_returning(result, rows)
1981 )
1982
1983 # test that it has a cursor metadata that is accurate. the
1984 # first row will have been fetched and current assumptions
1985 # are that the result has only one row, until executemany()
1986 # support is added here.
1987 assert result._metadata.returns_rows
1988
1989 # Insert statement has both return_defaults() and
1990 # returning(). rewind the result on the list of rows
1991 # we just used.
1992 if self._is_supplemental_returning:
1993 result._rewind(rows)
1994 else:
1995 result._soft_close()
1996 elif not self._is_explicit_returning:
1997 result._soft_close()
1998
1999 # we assume here the result does not return any rows.
2000 # *usually*, this will be true. However, some dialects
2001 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2002 # function so this is not necessarily true.
2003 # assert not result.returns_rows
2004
2005 elif self._is_implicit_returning:
2006 rows = result.all()
2007
2008 if rows:
2009 self.returned_default_rows = rows
2010 self._rowcount = len(rows)
2011
2012 if self._is_supplemental_returning:
2013 result._rewind(rows)
2014 else:
2015 result._soft_close()
2016
2017 # test that it has a cursor metadata that is accurate.
2018 # the rows have all been fetched however.
2019 assert result._metadata.returns_rows
2020
2021 elif not result._metadata.returns_rows:
2022 # no results, get rowcount
2023 # (which requires open cursor on some drivers)
2024 if self._rowcount is None:
2025 self._rowcount = self.cursor.rowcount
2026 result._soft_close()
2027 elif self.isupdate or self.isdelete:
2028 if self._rowcount is None:
2029 self._rowcount = self.cursor.rowcount
2030 return result
2031
2032 @util.memoized_property
2033 def inserted_primary_key_rows(self):
2034 # if no specific "get primary key" strategy was set up
2035 # during execution, return a "default" primary key based
2036 # on what's in the compiled_parameters and nothing else.
2037 return self._setup_ins_pk_from_empty()
2038
2039 def _setup_ins_pk_from_lastrowid(self):
2040 getter = cast(
2041 SQLCompiler, self.compiled
2042 )._inserted_primary_key_from_lastrowid_getter
2043 lastrowid = self.get_lastrowid()
2044 return [getter(lastrowid, self.compiled_parameters[0])]
2045
2046 def _setup_ins_pk_from_empty(self):
2047 getter = cast(
2048 SQLCompiler, self.compiled
2049 )._inserted_primary_key_from_lastrowid_getter
2050 return [getter(None, param) for param in self.compiled_parameters]
2051
2052 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2053 if not rows:
2054 return []
2055
2056 getter = cast(
2057 SQLCompiler, self.compiled
2058 )._inserted_primary_key_from_returning_getter
2059 compiled_params = self.compiled_parameters
2060
2061 return [
2062 getter(row, param) for row, param in zip(rows, compiled_params)
2063 ]
2064
2065 def lastrow_has_defaults(self):
2066 return (self.isinsert or self.isupdate) and bool(
2067 cast(SQLCompiler, self.compiled).postfetch
2068 )
2069
2070 def _prepare_set_input_sizes(
2071 self,
2072 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2073 """Given a cursor and ClauseParameters, prepare arguments
2074 in order to call the appropriate
2075 style of ``setinputsizes()`` on the cursor, using DB-API types
2076 from the bind parameter's ``TypeEngine`` objects.
2077
2078 This method only called by those dialects which set the
2079 :attr:`.Dialect.bind_typing` attribute to
2080 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2081 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2082 option.
2083
2084 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2085 for pg8000 and asyncpg, which has been changed to inline rendering
2086 of casts.
2087
2088 """
2089 if self.isddl or self.is_text:
2090 return None
2091
2092 compiled = cast(SQLCompiler, self.compiled)
2093
2094 inputsizes = compiled._get_set_input_sizes_lookup()
2095
2096 if inputsizes is None:
2097 return None
2098
2099 dialect = self.dialect
2100
2101 # all of the rest of this... cython?
2102
2103 if dialect._has_events:
2104 inputsizes = dict(inputsizes)
2105 dialect.dispatch.do_setinputsizes(
2106 inputsizes, self.cursor, self.statement, self.parameters, self
2107 )
2108
2109 if compiled.escaped_bind_names:
2110 escaped_bind_names = compiled.escaped_bind_names
2111 else:
2112 escaped_bind_names = None
2113
2114 if dialect.positional:
2115 items = [
2116 (key, compiled.binds[key])
2117 for key in compiled.positiontup or ()
2118 ]
2119 else:
2120 items = [
2121 (key, bindparam)
2122 for bindparam, key in compiled.bind_names.items()
2123 ]
2124
2125 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2126 for key, bindparam in items:
2127 if bindparam in compiled.literal_execute_params:
2128 continue
2129
2130 if key in self._expanded_parameters:
2131 if is_tuple_type(bindparam.type):
2132 num = len(bindparam.type.types)
2133 dbtypes = inputsizes[bindparam]
2134 generic_inputsizes.extend(
2135 (
2136 (
2137 escaped_bind_names.get(paramname, paramname)
2138 if escaped_bind_names is not None
2139 else paramname
2140 ),
2141 dbtypes[idx % num],
2142 bindparam.type.types[idx % num],
2143 )
2144 for idx, paramname in enumerate(
2145 self._expanded_parameters[key]
2146 )
2147 )
2148 else:
2149 dbtype = inputsizes.get(bindparam, None)
2150 generic_inputsizes.extend(
2151 (
2152 (
2153 escaped_bind_names.get(paramname, paramname)
2154 if escaped_bind_names is not None
2155 else paramname
2156 ),
2157 dbtype,
2158 bindparam.type,
2159 )
2160 for paramname in self._expanded_parameters[key]
2161 )
2162 else:
2163 dbtype = inputsizes.get(bindparam, None)
2164
2165 escaped_name = (
2166 escaped_bind_names.get(key, key)
2167 if escaped_bind_names is not None
2168 else key
2169 )
2170
2171 generic_inputsizes.append(
2172 (escaped_name, dbtype, bindparam.type)
2173 )
2174
2175 return generic_inputsizes
2176
2177 def _exec_default(self, column, default, type_):
2178 if default.is_sequence:
2179 return self.fire_sequence(default, type_)
2180 elif default.is_callable:
2181 # this codepath is not normally used as it's inlined
2182 # into _process_execute_defaults
2183 self.current_column = column
2184 return default.arg(self)
2185 elif default.is_clause_element:
2186 return self._exec_default_clause_element(column, default, type_)
2187 else:
2188 # this codepath is not normally used as it's inlined
2189 # into _process_execute_defaults
2190 return default.arg
2191
2192 def _exec_default_clause_element(self, column, default, type_):
2193 # execute a default that's a complete clause element. Here, we have
2194 # to re-implement a miniature version of the compile->parameters->
2195 # cursor.execute() sequence, since we don't want to modify the state
2196 # of the connection / result in progress or create new connection/
2197 # result objects etc.
2198 # .. versionchanged:: 1.4
2199
2200 if not default._arg_is_typed:
2201 default_arg = expression.type_coerce(default.arg, type_)
2202 else:
2203 default_arg = default.arg
2204 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2205 compiled_params = compiled.construct_params()
2206 processors = compiled._bind_processors
2207 if compiled.positional:
2208 parameters = self.dialect.execute_sequence_format(
2209 [
2210 (
2211 processors[key](compiled_params[key]) # type: ignore
2212 if key in processors
2213 else compiled_params[key]
2214 )
2215 for key in compiled.positiontup or ()
2216 ]
2217 )
2218 else:
2219 parameters = {
2220 key: (
2221 processors[key](compiled_params[key]) # type: ignore
2222 if key in processors
2223 else compiled_params[key]
2224 )
2225 for key in compiled_params
2226 }
2227 return self._execute_scalar(
2228 str(compiled), type_, parameters=parameters
2229 )
2230
2231 current_parameters: Optional[_CoreSingleExecuteParams] = None
2232 """A dictionary of parameters applied to the current row.
2233
2234 This attribute is only available in the context of a user-defined default
2235 generation function, e.g. as described at :ref:`context_default_functions`.
2236 It consists of a dictionary which includes entries for each column/value
2237 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2238 dictionary will be the key value of each :class:`_schema.Column`,
2239 which is usually
2240 synonymous with the name.
2241
2242 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2243 does not accommodate for the "multi-values" feature of the
2244 :meth:`_expression.Insert.values` method. The
2245 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2246 preferred.
2247
2248 .. seealso::
2249
2250 :meth:`.DefaultExecutionContext.get_current_parameters`
2251
2252 :ref:`context_default_functions`
2253
2254 """
2255
2256 def get_current_parameters(self, isolate_multiinsert_groups=True):
2257 """Return a dictionary of parameters applied to the current row.
2258
2259 This method can only be used in the context of a user-defined default
2260 generation function, e.g. as described at
2261 :ref:`context_default_functions`. When invoked, a dictionary is
2262 returned which includes entries for each column/value pair that is part
2263 of the INSERT or UPDATE statement. The keys of the dictionary will be
2264 the key value of each :class:`_schema.Column`,
2265 which is usually synonymous
2266 with the name.
2267
2268 :param isolate_multiinsert_groups=True: indicates that multi-valued
2269 INSERT constructs created using :meth:`_expression.Insert.values`
2270 should be
2271 handled by returning only the subset of parameters that are local
2272 to the current column default invocation. When ``False``, the
2273 raw parameters of the statement are returned including the
2274 naming convention used in the case of multi-valued INSERT.
2275
2276 .. seealso::
2277
2278 :attr:`.DefaultExecutionContext.current_parameters`
2279
2280 :ref:`context_default_functions`
2281
2282 """
2283 try:
2284 parameters = self.current_parameters
2285 column = self.current_column
2286 except AttributeError:
2287 raise exc.InvalidRequestError(
2288 "get_current_parameters() can only be invoked in the "
2289 "context of a Python side column default function"
2290 )
2291 else:
2292 assert column is not None
2293 assert parameters is not None
2294 compile_state = cast(
2295 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2296 )
2297 assert compile_state is not None
2298 if (
2299 isolate_multiinsert_groups
2300 and dml.isinsert(compile_state)
2301 and compile_state._has_multi_parameters
2302 ):
2303 if column._is_multiparam_column:
2304 index = column.index + 1
2305 d = {column.original.key: parameters[column.key]}
2306 else:
2307 d = {column.key: parameters[column.key]}
2308 index = 0
2309 assert compile_state._dict_parameters is not None
2310 keys = compile_state._dict_parameters.keys()
2311 d.update(
2312 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2313 )
2314 return d
2315 else:
2316 return parameters
2317
2318 def get_insert_default(self, column):
2319 if column.default is None:
2320 return None
2321 else:
2322 return self._exec_default(column, column.default, column.type)
2323
2324 def get_update_default(self, column):
2325 if column.onupdate is None:
2326 return None
2327 else:
2328 return self._exec_default(column, column.onupdate, column.type)
2329
2330 def _process_execute_defaults(self):
2331 compiled = cast(SQLCompiler, self.compiled)
2332
2333 key_getter = compiled._within_exec_param_key_getter
2334
2335 sentinel_counter = 0
2336
2337 if compiled.insert_prefetch:
2338 prefetch_recs = [
2339 (
2340 c,
2341 key_getter(c),
2342 c._default_description_tuple,
2343 self.get_insert_default,
2344 )
2345 for c in compiled.insert_prefetch
2346 ]
2347 elif compiled.update_prefetch:
2348 prefetch_recs = [
2349 (
2350 c,
2351 key_getter(c),
2352 c._onupdate_description_tuple,
2353 self.get_update_default,
2354 )
2355 for c in compiled.update_prefetch
2356 ]
2357 else:
2358 prefetch_recs = []
2359
2360 for param in self.compiled_parameters:
2361 self.current_parameters = param
2362
2363 for (
2364 c,
2365 param_key,
2366 (arg, is_scalar, is_callable, is_sentinel),
2367 fallback,
2368 ) in prefetch_recs:
2369 if is_sentinel:
2370 param[param_key] = sentinel_counter
2371 sentinel_counter += 1
2372 elif is_scalar:
2373 param[param_key] = arg
2374 elif is_callable:
2375 self.current_column = c
2376 param[param_key] = arg(self)
2377 else:
2378 val = fallback(c)
2379 if val is not None:
2380 param[param_key] = val
2381
2382 del self.current_parameters
2383
2384
2385DefaultDialect.execution_ctx_cls = DefaultExecutionContext