1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import MutableSequence
33from typing import Optional
34from typing import Sequence
35from typing import Set
36from typing import Tuple
37from typing import Type
38from typing import TYPE_CHECKING
39from typing import Union
40import weakref
41
42from . import characteristics
43from . import cursor as _cursor
44from . import interfaces
45from .base import Connection
46from .interfaces import CacheStats
47from .interfaces import DBAPICursor
48from .interfaces import Dialect
49from .interfaces import ExecuteStyle
50from .interfaces import ExecutionContext
51from .reflection import ObjectKind
52from .reflection import ObjectScope
53from .. import event
54from .. import exc
55from .. import pool
56from .. import util
57from ..sql import compiler
58from ..sql import dml
59from ..sql import expression
60from ..sql import type_api
61from ..sql import util as sql_util
62from ..sql._typing import is_tuple_type
63from ..sql.base import _NoArg
64from ..sql.compiler import DDLCompiler
65from ..sql.compiler import InsertmanyvaluesSentinelOpts
66from ..sql.compiler import SQLCompiler
67from ..sql.elements import quoted_name
68from ..util.typing import Final
69from ..util.typing import Literal
70
71if typing.TYPE_CHECKING:
72 from types import ModuleType
73
74 from .base import Engine
75 from .cursor import ResultFetchStrategy
76 from .interfaces import _CoreMultiExecuteParams
77 from .interfaces import _CoreSingleExecuteParams
78 from .interfaces import _DBAPICursorDescription
79 from .interfaces import _DBAPIMultiExecuteParams
80 from .interfaces import _DBAPISingleExecuteParams
81 from .interfaces import _ExecuteOptions
82 from .interfaces import _MutableCoreSingleExecuteParams
83 from .interfaces import _ParamStyle
84 from .interfaces import ConnectArgsType
85 from .interfaces import DBAPIConnection
86 from .interfaces import DBAPIModule
87 from .interfaces import IsolationLevel
88 from .row import Row
89 from .url import URL
90 from ..event import _ListenerFnType
91 from ..pool import Pool
92 from ..pool import PoolProxiedConnection
93 from ..sql import Executable
94 from ..sql.compiler import Compiled
95 from ..sql.compiler import Linting
96 from ..sql.compiler import ResultColumnsEntry
97 from ..sql.dml import DMLState
98 from ..sql.dml import UpdateBase
99 from ..sql.elements import BindParameter
100 from ..sql.schema import Column
101 from ..sql.type_api import _BindProcessorType
102 from ..sql.type_api import _ResultProcessorType
103 from ..sql.type_api import TypeEngine
104
105
106# When we're handed literal SQL, ensure it's a SELECT query
107SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
108
109
110(
111 CACHE_HIT,
112 CACHE_MISS,
113 CACHING_DISABLED,
114 NO_CACHE_KEY,
115 NO_DIALECT_SUPPORT,
116) = list(CacheStats)
117
118
119class DefaultDialect(Dialect):
120 """Default implementation of Dialect"""
121
122 statement_compiler = compiler.SQLCompiler
123 ddl_compiler = compiler.DDLCompiler
124 type_compiler_cls = compiler.GenericTypeCompiler
125
126 preparer = compiler.IdentifierPreparer
127 supports_alter = True
128 supports_comments = False
129 supports_constraint_comments = False
130 inline_comments = False
131 supports_statement_cache = True
132
133 div_is_floordiv = True
134
135 bind_typing = interfaces.BindTyping.NONE
136
137 include_set_input_sizes: Optional[Set[Any]] = None
138 exclude_set_input_sizes: Optional[Set[Any]] = None
139
140 # the first value we'd get for an autoincrement column.
141 default_sequence_base = 1
142
143 # most DBAPIs happy with this for execute().
144 # not cx_oracle.
145 execute_sequence_format = tuple
146
147 supports_schemas = True
148 supports_views = True
149 supports_sequences = False
150 sequences_optional = False
151 preexecute_autoincrement_sequences = False
152 supports_identity_columns = False
153 postfetch_lastrowid = True
154 favor_returning_over_lastrowid = False
155 insert_null_pk_still_autoincrements = False
156 update_returning = False
157 delete_returning = False
158 update_returning_multifrom = False
159 delete_returning_multifrom = False
160 insert_returning = False
161
162 cte_follows_insert = False
163
164 supports_native_enum = False
165 supports_native_boolean = False
166 supports_native_uuid = False
167 returns_native_bytes = False
168
169 non_native_boolean_check_constraint = True
170
171 supports_simple_order_by_label = True
172
173 tuple_in_values = False
174
175 connection_characteristics = util.immutabledict(
176 {
177 "isolation_level": characteristics.IsolationLevelCharacteristic(),
178 "logging_token": characteristics.LoggingTokenCharacteristic(),
179 }
180 )
181
182 engine_config_types: Mapping[str, Any] = util.immutabledict(
183 {
184 "pool_timeout": util.asint,
185 "echo": util.bool_or_str("debug"),
186 "echo_pool": util.bool_or_str("debug"),
187 "pool_recycle": util.asint,
188 "pool_size": util.asint,
189 "max_overflow": util.asint,
190 "future": util.asbool,
191 }
192 )
193
194 # if the NUMERIC type
195 # returns decimal.Decimal.
196 # *not* the FLOAT type however.
197 supports_native_decimal = False
198
199 name = "default"
200
201 # length at which to truncate
202 # any identifier.
203 max_identifier_length = 9999
204 _user_defined_max_identifier_length: Optional[int] = None
205
206 isolation_level: Optional[str] = None
207
208 # sub-categories of max_identifier_length.
209 # currently these accommodate for MySQL which allows alias names
210 # of 255 but DDL names only of 64.
211 max_index_name_length: Optional[int] = None
212 max_constraint_name_length: Optional[int] = None
213
214 supports_sane_rowcount = True
215 supports_sane_multi_rowcount = True
216 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
217 default_paramstyle = "named"
218
219 supports_default_values = False
220 """dialect supports INSERT... DEFAULT VALUES syntax"""
221
222 supports_default_metavalue = False
223 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
224
225 default_metavalue_token = "DEFAULT"
226 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
227 parenthesis."""
228
229 # not sure if this is a real thing but the compiler will deliver it
230 # if this is the only flag enabled.
231 supports_empty_insert = True
232 """dialect supports INSERT () VALUES ()"""
233
234 supports_multivalues_insert = False
235
236 use_insertmanyvalues: bool = False
237
238 use_insertmanyvalues_wo_returning: bool = False
239
240 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
241 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
242 )
243
244 insertmanyvalues_page_size: int = 1000
245 insertmanyvalues_max_parameters = 32700
246
247 supports_is_distinct_from = True
248
249 supports_server_side_cursors = False
250
251 server_side_cursors = False
252
253 # extra record-level locking features (#4860)
254 supports_for_update_of = False
255
256 server_version_info = None
257
258 default_schema_name: Optional[str] = None
259
260 # indicates symbol names are
261 # UPPERCASED if they are case insensitive
262 # within the database.
263 # if this is True, the methods normalize_name()
264 # and denormalize_name() must be provided.
265 requires_name_normalize = False
266
267 is_async = False
268
269 has_terminate = False
270
271 # TODO: this is not to be part of 2.0. implement rudimentary binary
272 # literals for SQLite, PostgreSQL, MySQL only within
273 # _Binary.literal_processor
274 _legacy_binary_type_literal_encoding = "utf-8"
275
276 @util.deprecated_params(
277 empty_in_strategy=(
278 "1.4",
279 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
280 "deprecated, and no longer has any effect. All IN expressions "
281 "are now rendered using "
282 'the "expanding parameter" strategy which renders a set of bound'
283 'expressions, or an "empty set" SELECT, at statement execution'
284 "time.",
285 ),
286 server_side_cursors=(
287 "1.4",
288 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
289 "is deprecated and will be removed in a future release. Please "
290 "use the "
291 ":paramref:`_engine.Connection.execution_options.stream_results` "
292 "parameter.",
293 ),
294 )
295 def __init__(
296 self,
297 paramstyle: Optional[_ParamStyle] = None,
298 isolation_level: Optional[IsolationLevel] = None,
299 dbapi: Optional[ModuleType] = None,
300 implicit_returning: Literal[True] = True,
301 supports_native_boolean: Optional[bool] = None,
302 max_identifier_length: Optional[int] = None,
303 label_length: Optional[int] = None,
304 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
305 use_insertmanyvalues: Optional[bool] = None,
306 # util.deprecated_params decorator cannot render the
307 # Linting.NO_LINTING constant
308 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
309 server_side_cursors: bool = False,
310 skip_autocommit_rollback: bool = False,
311 **kwargs: Any,
312 ):
313 if server_side_cursors:
314 if not self.supports_server_side_cursors:
315 raise exc.ArgumentError(
316 "Dialect %s does not support server side cursors" % self
317 )
318 else:
319 self.server_side_cursors = True
320
321 if getattr(self, "use_setinputsizes", False):
322 util.warn_deprecated(
323 "The dialect-level use_setinputsizes attribute is "
324 "deprecated. Please use "
325 "bind_typing = BindTyping.SETINPUTSIZES",
326 "2.0",
327 )
328 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
329
330 self.positional = False
331 self._ischema = None
332
333 self.dbapi = dbapi
334
335 self.skip_autocommit_rollback = skip_autocommit_rollback
336
337 if paramstyle is not None:
338 self.paramstyle = paramstyle
339 elif self.dbapi is not None:
340 self.paramstyle = self.dbapi.paramstyle
341 else:
342 self.paramstyle = self.default_paramstyle
343 self.positional = self.paramstyle in (
344 "qmark",
345 "format",
346 "numeric",
347 "numeric_dollar",
348 )
349 self.identifier_preparer = self.preparer(self)
350 self._on_connect_isolation_level = isolation_level
351
352 legacy_tt_callable = getattr(self, "type_compiler", None)
353 if legacy_tt_callable is not None:
354 tt_callable = cast(
355 Type[compiler.GenericTypeCompiler],
356 self.type_compiler,
357 )
358 else:
359 tt_callable = self.type_compiler_cls
360
361 self.type_compiler_instance = self.type_compiler = tt_callable(self)
362
363 if supports_native_boolean is not None:
364 self.supports_native_boolean = supports_native_boolean
365
366 self._user_defined_max_identifier_length = max_identifier_length
367 if self._user_defined_max_identifier_length:
368 self.max_identifier_length = (
369 self._user_defined_max_identifier_length
370 )
371 self.label_length = label_length
372 self.compiler_linting = compiler_linting
373
374 if use_insertmanyvalues is not None:
375 self.use_insertmanyvalues = use_insertmanyvalues
376
377 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
378 self.insertmanyvalues_page_size = insertmanyvalues_page_size
379
380 @property
381 @util.deprecated(
382 "2.0",
383 "full_returning is deprecated, please use insert_returning, "
384 "update_returning, delete_returning",
385 )
386 def full_returning(self):
387 return (
388 self.insert_returning
389 and self.update_returning
390 and self.delete_returning
391 )
392
393 @util.memoized_property
394 def insert_executemany_returning(self):
395 """Default implementation for insert_executemany_returning, if not
396 otherwise overridden by the specific dialect.
397
398 The default dialect determines "insert_executemany_returning" is
399 available if the dialect in use has opted into using the
400 "use_insertmanyvalues" feature. If they haven't opted into that, then
401 this attribute is False, unless the dialect in question overrides this
402 and provides some other implementation (such as the Oracle Database
403 dialects).
404
405 """
406 return self.insert_returning and self.use_insertmanyvalues
407
408 @util.memoized_property
409 def insert_executemany_returning_sort_by_parameter_order(self):
410 """Default implementation for
411 insert_executemany_returning_deterministic_order, if not otherwise
412 overridden by the specific dialect.
413
414 The default dialect determines "insert_executemany_returning" can have
415 deterministic order only if the dialect in use has opted into using the
416 "use_insertmanyvalues" feature, which implements deterministic ordering
417 using client side sentinel columns only by default. The
418 "insertmanyvalues" feature also features alternate forms that can
419 use server-generated PK values as "sentinels", but those are only
420 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
421 bitflag enables those alternate SQL forms, which are disabled
422 by default.
423
424 If the dialect in use hasn't opted into that, then this attribute is
425 False, unless the dialect in question overrides this and provides some
426 other implementation (such as the Oracle Database dialects).
427
428 """
429 return self.insert_returning and self.use_insertmanyvalues
430
431 update_executemany_returning = False
432 delete_executemany_returning = False
433
434 @util.memoized_property
435 def loaded_dbapi(self) -> DBAPIModule:
436 if self.dbapi is None:
437 raise exc.InvalidRequestError(
438 f"Dialect {self} does not have a Python DBAPI established "
439 "and cannot be used for actual database interaction"
440 )
441 return self.dbapi
442
443 @util.memoized_property
444 def _bind_typing_render_casts(self):
445 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
446
447 def _ensure_has_table_connection(self, arg: Connection) -> None:
448 if not isinstance(arg, Connection):
449 raise exc.ArgumentError(
450 "The argument passed to Dialect.has_table() should be a "
451 "%s, got %s. "
452 "Additionally, the Dialect.has_table() method is for "
453 "internal dialect "
454 "use only; please use "
455 "``inspect(some_engine).has_table(<tablename>>)`` "
456 "for public API use." % (Connection, type(arg))
457 )
458
459 @util.memoized_property
460 def _supports_statement_cache(self):
461 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
462 if ssc is None:
463 util.warn(
464 "Dialect %s:%s will not make use of SQL compilation caching "
465 "as it does not set the 'supports_statement_cache' attribute "
466 "to ``True``. This can have "
467 "significant performance implications including some "
468 "performance degradations in comparison to prior SQLAlchemy "
469 "versions. Dialect maintainers should seek to set this "
470 "attribute to True after appropriate development and testing "
471 "for SQLAlchemy 1.4 caching support. Alternatively, this "
472 "attribute may be set to False which will disable this "
473 "warning." % (self.name, self.driver),
474 code="cprf",
475 )
476
477 return bool(ssc)
478
479 @util.memoized_property
480 def _type_memos(self):
481 return weakref.WeakKeyDictionary()
482
483 @property
484 def dialect_description(self): # type: ignore[override]
485 return self.name + "+" + self.driver
486
487 @property
488 def supports_sane_rowcount_returning(self):
489 """True if this dialect supports sane rowcount even if RETURNING is
490 in use.
491
492 For dialects that don't support RETURNING, this is synonymous with
493 ``supports_sane_rowcount``.
494
495 """
496 return self.supports_sane_rowcount
497
498 @classmethod
499 def get_pool_class(cls, url: URL) -> Type[Pool]:
500 return getattr(cls, "poolclass", pool.QueuePool)
501
502 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
503 return self.get_pool_class(url)
504
505 @classmethod
506 def load_provisioning(cls):
507 package = ".".join(cls.__module__.split(".")[0:-1])
508 try:
509 __import__(package + ".provision")
510 except ImportError:
511 pass
512
513 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
514 if self._on_connect_isolation_level is not None:
515
516 def builtin_connect(dbapi_conn, conn_rec):
517 self._assert_and_set_isolation_level(
518 dbapi_conn, self._on_connect_isolation_level
519 )
520
521 return builtin_connect
522 else:
523 return None
524
525 def initialize(self, connection: Connection) -> None:
526 try:
527 self.server_version_info = self._get_server_version_info(
528 connection
529 )
530 except NotImplementedError:
531 self.server_version_info = None
532 try:
533 self.default_schema_name = self._get_default_schema_name(
534 connection
535 )
536 except NotImplementedError:
537 self.default_schema_name = None
538
539 try:
540 self.default_isolation_level = self.get_default_isolation_level(
541 connection.connection.dbapi_connection
542 )
543 except NotImplementedError:
544 self.default_isolation_level = None
545
546 if not self._user_defined_max_identifier_length:
547 max_ident_length = self._check_max_identifier_length(connection)
548 if max_ident_length:
549 self.max_identifier_length = max_ident_length
550
551 if (
552 self.label_length
553 and self.label_length > self.max_identifier_length
554 ):
555 raise exc.ArgumentError(
556 "Label length of %d is greater than this dialect's"
557 " maximum identifier length of %d"
558 % (self.label_length, self.max_identifier_length)
559 )
560
561 def on_connect(self) -> Optional[Callable[[Any], None]]:
562 # inherits the docstring from interfaces.Dialect.on_connect
563 return None
564
565 def _check_max_identifier_length(self, connection):
566 """Perform a connection / server version specific check to determine
567 the max_identifier_length.
568
569 If the dialect's class level max_identifier_length should be used,
570 can return None.
571
572 .. versionadded:: 1.3.9
573
574 """
575 return None
576
577 def get_default_isolation_level(self, dbapi_conn):
578 """Given a DBAPI connection, return its isolation level, or
579 a default isolation level if one cannot be retrieved.
580
581 May be overridden by subclasses in order to provide a
582 "fallback" isolation level for databases that cannot reliably
583 retrieve the actual isolation level.
584
585 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
586 method, propagating any exceptions raised.
587
588 .. versionadded:: 1.3.22
589
590 """
591 return self.get_isolation_level(dbapi_conn)
592
593 def type_descriptor(self, typeobj):
594 """Provide a database-specific :class:`.TypeEngine` object, given
595 the generic object which comes from the types module.
596
597 This method looks for a dictionary called
598 ``colspecs`` as a class or instance-level variable,
599 and passes on to :func:`_types.adapt_type`.
600
601 """
602 return type_api.adapt_type(typeobj, self.colspecs)
603
604 def has_index(self, connection, table_name, index_name, schema=None, **kw):
605 if not self.has_table(connection, table_name, schema=schema, **kw):
606 return False
607 for idx in self.get_indexes(
608 connection, table_name, schema=schema, **kw
609 ):
610 if idx["name"] == index_name:
611 return True
612 else:
613 return False
614
615 def has_schema(
616 self, connection: Connection, schema_name: str, **kw: Any
617 ) -> bool:
618 return schema_name in self.get_schema_names(connection, **kw)
619
620 def validate_identifier(self, ident: str) -> None:
621 if len(ident) > self.max_identifier_length:
622 raise exc.IdentifierError(
623 "Identifier '%s' exceeds maximum length of %d characters"
624 % (ident, self.max_identifier_length)
625 )
626
627 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
628 # inherits the docstring from interfaces.Dialect.connect
629 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
630
631 def create_connect_args(self, url: URL) -> ConnectArgsType:
632 # inherits the docstring from interfaces.Dialect.create_connect_args
633 opts = url.translate_connect_args()
634 opts.update(url.query)
635 return ([], opts)
636
637 def set_engine_execution_options(
638 self, engine: Engine, opts: Mapping[str, Any]
639 ) -> None:
640 supported_names = set(self.connection_characteristics).intersection(
641 opts
642 )
643 if supported_names:
644 characteristics: Mapping[str, Any] = util.immutabledict(
645 (name, opts[name]) for name in supported_names
646 )
647
648 @event.listens_for(engine, "engine_connect")
649 def set_connection_characteristics(connection):
650 self._set_connection_characteristics(
651 connection, characteristics
652 )
653
654 def set_connection_execution_options(
655 self, connection: Connection, opts: Mapping[str, Any]
656 ) -> None:
657 supported_names = set(self.connection_characteristics).intersection(
658 opts
659 )
660 if supported_names:
661 characteristics: Mapping[str, Any] = util.immutabledict(
662 (name, opts[name]) for name in supported_names
663 )
664 self._set_connection_characteristics(connection, characteristics)
665
666 def _set_connection_characteristics(self, connection, characteristics):
667 characteristic_values = [
668 (name, self.connection_characteristics[name], value)
669 for name, value in characteristics.items()
670 ]
671
672 if connection.in_transaction():
673 trans_objs = [
674 (name, obj)
675 for name, obj, _ in characteristic_values
676 if obj.transactional
677 ]
678 if trans_objs:
679 raise exc.InvalidRequestError(
680 "This connection has already initialized a SQLAlchemy "
681 "Transaction() object via begin() or autobegin; "
682 "%s may not be altered unless rollback() or commit() "
683 "is called first."
684 % (", ".join(name for name, obj in trans_objs))
685 )
686
687 dbapi_connection = connection.connection.dbapi_connection
688 for _, characteristic, value in characteristic_values:
689 characteristic.set_connection_characteristic(
690 self, connection, dbapi_connection, value
691 )
692 connection.connection._connection_record.finalize_callback.append(
693 functools.partial(self._reset_characteristics, characteristics)
694 )
695
696 def _reset_characteristics(self, characteristics, dbapi_connection):
697 for characteristic_name in characteristics:
698 characteristic = self.connection_characteristics[
699 characteristic_name
700 ]
701 characteristic.reset_characteristic(self, dbapi_connection)
702
703 def do_begin(self, dbapi_connection):
704 pass
705
706 def do_rollback(self, dbapi_connection):
707 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
708 dbapi_connection
709 ):
710 return
711 dbapi_connection.rollback()
712
713 def do_commit(self, dbapi_connection):
714 dbapi_connection.commit()
715
716 def do_terminate(self, dbapi_connection):
717 self.do_close(dbapi_connection)
718
719 def do_close(self, dbapi_connection):
720 dbapi_connection.close()
721
722 @util.memoized_property
723 def _dialect_specific_select_one(self):
724 return str(expression.select(1).compile(dialect=self))
725
726 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
727 try:
728 return self.do_ping(dbapi_connection)
729 except self.loaded_dbapi.Error as err:
730 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
731
732 if self._has_events:
733 try:
734 Connection._handle_dbapi_exception_noconnection(
735 err,
736 self,
737 is_disconnect=is_disconnect,
738 invalidate_pool_on_disconnect=False,
739 is_pre_ping=True,
740 )
741 except exc.StatementError as new_err:
742 is_disconnect = new_err.connection_invalidated
743
744 if is_disconnect:
745 return False
746 else:
747 raise
748
749 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
750 cursor = dbapi_connection.cursor()
751 try:
752 cursor.execute(self._dialect_specific_select_one)
753 finally:
754 cursor.close()
755 return True
756
757 def create_xid(self):
758 """Create a random two-phase transaction ID.
759
760 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
761 do_commit_twophase(). Its format is unspecified.
762 """
763
764 return "_sa_%032x" % random.randint(0, 2**128)
765
766 def do_savepoint(self, connection, name):
767 connection.execute(expression.SavepointClause(name))
768
769 def do_rollback_to_savepoint(self, connection, name):
770 connection.execute(expression.RollbackToSavepointClause(name))
771
772 def do_release_savepoint(self, connection, name):
773 connection.execute(expression.ReleaseSavepointClause(name))
774
775 def _deliver_insertmanyvalues_batches(
776 self,
777 connection,
778 cursor,
779 statement,
780 parameters,
781 generic_setinputsizes,
782 context,
783 ):
784 context = cast(DefaultExecutionContext, context)
785 compiled = cast(SQLCompiler, context.compiled)
786
787 _composite_sentinel_proc: Sequence[
788 Optional[_ResultProcessorType[Any]]
789 ] = ()
790 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
791 _sentinel_proc_initialized: bool = False
792
793 compiled_parameters = context.compiled_parameters
794
795 imv = compiled._insertmanyvalues
796 assert imv is not None
797
798 is_returning: Final[bool] = bool(compiled.effective_returning)
799 batch_size = context.execution_options.get(
800 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
801 )
802
803 if compiled.schema_translate_map:
804 schema_translate_map = context.execution_options.get(
805 "schema_translate_map", {}
806 )
807 else:
808 schema_translate_map = None
809
810 if is_returning:
811 result: Optional[List[Any]] = []
812 context._insertmanyvalues_rows = result
813
814 sort_by_parameter_order = imv.sort_by_parameter_order
815
816 else:
817 sort_by_parameter_order = False
818 result = None
819
820 for imv_batch in compiled._deliver_insertmanyvalues_batches(
821 statement,
822 parameters,
823 compiled_parameters,
824 generic_setinputsizes,
825 batch_size,
826 sort_by_parameter_order,
827 schema_translate_map,
828 ):
829 yield imv_batch
830
831 if is_returning:
832
833 try:
834 rows = context.fetchall_for_returning(cursor)
835 except BaseException as be:
836 connection._handle_dbapi_exception(
837 be,
838 sql_util._long_statement(imv_batch.replaced_statement),
839 imv_batch.replaced_parameters,
840 None,
841 context,
842 is_sub_exec=True,
843 )
844
845 # I would have thought "is_returning: Final[bool]"
846 # would have assured this but pylance thinks not
847 assert result is not None
848
849 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
850 composite_sentinel = imv.num_sentinel_columns > 1
851 if imv.implicit_sentinel:
852 # for implicit sentinel, which is currently single-col
853 # integer autoincrement, do a simple sort.
854 assert not composite_sentinel
855 result.extend(
856 sorted(rows, key=operator.itemgetter(-1))
857 )
858 continue
859
860 # otherwise, create dictionaries to match up batches
861 # with parameters
862 assert imv.sentinel_param_keys
863 assert imv.sentinel_columns
864
865 _nsc = imv.num_sentinel_columns
866
867 if not _sentinel_proc_initialized:
868 if composite_sentinel:
869 _composite_sentinel_proc = [
870 col.type._cached_result_processor(
871 self, cursor_desc[1]
872 )
873 for col, cursor_desc in zip(
874 imv.sentinel_columns,
875 cursor.description[-_nsc:],
876 )
877 ]
878 else:
879 _scalar_sentinel_proc = (
880 imv.sentinel_columns[0]
881 ).type._cached_result_processor(
882 self, cursor.description[-1][1]
883 )
884 _sentinel_proc_initialized = True
885
886 rows_by_sentinel: Union[
887 Dict[Tuple[Any, ...], Any],
888 Dict[Any, Any],
889 ]
890 if composite_sentinel:
891 rows_by_sentinel = {
892 tuple(
893 (proc(val) if proc else val)
894 for val, proc in zip(
895 row[-_nsc:], _composite_sentinel_proc
896 )
897 ): row
898 for row in rows
899 }
900 elif _scalar_sentinel_proc:
901 rows_by_sentinel = {
902 _scalar_sentinel_proc(row[-1]): row for row in rows
903 }
904 else:
905 rows_by_sentinel = {row[-1]: row for row in rows}
906
907 if len(rows_by_sentinel) != len(imv_batch.batch):
908 # see test_insert_exec.py::
909 # IMVSentinelTest::test_sentinel_incorrect_rowcount
910 # for coverage / demonstration
911 raise exc.InvalidRequestError(
912 f"Sentinel-keyed result set did not produce "
913 f"correct number of rows {len(imv_batch.batch)}; "
914 "produced "
915 f"{len(rows_by_sentinel)}. Please ensure the "
916 "sentinel column is fully unique and populated in "
917 "all cases."
918 )
919
920 try:
921 ordered_rows = [
922 rows_by_sentinel[sentinel_keys]
923 for sentinel_keys in imv_batch.sentinel_values
924 ]
925 except KeyError as ke:
926 # see test_insert_exec.py::
927 # IMVSentinelTest::test_sentinel_cant_match_keys
928 # for coverage / demonstration
929 raise exc.InvalidRequestError(
930 f"Can't match sentinel values in result set to "
931 f"parameter sets; key {ke.args[0]!r} was not "
932 "found. "
933 "There may be a mismatch between the datatype "
934 "passed to the DBAPI driver vs. that which it "
935 "returns in a result row. Ensure the given "
936 "Python value matches the expected result type "
937 "*exactly*, taking care to not rely upon implicit "
938 "conversions which may occur such as when using "
939 "strings in place of UUID or integer values, etc. "
940 ) from ke
941
942 result.extend(ordered_rows)
943
944 else:
945 result.extend(rows)
946
947 def do_executemany(self, cursor, statement, parameters, context=None):
948 cursor.executemany(statement, parameters)
949
950 def do_execute(self, cursor, statement, parameters, context=None):
951 cursor.execute(statement, parameters)
952
953 def do_execute_no_params(self, cursor, statement, context=None):
954 cursor.execute(statement)
955
956 def is_disconnect(
957 self,
958 e: DBAPIModule.Error,
959 connection: Union[
960 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
961 ],
962 cursor: Optional[interfaces.DBAPICursor],
963 ) -> bool:
964 return False
965
966 @util.memoized_instancemethod
967 def _gen_allowed_isolation_levels(self, dbapi_conn):
968 try:
969 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
970 except NotImplementedError:
971 return None
972 else:
973 normalized_levels = [
974 level.replace("_", " ").upper() for level in raw_levels
975 ]
976 if raw_levels != normalized_levels:
977 raise ValueError(
978 f"Dialect {self.name!r} get_isolation_level_values() "
979 f"method should return names as UPPERCASE using spaces, "
980 f"not underscores; got "
981 f"{sorted(set(raw_levels).difference(normalized_levels))}"
982 )
983 return tuple(normalized_levels)
984
985 def _assert_and_set_isolation_level(self, dbapi_conn, level):
986 level = level.replace("_", " ").upper()
987
988 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
989 dbapi_conn
990 )
991 if (
992 _allowed_isolation_levels
993 and level not in _allowed_isolation_levels
994 ):
995 raise exc.ArgumentError(
996 f"Invalid value {level!r} for isolation_level. "
997 f"Valid isolation levels for {self.name!r} are "
998 f"{', '.join(_allowed_isolation_levels)}"
999 )
1000
1001 self.set_isolation_level(dbapi_conn, level)
1002
1003 def reset_isolation_level(self, dbapi_conn):
1004 if self._on_connect_isolation_level is not None:
1005 assert (
1006 self._on_connect_isolation_level == "AUTOCOMMIT"
1007 or self._on_connect_isolation_level
1008 == self.default_isolation_level
1009 )
1010 self._assert_and_set_isolation_level(
1011 dbapi_conn, self._on_connect_isolation_level
1012 )
1013 else:
1014 assert self.default_isolation_level is not None
1015 self._assert_and_set_isolation_level(
1016 dbapi_conn,
1017 self.default_isolation_level,
1018 )
1019
1020 def normalize_name(self, name):
1021 if name is None:
1022 return None
1023
1024 name_lower = name.lower()
1025 name_upper = name.upper()
1026
1027 if name_upper == name_lower:
1028 # name has no upper/lower conversion, e.g. non-european characters.
1029 # return unchanged
1030 return name
1031 elif name_upper == name and not (
1032 self.identifier_preparer._requires_quotes
1033 )(name_lower):
1034 # name is all uppercase and doesn't require quoting; normalize
1035 # to all lower case
1036 return name_lower
1037 elif name_lower == name:
1038 # name is all lower case, which if denormalized means we need to
1039 # force quoting on it
1040 return quoted_name(name, quote=True)
1041 else:
1042 # name is mixed case, means it will be quoted in SQL when used
1043 # later, no normalizes
1044 return name
1045
1046 def denormalize_name(self, name):
1047 if name is None:
1048 return None
1049
1050 name_lower = name.lower()
1051 name_upper = name.upper()
1052
1053 if name_upper == name_lower:
1054 # name has no upper/lower conversion, e.g. non-european characters.
1055 # return unchanged
1056 return name
1057 elif name_lower == name and not (
1058 self.identifier_preparer._requires_quotes
1059 )(name_lower):
1060 name = name_upper
1061 return name
1062
1063 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1064 return connection
1065
1066 def _overrides_default(self, method):
1067 return (
1068 getattr(type(self), method).__code__
1069 is not getattr(DefaultDialect, method).__code__
1070 )
1071
1072 def _default_multi_reflect(
1073 self,
1074 single_tbl_method,
1075 connection,
1076 kind,
1077 schema,
1078 filter_names,
1079 scope,
1080 **kw,
1081 ):
1082 names_fns = []
1083 temp_names_fns = []
1084 if ObjectKind.TABLE in kind:
1085 names_fns.append(self.get_table_names)
1086 temp_names_fns.append(self.get_temp_table_names)
1087 if ObjectKind.VIEW in kind:
1088 names_fns.append(self.get_view_names)
1089 temp_names_fns.append(self.get_temp_view_names)
1090 if ObjectKind.MATERIALIZED_VIEW in kind:
1091 names_fns.append(self.get_materialized_view_names)
1092 # no temp materialized view at the moment
1093 # temp_names_fns.append(self.get_temp_materialized_view_names)
1094
1095 unreflectable = kw.pop("unreflectable", {})
1096
1097 if (
1098 filter_names
1099 and scope is ObjectScope.ANY
1100 and kind is ObjectKind.ANY
1101 ):
1102 # if names are given and no qualification on type of table
1103 # (i.e. the Table(..., autoload) case), take the names as given,
1104 # don't run names queries. If a table does not exit
1105 # NoSuchTableError is raised and it's skipped
1106
1107 # this also suits the case for mssql where we can reflect
1108 # individual temp tables but there's no temp_names_fn
1109 names = filter_names
1110 else:
1111 names = []
1112 name_kw = {"schema": schema, **kw}
1113 fns = []
1114 if ObjectScope.DEFAULT in scope:
1115 fns.extend(names_fns)
1116 if ObjectScope.TEMPORARY in scope:
1117 fns.extend(temp_names_fns)
1118
1119 for fn in fns:
1120 try:
1121 names.extend(fn(connection, **name_kw))
1122 except NotImplementedError:
1123 pass
1124
1125 if filter_names:
1126 filter_names = set(filter_names)
1127
1128 # iterate over all the tables/views and call the single table method
1129 for table in names:
1130 if not filter_names or table in filter_names:
1131 key = (schema, table)
1132 try:
1133 yield (
1134 key,
1135 single_tbl_method(
1136 connection, table, schema=schema, **kw
1137 ),
1138 )
1139 except exc.UnreflectableTableError as err:
1140 if key not in unreflectable:
1141 unreflectable[key] = err
1142 except exc.NoSuchTableError:
1143 pass
1144
1145 def get_multi_table_options(self, connection, **kw):
1146 return self._default_multi_reflect(
1147 self.get_table_options, connection, **kw
1148 )
1149
1150 def get_multi_columns(self, connection, **kw):
1151 return self._default_multi_reflect(self.get_columns, connection, **kw)
1152
1153 def get_multi_pk_constraint(self, connection, **kw):
1154 return self._default_multi_reflect(
1155 self.get_pk_constraint, connection, **kw
1156 )
1157
1158 def get_multi_foreign_keys(self, connection, **kw):
1159 return self._default_multi_reflect(
1160 self.get_foreign_keys, connection, **kw
1161 )
1162
1163 def get_multi_indexes(self, connection, **kw):
1164 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1165
1166 def get_multi_unique_constraints(self, connection, **kw):
1167 return self._default_multi_reflect(
1168 self.get_unique_constraints, connection, **kw
1169 )
1170
1171 def get_multi_check_constraints(self, connection, **kw):
1172 return self._default_multi_reflect(
1173 self.get_check_constraints, connection, **kw
1174 )
1175
1176 def get_multi_table_comment(self, connection, **kw):
1177 return self._default_multi_reflect(
1178 self.get_table_comment, connection, **kw
1179 )
1180
1181
1182class StrCompileDialect(DefaultDialect):
1183 statement_compiler = compiler.StrSQLCompiler
1184 ddl_compiler = compiler.DDLCompiler
1185 type_compiler_cls = compiler.StrSQLTypeCompiler
1186 preparer = compiler.IdentifierPreparer
1187
1188 insert_returning = True
1189 update_returning = True
1190 delete_returning = True
1191
1192 supports_statement_cache = True
1193
1194 supports_identity_columns = True
1195
1196 supports_sequences = True
1197 sequences_optional = True
1198 preexecute_autoincrement_sequences = False
1199
1200 supports_native_boolean = True
1201
1202 supports_multivalues_insert = True
1203 supports_simple_order_by_label = True
1204
1205
1206class DefaultExecutionContext(ExecutionContext):
1207 isinsert = False
1208 isupdate = False
1209 isdelete = False
1210 is_crud = False
1211 is_text = False
1212 isddl = False
1213
1214 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1215
1216 compiled: Optional[Compiled] = None
1217 result_column_struct: Optional[
1218 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1219 ] = None
1220 returned_default_rows: Optional[Sequence[Row[Any]]] = None
1221
1222 execution_options: _ExecuteOptions = util.EMPTY_DICT
1223
1224 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1225
1226 invoked_statement: Optional[Executable] = None
1227
1228 _is_implicit_returning = False
1229 _is_explicit_returning = False
1230 _is_supplemental_returning = False
1231 _is_server_side = False
1232
1233 _soft_closed = False
1234
1235 _rowcount: Optional[int] = None
1236
1237 # a hook for SQLite's translation of
1238 # result column names
1239 # NOTE: pyhive is using this hook, can't remove it :(
1240 _translate_colname: Optional[Callable[[str], str]] = None
1241
1242 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1243 """used by set_input_sizes().
1244
1245 This collection comes from ``ExpandedState.parameter_expansion``.
1246
1247 """
1248
1249 cache_hit = NO_CACHE_KEY
1250
1251 root_connection: Connection
1252 _dbapi_connection: PoolProxiedConnection
1253 dialect: Dialect
1254 unicode_statement: str
1255 cursor: DBAPICursor
1256 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1257 parameters: _DBAPIMultiExecuteParams
1258 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1259
1260 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1261
1262 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1263 _num_sentinel_cols: int = 0
1264
1265 @classmethod
1266 def _init_ddl(
1267 cls,
1268 dialect: Dialect,
1269 connection: Connection,
1270 dbapi_connection: PoolProxiedConnection,
1271 execution_options: _ExecuteOptions,
1272 compiled_ddl: DDLCompiler,
1273 ) -> ExecutionContext:
1274 """Initialize execution context for an ExecutableDDLElement
1275 construct."""
1276
1277 self = cls.__new__(cls)
1278 self.root_connection = connection
1279 self._dbapi_connection = dbapi_connection
1280 self.dialect = connection.dialect
1281
1282 self.compiled = compiled = compiled_ddl
1283 self.isddl = True
1284
1285 self.execution_options = execution_options
1286
1287 self.unicode_statement = str(compiled)
1288 if compiled.schema_translate_map:
1289 schema_translate_map = self.execution_options.get(
1290 "schema_translate_map", {}
1291 )
1292
1293 rst = compiled.preparer._render_schema_translates
1294 self.unicode_statement = rst(
1295 self.unicode_statement, schema_translate_map
1296 )
1297
1298 self.statement = self.unicode_statement
1299
1300 self.cursor = self.create_cursor()
1301 self.compiled_parameters = []
1302
1303 if dialect.positional:
1304 self.parameters = [dialect.execute_sequence_format()]
1305 else:
1306 self.parameters = [self._empty_dict_params]
1307
1308 return self
1309
1310 @classmethod
1311 def _init_compiled(
1312 cls,
1313 dialect: Dialect,
1314 connection: Connection,
1315 dbapi_connection: PoolProxiedConnection,
1316 execution_options: _ExecuteOptions,
1317 compiled: SQLCompiler,
1318 parameters: _CoreMultiExecuteParams,
1319 invoked_statement: Executable,
1320 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1321 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1322 ) -> ExecutionContext:
1323 """Initialize execution context for a Compiled construct."""
1324
1325 self = cls.__new__(cls)
1326 self.root_connection = connection
1327 self._dbapi_connection = dbapi_connection
1328 self.dialect = connection.dialect
1329 self.extracted_parameters = extracted_parameters
1330 self.invoked_statement = invoked_statement
1331 self.compiled = compiled
1332 self.cache_hit = cache_hit
1333
1334 self.execution_options = execution_options
1335
1336 self.result_column_struct = (
1337 compiled._result_columns,
1338 compiled._ordered_columns,
1339 compiled._textual_ordered_columns,
1340 compiled._ad_hoc_textual,
1341 compiled._loose_column_name_matching,
1342 )
1343
1344 self.isinsert = ii = compiled.isinsert
1345 self.isupdate = iu = compiled.isupdate
1346 self.isdelete = id_ = compiled.isdelete
1347 self.is_text = compiled.isplaintext
1348
1349 if ii or iu or id_:
1350 dml_statement = compiled.compile_state.statement # type: ignore
1351 if TYPE_CHECKING:
1352 assert isinstance(dml_statement, UpdateBase)
1353 self.is_crud = True
1354 self._is_explicit_returning = ier = bool(dml_statement._returning)
1355 self._is_implicit_returning = iir = bool(
1356 compiled.implicit_returning
1357 )
1358 if iir and dml_statement._supplemental_returning:
1359 self._is_supplemental_returning = True
1360
1361 # dont mix implicit and explicit returning
1362 assert not (iir and ier)
1363
1364 if (ier or iir) and compiled.for_executemany:
1365 if ii and not self.dialect.insert_executemany_returning:
1366 raise exc.InvalidRequestError(
1367 f"Dialect {self.dialect.dialect_description} with "
1368 f"current server capabilities does not support "
1369 "INSERT..RETURNING when executemany is used"
1370 )
1371 elif (
1372 ii
1373 and dml_statement._sort_by_parameter_order
1374 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1375 ):
1376 raise exc.InvalidRequestError(
1377 f"Dialect {self.dialect.dialect_description} with "
1378 f"current server capabilities does not support "
1379 "INSERT..RETURNING with deterministic row ordering "
1380 "when executemany is used"
1381 )
1382 elif (
1383 ii
1384 and self.dialect.use_insertmanyvalues
1385 and not compiled._insertmanyvalues
1386 ):
1387 raise exc.InvalidRequestError(
1388 'Statement does not have "insertmanyvalues" '
1389 "enabled, can't use INSERT..RETURNING with "
1390 "executemany in this case."
1391 )
1392 elif iu and not self.dialect.update_executemany_returning:
1393 raise exc.InvalidRequestError(
1394 f"Dialect {self.dialect.dialect_description} with "
1395 f"current server capabilities does not support "
1396 "UPDATE..RETURNING when executemany is used"
1397 )
1398 elif id_ and not self.dialect.delete_executemany_returning:
1399 raise exc.InvalidRequestError(
1400 f"Dialect {self.dialect.dialect_description} with "
1401 f"current server capabilities does not support "
1402 "DELETE..RETURNING when executemany is used"
1403 )
1404
1405 if not parameters:
1406 self.compiled_parameters = [
1407 compiled.construct_params(
1408 extracted_parameters=extracted_parameters,
1409 escape_names=False,
1410 )
1411 ]
1412 else:
1413 self.compiled_parameters = [
1414 compiled.construct_params(
1415 m,
1416 escape_names=False,
1417 _group_number=grp,
1418 extracted_parameters=extracted_parameters,
1419 )
1420 for grp, m in enumerate(parameters)
1421 ]
1422
1423 if len(parameters) > 1:
1424 if self.isinsert and compiled._insertmanyvalues:
1425 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1426
1427 imv = compiled._insertmanyvalues
1428 if imv.sentinel_columns is not None:
1429 self._num_sentinel_cols = imv.num_sentinel_columns
1430 else:
1431 self.execute_style = ExecuteStyle.EXECUTEMANY
1432
1433 self.unicode_statement = compiled.string
1434
1435 self.cursor = self.create_cursor()
1436
1437 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1438 self._process_execute_defaults()
1439
1440 processors = compiled._bind_processors
1441
1442 flattened_processors: Mapping[
1443 str, _BindProcessorType[Any]
1444 ] = processors # type: ignore[assignment]
1445
1446 if compiled.literal_execute_params or compiled.post_compile_params:
1447 if self.executemany:
1448 raise exc.InvalidRequestError(
1449 "'literal_execute' or 'expanding' parameters can't be "
1450 "used with executemany()"
1451 )
1452
1453 expanded_state = compiled._process_parameters_for_postcompile(
1454 self.compiled_parameters[0]
1455 )
1456
1457 # re-assign self.unicode_statement
1458 self.unicode_statement = expanded_state.statement
1459
1460 self._expanded_parameters = expanded_state.parameter_expansion
1461
1462 flattened_processors = dict(processors) # type: ignore
1463 flattened_processors.update(expanded_state.processors)
1464 positiontup = expanded_state.positiontup
1465 elif compiled.positional:
1466 positiontup = self.compiled.positiontup
1467 else:
1468 positiontup = None
1469
1470 if compiled.schema_translate_map:
1471 schema_translate_map = self.execution_options.get(
1472 "schema_translate_map", {}
1473 )
1474 rst = compiled.preparer._render_schema_translates
1475 self.unicode_statement = rst(
1476 self.unicode_statement, schema_translate_map
1477 )
1478
1479 # final self.unicode_statement is now assigned, encode if needed
1480 # by dialect
1481 self.statement = self.unicode_statement
1482
1483 # Convert the dictionary of bind parameter values
1484 # into a dict or list to be sent to the DBAPI's
1485 # execute() or executemany() method.
1486
1487 if compiled.positional:
1488 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1489 assert positiontup is not None
1490 for compiled_params in self.compiled_parameters:
1491 l_param: List[Any] = [
1492 (
1493 flattened_processors[key](compiled_params[key])
1494 if key in flattened_processors
1495 else compiled_params[key]
1496 )
1497 for key in positiontup
1498 ]
1499 core_positional_parameters.append(
1500 dialect.execute_sequence_format(l_param)
1501 )
1502
1503 self.parameters = core_positional_parameters
1504 else:
1505 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1506 escaped_names = compiled.escaped_bind_names
1507
1508 # note that currently, "expanded" parameters will be present
1509 # in self.compiled_parameters in their quoted form. This is
1510 # slightly inconsistent with the approach taken as of
1511 # #8056 where self.compiled_parameters is meant to contain unquoted
1512 # param names.
1513 d_param: Dict[str, Any]
1514 for compiled_params in self.compiled_parameters:
1515 if escaped_names:
1516 d_param = {
1517 escaped_names.get(key, key): (
1518 flattened_processors[key](compiled_params[key])
1519 if key in flattened_processors
1520 else compiled_params[key]
1521 )
1522 for key in compiled_params
1523 }
1524 else:
1525 d_param = {
1526 key: (
1527 flattened_processors[key](compiled_params[key])
1528 if key in flattened_processors
1529 else compiled_params[key]
1530 )
1531 for key in compiled_params
1532 }
1533
1534 core_dict_parameters.append(d_param)
1535
1536 self.parameters = core_dict_parameters
1537
1538 return self
1539
1540 @classmethod
1541 def _init_statement(
1542 cls,
1543 dialect: Dialect,
1544 connection: Connection,
1545 dbapi_connection: PoolProxiedConnection,
1546 execution_options: _ExecuteOptions,
1547 statement: str,
1548 parameters: _DBAPIMultiExecuteParams,
1549 ) -> ExecutionContext:
1550 """Initialize execution context for a string SQL statement."""
1551
1552 self = cls.__new__(cls)
1553 self.root_connection = connection
1554 self._dbapi_connection = dbapi_connection
1555 self.dialect = connection.dialect
1556 self.is_text = True
1557
1558 self.execution_options = execution_options
1559
1560 if not parameters:
1561 if self.dialect.positional:
1562 self.parameters = [dialect.execute_sequence_format()]
1563 else:
1564 self.parameters = [self._empty_dict_params]
1565 elif isinstance(parameters[0], dialect.execute_sequence_format):
1566 self.parameters = parameters
1567 elif isinstance(parameters[0], dict):
1568 self.parameters = parameters
1569 else:
1570 self.parameters = [
1571 dialect.execute_sequence_format(p) for p in parameters
1572 ]
1573
1574 if len(parameters) > 1:
1575 self.execute_style = ExecuteStyle.EXECUTEMANY
1576
1577 self.statement = self.unicode_statement = statement
1578
1579 self.cursor = self.create_cursor()
1580 return self
1581
1582 @classmethod
1583 def _init_default(
1584 cls,
1585 dialect: Dialect,
1586 connection: Connection,
1587 dbapi_connection: PoolProxiedConnection,
1588 execution_options: _ExecuteOptions,
1589 ) -> ExecutionContext:
1590 """Initialize execution context for a ColumnDefault construct."""
1591
1592 self = cls.__new__(cls)
1593 self.root_connection = connection
1594 self._dbapi_connection = dbapi_connection
1595 self.dialect = connection.dialect
1596
1597 self.execution_options = execution_options
1598
1599 self.cursor = self.create_cursor()
1600 return self
1601
1602 def _get_cache_stats(self) -> str:
1603 if self.compiled is None:
1604 return "raw sql"
1605
1606 now = perf_counter()
1607
1608 ch = self.cache_hit
1609
1610 gen_time = self.compiled._gen_time
1611 assert gen_time is not None
1612
1613 if ch is NO_CACHE_KEY:
1614 return "no key %.5fs" % (now - gen_time,)
1615 elif ch is CACHE_HIT:
1616 return "cached since %.4gs ago" % (now - gen_time,)
1617 elif ch is CACHE_MISS:
1618 return "generated in %.5fs" % (now - gen_time,)
1619 elif ch is CACHING_DISABLED:
1620 if "_cache_disable_reason" in self.execution_options:
1621 return "caching disabled (%s) %.5fs " % (
1622 self.execution_options["_cache_disable_reason"],
1623 now - gen_time,
1624 )
1625 else:
1626 return "caching disabled %.5fs" % (now - gen_time,)
1627 elif ch is NO_DIALECT_SUPPORT:
1628 return "dialect %s+%s does not support caching %.5fs" % (
1629 self.dialect.name,
1630 self.dialect.driver,
1631 now - gen_time,
1632 )
1633 else:
1634 return "unknown"
1635
1636 @property
1637 def executemany(self): # type: ignore[override]
1638 return self.execute_style in (
1639 ExecuteStyle.EXECUTEMANY,
1640 ExecuteStyle.INSERTMANYVALUES,
1641 )
1642
1643 @util.memoized_property
1644 def identifier_preparer(self):
1645 if self.compiled:
1646 return self.compiled.preparer
1647 elif "schema_translate_map" in self.execution_options:
1648 return self.dialect.identifier_preparer._with_schema_translate(
1649 self.execution_options["schema_translate_map"]
1650 )
1651 else:
1652 return self.dialect.identifier_preparer
1653
1654 @util.memoized_property
1655 def engine(self):
1656 return self.root_connection.engine
1657
1658 @util.memoized_property
1659 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1660 if TYPE_CHECKING:
1661 assert isinstance(self.compiled, SQLCompiler)
1662 return self.compiled.postfetch
1663
1664 @util.memoized_property
1665 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1666 if TYPE_CHECKING:
1667 assert isinstance(self.compiled, SQLCompiler)
1668 if self.isinsert:
1669 return self.compiled.insert_prefetch
1670 elif self.isupdate:
1671 return self.compiled.update_prefetch
1672 else:
1673 return ()
1674
1675 @util.memoized_property
1676 def no_parameters(self):
1677 return self.execution_options.get("no_parameters", False)
1678
1679 def _execute_scalar(
1680 self,
1681 stmt: str,
1682 type_: Optional[TypeEngine[Any]],
1683 parameters: Optional[_DBAPISingleExecuteParams] = None,
1684 ) -> Any:
1685 """Execute a string statement on the current cursor, returning a
1686 scalar result.
1687
1688 Used to fire off sequences, default phrases, and "select lastrowid"
1689 types of statements individually or in the context of a parent INSERT
1690 or UPDATE statement.
1691
1692 """
1693
1694 conn = self.root_connection
1695
1696 if "schema_translate_map" in self.execution_options:
1697 schema_translate_map = self.execution_options.get(
1698 "schema_translate_map", {}
1699 )
1700
1701 rst = self.identifier_preparer._render_schema_translates
1702 stmt = rst(stmt, schema_translate_map)
1703
1704 if not parameters:
1705 if self.dialect.positional:
1706 parameters = self.dialect.execute_sequence_format()
1707 else:
1708 parameters = {}
1709
1710 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1711 row = self.cursor.fetchone()
1712 if row is not None:
1713 r = row[0]
1714 else:
1715 r = None
1716 if type_ is not None:
1717 # apply type post processors to the result
1718 proc = type_._cached_result_processor(
1719 self.dialect, self.cursor.description[0][1]
1720 )
1721 if proc:
1722 return proc(r)
1723 return r
1724
1725 @util.memoized_property
1726 def connection(self):
1727 return self.root_connection
1728
1729 def _use_server_side_cursor(self):
1730 if not self.dialect.supports_server_side_cursors:
1731 return False
1732
1733 if self.dialect.server_side_cursors:
1734 # this is deprecated
1735 use_server_side = self.execution_options.get(
1736 "stream_results", True
1737 ) and (
1738 self.compiled
1739 and isinstance(self.compiled.statement, expression.Selectable)
1740 or (
1741 (
1742 not self.compiled
1743 or isinstance(
1744 self.compiled.statement, expression.TextClause
1745 )
1746 )
1747 and self.unicode_statement
1748 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1749 )
1750 )
1751 else:
1752 use_server_side = self.execution_options.get(
1753 "stream_results", False
1754 )
1755
1756 return use_server_side
1757
1758 def create_cursor(self) -> DBAPICursor:
1759 if (
1760 # inlining initial preference checks for SS cursors
1761 self.dialect.supports_server_side_cursors
1762 and (
1763 self.execution_options.get("stream_results", False)
1764 or (
1765 self.dialect.server_side_cursors
1766 and self._use_server_side_cursor()
1767 )
1768 )
1769 ):
1770 self._is_server_side = True
1771 return self.create_server_side_cursor()
1772 else:
1773 self._is_server_side = False
1774 return self.create_default_cursor()
1775
1776 def fetchall_for_returning(self, cursor):
1777 return cursor.fetchall()
1778
1779 def create_default_cursor(self) -> DBAPICursor:
1780 return self._dbapi_connection.cursor()
1781
1782 def create_server_side_cursor(self) -> DBAPICursor:
1783 raise NotImplementedError()
1784
1785 def pre_exec(self):
1786 pass
1787
1788 def get_out_parameter_values(self, names):
1789 raise NotImplementedError(
1790 "This dialect does not support OUT parameters"
1791 )
1792
1793 def post_exec(self):
1794 pass
1795
1796 def get_result_processor(self, type_, colname, coltype):
1797 """Return a 'result processor' for a given type as present in
1798 cursor.description.
1799
1800 This has a default implementation that dialects can override
1801 for context-sensitive result type handling.
1802
1803 """
1804 return type_._cached_result_processor(self.dialect, coltype)
1805
1806 def get_lastrowid(self):
1807 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1808
1809 This may involve calling special cursor functions, issuing a new SELECT
1810 on the cursor (or a new one), or returning a stored value that was
1811 calculated within post_exec().
1812
1813 This function will only be called for dialects which support "implicit"
1814 primary key generation, keep preexecute_autoincrement_sequences set to
1815 False, and when no explicit id value was bound to the statement.
1816
1817 The function is called once for an INSERT statement that would need to
1818 return the last inserted primary key for those dialects that make use
1819 of the lastrowid concept. In these cases, it is called directly after
1820 :meth:`.ExecutionContext.post_exec`.
1821
1822 """
1823 return self.cursor.lastrowid
1824
1825 def handle_dbapi_exception(self, e):
1826 pass
1827
1828 @util.non_memoized_property
1829 def rowcount(self) -> int:
1830 if self._rowcount is not None:
1831 return self._rowcount
1832 else:
1833 return self.cursor.rowcount
1834
1835 @property
1836 def _has_rowcount(self):
1837 return self._rowcount is not None
1838
1839 def supports_sane_rowcount(self):
1840 return self.dialect.supports_sane_rowcount
1841
1842 def supports_sane_multi_rowcount(self):
1843 return self.dialect.supports_sane_multi_rowcount
1844
1845 def _setup_result_proxy(self):
1846 exec_opt = self.execution_options
1847
1848 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1849 self._rowcount = self.cursor.rowcount
1850
1851 yp: Optional[Union[int, bool]]
1852 if self.is_crud or self.is_text:
1853 result = self._setup_dml_or_text_result()
1854 yp = False
1855 else:
1856 yp = exec_opt.get("yield_per", None)
1857 sr = self._is_server_side or exec_opt.get("stream_results", False)
1858 strategy = self.cursor_fetch_strategy
1859 if sr and strategy is _cursor._DEFAULT_FETCH:
1860 strategy = _cursor.BufferedRowCursorFetchStrategy(
1861 self.cursor, self.execution_options
1862 )
1863 cursor_description: _DBAPICursorDescription = (
1864 strategy.alternate_cursor_description
1865 or self.cursor.description
1866 )
1867 if cursor_description is None:
1868 strategy = _cursor._NO_CURSOR_DQL
1869
1870 result = _cursor.CursorResult(self, strategy, cursor_description)
1871
1872 compiled = self.compiled
1873
1874 if (
1875 compiled
1876 and not self.isddl
1877 and cast(SQLCompiler, compiled).has_out_parameters
1878 ):
1879 self._setup_out_parameters(result)
1880
1881 self._soft_closed = result._soft_closed
1882
1883 if yp:
1884 result = result.yield_per(yp)
1885
1886 return result
1887
1888 def _setup_out_parameters(self, result):
1889 compiled = cast(SQLCompiler, self.compiled)
1890
1891 out_bindparams = [
1892 (param, name)
1893 for param, name in compiled.bind_names.items()
1894 if param.isoutparam
1895 ]
1896 out_parameters = {}
1897
1898 for bindparam, raw_value in zip(
1899 [param for param, name in out_bindparams],
1900 self.get_out_parameter_values(
1901 [name for param, name in out_bindparams]
1902 ),
1903 ):
1904 type_ = bindparam.type
1905 impl_type = type_.dialect_impl(self.dialect)
1906 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1907 result_processor = impl_type.result_processor(
1908 self.dialect, dbapi_type
1909 )
1910 if result_processor is not None:
1911 raw_value = result_processor(raw_value)
1912 out_parameters[bindparam.key] = raw_value
1913
1914 result.out_parameters = out_parameters
1915
1916 def _setup_dml_or_text_result(self):
1917 compiled = cast(SQLCompiler, self.compiled)
1918
1919 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1920
1921 if self.isinsert:
1922 if (
1923 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1924 and compiled.effective_returning
1925 ):
1926 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1927 self.cursor,
1928 initial_buffer=self._insertmanyvalues_rows,
1929 # maintain alt cursor description if set by the
1930 # dialect, e.g. mssql preserves it
1931 alternate_description=(
1932 strategy.alternate_cursor_description
1933 ),
1934 )
1935
1936 if compiled.postfetch_lastrowid:
1937 self.inserted_primary_key_rows = (
1938 self._setup_ins_pk_from_lastrowid()
1939 )
1940 # else if not self._is_implicit_returning,
1941 # the default inserted_primary_key_rows accessor will
1942 # return an "empty" primary key collection when accessed.
1943
1944 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1945 strategy = _cursor.BufferedRowCursorFetchStrategy(
1946 self.cursor, self.execution_options
1947 )
1948
1949 if strategy is _cursor._NO_CURSOR_DML:
1950 cursor_description = None
1951 else:
1952 cursor_description = (
1953 strategy.alternate_cursor_description
1954 or self.cursor.description
1955 )
1956
1957 if cursor_description is None:
1958 strategy = _cursor._NO_CURSOR_DML
1959 elif self._num_sentinel_cols:
1960 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1961 # strip out the sentinel columns from cursor description
1962 # a similar logic is done to the rows only in CursorResult
1963 cursor_description = cursor_description[
1964 0 : -self._num_sentinel_cols
1965 ]
1966
1967 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1968 self, strategy, cursor_description
1969 )
1970
1971 if self.isinsert:
1972 if self._is_implicit_returning:
1973 rows = result.all()
1974
1975 self.returned_default_rows = rows
1976
1977 self.inserted_primary_key_rows = (
1978 self._setup_ins_pk_from_implicit_returning(result, rows)
1979 )
1980
1981 # test that it has a cursor metadata that is accurate. the
1982 # first row will have been fetched and current assumptions
1983 # are that the result has only one row, until executemany()
1984 # support is added here.
1985 assert result._metadata.returns_rows
1986
1987 # Insert statement has both return_defaults() and
1988 # returning(). rewind the result on the list of rows
1989 # we just used.
1990 if self._is_supplemental_returning:
1991 result._rewind(rows)
1992 else:
1993 result._soft_close()
1994 elif not self._is_explicit_returning:
1995 result._soft_close()
1996
1997 # we assume here the result does not return any rows.
1998 # *usually*, this will be true. However, some dialects
1999 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2000 # function so this is not necessarily true.
2001 # assert not result.returns_rows
2002
2003 elif self._is_implicit_returning:
2004 rows = result.all()
2005
2006 if rows:
2007 self.returned_default_rows = rows
2008 self._rowcount = len(rows)
2009
2010 if self._is_supplemental_returning:
2011 result._rewind(rows)
2012 else:
2013 result._soft_close()
2014
2015 # test that it has a cursor metadata that is accurate.
2016 # the rows have all been fetched however.
2017 assert result._metadata.returns_rows
2018
2019 elif not result._metadata.returns_rows:
2020 # no results, get rowcount
2021 # (which requires open cursor on some drivers)
2022 if self._rowcount is None:
2023 self._rowcount = self.cursor.rowcount
2024 result._soft_close()
2025 elif self.isupdate or self.isdelete:
2026 if self._rowcount is None:
2027 self._rowcount = self.cursor.rowcount
2028 return result
2029
2030 @util.memoized_property
2031 def inserted_primary_key_rows(self):
2032 # if no specific "get primary key" strategy was set up
2033 # during execution, return a "default" primary key based
2034 # on what's in the compiled_parameters and nothing else.
2035 return self._setup_ins_pk_from_empty()
2036
2037 def _setup_ins_pk_from_lastrowid(self):
2038 getter = cast(
2039 SQLCompiler, self.compiled
2040 )._inserted_primary_key_from_lastrowid_getter
2041 lastrowid = self.get_lastrowid()
2042 return [getter(lastrowid, self.compiled_parameters[0])]
2043
2044 def _setup_ins_pk_from_empty(self):
2045 getter = cast(
2046 SQLCompiler, self.compiled
2047 )._inserted_primary_key_from_lastrowid_getter
2048 return [getter(None, param) for param in self.compiled_parameters]
2049
2050 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2051 if not rows:
2052 return []
2053
2054 getter = cast(
2055 SQLCompiler, self.compiled
2056 )._inserted_primary_key_from_returning_getter
2057 compiled_params = self.compiled_parameters
2058
2059 return [
2060 getter(row, param) for row, param in zip(rows, compiled_params)
2061 ]
2062
2063 def lastrow_has_defaults(self):
2064 return (self.isinsert or self.isupdate) and bool(
2065 cast(SQLCompiler, self.compiled).postfetch
2066 )
2067
2068 def _prepare_set_input_sizes(
2069 self,
2070 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2071 """Given a cursor and ClauseParameters, prepare arguments
2072 in order to call the appropriate
2073 style of ``setinputsizes()`` on the cursor, using DB-API types
2074 from the bind parameter's ``TypeEngine`` objects.
2075
2076 This method only called by those dialects which set the
2077 :attr:`.Dialect.bind_typing` attribute to
2078 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2079 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2080 option.
2081
2082 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2083 for pg8000 and asyncpg, which has been changed to inline rendering
2084 of casts.
2085
2086 """
2087 if self.isddl or self.is_text:
2088 return None
2089
2090 compiled = cast(SQLCompiler, self.compiled)
2091
2092 inputsizes = compiled._get_set_input_sizes_lookup()
2093
2094 if inputsizes is None:
2095 return None
2096
2097 dialect = self.dialect
2098
2099 # all of the rest of this... cython?
2100
2101 if dialect._has_events:
2102 inputsizes = dict(inputsizes)
2103 dialect.dispatch.do_setinputsizes(
2104 inputsizes, self.cursor, self.statement, self.parameters, self
2105 )
2106
2107 if compiled.escaped_bind_names:
2108 escaped_bind_names = compiled.escaped_bind_names
2109 else:
2110 escaped_bind_names = None
2111
2112 if dialect.positional:
2113 items = [
2114 (key, compiled.binds[key])
2115 for key in compiled.positiontup or ()
2116 ]
2117 else:
2118 items = [
2119 (key, bindparam)
2120 for bindparam, key in compiled.bind_names.items()
2121 ]
2122
2123 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2124 for key, bindparam in items:
2125 if bindparam in compiled.literal_execute_params:
2126 continue
2127
2128 if key in self._expanded_parameters:
2129 if is_tuple_type(bindparam.type):
2130 num = len(bindparam.type.types)
2131 dbtypes = inputsizes[bindparam]
2132 generic_inputsizes.extend(
2133 (
2134 (
2135 escaped_bind_names.get(paramname, paramname)
2136 if escaped_bind_names is not None
2137 else paramname
2138 ),
2139 dbtypes[idx % num],
2140 bindparam.type.types[idx % num],
2141 )
2142 for idx, paramname in enumerate(
2143 self._expanded_parameters[key]
2144 )
2145 )
2146 else:
2147 dbtype = inputsizes.get(bindparam, None)
2148 generic_inputsizes.extend(
2149 (
2150 (
2151 escaped_bind_names.get(paramname, paramname)
2152 if escaped_bind_names is not None
2153 else paramname
2154 ),
2155 dbtype,
2156 bindparam.type,
2157 )
2158 for paramname in self._expanded_parameters[key]
2159 )
2160 else:
2161 dbtype = inputsizes.get(bindparam, None)
2162
2163 escaped_name = (
2164 escaped_bind_names.get(key, key)
2165 if escaped_bind_names is not None
2166 else key
2167 )
2168
2169 generic_inputsizes.append(
2170 (escaped_name, dbtype, bindparam.type)
2171 )
2172
2173 return generic_inputsizes
2174
2175 def _exec_default(self, column, default, type_):
2176 if default.is_sequence:
2177 return self.fire_sequence(default, type_)
2178 elif default.is_callable:
2179 # this codepath is not normally used as it's inlined
2180 # into _process_execute_defaults
2181 self.current_column = column
2182 return default.arg(self)
2183 elif default.is_clause_element:
2184 return self._exec_default_clause_element(column, default, type_)
2185 else:
2186 # this codepath is not normally used as it's inlined
2187 # into _process_execute_defaults
2188 return default.arg
2189
2190 def _exec_default_clause_element(self, column, default, type_):
2191 # execute a default that's a complete clause element. Here, we have
2192 # to re-implement a miniature version of the compile->parameters->
2193 # cursor.execute() sequence, since we don't want to modify the state
2194 # of the connection / result in progress or create new connection/
2195 # result objects etc.
2196 # .. versionchanged:: 1.4
2197
2198 if not default._arg_is_typed:
2199 default_arg = expression.type_coerce(default.arg, type_)
2200 else:
2201 default_arg = default.arg
2202 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2203 compiled_params = compiled.construct_params()
2204 processors = compiled._bind_processors
2205 if compiled.positional:
2206 parameters = self.dialect.execute_sequence_format(
2207 [
2208 (
2209 processors[key](compiled_params[key]) # type: ignore
2210 if key in processors
2211 else compiled_params[key]
2212 )
2213 for key in compiled.positiontup or ()
2214 ]
2215 )
2216 else:
2217 parameters = {
2218 key: (
2219 processors[key](compiled_params[key]) # type: ignore
2220 if key in processors
2221 else compiled_params[key]
2222 )
2223 for key in compiled_params
2224 }
2225 return self._execute_scalar(
2226 str(compiled), type_, parameters=parameters
2227 )
2228
2229 current_parameters: Optional[_CoreSingleExecuteParams] = None
2230 """A dictionary of parameters applied to the current row.
2231
2232 This attribute is only available in the context of a user-defined default
2233 generation function, e.g. as described at :ref:`context_default_functions`.
2234 It consists of a dictionary which includes entries for each column/value
2235 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2236 dictionary will be the key value of each :class:`_schema.Column`,
2237 which is usually
2238 synonymous with the name.
2239
2240 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2241 does not accommodate for the "multi-values" feature of the
2242 :meth:`_expression.Insert.values` method. The
2243 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2244 preferred.
2245
2246 .. seealso::
2247
2248 :meth:`.DefaultExecutionContext.get_current_parameters`
2249
2250 :ref:`context_default_functions`
2251
2252 """
2253
2254 def get_current_parameters(self, isolate_multiinsert_groups=True):
2255 """Return a dictionary of parameters applied to the current row.
2256
2257 This method can only be used in the context of a user-defined default
2258 generation function, e.g. as described at
2259 :ref:`context_default_functions`. When invoked, a dictionary is
2260 returned which includes entries for each column/value pair that is part
2261 of the INSERT or UPDATE statement. The keys of the dictionary will be
2262 the key value of each :class:`_schema.Column`,
2263 which is usually synonymous
2264 with the name.
2265
2266 :param isolate_multiinsert_groups=True: indicates that multi-valued
2267 INSERT constructs created using :meth:`_expression.Insert.values`
2268 should be
2269 handled by returning only the subset of parameters that are local
2270 to the current column default invocation. When ``False``, the
2271 raw parameters of the statement are returned including the
2272 naming convention used in the case of multi-valued INSERT.
2273
2274 .. versionadded:: 1.2 added
2275 :meth:`.DefaultExecutionContext.get_current_parameters`
2276 which provides more functionality over the existing
2277 :attr:`.DefaultExecutionContext.current_parameters`
2278 attribute.
2279
2280 .. seealso::
2281
2282 :attr:`.DefaultExecutionContext.current_parameters`
2283
2284 :ref:`context_default_functions`
2285
2286 """
2287 try:
2288 parameters = self.current_parameters
2289 column = self.current_column
2290 except AttributeError:
2291 raise exc.InvalidRequestError(
2292 "get_current_parameters() can only be invoked in the "
2293 "context of a Python side column default function"
2294 )
2295 else:
2296 assert column is not None
2297 assert parameters is not None
2298 compile_state = cast(
2299 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2300 )
2301 assert compile_state is not None
2302 if (
2303 isolate_multiinsert_groups
2304 and dml.isinsert(compile_state)
2305 and compile_state._has_multi_parameters
2306 ):
2307 if column._is_multiparam_column:
2308 index = column.index + 1
2309 d = {column.original.key: parameters[column.key]}
2310 else:
2311 d = {column.key: parameters[column.key]}
2312 index = 0
2313 assert compile_state._dict_parameters is not None
2314 keys = compile_state._dict_parameters.keys()
2315 d.update(
2316 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2317 )
2318 return d
2319 else:
2320 return parameters
2321
2322 def get_insert_default(self, column):
2323 if column.default is None:
2324 return None
2325 else:
2326 return self._exec_default(column, column.default, column.type)
2327
2328 def get_update_default(self, column):
2329 if column.onupdate is None:
2330 return None
2331 else:
2332 return self._exec_default(column, column.onupdate, column.type)
2333
2334 def _process_execute_defaults(self):
2335 compiled = cast(SQLCompiler, self.compiled)
2336
2337 key_getter = compiled._within_exec_param_key_getter
2338
2339 sentinel_counter = 0
2340
2341 if compiled.insert_prefetch:
2342 prefetch_recs = [
2343 (
2344 c,
2345 key_getter(c),
2346 c._default_description_tuple,
2347 self.get_insert_default,
2348 )
2349 for c in compiled.insert_prefetch
2350 ]
2351 elif compiled.update_prefetch:
2352 prefetch_recs = [
2353 (
2354 c,
2355 key_getter(c),
2356 c._onupdate_description_tuple,
2357 self.get_update_default,
2358 )
2359 for c in compiled.update_prefetch
2360 ]
2361 else:
2362 prefetch_recs = []
2363
2364 for param in self.compiled_parameters:
2365 self.current_parameters = param
2366
2367 for (
2368 c,
2369 param_key,
2370 (arg, is_scalar, is_callable, is_sentinel),
2371 fallback,
2372 ) in prefetch_recs:
2373 if is_sentinel:
2374 param[param_key] = sentinel_counter
2375 sentinel_counter += 1
2376 elif is_scalar:
2377 param[param_key] = arg
2378 elif is_callable:
2379 self.current_column = c
2380 param[param_key] = arg(self)
2381 else:
2382 val = fallback(c)
2383 if val is not None:
2384 param[param_key] = val
2385
2386 del self.current_parameters
2387
2388
2389DefaultDialect.execution_ctx_cls = DefaultExecutionContext