1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _DBAPISingleExecuteParams
84 from .interfaces import _ExecuteOptions
85 from .interfaces import _MutableCoreSingleExecuteParams
86 from .interfaces import _ParamStyle
87 from .interfaces import ConnectArgsType
88 from .interfaces import DBAPIConnection
89 from .interfaces import DBAPIModule
90 from .interfaces import IsolationLevel
91 from .row import Row
92 from .url import URL
93 from ..event import _ListenerFnType
94 from ..pool import Pool
95 from ..pool import PoolProxiedConnection
96 from ..sql import Executable
97 from ..sql.compiler import Compiled
98 from ..sql.compiler import Linting
99 from ..sql.compiler import ResultColumnsEntry
100 from ..sql.dml import DMLState
101 from ..sql.dml import UpdateBase
102 from ..sql.elements import BindParameter
103 from ..sql.schema import Column
104 from ..sql.type_api import _BindProcessorType
105 from ..sql.type_api import _ResultProcessorType
106 from ..sql.type_api import TypeEngine
107
108
109# When we're handed literal SQL, ensure it's a SELECT query
110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
111
112
113(
114 CACHE_HIT,
115 CACHE_MISS,
116 CACHING_DISABLED,
117 NO_CACHE_KEY,
118 NO_DIALECT_SUPPORT,
119) = list(CacheStats)
120
121
122class DefaultDialect(Dialect):
123 """Default implementation of Dialect"""
124
125 statement_compiler = compiler.SQLCompiler
126 ddl_compiler = compiler.DDLCompiler
127 type_compiler_cls = compiler.GenericTypeCompiler
128
129 preparer = compiler.IdentifierPreparer
130 supports_alter = True
131 supports_comments = False
132 supports_constraint_comments = False
133 inline_comments = False
134 supports_statement_cache = True
135
136 div_is_floordiv = True
137
138 bind_typing = interfaces.BindTyping.NONE
139
140 include_set_input_sizes: Optional[Set[Any]] = None
141 exclude_set_input_sizes: Optional[Set[Any]] = None
142
143 # the first value we'd get for an autoincrement column.
144 default_sequence_base = 1
145
146 # most DBAPIs happy with this for execute().
147 # not cx_oracle.
148 execute_sequence_format = tuple
149
150 supports_schemas = True
151 supports_views = True
152 supports_sequences = False
153 sequences_optional = False
154 preexecute_autoincrement_sequences = False
155 supports_identity_columns = False
156 postfetch_lastrowid = True
157 favor_returning_over_lastrowid = False
158 insert_null_pk_still_autoincrements = False
159 update_returning = False
160 delete_returning = False
161 update_returning_multifrom = False
162 delete_returning_multifrom = False
163 insert_returning = False
164
165 aggregate_order_by_style = AggregateOrderByStyle.INLINE
166
167 cte_follows_insert = False
168
169 supports_native_enum = False
170 supports_native_boolean = False
171 supports_native_uuid = False
172 returns_native_bytes = False
173
174 non_native_boolean_check_constraint = True
175
176 supports_simple_order_by_label = True
177
178 tuple_in_values = False
179
180 connection_characteristics = util.immutabledict(
181 {
182 "isolation_level": characteristics.IsolationLevelCharacteristic(),
183 "logging_token": characteristics.LoggingTokenCharacteristic(),
184 }
185 )
186
187 engine_config_types: Mapping[str, Any] = util.immutabledict(
188 {
189 "pool_timeout": util.asint,
190 "echo": util.bool_or_str("debug"),
191 "echo_pool": util.bool_or_str("debug"),
192 "pool_recycle": util.asint,
193 "pool_size": util.asint,
194 "max_overflow": util.asint,
195 "future": util.asbool,
196 }
197 )
198
199 # if the NUMERIC type
200 # returns decimal.Decimal.
201 # *not* the FLOAT type however.
202 supports_native_decimal = False
203
204 name = "default"
205
206 # length at which to truncate
207 # any identifier.
208 max_identifier_length = 9999
209 _user_defined_max_identifier_length: Optional[int] = None
210
211 isolation_level: Optional[str] = None
212
213 # sub-categories of max_identifier_length.
214 # currently these accommodate for MySQL which allows alias names
215 # of 255 but DDL names only of 64.
216 max_index_name_length: Optional[int] = None
217 max_constraint_name_length: Optional[int] = None
218
219 supports_sane_rowcount = True
220 supports_sane_multi_rowcount = True
221 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
222 default_paramstyle = "named"
223
224 supports_default_values = False
225 """dialect supports INSERT... DEFAULT VALUES syntax"""
226
227 supports_default_metavalue = False
228 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
229
230 default_metavalue_token = "DEFAULT"
231 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
232 parenthesis."""
233
234 # not sure if this is a real thing but the compiler will deliver it
235 # if this is the only flag enabled.
236 supports_empty_insert = True
237 """dialect supports INSERT () VALUES ()"""
238
239 supports_multivalues_insert = False
240
241 use_insertmanyvalues: bool = False
242
243 use_insertmanyvalues_wo_returning: bool = False
244
245 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
246 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
247 )
248
249 insertmanyvalues_page_size: int = 1000
250 insertmanyvalues_max_parameters = 32700
251
252 supports_is_distinct_from = True
253
254 supports_server_side_cursors = False
255
256 server_side_cursors = False
257
258 # extra record-level locking features (#4860)
259 supports_for_update_of = False
260
261 server_version_info = None
262
263 default_schema_name: Optional[str] = None
264
265 # indicates symbol names are
266 # UPPERCASED if they are case insensitive
267 # within the database.
268 # if this is True, the methods normalize_name()
269 # and denormalize_name() must be provided.
270 requires_name_normalize = False
271
272 is_async = False
273
274 has_terminate = False
275
276 # TODO: this is not to be part of 2.0. implement rudimentary binary
277 # literals for SQLite, PostgreSQL, MySQL only within
278 # _Binary.literal_processor
279 _legacy_binary_type_literal_encoding = "utf-8"
280
281 @util.deprecated_params(
282 empty_in_strategy=(
283 "1.4",
284 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
285 "deprecated, and no longer has any effect. All IN expressions "
286 "are now rendered using "
287 'the "expanding parameter" strategy which renders a set of bound'
288 'expressions, or an "empty set" SELECT, at statement execution'
289 "time.",
290 ),
291 server_side_cursors=(
292 "1.4",
293 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
294 "is deprecated and will be removed in a future release. Please "
295 "use the "
296 ":paramref:`_engine.Connection.execution_options.stream_results` "
297 "parameter.",
298 ),
299 )
300 def __init__(
301 self,
302 paramstyle: Optional[_ParamStyle] = None,
303 isolation_level: Optional[IsolationLevel] = None,
304 dbapi: Optional[ModuleType] = None,
305 implicit_returning: Literal[True] = True,
306 supports_native_boolean: Optional[bool] = None,
307 max_identifier_length: Optional[int] = None,
308 label_length: Optional[int] = None,
309 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
310 use_insertmanyvalues: Optional[bool] = None,
311 # util.deprecated_params decorator cannot render the
312 # Linting.NO_LINTING constant
313 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
314 server_side_cursors: bool = False,
315 skip_autocommit_rollback: bool = False,
316 **kwargs: Any,
317 ):
318 if server_side_cursors:
319 if not self.supports_server_side_cursors:
320 raise exc.ArgumentError(
321 "Dialect %s does not support server side cursors" % self
322 )
323 else:
324 self.server_side_cursors = True
325
326 if getattr(self, "use_setinputsizes", False):
327 util.warn_deprecated(
328 "The dialect-level use_setinputsizes attribute is "
329 "deprecated. Please use "
330 "bind_typing = BindTyping.SETINPUTSIZES",
331 "2.0",
332 )
333 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
334
335 self.positional = False
336 self._ischema = None
337
338 self.dbapi = dbapi
339
340 self.skip_autocommit_rollback = skip_autocommit_rollback
341
342 if paramstyle is not None:
343 self.paramstyle = paramstyle
344 elif self.dbapi is not None:
345 self.paramstyle = self.dbapi.paramstyle
346 else:
347 self.paramstyle = self.default_paramstyle
348 self.positional = self.paramstyle in (
349 "qmark",
350 "format",
351 "numeric",
352 "numeric_dollar",
353 )
354 self.identifier_preparer = self.preparer(self)
355 self._on_connect_isolation_level = isolation_level
356
357 legacy_tt_callable = getattr(self, "type_compiler", None)
358 if legacy_tt_callable is not None:
359 tt_callable = cast(
360 Type[compiler.GenericTypeCompiler],
361 self.type_compiler,
362 )
363 else:
364 tt_callable = self.type_compiler_cls
365
366 self.type_compiler_instance = self.type_compiler = tt_callable(self)
367
368 if supports_native_boolean is not None:
369 self.supports_native_boolean = supports_native_boolean
370
371 self._user_defined_max_identifier_length = max_identifier_length
372 if self._user_defined_max_identifier_length:
373 self.max_identifier_length = (
374 self._user_defined_max_identifier_length
375 )
376 self.label_length = label_length
377 self.compiler_linting = compiler_linting
378
379 if use_insertmanyvalues is not None:
380 self.use_insertmanyvalues = use_insertmanyvalues
381
382 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
383 self.insertmanyvalues_page_size = insertmanyvalues_page_size
384
385 @property
386 @util.deprecated(
387 "2.0",
388 "full_returning is deprecated, please use insert_returning, "
389 "update_returning, delete_returning",
390 )
391 def full_returning(self):
392 return (
393 self.insert_returning
394 and self.update_returning
395 and self.delete_returning
396 )
397
398 @util.memoized_property
399 def insert_executemany_returning(self):
400 """Default implementation for insert_executemany_returning, if not
401 otherwise overridden by the specific dialect.
402
403 The default dialect determines "insert_executemany_returning" is
404 available if the dialect in use has opted into using the
405 "use_insertmanyvalues" feature. If they haven't opted into that, then
406 this attribute is False, unless the dialect in question overrides this
407 and provides some other implementation (such as the Oracle Database
408 dialects).
409
410 """
411 return self.insert_returning and self.use_insertmanyvalues
412
413 @util.memoized_property
414 def insert_executemany_returning_sort_by_parameter_order(self):
415 """Default implementation for
416 insert_executemany_returning_deterministic_order, if not otherwise
417 overridden by the specific dialect.
418
419 The default dialect determines "insert_executemany_returning" can have
420 deterministic order only if the dialect in use has opted into using the
421 "use_insertmanyvalues" feature, which implements deterministic ordering
422 using client side sentinel columns only by default. The
423 "insertmanyvalues" feature also features alternate forms that can
424 use server-generated PK values as "sentinels", but those are only
425 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
426 bitflag enables those alternate SQL forms, which are disabled
427 by default.
428
429 If the dialect in use hasn't opted into that, then this attribute is
430 False, unless the dialect in question overrides this and provides some
431 other implementation (such as the Oracle Database dialects).
432
433 """
434 return self.insert_returning and self.use_insertmanyvalues
435
436 update_executemany_returning = False
437 delete_executemany_returning = False
438
439 @util.memoized_property
440 def loaded_dbapi(self) -> DBAPIModule:
441 if self.dbapi is None:
442 raise exc.InvalidRequestError(
443 f"Dialect {self} does not have a Python DBAPI established "
444 "and cannot be used for actual database interaction"
445 )
446 return self.dbapi
447
448 @util.memoized_property
449 def _bind_typing_render_casts(self):
450 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
451
452 def _ensure_has_table_connection(self, arg: Connection) -> None:
453 if not isinstance(arg, Connection):
454 raise exc.ArgumentError(
455 "The argument passed to Dialect.has_table() should be a "
456 "%s, got %s. "
457 "Additionally, the Dialect.has_table() method is for "
458 "internal dialect "
459 "use only; please use "
460 "``inspect(some_engine).has_table(<tablename>>)`` "
461 "for public API use." % (Connection, type(arg))
462 )
463
464 @util.memoized_property
465 def _supports_statement_cache(self):
466 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
467 if ssc is None:
468 util.warn(
469 "Dialect %s:%s will not make use of SQL compilation caching "
470 "as it does not set the 'supports_statement_cache' attribute "
471 "to ``True``. This can have "
472 "significant performance implications including some "
473 "performance degradations in comparison to prior SQLAlchemy "
474 "versions. Dialect maintainers should seek to set this "
475 "attribute to True after appropriate development and testing "
476 "for SQLAlchemy 1.4 caching support. Alternatively, this "
477 "attribute may be set to False which will disable this "
478 "warning." % (self.name, self.driver),
479 code="cprf",
480 )
481
482 return bool(ssc)
483
484 @util.memoized_property
485 def _type_memos(self):
486 return weakref.WeakKeyDictionary()
487
488 @property
489 def dialect_description(self): # type: ignore[override]
490 return self.name + "+" + self.driver
491
492 @property
493 def supports_sane_rowcount_returning(self):
494 """True if this dialect supports sane rowcount even if RETURNING is
495 in use.
496
497 For dialects that don't support RETURNING, this is synonymous with
498 ``supports_sane_rowcount``.
499
500 """
501 return self.supports_sane_rowcount
502
503 @classmethod
504 def get_pool_class(cls, url: URL) -> Type[Pool]:
505 default: Type[pool.Pool]
506 if cls.is_async:
507 default = pool.AsyncAdaptedQueuePool
508 else:
509 default = pool.QueuePool
510
511 return getattr(cls, "poolclass", default)
512
513 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
514 return self.get_pool_class(url)
515
516 @classmethod
517 def load_provisioning(cls):
518 package = ".".join(cls.__module__.split(".")[0:-1])
519 try:
520 __import__(package + ".provision")
521 except ImportError:
522 pass
523
524 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
525 if self._on_connect_isolation_level is not None:
526
527 def builtin_connect(dbapi_conn, conn_rec):
528 self._assert_and_set_isolation_level(
529 dbapi_conn, self._on_connect_isolation_level
530 )
531
532 return builtin_connect
533 else:
534 return None
535
536 def initialize(self, connection: Connection) -> None:
537 try:
538 self.server_version_info = self._get_server_version_info(
539 connection
540 )
541 except NotImplementedError:
542 self.server_version_info = None
543 try:
544 self.default_schema_name = self._get_default_schema_name(
545 connection
546 )
547 except NotImplementedError:
548 self.default_schema_name = None
549
550 try:
551 self.default_isolation_level = self.get_default_isolation_level(
552 connection.connection.dbapi_connection
553 )
554 except NotImplementedError:
555 self.default_isolation_level = None
556
557 if not self._user_defined_max_identifier_length:
558 max_ident_length = self._check_max_identifier_length(connection)
559 if max_ident_length:
560 self.max_identifier_length = max_ident_length
561
562 if (
563 self.label_length
564 and self.label_length > self.max_identifier_length
565 ):
566 raise exc.ArgumentError(
567 "Label length of %d is greater than this dialect's"
568 " maximum identifier length of %d"
569 % (self.label_length, self.max_identifier_length)
570 )
571
572 def on_connect(self) -> Optional[Callable[[Any], None]]:
573 # inherits the docstring from interfaces.Dialect.on_connect
574 return None
575
576 def _check_max_identifier_length(self, connection):
577 """Perform a connection / server version specific check to determine
578 the max_identifier_length.
579
580 If the dialect's class level max_identifier_length should be used,
581 can return None.
582
583 """
584 return None
585
586 def get_default_isolation_level(self, dbapi_conn):
587 """Given a DBAPI connection, return its isolation level, or
588 a default isolation level if one cannot be retrieved.
589
590 May be overridden by subclasses in order to provide a
591 "fallback" isolation level for databases that cannot reliably
592 retrieve the actual isolation level.
593
594 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
595 method, propagating any exceptions raised.
596
597 """
598 return self.get_isolation_level(dbapi_conn)
599
600 def type_descriptor(self, typeobj):
601 """Provide a database-specific :class:`.TypeEngine` object, given
602 the generic object which comes from the types module.
603
604 This method looks for a dictionary called
605 ``colspecs`` as a class or instance-level variable,
606 and passes on to :func:`_types.adapt_type`.
607
608 """
609 return type_api.adapt_type(typeobj, self.colspecs)
610
611 def has_index(self, connection, table_name, index_name, schema=None, **kw):
612 if not self.has_table(connection, table_name, schema=schema, **kw):
613 return False
614 for idx in self.get_indexes(
615 connection, table_name, schema=schema, **kw
616 ):
617 if idx["name"] == index_name:
618 return True
619 else:
620 return False
621
622 def has_schema(
623 self, connection: Connection, schema_name: str, **kw: Any
624 ) -> bool:
625 return schema_name in self.get_schema_names(connection, **kw)
626
627 def validate_identifier(self, ident: str) -> None:
628 if len(ident) > self.max_identifier_length:
629 raise exc.IdentifierError(
630 "Identifier '%s' exceeds maximum length of %d characters"
631 % (ident, self.max_identifier_length)
632 )
633
634 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
635 # inherits the docstring from interfaces.Dialect.connect
636 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
637
638 def create_connect_args(self, url: URL) -> ConnectArgsType:
639 # inherits the docstring from interfaces.Dialect.create_connect_args
640 opts = url.translate_connect_args()
641 opts.update(url.query)
642 return ([], opts)
643
644 def set_engine_execution_options(
645 self, engine: Engine, opts: Mapping[str, Any]
646 ) -> None:
647 supported_names = set(self.connection_characteristics).intersection(
648 opts
649 )
650 if supported_names:
651 characteristics: Mapping[str, Any] = util.immutabledict(
652 (name, opts[name]) for name in supported_names
653 )
654
655 @event.listens_for(engine, "engine_connect")
656 def set_connection_characteristics(connection):
657 self._set_connection_characteristics(
658 connection, characteristics
659 )
660
661 def set_connection_execution_options(
662 self, connection: Connection, opts: Mapping[str, Any]
663 ) -> None:
664 supported_names = set(self.connection_characteristics).intersection(
665 opts
666 )
667 if supported_names:
668 characteristics: Mapping[str, Any] = util.immutabledict(
669 (name, opts[name]) for name in supported_names
670 )
671 self._set_connection_characteristics(connection, characteristics)
672
673 def _set_connection_characteristics(self, connection, characteristics):
674 characteristic_values = [
675 (name, self.connection_characteristics[name], value)
676 for name, value in characteristics.items()
677 ]
678
679 if connection.in_transaction():
680 trans_objs = [
681 (name, obj)
682 for name, obj, _ in characteristic_values
683 if obj.transactional
684 ]
685 if trans_objs:
686 raise exc.InvalidRequestError(
687 "This connection has already initialized a SQLAlchemy "
688 "Transaction() object via begin() or autobegin; "
689 "%s may not be altered unless rollback() or commit() "
690 "is called first."
691 % (", ".join(name for name, obj in trans_objs))
692 )
693
694 dbapi_connection = connection.connection.dbapi_connection
695 for _, characteristic, value in characteristic_values:
696 characteristic.set_connection_characteristic(
697 self, connection, dbapi_connection, value
698 )
699 connection.connection._connection_record.finalize_callback.append(
700 functools.partial(self._reset_characteristics, characteristics)
701 )
702
703 def _reset_characteristics(self, characteristics, dbapi_connection):
704 for characteristic_name in characteristics:
705 characteristic = self.connection_characteristics[
706 characteristic_name
707 ]
708 characteristic.reset_characteristic(self, dbapi_connection)
709
710 def do_begin(self, dbapi_connection):
711 pass
712
713 def do_rollback(self, dbapi_connection):
714 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
715 dbapi_connection
716 ):
717 return
718 dbapi_connection.rollback()
719
720 def do_commit(self, dbapi_connection):
721 dbapi_connection.commit()
722
723 def do_terminate(self, dbapi_connection):
724 self.do_close(dbapi_connection)
725
726 def do_close(self, dbapi_connection):
727 dbapi_connection.close()
728
729 @util.memoized_property
730 def _dialect_specific_select_one(self):
731 return str(expression.select(1).compile(dialect=self))
732
733 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
734 try:
735 return self.do_ping(dbapi_connection)
736 except self.loaded_dbapi.Error as err:
737 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
738
739 if self._has_events:
740 try:
741 Connection._handle_dbapi_exception_noconnection(
742 err,
743 self,
744 is_disconnect=is_disconnect,
745 invalidate_pool_on_disconnect=False,
746 is_pre_ping=True,
747 )
748 except exc.StatementError as new_err:
749 is_disconnect = new_err.connection_invalidated
750
751 if is_disconnect:
752 return False
753 else:
754 raise
755
756 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
757 cursor = dbapi_connection.cursor()
758 try:
759 cursor.execute(self._dialect_specific_select_one)
760 finally:
761 cursor.close()
762 return True
763
764 def create_xid(self):
765 """Create a random two-phase transaction ID.
766
767 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
768 do_commit_twophase(). Its format is unspecified.
769 """
770
771 return "_sa_%032x" % random.randint(0, 2**128)
772
773 def do_savepoint(self, connection, name):
774 connection.execute(expression.SavepointClause(name))
775
776 def do_rollback_to_savepoint(self, connection, name):
777 connection.execute(expression.RollbackToSavepointClause(name))
778
779 def do_release_savepoint(self, connection, name):
780 connection.execute(expression.ReleaseSavepointClause(name))
781
782 def _deliver_insertmanyvalues_batches(
783 self,
784 connection,
785 cursor,
786 statement,
787 parameters,
788 generic_setinputsizes,
789 context,
790 ):
791 context = cast(DefaultExecutionContext, context)
792 compiled = cast(SQLCompiler, context.compiled)
793
794 _composite_sentinel_proc: Sequence[
795 Optional[_ResultProcessorType[Any]]
796 ] = ()
797 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
798 _sentinel_proc_initialized: bool = False
799
800 compiled_parameters = context.compiled_parameters
801
802 imv = compiled._insertmanyvalues
803 assert imv is not None
804
805 is_returning: Final[bool] = bool(compiled.effective_returning)
806 batch_size = context.execution_options.get(
807 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
808 )
809
810 if compiled.schema_translate_map:
811 schema_translate_map = context.execution_options.get(
812 "schema_translate_map", {}
813 )
814 else:
815 schema_translate_map = None
816
817 if is_returning:
818 result: Optional[List[Any]] = []
819 context._insertmanyvalues_rows = result
820
821 sort_by_parameter_order = imv.sort_by_parameter_order
822
823 else:
824 sort_by_parameter_order = False
825 result = None
826
827 for imv_batch in compiled._deliver_insertmanyvalues_batches(
828 statement,
829 parameters,
830 compiled_parameters,
831 generic_setinputsizes,
832 batch_size,
833 sort_by_parameter_order,
834 schema_translate_map,
835 ):
836 yield imv_batch
837
838 if is_returning:
839
840 try:
841 rows = context.fetchall_for_returning(cursor)
842 except BaseException as be:
843 connection._handle_dbapi_exception(
844 be,
845 sql_util._long_statement(imv_batch.replaced_statement),
846 imv_batch.replaced_parameters,
847 None,
848 context,
849 is_sub_exec=True,
850 )
851
852 # I would have thought "is_returning: Final[bool]"
853 # would have assured this but pylance thinks not
854 assert result is not None
855
856 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
857 composite_sentinel = imv.num_sentinel_columns > 1
858 if imv.implicit_sentinel:
859 # for implicit sentinel, which is currently single-col
860 # integer autoincrement, do a simple sort.
861 assert not composite_sentinel
862 result.extend(
863 sorted(rows, key=operator.itemgetter(-1))
864 )
865 continue
866
867 # otherwise, create dictionaries to match up batches
868 # with parameters
869 assert imv.sentinel_param_keys
870 assert imv.sentinel_columns
871
872 _nsc = imv.num_sentinel_columns
873
874 if not _sentinel_proc_initialized:
875 if composite_sentinel:
876 _composite_sentinel_proc = [
877 col.type._cached_result_processor(
878 self, cursor_desc[1]
879 )
880 for col, cursor_desc in zip(
881 imv.sentinel_columns,
882 cursor.description[-_nsc:],
883 )
884 ]
885 else:
886 _scalar_sentinel_proc = (
887 imv.sentinel_columns[0]
888 ).type._cached_result_processor(
889 self, cursor.description[-1][1]
890 )
891 _sentinel_proc_initialized = True
892
893 rows_by_sentinel: Union[
894 Dict[Tuple[Any, ...], Any],
895 Dict[Any, Any],
896 ]
897 if composite_sentinel:
898 rows_by_sentinel = {
899 tuple(
900 (proc(val) if proc else val)
901 for val, proc in zip(
902 row[-_nsc:], _composite_sentinel_proc
903 )
904 ): row
905 for row in rows
906 }
907 elif _scalar_sentinel_proc:
908 rows_by_sentinel = {
909 _scalar_sentinel_proc(row[-1]): row for row in rows
910 }
911 else:
912 rows_by_sentinel = {row[-1]: row for row in rows}
913
914 if len(rows_by_sentinel) != len(imv_batch.batch):
915 # see test_insert_exec.py::
916 # IMVSentinelTest::test_sentinel_incorrect_rowcount
917 # for coverage / demonstration
918 raise exc.InvalidRequestError(
919 f"Sentinel-keyed result set did not produce "
920 f"correct number of rows {len(imv_batch.batch)}; "
921 "produced "
922 f"{len(rows_by_sentinel)}. Please ensure the "
923 "sentinel column is fully unique and populated in "
924 "all cases."
925 )
926
927 try:
928 ordered_rows = [
929 rows_by_sentinel[sentinel_keys]
930 for sentinel_keys in imv_batch.sentinel_values
931 ]
932 except KeyError as ke:
933 # see test_insert_exec.py::
934 # IMVSentinelTest::test_sentinel_cant_match_keys
935 # for coverage / demonstration
936 raise exc.InvalidRequestError(
937 f"Can't match sentinel values in result set to "
938 f"parameter sets; key {ke.args[0]!r} was not "
939 "found. "
940 "There may be a mismatch between the datatype "
941 "passed to the DBAPI driver vs. that which it "
942 "returns in a result row. Ensure the given "
943 "Python value matches the expected result type "
944 "*exactly*, taking care to not rely upon implicit "
945 "conversions which may occur such as when using "
946 "strings in place of UUID or integer values, etc. "
947 ) from ke
948
949 result.extend(ordered_rows)
950
951 else:
952 result.extend(rows)
953
954 def do_executemany(self, cursor, statement, parameters, context=None):
955 cursor.executemany(statement, parameters)
956
957 def do_execute(self, cursor, statement, parameters, context=None):
958 cursor.execute(statement, parameters)
959
960 def do_execute_no_params(self, cursor, statement, context=None):
961 cursor.execute(statement)
962
963 def is_disconnect(
964 self,
965 e: DBAPIModule.Error,
966 connection: Union[
967 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
968 ],
969 cursor: Optional[interfaces.DBAPICursor],
970 ) -> bool:
971 return False
972
973 @util.memoized_instancemethod
974 def _gen_allowed_isolation_levels(self, dbapi_conn):
975 try:
976 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
977 except NotImplementedError:
978 return None
979 else:
980 normalized_levels = [
981 level.replace("_", " ").upper() for level in raw_levels
982 ]
983 if raw_levels != normalized_levels:
984 raise ValueError(
985 f"Dialect {self.name!r} get_isolation_level_values() "
986 f"method should return names as UPPERCASE using spaces, "
987 f"not underscores; got "
988 f"{sorted(set(raw_levels).difference(normalized_levels))}"
989 )
990 return tuple(normalized_levels)
991
992 def _assert_and_set_isolation_level(self, dbapi_conn, level):
993 level = level.replace("_", " ").upper()
994
995 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
996 dbapi_conn
997 )
998 if (
999 _allowed_isolation_levels
1000 and level not in _allowed_isolation_levels
1001 ):
1002 raise exc.ArgumentError(
1003 f"Invalid value {level!r} for isolation_level. "
1004 f"Valid isolation levels for {self.name!r} are "
1005 f"{', '.join(_allowed_isolation_levels)}"
1006 )
1007
1008 self.set_isolation_level(dbapi_conn, level)
1009
1010 def reset_isolation_level(self, dbapi_conn):
1011 if self._on_connect_isolation_level is not None:
1012 assert (
1013 self._on_connect_isolation_level == "AUTOCOMMIT"
1014 or self._on_connect_isolation_level
1015 == self.default_isolation_level
1016 )
1017 self._assert_and_set_isolation_level(
1018 dbapi_conn, self._on_connect_isolation_level
1019 )
1020 else:
1021 assert self.default_isolation_level is not None
1022 self._assert_and_set_isolation_level(
1023 dbapi_conn,
1024 self.default_isolation_level,
1025 )
1026
1027 def normalize_name(self, name):
1028 if name is None:
1029 return None
1030
1031 name_lower = name.lower()
1032 name_upper = name.upper()
1033
1034 if name_upper == name_lower:
1035 # name has no upper/lower conversion, e.g. non-european characters.
1036 # return unchanged
1037 return name
1038 elif name_upper == name and not (
1039 self.identifier_preparer._requires_quotes
1040 )(name_lower):
1041 # name is all uppercase and doesn't require quoting; normalize
1042 # to all lower case
1043 return name_lower
1044 elif name_lower == name:
1045 # name is all lower case, which if denormalized means we need to
1046 # force quoting on it
1047 return quoted_name(name, quote=True)
1048 else:
1049 # name is mixed case, means it will be quoted in SQL when used
1050 # later, no normalizes
1051 return name
1052
1053 def denormalize_name(self, name):
1054 if name is None:
1055 return None
1056
1057 name_lower = name.lower()
1058 name_upper = name.upper()
1059
1060 if name_upper == name_lower:
1061 # name has no upper/lower conversion, e.g. non-european characters.
1062 # return unchanged
1063 return name
1064 elif name_lower == name and not (
1065 self.identifier_preparer._requires_quotes
1066 )(name_lower):
1067 name = name_upper
1068 return name
1069
1070 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1071 return connection
1072
1073 def _overrides_default(self, method):
1074 return (
1075 getattr(type(self), method).__code__
1076 is not getattr(DefaultDialect, method).__code__
1077 )
1078
1079 def _default_multi_reflect(
1080 self,
1081 single_tbl_method,
1082 connection,
1083 kind,
1084 schema,
1085 filter_names,
1086 scope,
1087 **kw,
1088 ):
1089 names_fns = []
1090 temp_names_fns = []
1091 if ObjectKind.TABLE in kind:
1092 names_fns.append(self.get_table_names)
1093 temp_names_fns.append(self.get_temp_table_names)
1094 if ObjectKind.VIEW in kind:
1095 names_fns.append(self.get_view_names)
1096 temp_names_fns.append(self.get_temp_view_names)
1097 if ObjectKind.MATERIALIZED_VIEW in kind:
1098 names_fns.append(self.get_materialized_view_names)
1099 # no temp materialized view at the moment
1100 # temp_names_fns.append(self.get_temp_materialized_view_names)
1101
1102 unreflectable = kw.pop("unreflectable", {})
1103
1104 if (
1105 filter_names
1106 and scope is ObjectScope.ANY
1107 and kind is ObjectKind.ANY
1108 ):
1109 # if names are given and no qualification on type of table
1110 # (i.e. the Table(..., autoload) case), take the names as given,
1111 # don't run names queries. If a table does not exit
1112 # NoSuchTableError is raised and it's skipped
1113
1114 # this also suits the case for mssql where we can reflect
1115 # individual temp tables but there's no temp_names_fn
1116 names = filter_names
1117 else:
1118 names = []
1119 name_kw = {"schema": schema, **kw}
1120 fns = []
1121 if ObjectScope.DEFAULT in scope:
1122 fns.extend(names_fns)
1123 if ObjectScope.TEMPORARY in scope:
1124 fns.extend(temp_names_fns)
1125
1126 for fn in fns:
1127 try:
1128 names.extend(fn(connection, **name_kw))
1129 except NotImplementedError:
1130 pass
1131
1132 if filter_names:
1133 filter_names = set(filter_names)
1134
1135 # iterate over all the tables/views and call the single table method
1136 for table in names:
1137 if not filter_names or table in filter_names:
1138 key = (schema, table)
1139 try:
1140 yield (
1141 key,
1142 single_tbl_method(
1143 connection, table, schema=schema, **kw
1144 ),
1145 )
1146 except exc.UnreflectableTableError as err:
1147 if key not in unreflectable:
1148 unreflectable[key] = err
1149 except exc.NoSuchTableError:
1150 pass
1151
1152 def get_multi_table_options(self, connection, **kw):
1153 return self._default_multi_reflect(
1154 self.get_table_options, connection, **kw
1155 )
1156
1157 def get_multi_columns(self, connection, **kw):
1158 return self._default_multi_reflect(self.get_columns, connection, **kw)
1159
1160 def get_multi_pk_constraint(self, connection, **kw):
1161 return self._default_multi_reflect(
1162 self.get_pk_constraint, connection, **kw
1163 )
1164
1165 def get_multi_foreign_keys(self, connection, **kw):
1166 return self._default_multi_reflect(
1167 self.get_foreign_keys, connection, **kw
1168 )
1169
1170 def get_multi_indexes(self, connection, **kw):
1171 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1172
1173 def get_multi_unique_constraints(self, connection, **kw):
1174 return self._default_multi_reflect(
1175 self.get_unique_constraints, connection, **kw
1176 )
1177
1178 def get_multi_check_constraints(self, connection, **kw):
1179 return self._default_multi_reflect(
1180 self.get_check_constraints, connection, **kw
1181 )
1182
1183 def get_multi_table_comment(self, connection, **kw):
1184 return self._default_multi_reflect(
1185 self.get_table_comment, connection, **kw
1186 )
1187
1188
1189class StrCompileDialect(DefaultDialect):
1190 statement_compiler = compiler.StrSQLCompiler
1191 ddl_compiler = compiler.DDLCompiler
1192 type_compiler_cls = compiler.StrSQLTypeCompiler
1193 preparer = compiler.IdentifierPreparer
1194
1195 insert_returning = True
1196 update_returning = True
1197 delete_returning = True
1198
1199 supports_statement_cache = True
1200
1201 supports_identity_columns = True
1202
1203 supports_sequences = True
1204 sequences_optional = True
1205 preexecute_autoincrement_sequences = False
1206
1207 supports_native_boolean = True
1208
1209 supports_multivalues_insert = True
1210 supports_simple_order_by_label = True
1211
1212
1213class DefaultExecutionContext(ExecutionContext):
1214 isinsert = False
1215 isupdate = False
1216 isdelete = False
1217 is_crud = False
1218 is_text = False
1219 isddl = False
1220
1221 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1222
1223 compiled: Optional[Compiled] = None
1224 result_column_struct: Optional[
1225 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1226 ] = None
1227 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1228
1229 execution_options: _ExecuteOptions = util.EMPTY_DICT
1230
1231 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1232
1233 invoked_statement: Optional[Executable] = None
1234
1235 _is_implicit_returning = False
1236 _is_explicit_returning = False
1237 _is_supplemental_returning = False
1238 _is_server_side = False
1239
1240 _soft_closed = False
1241
1242 _rowcount: Optional[int] = None
1243
1244 # a hook for SQLite's translation of
1245 # result column names
1246 # NOTE: pyhive is using this hook, can't remove it :(
1247 _translate_colname: Optional[Callable[[str], str]] = None
1248
1249 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1250 """used by set_input_sizes().
1251
1252 This collection comes from ``ExpandedState.parameter_expansion``.
1253
1254 """
1255
1256 cache_hit = NO_CACHE_KEY
1257
1258 root_connection: Connection
1259 _dbapi_connection: PoolProxiedConnection
1260 dialect: Dialect
1261 unicode_statement: str
1262 cursor: DBAPICursor
1263 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1264 parameters: _DBAPIMultiExecuteParams
1265 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1266
1267 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1268
1269 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1270 _num_sentinel_cols: int = 0
1271
1272 @classmethod
1273 def _init_ddl(
1274 cls,
1275 dialect: Dialect,
1276 connection: Connection,
1277 dbapi_connection: PoolProxiedConnection,
1278 execution_options: _ExecuteOptions,
1279 compiled_ddl: DDLCompiler,
1280 ) -> ExecutionContext:
1281 """Initialize execution context for an ExecutableDDLElement
1282 construct."""
1283
1284 self = cls.__new__(cls)
1285 self.root_connection = connection
1286 self._dbapi_connection = dbapi_connection
1287 self.dialect = connection.dialect
1288
1289 self.compiled = compiled = compiled_ddl
1290 self.isddl = True
1291
1292 self.execution_options = execution_options
1293
1294 self.unicode_statement = str(compiled)
1295 if compiled.schema_translate_map:
1296 schema_translate_map = self.execution_options.get(
1297 "schema_translate_map", {}
1298 )
1299
1300 rst = compiled.preparer._render_schema_translates
1301 self.unicode_statement = rst(
1302 self.unicode_statement, schema_translate_map
1303 )
1304
1305 self.statement = self.unicode_statement
1306
1307 self.cursor = self.create_cursor()
1308 self.compiled_parameters = []
1309
1310 if dialect.positional:
1311 self.parameters = [dialect.execute_sequence_format()]
1312 else:
1313 self.parameters = [self._empty_dict_params]
1314
1315 return self
1316
1317 @classmethod
1318 def _init_compiled(
1319 cls,
1320 dialect: Dialect,
1321 connection: Connection,
1322 dbapi_connection: PoolProxiedConnection,
1323 execution_options: _ExecuteOptions,
1324 compiled: SQLCompiler,
1325 parameters: _CoreMultiExecuteParams,
1326 invoked_statement: Executable,
1327 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1328 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1329 ) -> ExecutionContext:
1330 """Initialize execution context for a Compiled construct."""
1331
1332 self = cls.__new__(cls)
1333 self.root_connection = connection
1334 self._dbapi_connection = dbapi_connection
1335 self.dialect = connection.dialect
1336 self.extracted_parameters = extracted_parameters
1337 self.invoked_statement = invoked_statement
1338 self.compiled = compiled
1339 self.cache_hit = cache_hit
1340
1341 self.execution_options = execution_options
1342
1343 self.result_column_struct = (
1344 compiled._result_columns,
1345 compiled._ordered_columns,
1346 compiled._textual_ordered_columns,
1347 compiled._ad_hoc_textual,
1348 compiled._loose_column_name_matching,
1349 )
1350
1351 self.isinsert = ii = compiled.isinsert
1352 self.isupdate = iu = compiled.isupdate
1353 self.isdelete = id_ = compiled.isdelete
1354 self.is_text = compiled.isplaintext
1355
1356 if ii or iu or id_:
1357 dml_statement = compiled.compile_state.statement # type: ignore
1358 if TYPE_CHECKING:
1359 assert isinstance(dml_statement, UpdateBase)
1360 self.is_crud = True
1361 self._is_explicit_returning = ier = bool(dml_statement._returning)
1362 self._is_implicit_returning = iir = bool(
1363 compiled.implicit_returning
1364 )
1365 if iir and dml_statement._supplemental_returning:
1366 self._is_supplemental_returning = True
1367
1368 # dont mix implicit and explicit returning
1369 assert not (iir and ier)
1370
1371 if (ier or iir) and compiled.for_executemany:
1372 if ii and not self.dialect.insert_executemany_returning:
1373 raise exc.InvalidRequestError(
1374 f"Dialect {self.dialect.dialect_description} with "
1375 f"current server capabilities does not support "
1376 "INSERT..RETURNING when executemany is used"
1377 )
1378 elif (
1379 ii
1380 and dml_statement._sort_by_parameter_order
1381 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1382 ):
1383 raise exc.InvalidRequestError(
1384 f"Dialect {self.dialect.dialect_description} with "
1385 f"current server capabilities does not support "
1386 "INSERT..RETURNING with deterministic row ordering "
1387 "when executemany is used"
1388 )
1389 elif (
1390 ii
1391 and self.dialect.use_insertmanyvalues
1392 and not compiled._insertmanyvalues
1393 ):
1394 raise exc.InvalidRequestError(
1395 'Statement does not have "insertmanyvalues" '
1396 "enabled, can't use INSERT..RETURNING with "
1397 "executemany in this case."
1398 )
1399 elif iu and not self.dialect.update_executemany_returning:
1400 raise exc.InvalidRequestError(
1401 f"Dialect {self.dialect.dialect_description} with "
1402 f"current server capabilities does not support "
1403 "UPDATE..RETURNING when executemany is used"
1404 )
1405 elif id_ and not self.dialect.delete_executemany_returning:
1406 raise exc.InvalidRequestError(
1407 f"Dialect {self.dialect.dialect_description} with "
1408 f"current server capabilities does not support "
1409 "DELETE..RETURNING when executemany is used"
1410 )
1411
1412 if not parameters:
1413 self.compiled_parameters = [
1414 compiled.construct_params(
1415 extracted_parameters=extracted_parameters,
1416 escape_names=False,
1417 )
1418 ]
1419 else:
1420 self.compiled_parameters = [
1421 compiled.construct_params(
1422 m,
1423 escape_names=False,
1424 _group_number=grp,
1425 extracted_parameters=extracted_parameters,
1426 )
1427 for grp, m in enumerate(parameters)
1428 ]
1429
1430 if len(parameters) > 1:
1431 if self.isinsert and compiled._insertmanyvalues:
1432 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1433
1434 imv = compiled._insertmanyvalues
1435 if imv.sentinel_columns is not None:
1436 self._num_sentinel_cols = imv.num_sentinel_columns
1437 else:
1438 self.execute_style = ExecuteStyle.EXECUTEMANY
1439
1440 self.unicode_statement = compiled.string
1441
1442 self.cursor = self.create_cursor()
1443
1444 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1445 self._process_execute_defaults()
1446
1447 processors = compiled._bind_processors
1448
1449 flattened_processors: Mapping[
1450 str, _BindProcessorType[Any]
1451 ] = processors # type: ignore[assignment]
1452
1453 if compiled.literal_execute_params or compiled.post_compile_params:
1454 if self.executemany:
1455 raise exc.InvalidRequestError(
1456 "'literal_execute' or 'expanding' parameters can't be "
1457 "used with executemany()"
1458 )
1459
1460 expanded_state = compiled._process_parameters_for_postcompile(
1461 self.compiled_parameters[0]
1462 )
1463
1464 # re-assign self.unicode_statement
1465 self.unicode_statement = expanded_state.statement
1466
1467 self._expanded_parameters = expanded_state.parameter_expansion
1468
1469 flattened_processors = dict(processors) # type: ignore
1470 flattened_processors.update(expanded_state.processors)
1471 positiontup = expanded_state.positiontup
1472 elif compiled.positional:
1473 positiontup = self.compiled.positiontup
1474 else:
1475 positiontup = None
1476
1477 if compiled.schema_translate_map:
1478 schema_translate_map = self.execution_options.get(
1479 "schema_translate_map", {}
1480 )
1481 rst = compiled.preparer._render_schema_translates
1482 self.unicode_statement = rst(
1483 self.unicode_statement, schema_translate_map
1484 )
1485
1486 # final self.unicode_statement is now assigned, encode if needed
1487 # by dialect
1488 self.statement = self.unicode_statement
1489
1490 # Convert the dictionary of bind parameter values
1491 # into a dict or list to be sent to the DBAPI's
1492 # execute() or executemany() method.
1493
1494 if compiled.positional:
1495 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1496 assert positiontup is not None
1497 for compiled_params in self.compiled_parameters:
1498 l_param: List[Any] = [
1499 (
1500 flattened_processors[key](compiled_params[key])
1501 if key in flattened_processors
1502 else compiled_params[key]
1503 )
1504 for key in positiontup
1505 ]
1506 core_positional_parameters.append(
1507 dialect.execute_sequence_format(l_param)
1508 )
1509
1510 self.parameters = core_positional_parameters
1511 else:
1512 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1513 escaped_names = compiled.escaped_bind_names
1514
1515 # note that currently, "expanded" parameters will be present
1516 # in self.compiled_parameters in their quoted form. This is
1517 # slightly inconsistent with the approach taken as of
1518 # #8056 where self.compiled_parameters is meant to contain unquoted
1519 # param names.
1520 d_param: Dict[str, Any]
1521 for compiled_params in self.compiled_parameters:
1522 if escaped_names:
1523 d_param = {
1524 escaped_names.get(key, key): (
1525 flattened_processors[key](compiled_params[key])
1526 if key in flattened_processors
1527 else compiled_params[key]
1528 )
1529 for key in compiled_params
1530 }
1531 else:
1532 d_param = {
1533 key: (
1534 flattened_processors[key](compiled_params[key])
1535 if key in flattened_processors
1536 else compiled_params[key]
1537 )
1538 for key in compiled_params
1539 }
1540
1541 core_dict_parameters.append(d_param)
1542
1543 self.parameters = core_dict_parameters
1544
1545 return self
1546
1547 @classmethod
1548 def _init_statement(
1549 cls,
1550 dialect: Dialect,
1551 connection: Connection,
1552 dbapi_connection: PoolProxiedConnection,
1553 execution_options: _ExecuteOptions,
1554 statement: str,
1555 parameters: _DBAPIMultiExecuteParams,
1556 ) -> ExecutionContext:
1557 """Initialize execution context for a string SQL statement."""
1558
1559 self = cls.__new__(cls)
1560 self.root_connection = connection
1561 self._dbapi_connection = dbapi_connection
1562 self.dialect = connection.dialect
1563 self.is_text = True
1564
1565 self.execution_options = execution_options
1566
1567 if not parameters:
1568 if self.dialect.positional:
1569 self.parameters = [dialect.execute_sequence_format()]
1570 else:
1571 self.parameters = [self._empty_dict_params]
1572 elif isinstance(parameters[0], dialect.execute_sequence_format):
1573 self.parameters = parameters
1574 elif isinstance(parameters[0], dict):
1575 self.parameters = parameters
1576 else:
1577 self.parameters = [
1578 dialect.execute_sequence_format(p) for p in parameters
1579 ]
1580
1581 if len(parameters) > 1:
1582 self.execute_style = ExecuteStyle.EXECUTEMANY
1583
1584 self.statement = self.unicode_statement = statement
1585
1586 self.cursor = self.create_cursor()
1587 return self
1588
1589 @classmethod
1590 def _init_default(
1591 cls,
1592 dialect: Dialect,
1593 connection: Connection,
1594 dbapi_connection: PoolProxiedConnection,
1595 execution_options: _ExecuteOptions,
1596 ) -> ExecutionContext:
1597 """Initialize execution context for a ColumnDefault construct."""
1598
1599 self = cls.__new__(cls)
1600 self.root_connection = connection
1601 self._dbapi_connection = dbapi_connection
1602 self.dialect = connection.dialect
1603
1604 self.execution_options = execution_options
1605
1606 self.cursor = self.create_cursor()
1607 return self
1608
1609 def _get_cache_stats(self) -> str:
1610 if self.compiled is None:
1611 return "raw sql"
1612
1613 now = perf_counter()
1614
1615 ch = self.cache_hit
1616
1617 gen_time = self.compiled._gen_time
1618 assert gen_time is not None
1619
1620 if ch is NO_CACHE_KEY:
1621 return "no key %.5fs" % (now - gen_time,)
1622 elif ch is CACHE_HIT:
1623 return "cached since %.4gs ago" % (now - gen_time,)
1624 elif ch is CACHE_MISS:
1625 return "generated in %.5fs" % (now - gen_time,)
1626 elif ch is CACHING_DISABLED:
1627 if "_cache_disable_reason" in self.execution_options:
1628 return "caching disabled (%s) %.5fs " % (
1629 self.execution_options["_cache_disable_reason"],
1630 now - gen_time,
1631 )
1632 else:
1633 return "caching disabled %.5fs" % (now - gen_time,)
1634 elif ch is NO_DIALECT_SUPPORT:
1635 return "dialect %s+%s does not support caching %.5fs" % (
1636 self.dialect.name,
1637 self.dialect.driver,
1638 now - gen_time,
1639 )
1640 else:
1641 return "unknown"
1642
1643 @property
1644 def executemany(self): # type: ignore[override]
1645 return self.execute_style in (
1646 ExecuteStyle.EXECUTEMANY,
1647 ExecuteStyle.INSERTMANYVALUES,
1648 )
1649
1650 @util.memoized_property
1651 def identifier_preparer(self):
1652 if self.compiled:
1653 return self.compiled.preparer
1654 elif "schema_translate_map" in self.execution_options:
1655 return self.dialect.identifier_preparer._with_schema_translate(
1656 self.execution_options["schema_translate_map"]
1657 )
1658 else:
1659 return self.dialect.identifier_preparer
1660
1661 @util.memoized_property
1662 def engine(self):
1663 return self.root_connection.engine
1664
1665 @util.memoized_property
1666 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1667 if TYPE_CHECKING:
1668 assert isinstance(self.compiled, SQLCompiler)
1669 return self.compiled.postfetch
1670
1671 @util.memoized_property
1672 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1673 if TYPE_CHECKING:
1674 assert isinstance(self.compiled, SQLCompiler)
1675 if self.isinsert:
1676 return self.compiled.insert_prefetch
1677 elif self.isupdate:
1678 return self.compiled.update_prefetch
1679 else:
1680 return ()
1681
1682 @util.memoized_property
1683 def no_parameters(self):
1684 return self.execution_options.get("no_parameters", False)
1685
1686 def _execute_scalar(
1687 self,
1688 stmt: str,
1689 type_: Optional[TypeEngine[Any]],
1690 parameters: Optional[_DBAPISingleExecuteParams] = None,
1691 ) -> Any:
1692 """Execute a string statement on the current cursor, returning a
1693 scalar result.
1694
1695 Used to fire off sequences, default phrases, and "select lastrowid"
1696 types of statements individually or in the context of a parent INSERT
1697 or UPDATE statement.
1698
1699 """
1700
1701 conn = self.root_connection
1702
1703 if "schema_translate_map" in self.execution_options:
1704 schema_translate_map = self.execution_options.get(
1705 "schema_translate_map", {}
1706 )
1707
1708 rst = self.identifier_preparer._render_schema_translates
1709 stmt = rst(stmt, schema_translate_map)
1710
1711 if not parameters:
1712 if self.dialect.positional:
1713 parameters = self.dialect.execute_sequence_format()
1714 else:
1715 parameters = {}
1716
1717 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1718 row = self.cursor.fetchone()
1719 if row is not None:
1720 r = row[0]
1721 else:
1722 r = None
1723 if type_ is not None:
1724 # apply type post processors to the result
1725 proc = type_._cached_result_processor(
1726 self.dialect, self.cursor.description[0][1]
1727 )
1728 if proc:
1729 return proc(r)
1730 return r
1731
1732 @util.memoized_property
1733 def connection(self):
1734 return self.root_connection
1735
1736 def _use_server_side_cursor(self):
1737 if not self.dialect.supports_server_side_cursors:
1738 return False
1739
1740 if self.dialect.server_side_cursors:
1741 # this is deprecated
1742 use_server_side = self.execution_options.get(
1743 "stream_results", True
1744 ) and (
1745 self.compiled
1746 and isinstance(self.compiled.statement, expression.Selectable)
1747 or (
1748 (
1749 not self.compiled
1750 or isinstance(
1751 self.compiled.statement, expression.TextClause
1752 )
1753 )
1754 and self.unicode_statement
1755 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1756 )
1757 )
1758 else:
1759 use_server_side = self.execution_options.get(
1760 "stream_results", False
1761 )
1762
1763 return use_server_side
1764
1765 def create_cursor(self) -> DBAPICursor:
1766 if (
1767 # inlining initial preference checks for SS cursors
1768 self.dialect.supports_server_side_cursors
1769 and (
1770 self.execution_options.get("stream_results", False)
1771 or (
1772 self.dialect.server_side_cursors
1773 and self._use_server_side_cursor()
1774 )
1775 )
1776 ):
1777 self._is_server_side = True
1778 return self.create_server_side_cursor()
1779 else:
1780 self._is_server_side = False
1781 return self.create_default_cursor()
1782
1783 def fetchall_for_returning(self, cursor):
1784 return cursor.fetchall()
1785
1786 def create_default_cursor(self) -> DBAPICursor:
1787 return self._dbapi_connection.cursor()
1788
1789 def create_server_side_cursor(self) -> DBAPICursor:
1790 raise NotImplementedError()
1791
1792 def pre_exec(self):
1793 pass
1794
1795 def get_out_parameter_values(self, names):
1796 raise NotImplementedError(
1797 "This dialect does not support OUT parameters"
1798 )
1799
1800 def post_exec(self):
1801 pass
1802
1803 def get_result_processor(self, type_, colname, coltype):
1804 """Return a 'result processor' for a given type as present in
1805 cursor.description.
1806
1807 This has a default implementation that dialects can override
1808 for context-sensitive result type handling.
1809
1810 """
1811 return type_._cached_result_processor(self.dialect, coltype)
1812
1813 def get_lastrowid(self):
1814 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1815
1816 This may involve calling special cursor functions, issuing a new SELECT
1817 on the cursor (or a new one), or returning a stored value that was
1818 calculated within post_exec().
1819
1820 This function will only be called for dialects which support "implicit"
1821 primary key generation, keep preexecute_autoincrement_sequences set to
1822 False, and when no explicit id value was bound to the statement.
1823
1824 The function is called once for an INSERT statement that would need to
1825 return the last inserted primary key for those dialects that make use
1826 of the lastrowid concept. In these cases, it is called directly after
1827 :meth:`.ExecutionContext.post_exec`.
1828
1829 """
1830 return self.cursor.lastrowid
1831
1832 def handle_dbapi_exception(self, e):
1833 pass
1834
1835 @util.non_memoized_property
1836 def rowcount(self) -> int:
1837 if self._rowcount is not None:
1838 return self._rowcount
1839 else:
1840 return self.cursor.rowcount
1841
1842 @property
1843 def _has_rowcount(self):
1844 return self._rowcount is not None
1845
1846 def supports_sane_rowcount(self):
1847 return self.dialect.supports_sane_rowcount
1848
1849 def supports_sane_multi_rowcount(self):
1850 return self.dialect.supports_sane_multi_rowcount
1851
1852 def _setup_result_proxy(self):
1853 exec_opt = self.execution_options
1854
1855 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1856 self._rowcount = self.cursor.rowcount
1857
1858 yp: Optional[Union[int, bool]]
1859 if self.is_crud or self.is_text:
1860 result = self._setup_dml_or_text_result()
1861 yp = False
1862 else:
1863 yp = exec_opt.get("yield_per", None)
1864 sr = self._is_server_side or exec_opt.get("stream_results", False)
1865 strategy = self.cursor_fetch_strategy
1866 if sr and strategy is _cursor._DEFAULT_FETCH:
1867 strategy = _cursor.BufferedRowCursorFetchStrategy(
1868 self.cursor, self.execution_options
1869 )
1870 cursor_description: _DBAPICursorDescription = (
1871 strategy.alternate_cursor_description
1872 or self.cursor.description
1873 )
1874 if cursor_description is None:
1875 strategy = _cursor._NO_CURSOR_DQL
1876
1877 result = _cursor.CursorResult(self, strategy, cursor_description)
1878
1879 compiled = self.compiled
1880
1881 if (
1882 compiled
1883 and not self.isddl
1884 and cast(SQLCompiler, compiled).has_out_parameters
1885 ):
1886 self._setup_out_parameters(result)
1887
1888 self._soft_closed = result._soft_closed
1889
1890 if yp:
1891 result = result.yield_per(yp)
1892
1893 return result
1894
1895 def _setup_out_parameters(self, result):
1896 compiled = cast(SQLCompiler, self.compiled)
1897
1898 out_bindparams = [
1899 (param, name)
1900 for param, name in compiled.bind_names.items()
1901 if param.isoutparam
1902 ]
1903 out_parameters = {}
1904
1905 for bindparam, raw_value in zip(
1906 [param for param, name in out_bindparams],
1907 self.get_out_parameter_values(
1908 [name for param, name in out_bindparams]
1909 ),
1910 ):
1911 type_ = bindparam.type
1912 impl_type = type_.dialect_impl(self.dialect)
1913 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1914 result_processor = impl_type.result_processor(
1915 self.dialect, dbapi_type
1916 )
1917 if result_processor is not None:
1918 raw_value = result_processor(raw_value)
1919 out_parameters[bindparam.key] = raw_value
1920
1921 result.out_parameters = out_parameters
1922
1923 def _setup_dml_or_text_result(self):
1924 compiled = cast(SQLCompiler, self.compiled)
1925
1926 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1927
1928 if self.isinsert:
1929 if (
1930 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1931 and compiled.effective_returning
1932 ):
1933 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1934 self.cursor,
1935 initial_buffer=self._insertmanyvalues_rows,
1936 # maintain alt cursor description if set by the
1937 # dialect, e.g. mssql preserves it
1938 alternate_description=(
1939 strategy.alternate_cursor_description
1940 ),
1941 )
1942
1943 if compiled.postfetch_lastrowid:
1944 self.inserted_primary_key_rows = (
1945 self._setup_ins_pk_from_lastrowid()
1946 )
1947 # else if not self._is_implicit_returning,
1948 # the default inserted_primary_key_rows accessor will
1949 # return an "empty" primary key collection when accessed.
1950
1951 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1952 strategy = _cursor.BufferedRowCursorFetchStrategy(
1953 self.cursor, self.execution_options
1954 )
1955
1956 if strategy is _cursor._NO_CURSOR_DML:
1957 cursor_description = None
1958 else:
1959 cursor_description = (
1960 strategy.alternate_cursor_description
1961 or self.cursor.description
1962 )
1963
1964 if cursor_description is None:
1965 strategy = _cursor._NO_CURSOR_DML
1966 elif self._num_sentinel_cols:
1967 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1968 # the sentinel columns are handled in CursorResult._init_metadata
1969 # using essentially _reduce
1970
1971 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1972 self, strategy, cursor_description
1973 )
1974
1975 if self.isinsert:
1976 if self._is_implicit_returning:
1977 rows = result.all()
1978
1979 self.returned_default_rows = rows
1980
1981 self.inserted_primary_key_rows = (
1982 self._setup_ins_pk_from_implicit_returning(result, rows)
1983 )
1984
1985 # test that it has a cursor metadata that is accurate. the
1986 # first row will have been fetched and current assumptions
1987 # are that the result has only one row, until executemany()
1988 # support is added here.
1989 assert result._metadata.returns_rows
1990
1991 # Insert statement has both return_defaults() and
1992 # returning(). rewind the result on the list of rows
1993 # we just used.
1994 if self._is_supplemental_returning:
1995 result._rewind(rows)
1996 else:
1997 result._soft_close()
1998 elif not self._is_explicit_returning:
1999 result._soft_close()
2000
2001 # we assume here the result does not return any rows.
2002 # *usually*, this will be true. However, some dialects
2003 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2004 # function so this is not necessarily true.
2005 # assert not result.returns_rows
2006
2007 elif self._is_implicit_returning:
2008 rows = result.all()
2009
2010 if rows:
2011 self.returned_default_rows = rows
2012 self._rowcount = len(rows)
2013
2014 if self._is_supplemental_returning:
2015 result._rewind(rows)
2016 else:
2017 result._soft_close()
2018
2019 # test that it has a cursor metadata that is accurate.
2020 # the rows have all been fetched however.
2021 assert result._metadata.returns_rows
2022
2023 elif not result._metadata.returns_rows:
2024 # no results, get rowcount
2025 # (which requires open cursor on some drivers)
2026 if self._rowcount is None:
2027 self._rowcount = self.cursor.rowcount
2028 result._soft_close()
2029 elif self.isupdate or self.isdelete:
2030 if self._rowcount is None:
2031 self._rowcount = self.cursor.rowcount
2032 return result
2033
2034 @util.memoized_property
2035 def inserted_primary_key_rows(self):
2036 # if no specific "get primary key" strategy was set up
2037 # during execution, return a "default" primary key based
2038 # on what's in the compiled_parameters and nothing else.
2039 return self._setup_ins_pk_from_empty()
2040
2041 def _setup_ins_pk_from_lastrowid(self):
2042 getter = cast(
2043 SQLCompiler, self.compiled
2044 )._inserted_primary_key_from_lastrowid_getter
2045 lastrowid = self.get_lastrowid()
2046 return [getter(lastrowid, self.compiled_parameters[0])]
2047
2048 def _setup_ins_pk_from_empty(self):
2049 getter = cast(
2050 SQLCompiler, self.compiled
2051 )._inserted_primary_key_from_lastrowid_getter
2052 return [getter(None, param) for param in self.compiled_parameters]
2053
2054 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2055 if not rows:
2056 return []
2057
2058 getter = cast(
2059 SQLCompiler, self.compiled
2060 )._inserted_primary_key_from_returning_getter
2061 compiled_params = self.compiled_parameters
2062
2063 return [
2064 getter(row, param) for row, param in zip(rows, compiled_params)
2065 ]
2066
2067 def lastrow_has_defaults(self):
2068 return (self.isinsert or self.isupdate) and bool(
2069 cast(SQLCompiler, self.compiled).postfetch
2070 )
2071
2072 def _prepare_set_input_sizes(
2073 self,
2074 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2075 """Given a cursor and ClauseParameters, prepare arguments
2076 in order to call the appropriate
2077 style of ``setinputsizes()`` on the cursor, using DB-API types
2078 from the bind parameter's ``TypeEngine`` objects.
2079
2080 This method only called by those dialects which set the
2081 :attr:`.Dialect.bind_typing` attribute to
2082 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2083 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2084 option.
2085
2086 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2087 for pg8000 and asyncpg, which has been changed to inline rendering
2088 of casts.
2089
2090 """
2091 if self.isddl or self.is_text:
2092 return None
2093
2094 compiled = cast(SQLCompiler, self.compiled)
2095
2096 inputsizes = compiled._get_set_input_sizes_lookup()
2097
2098 if inputsizes is None:
2099 return None
2100
2101 dialect = self.dialect
2102
2103 # all of the rest of this... cython?
2104
2105 if dialect._has_events:
2106 inputsizes = dict(inputsizes)
2107 dialect.dispatch.do_setinputsizes(
2108 inputsizes, self.cursor, self.statement, self.parameters, self
2109 )
2110
2111 if compiled.escaped_bind_names:
2112 escaped_bind_names = compiled.escaped_bind_names
2113 else:
2114 escaped_bind_names = None
2115
2116 if dialect.positional:
2117 items = [
2118 (key, compiled.binds[key])
2119 for key in compiled.positiontup or ()
2120 ]
2121 else:
2122 items = [
2123 (key, bindparam)
2124 for bindparam, key in compiled.bind_names.items()
2125 ]
2126
2127 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2128 for key, bindparam in items:
2129 if bindparam in compiled.literal_execute_params:
2130 continue
2131
2132 if key in self._expanded_parameters:
2133 if is_tuple_type(bindparam.type):
2134 num = len(bindparam.type.types)
2135 dbtypes = inputsizes[bindparam]
2136 generic_inputsizes.extend(
2137 (
2138 (
2139 escaped_bind_names.get(paramname, paramname)
2140 if escaped_bind_names is not None
2141 else paramname
2142 ),
2143 dbtypes[idx % num],
2144 bindparam.type.types[idx % num],
2145 )
2146 for idx, paramname in enumerate(
2147 self._expanded_parameters[key]
2148 )
2149 )
2150 else:
2151 dbtype = inputsizes.get(bindparam, None)
2152 generic_inputsizes.extend(
2153 (
2154 (
2155 escaped_bind_names.get(paramname, paramname)
2156 if escaped_bind_names is not None
2157 else paramname
2158 ),
2159 dbtype,
2160 bindparam.type,
2161 )
2162 for paramname in self._expanded_parameters[key]
2163 )
2164 else:
2165 dbtype = inputsizes.get(bindparam, None)
2166
2167 escaped_name = (
2168 escaped_bind_names.get(key, key)
2169 if escaped_bind_names is not None
2170 else key
2171 )
2172
2173 generic_inputsizes.append(
2174 (escaped_name, dbtype, bindparam.type)
2175 )
2176
2177 return generic_inputsizes
2178
2179 def _exec_default(self, column, default, type_):
2180 if default.is_sequence:
2181 return self.fire_sequence(default, type_)
2182 elif default.is_callable:
2183 # this codepath is not normally used as it's inlined
2184 # into _process_execute_defaults
2185 self.current_column = column
2186 return default.arg(self)
2187 elif default.is_clause_element:
2188 return self._exec_default_clause_element(column, default, type_)
2189 else:
2190 # this codepath is not normally used as it's inlined
2191 # into _process_execute_defaults
2192 return default.arg
2193
2194 def _exec_default_clause_element(self, column, default, type_):
2195 # execute a default that's a complete clause element. Here, we have
2196 # to re-implement a miniature version of the compile->parameters->
2197 # cursor.execute() sequence, since we don't want to modify the state
2198 # of the connection / result in progress or create new connection/
2199 # result objects etc.
2200 # .. versionchanged:: 1.4
2201
2202 if not default._arg_is_typed:
2203 default_arg = expression.type_coerce(default.arg, type_)
2204 else:
2205 default_arg = default.arg
2206 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2207 compiled_params = compiled.construct_params()
2208 processors = compiled._bind_processors
2209 if compiled.positional:
2210 parameters = self.dialect.execute_sequence_format(
2211 [
2212 (
2213 processors[key](compiled_params[key]) # type: ignore
2214 if key in processors
2215 else compiled_params[key]
2216 )
2217 for key in compiled.positiontup or ()
2218 ]
2219 )
2220 else:
2221 parameters = {
2222 key: (
2223 processors[key](compiled_params[key]) # type: ignore
2224 if key in processors
2225 else compiled_params[key]
2226 )
2227 for key in compiled_params
2228 }
2229 return self._execute_scalar(
2230 str(compiled), type_, parameters=parameters
2231 )
2232
2233 current_parameters: Optional[_CoreSingleExecuteParams] = None
2234 """A dictionary of parameters applied to the current row.
2235
2236 This attribute is only available in the context of a user-defined default
2237 generation function, e.g. as described at :ref:`context_default_functions`.
2238 It consists of a dictionary which includes entries for each column/value
2239 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2240 dictionary will be the key value of each :class:`_schema.Column`,
2241 which is usually
2242 synonymous with the name.
2243
2244 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2245 does not accommodate for the "multi-values" feature of the
2246 :meth:`_expression.Insert.values` method. The
2247 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2248 preferred.
2249
2250 .. seealso::
2251
2252 :meth:`.DefaultExecutionContext.get_current_parameters`
2253
2254 :ref:`context_default_functions`
2255
2256 """
2257
2258 def get_current_parameters(self, isolate_multiinsert_groups=True):
2259 """Return a dictionary of parameters applied to the current row.
2260
2261 This method can only be used in the context of a user-defined default
2262 generation function, e.g. as described at
2263 :ref:`context_default_functions`. When invoked, a dictionary is
2264 returned which includes entries for each column/value pair that is part
2265 of the INSERT or UPDATE statement. The keys of the dictionary will be
2266 the key value of each :class:`_schema.Column`,
2267 which is usually synonymous
2268 with the name.
2269
2270 :param isolate_multiinsert_groups=True: indicates that multi-valued
2271 INSERT constructs created using :meth:`_expression.Insert.values`
2272 should be
2273 handled by returning only the subset of parameters that are local
2274 to the current column default invocation. When ``False``, the
2275 raw parameters of the statement are returned including the
2276 naming convention used in the case of multi-valued INSERT.
2277
2278 .. seealso::
2279
2280 :attr:`.DefaultExecutionContext.current_parameters`
2281
2282 :ref:`context_default_functions`
2283
2284 """
2285 try:
2286 parameters = self.current_parameters
2287 column = self.current_column
2288 except AttributeError:
2289 raise exc.InvalidRequestError(
2290 "get_current_parameters() can only be invoked in the "
2291 "context of a Python side column default function"
2292 )
2293 else:
2294 assert column is not None
2295 assert parameters is not None
2296 compile_state = cast(
2297 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2298 )
2299 assert compile_state is not None
2300 if (
2301 isolate_multiinsert_groups
2302 and dml.isinsert(compile_state)
2303 and compile_state._has_multi_parameters
2304 ):
2305 if column._is_multiparam_column:
2306 index = column.index + 1
2307 d = {column.original.key: parameters[column.key]}
2308 else:
2309 d = {column.key: parameters[column.key]}
2310 index = 0
2311 assert compile_state._dict_parameters is not None
2312 keys = compile_state._dict_parameters.keys()
2313 d.update(
2314 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2315 )
2316 return d
2317 else:
2318 return parameters
2319
2320 def get_insert_default(self, column):
2321 if column.default is None:
2322 return None
2323 else:
2324 return self._exec_default(column, column.default, column.type)
2325
2326 def get_update_default(self, column):
2327 if column.onupdate is None:
2328 return None
2329 else:
2330 return self._exec_default(column, column.onupdate, column.type)
2331
2332 def _process_execute_defaults(self):
2333 compiled = cast(SQLCompiler, self.compiled)
2334
2335 key_getter = compiled._within_exec_param_key_getter
2336
2337 sentinel_counter = 0
2338
2339 if compiled.insert_prefetch:
2340 prefetch_recs = [
2341 (
2342 c,
2343 key_getter(c),
2344 c._default_description_tuple,
2345 self.get_insert_default,
2346 )
2347 for c in compiled.insert_prefetch
2348 ]
2349 elif compiled.update_prefetch:
2350 prefetch_recs = [
2351 (
2352 c,
2353 key_getter(c),
2354 c._onupdate_description_tuple,
2355 self.get_update_default,
2356 )
2357 for c in compiled.update_prefetch
2358 ]
2359 else:
2360 prefetch_recs = []
2361
2362 for param in self.compiled_parameters:
2363 self.current_parameters = param
2364
2365 for (
2366 c,
2367 param_key,
2368 (arg, is_scalar, is_callable, is_sentinel),
2369 fallback,
2370 ) in prefetch_recs:
2371 if is_sentinel:
2372 param[param_key] = sentinel_counter
2373 sentinel_counter += 1
2374 elif is_scalar:
2375 param[param_key] = arg
2376 elif is_callable:
2377 self.current_column = c
2378 param[param_key] = arg(self)
2379 else:
2380 val = fallback(c)
2381 if val is not None:
2382 param[param_key] = val
2383
2384 del self.current_parameters
2385
2386
2387DefaultDialect.execution_ctx_cls = DefaultExecutionContext