1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import MutableSequence
34from typing import Optional
35from typing import Sequence
36from typing import Set
37from typing import Tuple
38from typing import Type
39from typing import TYPE_CHECKING
40from typing import Union
41import weakref
42
43from . import characteristics
44from . import cursor as _cursor
45from . import interfaces
46from .base import Connection
47from .interfaces import CacheStats
48from .interfaces import DBAPICursor
49from .interfaces import Dialect
50from .interfaces import ExecuteStyle
51from .interfaces import ExecutionContext
52from .reflection import ObjectKind
53from .reflection import ObjectScope
54from .. import event
55from .. import exc
56from .. import pool
57from .. import util
58from ..sql import compiler
59from ..sql import dml
60from ..sql import expression
61from ..sql import type_api
62from ..sql import util as sql_util
63from ..sql._typing import is_tuple_type
64from ..sql.base import _NoArg
65from ..sql.compiler import DDLCompiler
66from ..sql.compiler import InsertmanyvaluesSentinelOpts
67from ..sql.compiler import SQLCompiler
68from ..sql.elements import quoted_name
69from ..util.typing import Literal
70from ..util.typing import TupleAny
71from ..util.typing import Unpack
72
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _DBAPISingleExecuteParams
84 from .interfaces import _ExecuteOptions
85 from .interfaces import _MutableCoreSingleExecuteParams
86 from .interfaces import _ParamStyle
87 from .interfaces import ConnectArgsType
88 from .interfaces import DBAPIConnection
89 from .interfaces import DBAPIModule
90 from .interfaces import IsolationLevel
91 from .row import Row
92 from .url import URL
93 from ..event import _ListenerFnType
94 from ..pool import Pool
95 from ..pool import PoolProxiedConnection
96 from ..sql import Executable
97 from ..sql.compiler import Compiled
98 from ..sql.compiler import Linting
99 from ..sql.compiler import ResultColumnsEntry
100 from ..sql.dml import DMLState
101 from ..sql.dml import UpdateBase
102 from ..sql.elements import BindParameter
103 from ..sql.schema import Column
104 from ..sql.type_api import _BindProcessorType
105 from ..sql.type_api import _ResultProcessorType
106 from ..sql.type_api import TypeEngine
107
108
109# When we're handed literal SQL, ensure it's a SELECT query
110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
111
112
113(
114 CACHE_HIT,
115 CACHE_MISS,
116 CACHING_DISABLED,
117 NO_CACHE_KEY,
118 NO_DIALECT_SUPPORT,
119) = list(CacheStats)
120
121
122class DefaultDialect(Dialect):
123 """Default implementation of Dialect"""
124
125 statement_compiler = compiler.SQLCompiler
126 ddl_compiler = compiler.DDLCompiler
127 type_compiler_cls = compiler.GenericTypeCompiler
128
129 preparer = compiler.IdentifierPreparer
130 supports_alter = True
131 supports_comments = False
132 supports_constraint_comments = False
133 inline_comments = False
134 supports_statement_cache = True
135
136 div_is_floordiv = True
137
138 bind_typing = interfaces.BindTyping.NONE
139
140 include_set_input_sizes: Optional[Set[Any]] = None
141 exclude_set_input_sizes: Optional[Set[Any]] = None
142
143 # the first value we'd get for an autoincrement column.
144 default_sequence_base = 1
145
146 # most DBAPIs happy with this for execute().
147 # not cx_oracle.
148 execute_sequence_format = tuple
149
150 supports_schemas = True
151 supports_views = True
152 supports_sequences = False
153 sequences_optional = False
154 preexecute_autoincrement_sequences = False
155 supports_identity_columns = False
156 postfetch_lastrowid = True
157 favor_returning_over_lastrowid = False
158 insert_null_pk_still_autoincrements = False
159 update_returning = False
160 delete_returning = False
161 update_returning_multifrom = False
162 delete_returning_multifrom = False
163 insert_returning = False
164
165 cte_follows_insert = False
166
167 supports_native_enum = False
168 supports_native_boolean = False
169 supports_native_uuid = False
170 returns_native_bytes = False
171
172 non_native_boolean_check_constraint = True
173
174 supports_simple_order_by_label = True
175
176 tuple_in_values = False
177
178 connection_characteristics = util.immutabledict(
179 {
180 "isolation_level": characteristics.IsolationLevelCharacteristic(),
181 "logging_token": characteristics.LoggingTokenCharacteristic(),
182 }
183 )
184
185 engine_config_types: Mapping[str, Any] = util.immutabledict(
186 {
187 "pool_timeout": util.asint,
188 "echo": util.bool_or_str("debug"),
189 "echo_pool": util.bool_or_str("debug"),
190 "pool_recycle": util.asint,
191 "pool_size": util.asint,
192 "max_overflow": util.asint,
193 "future": util.asbool,
194 }
195 )
196
197 # if the NUMERIC type
198 # returns decimal.Decimal.
199 # *not* the FLOAT type however.
200 supports_native_decimal = False
201
202 name = "default"
203
204 # length at which to truncate
205 # any identifier.
206 max_identifier_length = 9999
207 _user_defined_max_identifier_length: Optional[int] = None
208
209 isolation_level: Optional[str] = None
210
211 # sub-categories of max_identifier_length.
212 # currently these accommodate for MySQL which allows alias names
213 # of 255 but DDL names only of 64.
214 max_index_name_length: Optional[int] = None
215 max_constraint_name_length: Optional[int] = None
216
217 supports_sane_rowcount = True
218 supports_sane_multi_rowcount = True
219 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
220 default_paramstyle = "named"
221
222 supports_default_values = False
223 """dialect supports INSERT... DEFAULT VALUES syntax"""
224
225 supports_default_metavalue = False
226 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
227
228 default_metavalue_token = "DEFAULT"
229 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
230 parenthesis."""
231
232 # not sure if this is a real thing but the compiler will deliver it
233 # if this is the only flag enabled.
234 supports_empty_insert = True
235 """dialect supports INSERT () VALUES ()"""
236
237 supports_multivalues_insert = False
238
239 use_insertmanyvalues: bool = False
240
241 use_insertmanyvalues_wo_returning: bool = False
242
243 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
244 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
245 )
246
247 insertmanyvalues_page_size: int = 1000
248 insertmanyvalues_max_parameters = 32700
249
250 supports_is_distinct_from = True
251
252 supports_server_side_cursors = False
253
254 server_side_cursors = False
255
256 # extra record-level locking features (#4860)
257 supports_for_update_of = False
258
259 server_version_info = None
260
261 default_schema_name: Optional[str] = None
262
263 # indicates symbol names are
264 # UPPERCASED if they are case insensitive
265 # within the database.
266 # if this is True, the methods normalize_name()
267 # and denormalize_name() must be provided.
268 requires_name_normalize = False
269
270 is_async = False
271
272 has_terminate = False
273
274 # TODO: this is not to be part of 2.0. implement rudimentary binary
275 # literals for SQLite, PostgreSQL, MySQL only within
276 # _Binary.literal_processor
277 _legacy_binary_type_literal_encoding = "utf-8"
278
279 @util.deprecated_params(
280 empty_in_strategy=(
281 "1.4",
282 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
283 "deprecated, and no longer has any effect. All IN expressions "
284 "are now rendered using "
285 'the "expanding parameter" strategy which renders a set of bound'
286 'expressions, or an "empty set" SELECT, at statement execution'
287 "time.",
288 ),
289 server_side_cursors=(
290 "1.4",
291 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
292 "is deprecated and will be removed in a future release. Please "
293 "use the "
294 ":paramref:`_engine.Connection.execution_options.stream_results` "
295 "parameter.",
296 ),
297 )
298 def __init__(
299 self,
300 paramstyle: Optional[_ParamStyle] = None,
301 isolation_level: Optional[IsolationLevel] = None,
302 dbapi: Optional[ModuleType] = None,
303 implicit_returning: Literal[True] = True,
304 supports_native_boolean: Optional[bool] = None,
305 max_identifier_length: Optional[int] = None,
306 label_length: Optional[int] = None,
307 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
308 use_insertmanyvalues: Optional[bool] = None,
309 # util.deprecated_params decorator cannot render the
310 # Linting.NO_LINTING constant
311 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
312 server_side_cursors: bool = False,
313 **kwargs: Any,
314 ):
315 if server_side_cursors:
316 if not self.supports_server_side_cursors:
317 raise exc.ArgumentError(
318 "Dialect %s does not support server side cursors" % self
319 )
320 else:
321 self.server_side_cursors = True
322
323 if getattr(self, "use_setinputsizes", False):
324 util.warn_deprecated(
325 "The dialect-level use_setinputsizes attribute is "
326 "deprecated. Please use "
327 "bind_typing = BindTyping.SETINPUTSIZES",
328 "2.0",
329 )
330 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
331
332 self.positional = False
333 self._ischema = None
334
335 self.dbapi = dbapi
336
337 if paramstyle is not None:
338 self.paramstyle = paramstyle
339 elif self.dbapi is not None:
340 self.paramstyle = self.dbapi.paramstyle
341 else:
342 self.paramstyle = self.default_paramstyle
343 self.positional = self.paramstyle in (
344 "qmark",
345 "format",
346 "numeric",
347 "numeric_dollar",
348 )
349 self.identifier_preparer = self.preparer(self)
350 self._on_connect_isolation_level = isolation_level
351
352 legacy_tt_callable = getattr(self, "type_compiler", None)
353 if legacy_tt_callable is not None:
354 tt_callable = cast(
355 Type[compiler.GenericTypeCompiler],
356 self.type_compiler,
357 )
358 else:
359 tt_callable = self.type_compiler_cls
360
361 self.type_compiler_instance = self.type_compiler = tt_callable(self)
362
363 if supports_native_boolean is not None:
364 self.supports_native_boolean = supports_native_boolean
365
366 self._user_defined_max_identifier_length = max_identifier_length
367 if self._user_defined_max_identifier_length:
368 self.max_identifier_length = (
369 self._user_defined_max_identifier_length
370 )
371 self.label_length = label_length
372 self.compiler_linting = compiler_linting
373
374 if use_insertmanyvalues is not None:
375 self.use_insertmanyvalues = use_insertmanyvalues
376
377 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
378 self.insertmanyvalues_page_size = insertmanyvalues_page_size
379
380 @property
381 @util.deprecated(
382 "2.0",
383 "full_returning is deprecated, please use insert_returning, "
384 "update_returning, delete_returning",
385 )
386 def full_returning(self):
387 return (
388 self.insert_returning
389 and self.update_returning
390 and self.delete_returning
391 )
392
393 @util.memoized_property
394 def insert_executemany_returning(self):
395 """Default implementation for insert_executemany_returning, if not
396 otherwise overridden by the specific dialect.
397
398 The default dialect determines "insert_executemany_returning" is
399 available if the dialect in use has opted into using the
400 "use_insertmanyvalues" feature. If they haven't opted into that, then
401 this attribute is False, unless the dialect in question overrides this
402 and provides some other implementation (such as the Oracle Database
403 dialects).
404
405 """
406 return self.insert_returning and self.use_insertmanyvalues
407
408 @util.memoized_property
409 def insert_executemany_returning_sort_by_parameter_order(self):
410 """Default implementation for
411 insert_executemany_returning_deterministic_order, if not otherwise
412 overridden by the specific dialect.
413
414 The default dialect determines "insert_executemany_returning" can have
415 deterministic order only if the dialect in use has opted into using the
416 "use_insertmanyvalues" feature, which implements deterministic ordering
417 using client side sentinel columns only by default. The
418 "insertmanyvalues" feature also features alternate forms that can
419 use server-generated PK values as "sentinels", but those are only
420 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
421 bitflag enables those alternate SQL forms, which are disabled
422 by default.
423
424 If the dialect in use hasn't opted into that, then this attribute is
425 False, unless the dialect in question overrides this and provides some
426 other implementation (such as the Oracle Database dialects).
427
428 """
429 return self.insert_returning and self.use_insertmanyvalues
430
431 update_executemany_returning = False
432 delete_executemany_returning = False
433
434 @util.memoized_property
435 def loaded_dbapi(self) -> DBAPIModule:
436 if self.dbapi is None:
437 raise exc.InvalidRequestError(
438 f"Dialect {self} does not have a Python DBAPI established "
439 "and cannot be used for actual database interaction"
440 )
441 return self.dbapi
442
443 @util.memoized_property
444 def _bind_typing_render_casts(self):
445 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
446
447 def _ensure_has_table_connection(self, arg: Connection) -> None:
448 if not isinstance(arg, Connection):
449 raise exc.ArgumentError(
450 "The argument passed to Dialect.has_table() should be a "
451 "%s, got %s. "
452 "Additionally, the Dialect.has_table() method is for "
453 "internal dialect "
454 "use only; please use "
455 "``inspect(some_engine).has_table(<tablename>>)`` "
456 "for public API use." % (Connection, type(arg))
457 )
458
459 @util.memoized_property
460 def _supports_statement_cache(self):
461 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
462 if ssc is None:
463 util.warn(
464 "Dialect %s:%s will not make use of SQL compilation caching "
465 "as it does not set the 'supports_statement_cache' attribute "
466 "to ``True``. This can have "
467 "significant performance implications including some "
468 "performance degradations in comparison to prior SQLAlchemy "
469 "versions. Dialect maintainers should seek to set this "
470 "attribute to True after appropriate development and testing "
471 "for SQLAlchemy 1.4 caching support. Alternatively, this "
472 "attribute may be set to False which will disable this "
473 "warning." % (self.name, self.driver),
474 code="cprf",
475 )
476
477 return bool(ssc)
478
479 @util.memoized_property
480 def _type_memos(self):
481 return weakref.WeakKeyDictionary()
482
483 @property
484 def dialect_description(self): # type: ignore[override]
485 return self.name + "+" + self.driver
486
487 @property
488 def supports_sane_rowcount_returning(self):
489 """True if this dialect supports sane rowcount even if RETURNING is
490 in use.
491
492 For dialects that don't support RETURNING, this is synonymous with
493 ``supports_sane_rowcount``.
494
495 """
496 return self.supports_sane_rowcount
497
498 @classmethod
499 def get_pool_class(cls, url: URL) -> Type[Pool]:
500 default: Type[pool.Pool]
501 if cls.is_async:
502 default = pool.AsyncAdaptedQueuePool
503 else:
504 default = pool.QueuePool
505
506 return getattr(cls, "poolclass", default)
507
508 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
509 return self.get_pool_class(url)
510
511 @classmethod
512 def load_provisioning(cls):
513 package = ".".join(cls.__module__.split(".")[0:-1])
514 try:
515 __import__(package + ".provision")
516 except ImportError:
517 pass
518
519 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
520 if self._on_connect_isolation_level is not None:
521
522 def builtin_connect(dbapi_conn, conn_rec):
523 self._assert_and_set_isolation_level(
524 dbapi_conn, self._on_connect_isolation_level
525 )
526
527 return builtin_connect
528 else:
529 return None
530
531 def initialize(self, connection: Connection) -> None:
532 try:
533 self.server_version_info = self._get_server_version_info(
534 connection
535 )
536 except NotImplementedError:
537 self.server_version_info = None
538 try:
539 self.default_schema_name = self._get_default_schema_name(
540 connection
541 )
542 except NotImplementedError:
543 self.default_schema_name = None
544
545 try:
546 self.default_isolation_level = self.get_default_isolation_level(
547 connection.connection.dbapi_connection
548 )
549 except NotImplementedError:
550 self.default_isolation_level = None
551
552 if not self._user_defined_max_identifier_length:
553 max_ident_length = self._check_max_identifier_length(connection)
554 if max_ident_length:
555 self.max_identifier_length = max_ident_length
556
557 if (
558 self.label_length
559 and self.label_length > self.max_identifier_length
560 ):
561 raise exc.ArgumentError(
562 "Label length of %d is greater than this dialect's"
563 " maximum identifier length of %d"
564 % (self.label_length, self.max_identifier_length)
565 )
566
567 def on_connect(self) -> Optional[Callable[[Any], None]]:
568 # inherits the docstring from interfaces.Dialect.on_connect
569 return None
570
571 def _check_max_identifier_length(self, connection):
572 """Perform a connection / server version specific check to determine
573 the max_identifier_length.
574
575 If the dialect's class level max_identifier_length should be used,
576 can return None.
577
578 """
579 return None
580
581 def get_default_isolation_level(self, dbapi_conn):
582 """Given a DBAPI connection, return its isolation level, or
583 a default isolation level if one cannot be retrieved.
584
585 May be overridden by subclasses in order to provide a
586 "fallback" isolation level for databases that cannot reliably
587 retrieve the actual isolation level.
588
589 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
590 method, propagating any exceptions raised.
591
592 """
593 return self.get_isolation_level(dbapi_conn)
594
595 def type_descriptor(self, typeobj):
596 """Provide a database-specific :class:`.TypeEngine` object, given
597 the generic object which comes from the types module.
598
599 This method looks for a dictionary called
600 ``colspecs`` as a class or instance-level variable,
601 and passes on to :func:`_types.adapt_type`.
602
603 """
604 return type_api.adapt_type(typeobj, self.colspecs)
605
606 def has_index(self, connection, table_name, index_name, schema=None, **kw):
607 if not self.has_table(connection, table_name, schema=schema, **kw):
608 return False
609 for idx in self.get_indexes(
610 connection, table_name, schema=schema, **kw
611 ):
612 if idx["name"] == index_name:
613 return True
614 else:
615 return False
616
617 def has_schema(
618 self, connection: Connection, schema_name: str, **kw: Any
619 ) -> bool:
620 return schema_name in self.get_schema_names(connection, **kw)
621
622 def validate_identifier(self, ident: str) -> None:
623 if len(ident) > self.max_identifier_length:
624 raise exc.IdentifierError(
625 "Identifier '%s' exceeds maximum length of %d characters"
626 % (ident, self.max_identifier_length)
627 )
628
629 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
630 # inherits the docstring from interfaces.Dialect.connect
631 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
632
633 def create_connect_args(self, url: URL) -> ConnectArgsType:
634 # inherits the docstring from interfaces.Dialect.create_connect_args
635 opts = url.translate_connect_args()
636 opts.update(url.query)
637 return ([], opts)
638
639 def set_engine_execution_options(
640 self, engine: Engine, opts: Mapping[str, Any]
641 ) -> None:
642 supported_names = set(self.connection_characteristics).intersection(
643 opts
644 )
645 if supported_names:
646 characteristics: Mapping[str, Any] = util.immutabledict(
647 (name, opts[name]) for name in supported_names
648 )
649
650 @event.listens_for(engine, "engine_connect")
651 def set_connection_characteristics(connection):
652 self._set_connection_characteristics(
653 connection, characteristics
654 )
655
656 def set_connection_execution_options(
657 self, connection: Connection, opts: Mapping[str, Any]
658 ) -> None:
659 supported_names = set(self.connection_characteristics).intersection(
660 opts
661 )
662 if supported_names:
663 characteristics: Mapping[str, Any] = util.immutabledict(
664 (name, opts[name]) for name in supported_names
665 )
666 self._set_connection_characteristics(connection, characteristics)
667
668 def _set_connection_characteristics(self, connection, characteristics):
669 characteristic_values = [
670 (name, self.connection_characteristics[name], value)
671 for name, value in characteristics.items()
672 ]
673
674 if connection.in_transaction():
675 trans_objs = [
676 (name, obj)
677 for name, obj, _ in characteristic_values
678 if obj.transactional
679 ]
680 if trans_objs:
681 raise exc.InvalidRequestError(
682 "This connection has already initialized a SQLAlchemy "
683 "Transaction() object via begin() or autobegin; "
684 "%s may not be altered unless rollback() or commit() "
685 "is called first."
686 % (", ".join(name for name, obj in trans_objs))
687 )
688
689 dbapi_connection = connection.connection.dbapi_connection
690 for _, characteristic, value in characteristic_values:
691 characteristic.set_connection_characteristic(
692 self, connection, dbapi_connection, value
693 )
694 connection.connection._connection_record.finalize_callback.append(
695 functools.partial(self._reset_characteristics, characteristics)
696 )
697
698 def _reset_characteristics(self, characteristics, dbapi_connection):
699 for characteristic_name in characteristics:
700 characteristic = self.connection_characteristics[
701 characteristic_name
702 ]
703 characteristic.reset_characteristic(self, dbapi_connection)
704
705 def do_begin(self, dbapi_connection):
706 pass
707
708 def do_rollback(self, dbapi_connection):
709 dbapi_connection.rollback()
710
711 def do_commit(self, dbapi_connection):
712 dbapi_connection.commit()
713
714 def do_terminate(self, dbapi_connection):
715 self.do_close(dbapi_connection)
716
717 def do_close(self, dbapi_connection):
718 dbapi_connection.close()
719
720 @util.memoized_property
721 def _dialect_specific_select_one(self):
722 return str(expression.select(1).compile(dialect=self))
723
724 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
725 try:
726 return self.do_ping(dbapi_connection)
727 except self.loaded_dbapi.Error as err:
728 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
729
730 if self._has_events:
731 try:
732 Connection._handle_dbapi_exception_noconnection(
733 err,
734 self,
735 is_disconnect=is_disconnect,
736 invalidate_pool_on_disconnect=False,
737 is_pre_ping=True,
738 )
739 except exc.StatementError as new_err:
740 is_disconnect = new_err.connection_invalidated
741
742 if is_disconnect:
743 return False
744 else:
745 raise
746
747 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
748 cursor = dbapi_connection.cursor()
749 try:
750 cursor.execute(self._dialect_specific_select_one)
751 finally:
752 cursor.close()
753 return True
754
755 def create_xid(self):
756 """Create a random two-phase transaction ID.
757
758 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
759 do_commit_twophase(). Its format is unspecified.
760 """
761
762 return "_sa_%032x" % random.randint(0, 2**128)
763
764 def do_savepoint(self, connection, name):
765 connection.execute(expression.SavepointClause(name))
766
767 def do_rollback_to_savepoint(self, connection, name):
768 connection.execute(expression.RollbackToSavepointClause(name))
769
770 def do_release_savepoint(self, connection, name):
771 connection.execute(expression.ReleaseSavepointClause(name))
772
773 def _deliver_insertmanyvalues_batches(
774 self,
775 connection,
776 cursor,
777 statement,
778 parameters,
779 generic_setinputsizes,
780 context,
781 ):
782 context = cast(DefaultExecutionContext, context)
783 compiled = cast(SQLCompiler, context.compiled)
784
785 _composite_sentinel_proc: Sequence[
786 Optional[_ResultProcessorType[Any]]
787 ] = ()
788 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
789 _sentinel_proc_initialized: bool = False
790
791 compiled_parameters = context.compiled_parameters
792
793 imv = compiled._insertmanyvalues
794 assert imv is not None
795
796 is_returning: Final[bool] = bool(compiled.effective_returning)
797 batch_size = context.execution_options.get(
798 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
799 )
800
801 if compiled.schema_translate_map:
802 schema_translate_map = context.execution_options.get(
803 "schema_translate_map", {}
804 )
805 else:
806 schema_translate_map = None
807
808 if is_returning:
809 result: Optional[List[Any]] = []
810 context._insertmanyvalues_rows = result
811
812 sort_by_parameter_order = imv.sort_by_parameter_order
813
814 else:
815 sort_by_parameter_order = False
816 result = None
817
818 for imv_batch in compiled._deliver_insertmanyvalues_batches(
819 statement,
820 parameters,
821 compiled_parameters,
822 generic_setinputsizes,
823 batch_size,
824 sort_by_parameter_order,
825 schema_translate_map,
826 ):
827 yield imv_batch
828
829 if is_returning:
830
831 try:
832 rows = context.fetchall_for_returning(cursor)
833 except BaseException as be:
834 connection._handle_dbapi_exception(
835 be,
836 sql_util._long_statement(imv_batch.replaced_statement),
837 imv_batch.replaced_parameters,
838 None,
839 context,
840 is_sub_exec=True,
841 )
842
843 # I would have thought "is_returning: Final[bool]"
844 # would have assured this but pylance thinks not
845 assert result is not None
846
847 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
848 composite_sentinel = imv.num_sentinel_columns > 1
849 if imv.implicit_sentinel:
850 # for implicit sentinel, which is currently single-col
851 # integer autoincrement, do a simple sort.
852 assert not composite_sentinel
853 result.extend(
854 sorted(rows, key=operator.itemgetter(-1))
855 )
856 continue
857
858 # otherwise, create dictionaries to match up batches
859 # with parameters
860 assert imv.sentinel_param_keys
861 assert imv.sentinel_columns
862
863 _nsc = imv.num_sentinel_columns
864
865 if not _sentinel_proc_initialized:
866 if composite_sentinel:
867 _composite_sentinel_proc = [
868 col.type._cached_result_processor(
869 self, cursor_desc[1]
870 )
871 for col, cursor_desc in zip(
872 imv.sentinel_columns,
873 cursor.description[-_nsc:],
874 )
875 ]
876 else:
877 _scalar_sentinel_proc = (
878 imv.sentinel_columns[0]
879 ).type._cached_result_processor(
880 self, cursor.description[-1][1]
881 )
882 _sentinel_proc_initialized = True
883
884 rows_by_sentinel: Union[
885 Dict[Tuple[Any, ...], Any],
886 Dict[Any, Any],
887 ]
888 if composite_sentinel:
889 rows_by_sentinel = {
890 tuple(
891 (proc(val) if proc else val)
892 for val, proc in zip(
893 row[-_nsc:], _composite_sentinel_proc
894 )
895 ): row
896 for row in rows
897 }
898 elif _scalar_sentinel_proc:
899 rows_by_sentinel = {
900 _scalar_sentinel_proc(row[-1]): row for row in rows
901 }
902 else:
903 rows_by_sentinel = {row[-1]: row for row in rows}
904
905 if len(rows_by_sentinel) != len(imv_batch.batch):
906 # see test_insert_exec.py::
907 # IMVSentinelTest::test_sentinel_incorrect_rowcount
908 # for coverage / demonstration
909 raise exc.InvalidRequestError(
910 f"Sentinel-keyed result set did not produce "
911 f"correct number of rows {len(imv_batch.batch)}; "
912 "produced "
913 f"{len(rows_by_sentinel)}. Please ensure the "
914 "sentinel column is fully unique and populated in "
915 "all cases."
916 )
917
918 try:
919 ordered_rows = [
920 rows_by_sentinel[sentinel_keys]
921 for sentinel_keys in imv_batch.sentinel_values
922 ]
923 except KeyError as ke:
924 # see test_insert_exec.py::
925 # IMVSentinelTest::test_sentinel_cant_match_keys
926 # for coverage / demonstration
927 raise exc.InvalidRequestError(
928 f"Can't match sentinel values in result set to "
929 f"parameter sets; key {ke.args[0]!r} was not "
930 "found. "
931 "There may be a mismatch between the datatype "
932 "passed to the DBAPI driver vs. that which it "
933 "returns in a result row. Ensure the given "
934 "Python value matches the expected result type "
935 "*exactly*, taking care to not rely upon implicit "
936 "conversions which may occur such as when using "
937 "strings in place of UUID or integer values, etc. "
938 ) from ke
939
940 result.extend(ordered_rows)
941
942 else:
943 result.extend(rows)
944
945 def do_executemany(self, cursor, statement, parameters, context=None):
946 cursor.executemany(statement, parameters)
947
948 def do_execute(self, cursor, statement, parameters, context=None):
949 cursor.execute(statement, parameters)
950
951 def do_execute_no_params(self, cursor, statement, context=None):
952 cursor.execute(statement)
953
954 def is_disconnect(
955 self,
956 e: DBAPIModule.Error,
957 connection: Union[
958 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
959 ],
960 cursor: Optional[interfaces.DBAPICursor],
961 ) -> bool:
962 return False
963
964 @util.memoized_instancemethod
965 def _gen_allowed_isolation_levels(self, dbapi_conn):
966 try:
967 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
968 except NotImplementedError:
969 return None
970 else:
971 normalized_levels = [
972 level.replace("_", " ").upper() for level in raw_levels
973 ]
974 if raw_levels != normalized_levels:
975 raise ValueError(
976 f"Dialect {self.name!r} get_isolation_level_values() "
977 f"method should return names as UPPERCASE using spaces, "
978 f"not underscores; got "
979 f"{sorted(set(raw_levels).difference(normalized_levels))}"
980 )
981 return tuple(normalized_levels)
982
983 def _assert_and_set_isolation_level(self, dbapi_conn, level):
984 level = level.replace("_", " ").upper()
985
986 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
987 dbapi_conn
988 )
989 if (
990 _allowed_isolation_levels
991 and level not in _allowed_isolation_levels
992 ):
993 raise exc.ArgumentError(
994 f"Invalid value {level!r} for isolation_level. "
995 f"Valid isolation levels for {self.name!r} are "
996 f"{', '.join(_allowed_isolation_levels)}"
997 )
998
999 self.set_isolation_level(dbapi_conn, level)
1000
1001 def reset_isolation_level(self, dbapi_conn):
1002 if self._on_connect_isolation_level is not None:
1003 assert (
1004 self._on_connect_isolation_level == "AUTOCOMMIT"
1005 or self._on_connect_isolation_level
1006 == self.default_isolation_level
1007 )
1008 self._assert_and_set_isolation_level(
1009 dbapi_conn, self._on_connect_isolation_level
1010 )
1011 else:
1012 assert self.default_isolation_level is not None
1013 self._assert_and_set_isolation_level(
1014 dbapi_conn,
1015 self.default_isolation_level,
1016 )
1017
1018 def normalize_name(self, name):
1019 if name is None:
1020 return None
1021
1022 name_lower = name.lower()
1023 name_upper = name.upper()
1024
1025 if name_upper == name_lower:
1026 # name has no upper/lower conversion, e.g. non-european characters.
1027 # return unchanged
1028 return name
1029 elif name_upper == name and not (
1030 self.identifier_preparer._requires_quotes
1031 )(name_lower):
1032 # name is all uppercase and doesn't require quoting; normalize
1033 # to all lower case
1034 return name_lower
1035 elif name_lower == name:
1036 # name is all lower case, which if denormalized means we need to
1037 # force quoting on it
1038 return quoted_name(name, quote=True)
1039 else:
1040 # name is mixed case, means it will be quoted in SQL when used
1041 # later, no normalizes
1042 return name
1043
1044 def denormalize_name(self, name):
1045 if name is None:
1046 return None
1047
1048 name_lower = name.lower()
1049 name_upper = name.upper()
1050
1051 if name_upper == name_lower:
1052 # name has no upper/lower conversion, e.g. non-european characters.
1053 # return unchanged
1054 return name
1055 elif name_lower == name and not (
1056 self.identifier_preparer._requires_quotes
1057 )(name_lower):
1058 name = name_upper
1059 return name
1060
1061 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1062 return connection
1063
1064 def _overrides_default(self, method):
1065 return (
1066 getattr(type(self), method).__code__
1067 is not getattr(DefaultDialect, method).__code__
1068 )
1069
1070 def _default_multi_reflect(
1071 self,
1072 single_tbl_method,
1073 connection,
1074 kind,
1075 schema,
1076 filter_names,
1077 scope,
1078 **kw,
1079 ):
1080 names_fns = []
1081 temp_names_fns = []
1082 if ObjectKind.TABLE in kind:
1083 names_fns.append(self.get_table_names)
1084 temp_names_fns.append(self.get_temp_table_names)
1085 if ObjectKind.VIEW in kind:
1086 names_fns.append(self.get_view_names)
1087 temp_names_fns.append(self.get_temp_view_names)
1088 if ObjectKind.MATERIALIZED_VIEW in kind:
1089 names_fns.append(self.get_materialized_view_names)
1090 # no temp materialized view at the moment
1091 # temp_names_fns.append(self.get_temp_materialized_view_names)
1092
1093 unreflectable = kw.pop("unreflectable", {})
1094
1095 if (
1096 filter_names
1097 and scope is ObjectScope.ANY
1098 and kind is ObjectKind.ANY
1099 ):
1100 # if names are given and no qualification on type of table
1101 # (i.e. the Table(..., autoload) case), take the names as given,
1102 # don't run names queries. If a table does not exit
1103 # NoSuchTableError is raised and it's skipped
1104
1105 # this also suits the case for mssql where we can reflect
1106 # individual temp tables but there's no temp_names_fn
1107 names = filter_names
1108 else:
1109 names = []
1110 name_kw = {"schema": schema, **kw}
1111 fns = []
1112 if ObjectScope.DEFAULT in scope:
1113 fns.extend(names_fns)
1114 if ObjectScope.TEMPORARY in scope:
1115 fns.extend(temp_names_fns)
1116
1117 for fn in fns:
1118 try:
1119 names.extend(fn(connection, **name_kw))
1120 except NotImplementedError:
1121 pass
1122
1123 if filter_names:
1124 filter_names = set(filter_names)
1125
1126 # iterate over all the tables/views and call the single table method
1127 for table in names:
1128 if not filter_names or table in filter_names:
1129 key = (schema, table)
1130 try:
1131 yield (
1132 key,
1133 single_tbl_method(
1134 connection, table, schema=schema, **kw
1135 ),
1136 )
1137 except exc.UnreflectableTableError as err:
1138 if key not in unreflectable:
1139 unreflectable[key] = err
1140 except exc.NoSuchTableError:
1141 pass
1142
1143 def get_multi_table_options(self, connection, **kw):
1144 return self._default_multi_reflect(
1145 self.get_table_options, connection, **kw
1146 )
1147
1148 def get_multi_columns(self, connection, **kw):
1149 return self._default_multi_reflect(self.get_columns, connection, **kw)
1150
1151 def get_multi_pk_constraint(self, connection, **kw):
1152 return self._default_multi_reflect(
1153 self.get_pk_constraint, connection, **kw
1154 )
1155
1156 def get_multi_foreign_keys(self, connection, **kw):
1157 return self._default_multi_reflect(
1158 self.get_foreign_keys, connection, **kw
1159 )
1160
1161 def get_multi_indexes(self, connection, **kw):
1162 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1163
1164 def get_multi_unique_constraints(self, connection, **kw):
1165 return self._default_multi_reflect(
1166 self.get_unique_constraints, connection, **kw
1167 )
1168
1169 def get_multi_check_constraints(self, connection, **kw):
1170 return self._default_multi_reflect(
1171 self.get_check_constraints, connection, **kw
1172 )
1173
1174 def get_multi_table_comment(self, connection, **kw):
1175 return self._default_multi_reflect(
1176 self.get_table_comment, connection, **kw
1177 )
1178
1179
1180class StrCompileDialect(DefaultDialect):
1181 statement_compiler = compiler.StrSQLCompiler
1182 ddl_compiler = compiler.DDLCompiler
1183 type_compiler_cls = compiler.StrSQLTypeCompiler
1184 preparer = compiler.IdentifierPreparer
1185
1186 insert_returning = True
1187 update_returning = True
1188 delete_returning = True
1189
1190 supports_statement_cache = True
1191
1192 supports_identity_columns = True
1193
1194 supports_sequences = True
1195 sequences_optional = True
1196 preexecute_autoincrement_sequences = False
1197
1198 supports_native_boolean = True
1199
1200 supports_multivalues_insert = True
1201 supports_simple_order_by_label = True
1202
1203
1204class DefaultExecutionContext(ExecutionContext):
1205 isinsert = False
1206 isupdate = False
1207 isdelete = False
1208 is_crud = False
1209 is_text = False
1210 isddl = False
1211
1212 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1213
1214 compiled: Optional[Compiled] = None
1215 result_column_struct: Optional[
1216 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1217 ] = None
1218 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1219
1220 execution_options: _ExecuteOptions = util.EMPTY_DICT
1221
1222 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1223
1224 invoked_statement: Optional[Executable] = None
1225
1226 _is_implicit_returning = False
1227 _is_explicit_returning = False
1228 _is_supplemental_returning = False
1229 _is_server_side = False
1230
1231 _soft_closed = False
1232
1233 _rowcount: Optional[int] = None
1234
1235 # a hook for SQLite's translation of
1236 # result column names
1237 # NOTE: pyhive is using this hook, can't remove it :(
1238 _translate_colname: Optional[Callable[[str], str]] = None
1239
1240 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1241 """used by set_input_sizes().
1242
1243 This collection comes from ``ExpandedState.parameter_expansion``.
1244
1245 """
1246
1247 cache_hit = NO_CACHE_KEY
1248
1249 root_connection: Connection
1250 _dbapi_connection: PoolProxiedConnection
1251 dialect: Dialect
1252 unicode_statement: str
1253 cursor: DBAPICursor
1254 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1255 parameters: _DBAPIMultiExecuteParams
1256 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1257
1258 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1259
1260 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1261 _num_sentinel_cols: int = 0
1262
1263 @classmethod
1264 def _init_ddl(
1265 cls,
1266 dialect: Dialect,
1267 connection: Connection,
1268 dbapi_connection: PoolProxiedConnection,
1269 execution_options: _ExecuteOptions,
1270 compiled_ddl: DDLCompiler,
1271 ) -> ExecutionContext:
1272 """Initialize execution context for an ExecutableDDLElement
1273 construct."""
1274
1275 self = cls.__new__(cls)
1276 self.root_connection = connection
1277 self._dbapi_connection = dbapi_connection
1278 self.dialect = connection.dialect
1279
1280 self.compiled = compiled = compiled_ddl
1281 self.isddl = True
1282
1283 self.execution_options = execution_options
1284
1285 self.unicode_statement = str(compiled)
1286 if compiled.schema_translate_map:
1287 schema_translate_map = self.execution_options.get(
1288 "schema_translate_map", {}
1289 )
1290
1291 rst = compiled.preparer._render_schema_translates
1292 self.unicode_statement = rst(
1293 self.unicode_statement, schema_translate_map
1294 )
1295
1296 self.statement = self.unicode_statement
1297
1298 self.cursor = self.create_cursor()
1299 self.compiled_parameters = []
1300
1301 if dialect.positional:
1302 self.parameters = [dialect.execute_sequence_format()]
1303 else:
1304 self.parameters = [self._empty_dict_params]
1305
1306 return self
1307
1308 @classmethod
1309 def _init_compiled(
1310 cls,
1311 dialect: Dialect,
1312 connection: Connection,
1313 dbapi_connection: PoolProxiedConnection,
1314 execution_options: _ExecuteOptions,
1315 compiled: SQLCompiler,
1316 parameters: _CoreMultiExecuteParams,
1317 invoked_statement: Executable,
1318 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1319 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1320 ) -> ExecutionContext:
1321 """Initialize execution context for a Compiled construct."""
1322
1323 self = cls.__new__(cls)
1324 self.root_connection = connection
1325 self._dbapi_connection = dbapi_connection
1326 self.dialect = connection.dialect
1327 self.extracted_parameters = extracted_parameters
1328 self.invoked_statement = invoked_statement
1329 self.compiled = compiled
1330 self.cache_hit = cache_hit
1331
1332 self.execution_options = execution_options
1333
1334 self.result_column_struct = (
1335 compiled._result_columns,
1336 compiled._ordered_columns,
1337 compiled._textual_ordered_columns,
1338 compiled._ad_hoc_textual,
1339 compiled._loose_column_name_matching,
1340 )
1341
1342 self.isinsert = ii = compiled.isinsert
1343 self.isupdate = iu = compiled.isupdate
1344 self.isdelete = id_ = compiled.isdelete
1345 self.is_text = compiled.isplaintext
1346
1347 if ii or iu or id_:
1348 dml_statement = compiled.compile_state.statement # type: ignore
1349 if TYPE_CHECKING:
1350 assert isinstance(dml_statement, UpdateBase)
1351 self.is_crud = True
1352 self._is_explicit_returning = ier = bool(dml_statement._returning)
1353 self._is_implicit_returning = iir = bool(
1354 compiled.implicit_returning
1355 )
1356 if iir and dml_statement._supplemental_returning:
1357 self._is_supplemental_returning = True
1358
1359 # dont mix implicit and explicit returning
1360 assert not (iir and ier)
1361
1362 if (ier or iir) and compiled.for_executemany:
1363 if ii and not self.dialect.insert_executemany_returning:
1364 raise exc.InvalidRequestError(
1365 f"Dialect {self.dialect.dialect_description} with "
1366 f"current server capabilities does not support "
1367 "INSERT..RETURNING when executemany is used"
1368 )
1369 elif (
1370 ii
1371 and dml_statement._sort_by_parameter_order
1372 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1373 ):
1374 raise exc.InvalidRequestError(
1375 f"Dialect {self.dialect.dialect_description} with "
1376 f"current server capabilities does not support "
1377 "INSERT..RETURNING with deterministic row ordering "
1378 "when executemany is used"
1379 )
1380 elif (
1381 ii
1382 and self.dialect.use_insertmanyvalues
1383 and not compiled._insertmanyvalues
1384 ):
1385 raise exc.InvalidRequestError(
1386 'Statement does not have "insertmanyvalues" '
1387 "enabled, can't use INSERT..RETURNING with "
1388 "executemany in this case."
1389 )
1390 elif iu and not self.dialect.update_executemany_returning:
1391 raise exc.InvalidRequestError(
1392 f"Dialect {self.dialect.dialect_description} with "
1393 f"current server capabilities does not support "
1394 "UPDATE..RETURNING when executemany is used"
1395 )
1396 elif id_ and not self.dialect.delete_executemany_returning:
1397 raise exc.InvalidRequestError(
1398 f"Dialect {self.dialect.dialect_description} with "
1399 f"current server capabilities does not support "
1400 "DELETE..RETURNING when executemany is used"
1401 )
1402
1403 if not parameters:
1404 self.compiled_parameters = [
1405 compiled.construct_params(
1406 extracted_parameters=extracted_parameters,
1407 escape_names=False,
1408 )
1409 ]
1410 else:
1411 self.compiled_parameters = [
1412 compiled.construct_params(
1413 m,
1414 escape_names=False,
1415 _group_number=grp,
1416 extracted_parameters=extracted_parameters,
1417 )
1418 for grp, m in enumerate(parameters)
1419 ]
1420
1421 if len(parameters) > 1:
1422 if self.isinsert and compiled._insertmanyvalues:
1423 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1424
1425 imv = compiled._insertmanyvalues
1426 if imv.sentinel_columns is not None:
1427 self._num_sentinel_cols = imv.num_sentinel_columns
1428 else:
1429 self.execute_style = ExecuteStyle.EXECUTEMANY
1430
1431 self.unicode_statement = compiled.string
1432
1433 self.cursor = self.create_cursor()
1434
1435 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1436 self._process_execute_defaults()
1437
1438 processors = compiled._bind_processors
1439
1440 flattened_processors: Mapping[
1441 str, _BindProcessorType[Any]
1442 ] = processors # type: ignore[assignment]
1443
1444 if compiled.literal_execute_params or compiled.post_compile_params:
1445 if self.executemany:
1446 raise exc.InvalidRequestError(
1447 "'literal_execute' or 'expanding' parameters can't be "
1448 "used with executemany()"
1449 )
1450
1451 expanded_state = compiled._process_parameters_for_postcompile(
1452 self.compiled_parameters[0]
1453 )
1454
1455 # re-assign self.unicode_statement
1456 self.unicode_statement = expanded_state.statement
1457
1458 self._expanded_parameters = expanded_state.parameter_expansion
1459
1460 flattened_processors = dict(processors) # type: ignore
1461 flattened_processors.update(expanded_state.processors)
1462 positiontup = expanded_state.positiontup
1463 elif compiled.positional:
1464 positiontup = self.compiled.positiontup
1465 else:
1466 positiontup = None
1467
1468 if compiled.schema_translate_map:
1469 schema_translate_map = self.execution_options.get(
1470 "schema_translate_map", {}
1471 )
1472 rst = compiled.preparer._render_schema_translates
1473 self.unicode_statement = rst(
1474 self.unicode_statement, schema_translate_map
1475 )
1476
1477 # final self.unicode_statement is now assigned, encode if needed
1478 # by dialect
1479 self.statement = self.unicode_statement
1480
1481 # Convert the dictionary of bind parameter values
1482 # into a dict or list to be sent to the DBAPI's
1483 # execute() or executemany() method.
1484
1485 if compiled.positional:
1486 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1487 assert positiontup is not None
1488 for compiled_params in self.compiled_parameters:
1489 l_param: List[Any] = [
1490 (
1491 flattened_processors[key](compiled_params[key])
1492 if key in flattened_processors
1493 else compiled_params[key]
1494 )
1495 for key in positiontup
1496 ]
1497 core_positional_parameters.append(
1498 dialect.execute_sequence_format(l_param)
1499 )
1500
1501 self.parameters = core_positional_parameters
1502 else:
1503 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1504 escaped_names = compiled.escaped_bind_names
1505
1506 # note that currently, "expanded" parameters will be present
1507 # in self.compiled_parameters in their quoted form. This is
1508 # slightly inconsistent with the approach taken as of
1509 # #8056 where self.compiled_parameters is meant to contain unquoted
1510 # param names.
1511 d_param: Dict[str, Any]
1512 for compiled_params in self.compiled_parameters:
1513 if escaped_names:
1514 d_param = {
1515 escaped_names.get(key, key): (
1516 flattened_processors[key](compiled_params[key])
1517 if key in flattened_processors
1518 else compiled_params[key]
1519 )
1520 for key in compiled_params
1521 }
1522 else:
1523 d_param = {
1524 key: (
1525 flattened_processors[key](compiled_params[key])
1526 if key in flattened_processors
1527 else compiled_params[key]
1528 )
1529 for key in compiled_params
1530 }
1531
1532 core_dict_parameters.append(d_param)
1533
1534 self.parameters = core_dict_parameters
1535
1536 return self
1537
1538 @classmethod
1539 def _init_statement(
1540 cls,
1541 dialect: Dialect,
1542 connection: Connection,
1543 dbapi_connection: PoolProxiedConnection,
1544 execution_options: _ExecuteOptions,
1545 statement: str,
1546 parameters: _DBAPIMultiExecuteParams,
1547 ) -> ExecutionContext:
1548 """Initialize execution context for a string SQL statement."""
1549
1550 self = cls.__new__(cls)
1551 self.root_connection = connection
1552 self._dbapi_connection = dbapi_connection
1553 self.dialect = connection.dialect
1554 self.is_text = True
1555
1556 self.execution_options = execution_options
1557
1558 if not parameters:
1559 if self.dialect.positional:
1560 self.parameters = [dialect.execute_sequence_format()]
1561 else:
1562 self.parameters = [self._empty_dict_params]
1563 elif isinstance(parameters[0], dialect.execute_sequence_format):
1564 self.parameters = parameters
1565 elif isinstance(parameters[0], dict):
1566 self.parameters = parameters
1567 else:
1568 self.parameters = [
1569 dialect.execute_sequence_format(p) for p in parameters
1570 ]
1571
1572 if len(parameters) > 1:
1573 self.execute_style = ExecuteStyle.EXECUTEMANY
1574
1575 self.statement = self.unicode_statement = statement
1576
1577 self.cursor = self.create_cursor()
1578 return self
1579
1580 @classmethod
1581 def _init_default(
1582 cls,
1583 dialect: Dialect,
1584 connection: Connection,
1585 dbapi_connection: PoolProxiedConnection,
1586 execution_options: _ExecuteOptions,
1587 ) -> ExecutionContext:
1588 """Initialize execution context for a ColumnDefault construct."""
1589
1590 self = cls.__new__(cls)
1591 self.root_connection = connection
1592 self._dbapi_connection = dbapi_connection
1593 self.dialect = connection.dialect
1594
1595 self.execution_options = execution_options
1596
1597 self.cursor = self.create_cursor()
1598 return self
1599
1600 def _get_cache_stats(self) -> str:
1601 if self.compiled is None:
1602 return "raw sql"
1603
1604 now = perf_counter()
1605
1606 ch = self.cache_hit
1607
1608 gen_time = self.compiled._gen_time
1609 assert gen_time is not None
1610
1611 if ch is NO_CACHE_KEY:
1612 return "no key %.5fs" % (now - gen_time,)
1613 elif ch is CACHE_HIT:
1614 return "cached since %.4gs ago" % (now - gen_time,)
1615 elif ch is CACHE_MISS:
1616 return "generated in %.5fs" % (now - gen_time,)
1617 elif ch is CACHING_DISABLED:
1618 if "_cache_disable_reason" in self.execution_options:
1619 return "caching disabled (%s) %.5fs " % (
1620 self.execution_options["_cache_disable_reason"],
1621 now - gen_time,
1622 )
1623 else:
1624 return "caching disabled %.5fs" % (now - gen_time,)
1625 elif ch is NO_DIALECT_SUPPORT:
1626 return "dialect %s+%s does not support caching %.5fs" % (
1627 self.dialect.name,
1628 self.dialect.driver,
1629 now - gen_time,
1630 )
1631 else:
1632 return "unknown"
1633
1634 @property
1635 def executemany(self): # type: ignore[override]
1636 return self.execute_style in (
1637 ExecuteStyle.EXECUTEMANY,
1638 ExecuteStyle.INSERTMANYVALUES,
1639 )
1640
1641 @util.memoized_property
1642 def identifier_preparer(self):
1643 if self.compiled:
1644 return self.compiled.preparer
1645 elif "schema_translate_map" in self.execution_options:
1646 return self.dialect.identifier_preparer._with_schema_translate(
1647 self.execution_options["schema_translate_map"]
1648 )
1649 else:
1650 return self.dialect.identifier_preparer
1651
1652 @util.memoized_property
1653 def engine(self):
1654 return self.root_connection.engine
1655
1656 @util.memoized_property
1657 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1658 if TYPE_CHECKING:
1659 assert isinstance(self.compiled, SQLCompiler)
1660 return self.compiled.postfetch
1661
1662 @util.memoized_property
1663 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1664 if TYPE_CHECKING:
1665 assert isinstance(self.compiled, SQLCompiler)
1666 if self.isinsert:
1667 return self.compiled.insert_prefetch
1668 elif self.isupdate:
1669 return self.compiled.update_prefetch
1670 else:
1671 return ()
1672
1673 @util.memoized_property
1674 def no_parameters(self):
1675 return self.execution_options.get("no_parameters", False)
1676
1677 def _execute_scalar(
1678 self,
1679 stmt: str,
1680 type_: Optional[TypeEngine[Any]],
1681 parameters: Optional[_DBAPISingleExecuteParams] = None,
1682 ) -> Any:
1683 """Execute a string statement on the current cursor, returning a
1684 scalar result.
1685
1686 Used to fire off sequences, default phrases, and "select lastrowid"
1687 types of statements individually or in the context of a parent INSERT
1688 or UPDATE statement.
1689
1690 """
1691
1692 conn = self.root_connection
1693
1694 if "schema_translate_map" in self.execution_options:
1695 schema_translate_map = self.execution_options.get(
1696 "schema_translate_map", {}
1697 )
1698
1699 rst = self.identifier_preparer._render_schema_translates
1700 stmt = rst(stmt, schema_translate_map)
1701
1702 if not parameters:
1703 if self.dialect.positional:
1704 parameters = self.dialect.execute_sequence_format()
1705 else:
1706 parameters = {}
1707
1708 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1709 row = self.cursor.fetchone()
1710 if row is not None:
1711 r = row[0]
1712 else:
1713 r = None
1714 if type_ is not None:
1715 # apply type post processors to the result
1716 proc = type_._cached_result_processor(
1717 self.dialect, self.cursor.description[0][1]
1718 )
1719 if proc:
1720 return proc(r)
1721 return r
1722
1723 @util.memoized_property
1724 def connection(self):
1725 return self.root_connection
1726
1727 def _use_server_side_cursor(self):
1728 if not self.dialect.supports_server_side_cursors:
1729 return False
1730
1731 if self.dialect.server_side_cursors:
1732 # this is deprecated
1733 use_server_side = self.execution_options.get(
1734 "stream_results", True
1735 ) and (
1736 self.compiled
1737 and isinstance(self.compiled.statement, expression.Selectable)
1738 or (
1739 (
1740 not self.compiled
1741 or isinstance(
1742 self.compiled.statement, expression.TextClause
1743 )
1744 )
1745 and self.unicode_statement
1746 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1747 )
1748 )
1749 else:
1750 use_server_side = self.execution_options.get(
1751 "stream_results", False
1752 )
1753
1754 return use_server_side
1755
1756 def create_cursor(self) -> DBAPICursor:
1757 if (
1758 # inlining initial preference checks for SS cursors
1759 self.dialect.supports_server_side_cursors
1760 and (
1761 self.execution_options.get("stream_results", False)
1762 or (
1763 self.dialect.server_side_cursors
1764 and self._use_server_side_cursor()
1765 )
1766 )
1767 ):
1768 self._is_server_side = True
1769 return self.create_server_side_cursor()
1770 else:
1771 self._is_server_side = False
1772 return self.create_default_cursor()
1773
1774 def fetchall_for_returning(self, cursor):
1775 return cursor.fetchall()
1776
1777 def create_default_cursor(self) -> DBAPICursor:
1778 return self._dbapi_connection.cursor()
1779
1780 def create_server_side_cursor(self) -> DBAPICursor:
1781 raise NotImplementedError()
1782
1783 def pre_exec(self):
1784 pass
1785
1786 def get_out_parameter_values(self, names):
1787 raise NotImplementedError(
1788 "This dialect does not support OUT parameters"
1789 )
1790
1791 def post_exec(self):
1792 pass
1793
1794 def get_result_processor(self, type_, colname, coltype):
1795 """Return a 'result processor' for a given type as present in
1796 cursor.description.
1797
1798 This has a default implementation that dialects can override
1799 for context-sensitive result type handling.
1800
1801 """
1802 return type_._cached_result_processor(self.dialect, coltype)
1803
1804 def get_lastrowid(self):
1805 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1806
1807 This may involve calling special cursor functions, issuing a new SELECT
1808 on the cursor (or a new one), or returning a stored value that was
1809 calculated within post_exec().
1810
1811 This function will only be called for dialects which support "implicit"
1812 primary key generation, keep preexecute_autoincrement_sequences set to
1813 False, and when no explicit id value was bound to the statement.
1814
1815 The function is called once for an INSERT statement that would need to
1816 return the last inserted primary key for those dialects that make use
1817 of the lastrowid concept. In these cases, it is called directly after
1818 :meth:`.ExecutionContext.post_exec`.
1819
1820 """
1821 return self.cursor.lastrowid
1822
1823 def handle_dbapi_exception(self, e):
1824 pass
1825
1826 @util.non_memoized_property
1827 def rowcount(self) -> int:
1828 if self._rowcount is not None:
1829 return self._rowcount
1830 else:
1831 return self.cursor.rowcount
1832
1833 @property
1834 def _has_rowcount(self):
1835 return self._rowcount is not None
1836
1837 def supports_sane_rowcount(self):
1838 return self.dialect.supports_sane_rowcount
1839
1840 def supports_sane_multi_rowcount(self):
1841 return self.dialect.supports_sane_multi_rowcount
1842
1843 def _setup_result_proxy(self):
1844 exec_opt = self.execution_options
1845
1846 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1847 self._rowcount = self.cursor.rowcount
1848
1849 yp: Optional[Union[int, bool]]
1850 if self.is_crud or self.is_text:
1851 result = self._setup_dml_or_text_result()
1852 yp = False
1853 else:
1854 yp = exec_opt.get("yield_per", None)
1855 sr = self._is_server_side or exec_opt.get("stream_results", False)
1856 strategy = self.cursor_fetch_strategy
1857 if sr and strategy is _cursor._DEFAULT_FETCH:
1858 strategy = _cursor.BufferedRowCursorFetchStrategy(
1859 self.cursor, self.execution_options
1860 )
1861 cursor_description: _DBAPICursorDescription = (
1862 strategy.alternate_cursor_description
1863 or self.cursor.description
1864 )
1865 if cursor_description is None:
1866 strategy = _cursor._NO_CURSOR_DQL
1867
1868 result = _cursor.CursorResult(self, strategy, cursor_description)
1869
1870 compiled = self.compiled
1871
1872 if (
1873 compiled
1874 and not self.isddl
1875 and cast(SQLCompiler, compiled).has_out_parameters
1876 ):
1877 self._setup_out_parameters(result)
1878
1879 self._soft_closed = result._soft_closed
1880
1881 if yp:
1882 result = result.yield_per(yp)
1883
1884 return result
1885
1886 def _setup_out_parameters(self, result):
1887 compiled = cast(SQLCompiler, self.compiled)
1888
1889 out_bindparams = [
1890 (param, name)
1891 for param, name in compiled.bind_names.items()
1892 if param.isoutparam
1893 ]
1894 out_parameters = {}
1895
1896 for bindparam, raw_value in zip(
1897 [param for param, name in out_bindparams],
1898 self.get_out_parameter_values(
1899 [name for param, name in out_bindparams]
1900 ),
1901 ):
1902 type_ = bindparam.type
1903 impl_type = type_.dialect_impl(self.dialect)
1904 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1905 result_processor = impl_type.result_processor(
1906 self.dialect, dbapi_type
1907 )
1908 if result_processor is not None:
1909 raw_value = result_processor(raw_value)
1910 out_parameters[bindparam.key] = raw_value
1911
1912 result.out_parameters = out_parameters
1913
1914 def _setup_dml_or_text_result(self):
1915 compiled = cast(SQLCompiler, self.compiled)
1916
1917 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1918
1919 if self.isinsert:
1920 if (
1921 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1922 and compiled.effective_returning
1923 ):
1924 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1925 self.cursor,
1926 initial_buffer=self._insertmanyvalues_rows,
1927 # maintain alt cursor description if set by the
1928 # dialect, e.g. mssql preserves it
1929 alternate_description=(
1930 strategy.alternate_cursor_description
1931 ),
1932 )
1933
1934 if compiled.postfetch_lastrowid:
1935 self.inserted_primary_key_rows = (
1936 self._setup_ins_pk_from_lastrowid()
1937 )
1938 # else if not self._is_implicit_returning,
1939 # the default inserted_primary_key_rows accessor will
1940 # return an "empty" primary key collection when accessed.
1941
1942 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1943 strategy = _cursor.BufferedRowCursorFetchStrategy(
1944 self.cursor, self.execution_options
1945 )
1946
1947 if strategy is _cursor._NO_CURSOR_DML:
1948 cursor_description = None
1949 else:
1950 cursor_description = (
1951 strategy.alternate_cursor_description
1952 or self.cursor.description
1953 )
1954
1955 if cursor_description is None:
1956 strategy = _cursor._NO_CURSOR_DML
1957 elif self._num_sentinel_cols:
1958 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1959 # strip out the sentinel columns from cursor description
1960 # a similar logic is done to the rows only in CursorResult
1961 cursor_description = cursor_description[
1962 0 : -self._num_sentinel_cols
1963 ]
1964
1965 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1966 self, strategy, cursor_description
1967 )
1968
1969 if self.isinsert:
1970 if self._is_implicit_returning:
1971 rows = result.all()
1972
1973 self.returned_default_rows = rows
1974
1975 self.inserted_primary_key_rows = (
1976 self._setup_ins_pk_from_implicit_returning(result, rows)
1977 )
1978
1979 # test that it has a cursor metadata that is accurate. the
1980 # first row will have been fetched and current assumptions
1981 # are that the result has only one row, until executemany()
1982 # support is added here.
1983 assert result._metadata.returns_rows
1984
1985 # Insert statement has both return_defaults() and
1986 # returning(). rewind the result on the list of rows
1987 # we just used.
1988 if self._is_supplemental_returning:
1989 result._rewind(rows)
1990 else:
1991 result._soft_close()
1992 elif not self._is_explicit_returning:
1993 result._soft_close()
1994
1995 # we assume here the result does not return any rows.
1996 # *usually*, this will be true. However, some dialects
1997 # such as that of MSSQL/pyodbc need to SELECT a post fetch
1998 # function so this is not necessarily true.
1999 # assert not result.returns_rows
2000
2001 elif self._is_implicit_returning:
2002 rows = result.all()
2003
2004 if rows:
2005 self.returned_default_rows = rows
2006 self._rowcount = len(rows)
2007
2008 if self._is_supplemental_returning:
2009 result._rewind(rows)
2010 else:
2011 result._soft_close()
2012
2013 # test that it has a cursor metadata that is accurate.
2014 # the rows have all been fetched however.
2015 assert result._metadata.returns_rows
2016
2017 elif not result._metadata.returns_rows:
2018 # no results, get rowcount
2019 # (which requires open cursor on some drivers)
2020 if self._rowcount is None:
2021 self._rowcount = self.cursor.rowcount
2022 result._soft_close()
2023 elif self.isupdate or self.isdelete:
2024 if self._rowcount is None:
2025 self._rowcount = self.cursor.rowcount
2026 return result
2027
2028 @util.memoized_property
2029 def inserted_primary_key_rows(self):
2030 # if no specific "get primary key" strategy was set up
2031 # during execution, return a "default" primary key based
2032 # on what's in the compiled_parameters and nothing else.
2033 return self._setup_ins_pk_from_empty()
2034
2035 def _setup_ins_pk_from_lastrowid(self):
2036 getter = cast(
2037 SQLCompiler, self.compiled
2038 )._inserted_primary_key_from_lastrowid_getter
2039 lastrowid = self.get_lastrowid()
2040 return [getter(lastrowid, self.compiled_parameters[0])]
2041
2042 def _setup_ins_pk_from_empty(self):
2043 getter = cast(
2044 SQLCompiler, self.compiled
2045 )._inserted_primary_key_from_lastrowid_getter
2046 return [getter(None, param) for param in self.compiled_parameters]
2047
2048 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2049 if not rows:
2050 return []
2051
2052 getter = cast(
2053 SQLCompiler, self.compiled
2054 )._inserted_primary_key_from_returning_getter
2055 compiled_params = self.compiled_parameters
2056
2057 return [
2058 getter(row, param) for row, param in zip(rows, compiled_params)
2059 ]
2060
2061 def lastrow_has_defaults(self):
2062 return (self.isinsert or self.isupdate) and bool(
2063 cast(SQLCompiler, self.compiled).postfetch
2064 )
2065
2066 def _prepare_set_input_sizes(
2067 self,
2068 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2069 """Given a cursor and ClauseParameters, prepare arguments
2070 in order to call the appropriate
2071 style of ``setinputsizes()`` on the cursor, using DB-API types
2072 from the bind parameter's ``TypeEngine`` objects.
2073
2074 This method only called by those dialects which set the
2075 :attr:`.Dialect.bind_typing` attribute to
2076 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2077 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2078 option.
2079
2080 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2081 for pg8000 and asyncpg, which has been changed to inline rendering
2082 of casts.
2083
2084 """
2085 if self.isddl or self.is_text:
2086 return None
2087
2088 compiled = cast(SQLCompiler, self.compiled)
2089
2090 inputsizes = compiled._get_set_input_sizes_lookup()
2091
2092 if inputsizes is None:
2093 return None
2094
2095 dialect = self.dialect
2096
2097 # all of the rest of this... cython?
2098
2099 if dialect._has_events:
2100 inputsizes = dict(inputsizes)
2101 dialect.dispatch.do_setinputsizes(
2102 inputsizes, self.cursor, self.statement, self.parameters, self
2103 )
2104
2105 if compiled.escaped_bind_names:
2106 escaped_bind_names = compiled.escaped_bind_names
2107 else:
2108 escaped_bind_names = None
2109
2110 if dialect.positional:
2111 items = [
2112 (key, compiled.binds[key])
2113 for key in compiled.positiontup or ()
2114 ]
2115 else:
2116 items = [
2117 (key, bindparam)
2118 for bindparam, key in compiled.bind_names.items()
2119 ]
2120
2121 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2122 for key, bindparam in items:
2123 if bindparam in compiled.literal_execute_params:
2124 continue
2125
2126 if key in self._expanded_parameters:
2127 if is_tuple_type(bindparam.type):
2128 num = len(bindparam.type.types)
2129 dbtypes = inputsizes[bindparam]
2130 generic_inputsizes.extend(
2131 (
2132 (
2133 escaped_bind_names.get(paramname, paramname)
2134 if escaped_bind_names is not None
2135 else paramname
2136 ),
2137 dbtypes[idx % num],
2138 bindparam.type.types[idx % num],
2139 )
2140 for idx, paramname in enumerate(
2141 self._expanded_parameters[key]
2142 )
2143 )
2144 else:
2145 dbtype = inputsizes.get(bindparam, None)
2146 generic_inputsizes.extend(
2147 (
2148 (
2149 escaped_bind_names.get(paramname, paramname)
2150 if escaped_bind_names is not None
2151 else paramname
2152 ),
2153 dbtype,
2154 bindparam.type,
2155 )
2156 for paramname in self._expanded_parameters[key]
2157 )
2158 else:
2159 dbtype = inputsizes.get(bindparam, None)
2160
2161 escaped_name = (
2162 escaped_bind_names.get(key, key)
2163 if escaped_bind_names is not None
2164 else key
2165 )
2166
2167 generic_inputsizes.append(
2168 (escaped_name, dbtype, bindparam.type)
2169 )
2170
2171 return generic_inputsizes
2172
2173 def _exec_default(self, column, default, type_):
2174 if default.is_sequence:
2175 return self.fire_sequence(default, type_)
2176 elif default.is_callable:
2177 # this codepath is not normally used as it's inlined
2178 # into _process_execute_defaults
2179 self.current_column = column
2180 return default.arg(self)
2181 elif default.is_clause_element:
2182 return self._exec_default_clause_element(column, default, type_)
2183 else:
2184 # this codepath is not normally used as it's inlined
2185 # into _process_execute_defaults
2186 return default.arg
2187
2188 def _exec_default_clause_element(self, column, default, type_):
2189 # execute a default that's a complete clause element. Here, we have
2190 # to re-implement a miniature version of the compile->parameters->
2191 # cursor.execute() sequence, since we don't want to modify the state
2192 # of the connection / result in progress or create new connection/
2193 # result objects etc.
2194 # .. versionchanged:: 1.4
2195
2196 if not default._arg_is_typed:
2197 default_arg = expression.type_coerce(default.arg, type_)
2198 else:
2199 default_arg = default.arg
2200 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2201 compiled_params = compiled.construct_params()
2202 processors = compiled._bind_processors
2203 if compiled.positional:
2204 parameters = self.dialect.execute_sequence_format(
2205 [
2206 (
2207 processors[key](compiled_params[key]) # type: ignore
2208 if key in processors
2209 else compiled_params[key]
2210 )
2211 for key in compiled.positiontup or ()
2212 ]
2213 )
2214 else:
2215 parameters = {
2216 key: (
2217 processors[key](compiled_params[key]) # type: ignore
2218 if key in processors
2219 else compiled_params[key]
2220 )
2221 for key in compiled_params
2222 }
2223 return self._execute_scalar(
2224 str(compiled), type_, parameters=parameters
2225 )
2226
2227 current_parameters: Optional[_CoreSingleExecuteParams] = None
2228 """A dictionary of parameters applied to the current row.
2229
2230 This attribute is only available in the context of a user-defined default
2231 generation function, e.g. as described at :ref:`context_default_functions`.
2232 It consists of a dictionary which includes entries for each column/value
2233 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2234 dictionary will be the key value of each :class:`_schema.Column`,
2235 which is usually
2236 synonymous with the name.
2237
2238 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2239 does not accommodate for the "multi-values" feature of the
2240 :meth:`_expression.Insert.values` method. The
2241 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2242 preferred.
2243
2244 .. seealso::
2245
2246 :meth:`.DefaultExecutionContext.get_current_parameters`
2247
2248 :ref:`context_default_functions`
2249
2250 """
2251
2252 def get_current_parameters(self, isolate_multiinsert_groups=True):
2253 """Return a dictionary of parameters applied to the current row.
2254
2255 This method can only be used in the context of a user-defined default
2256 generation function, e.g. as described at
2257 :ref:`context_default_functions`. When invoked, a dictionary is
2258 returned which includes entries for each column/value pair that is part
2259 of the INSERT or UPDATE statement. The keys of the dictionary will be
2260 the key value of each :class:`_schema.Column`,
2261 which is usually synonymous
2262 with the name.
2263
2264 :param isolate_multiinsert_groups=True: indicates that multi-valued
2265 INSERT constructs created using :meth:`_expression.Insert.values`
2266 should be
2267 handled by returning only the subset of parameters that are local
2268 to the current column default invocation. When ``False``, the
2269 raw parameters of the statement are returned including the
2270 naming convention used in the case of multi-valued INSERT.
2271
2272 .. seealso::
2273
2274 :attr:`.DefaultExecutionContext.current_parameters`
2275
2276 :ref:`context_default_functions`
2277
2278 """
2279 try:
2280 parameters = self.current_parameters
2281 column = self.current_column
2282 except AttributeError:
2283 raise exc.InvalidRequestError(
2284 "get_current_parameters() can only be invoked in the "
2285 "context of a Python side column default function"
2286 )
2287 else:
2288 assert column is not None
2289 assert parameters is not None
2290 compile_state = cast(
2291 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2292 )
2293 assert compile_state is not None
2294 if (
2295 isolate_multiinsert_groups
2296 and dml.isinsert(compile_state)
2297 and compile_state._has_multi_parameters
2298 ):
2299 if column._is_multiparam_column:
2300 index = column.index + 1
2301 d = {column.original.key: parameters[column.key]}
2302 else:
2303 d = {column.key: parameters[column.key]}
2304 index = 0
2305 assert compile_state._dict_parameters is not None
2306 keys = compile_state._dict_parameters.keys()
2307 d.update(
2308 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2309 )
2310 return d
2311 else:
2312 return parameters
2313
2314 def get_insert_default(self, column):
2315 if column.default is None:
2316 return None
2317 else:
2318 return self._exec_default(column, column.default, column.type)
2319
2320 def get_update_default(self, column):
2321 if column.onupdate is None:
2322 return None
2323 else:
2324 return self._exec_default(column, column.onupdate, column.type)
2325
2326 def _process_execute_defaults(self):
2327 compiled = cast(SQLCompiler, self.compiled)
2328
2329 key_getter = compiled._within_exec_param_key_getter
2330
2331 sentinel_counter = 0
2332
2333 if compiled.insert_prefetch:
2334 prefetch_recs = [
2335 (
2336 c,
2337 key_getter(c),
2338 c._default_description_tuple,
2339 self.get_insert_default,
2340 )
2341 for c in compiled.insert_prefetch
2342 ]
2343 elif compiled.update_prefetch:
2344 prefetch_recs = [
2345 (
2346 c,
2347 key_getter(c),
2348 c._onupdate_description_tuple,
2349 self.get_update_default,
2350 )
2351 for c in compiled.update_prefetch
2352 ]
2353 else:
2354 prefetch_recs = []
2355
2356 for param in self.compiled_parameters:
2357 self.current_parameters = param
2358
2359 for (
2360 c,
2361 param_key,
2362 (arg, is_scalar, is_callable, is_sentinel),
2363 fallback,
2364 ) in prefetch_recs:
2365 if is_sentinel:
2366 param[param_key] = sentinel_counter
2367 sentinel_counter += 1
2368 elif is_scalar:
2369 param[param_key] = arg
2370 elif is_callable:
2371 self.current_column = c
2372 param[param_key] = arg(self)
2373 else:
2374 val = fallback(c)
2375 if val is not None:
2376 param[param_key] = val
2377
2378 del self.current_parameters
2379
2380
2381DefaultDialect.execution_ctx_cls = DefaultExecutionContext