1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _DBAPISingleExecuteParams
84 from .interfaces import _ExecuteOptions
85 from .interfaces import _MutableCoreSingleExecuteParams
86 from .interfaces import _ParamStyle
87 from .interfaces import ConnectArgsType
88 from .interfaces import DBAPIConnection
89 from .interfaces import DBAPIModule
90 from .interfaces import DBAPIType
91 from .interfaces import IsolationLevel
92 from .row import Row
93 from .url import URL
94 from ..event import _ListenerFnType
95 from ..pool import Pool
96 from ..pool import PoolProxiedConnection
97 from ..sql import Executable
98 from ..sql.compiler import Compiled
99 from ..sql.compiler import Linting
100 from ..sql.compiler import ResultColumnsEntry
101 from ..sql.dml import DMLState
102 from ..sql.dml import UpdateBase
103 from ..sql.elements import BindParameter
104 from ..sql.schema import Column
105 from ..sql.type_api import _BindProcessorType
106 from ..sql.type_api import _ResultProcessorType
107 from ..sql.type_api import TypeEngine
108
109
110# When we're handed literal SQL, ensure it's a SELECT query
111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
112
113
114(
115 CACHE_HIT,
116 CACHE_MISS,
117 CACHING_DISABLED,
118 NO_CACHE_KEY,
119 NO_DIALECT_SUPPORT,
120) = list(CacheStats)
121
122
123class DefaultDialect(Dialect):
124 """Default implementation of Dialect"""
125
126 statement_compiler = compiler.SQLCompiler
127 ddl_compiler = compiler.DDLCompiler
128 type_compiler_cls = compiler.GenericTypeCompiler
129
130 preparer = compiler.IdentifierPreparer
131 supports_alter = True
132 supports_comments = False
133 supports_constraint_comments = False
134 inline_comments = False
135 supports_statement_cache = True
136
137 div_is_floordiv = True
138
139 bind_typing = interfaces.BindTyping.NONE
140
141 include_set_input_sizes: Optional[Set[Any]] = None
142 exclude_set_input_sizes: Optional[Set[Any]] = None
143
144 # the first value we'd get for an autoincrement column.
145 default_sequence_base = 1
146
147 # most DBAPIs happy with this for execute().
148 # not cx_oracle.
149 execute_sequence_format = tuple
150
151 supports_schemas = True
152 supports_views = True
153 supports_sequences = False
154 sequences_optional = False
155 preexecute_autoincrement_sequences = False
156 supports_identity_columns = False
157 postfetch_lastrowid = True
158 favor_returning_over_lastrowid = False
159 insert_null_pk_still_autoincrements = False
160 update_returning = False
161 delete_returning = False
162 update_returning_multifrom = False
163 delete_returning_multifrom = False
164 insert_returning = False
165
166 aggregate_order_by_style = AggregateOrderByStyle.INLINE
167
168 cte_follows_insert = False
169
170 supports_native_enum = False
171 supports_native_boolean = False
172 supports_native_uuid = False
173 returns_native_bytes = False
174
175 non_native_boolean_check_constraint = True
176
177 supports_simple_order_by_label = True
178
179 tuple_in_values = False
180
181 connection_characteristics = util.immutabledict(
182 {
183 "isolation_level": characteristics.IsolationLevelCharacteristic(),
184 "logging_token": characteristics.LoggingTokenCharacteristic(),
185 }
186 )
187
188 engine_config_types: Mapping[str, Any] = util.immutabledict(
189 {
190 "pool_timeout": util.asint,
191 "echo": util.bool_or_str("debug"),
192 "echo_pool": util.bool_or_str("debug"),
193 "pool_recycle": util.asint,
194 "pool_size": util.asint,
195 "max_overflow": util.asint,
196 "future": util.asbool,
197 }
198 )
199
200 # if the NUMERIC type
201 # returns decimal.Decimal.
202 # *not* the FLOAT type however.
203 supports_native_decimal = False
204
205 name = "default"
206
207 # length at which to truncate
208 # any identifier.
209 max_identifier_length = 9999
210 _user_defined_max_identifier_length: Optional[int] = None
211
212 isolation_level: Optional[str] = None
213
214 # sub-categories of max_identifier_length.
215 # currently these accommodate for MySQL which allows alias names
216 # of 255 but DDL names only of 64.
217 max_index_name_length: Optional[int] = None
218 max_constraint_name_length: Optional[int] = None
219
220 supports_sane_rowcount = True
221 supports_sane_multi_rowcount = True
222 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
223 default_paramstyle = "named"
224
225 supports_default_values = False
226 """dialect supports INSERT... DEFAULT VALUES syntax"""
227
228 supports_default_metavalue = False
229 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
230
231 default_metavalue_token = "DEFAULT"
232 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
233 parenthesis."""
234
235 # not sure if this is a real thing but the compiler will deliver it
236 # if this is the only flag enabled.
237 supports_empty_insert = True
238 """dialect supports INSERT () VALUES ()"""
239
240 supports_multivalues_insert = False
241
242 use_insertmanyvalues: bool = False
243
244 use_insertmanyvalues_wo_returning: bool = False
245
246 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
247 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
248 )
249
250 insertmanyvalues_page_size: int = 1000
251 insertmanyvalues_max_parameters = 32700
252
253 supports_is_distinct_from = True
254
255 supports_server_side_cursors = False
256
257 server_side_cursors = False
258
259 # extra record-level locking features (#4860)
260 supports_for_update_of = False
261
262 server_version_info = None
263
264 default_schema_name: Optional[str] = None
265
266 # indicates symbol names are
267 # UPPERCASED if they are case insensitive
268 # within the database.
269 # if this is True, the methods normalize_name()
270 # and denormalize_name() must be provided.
271 requires_name_normalize = False
272
273 is_async = False
274
275 has_terminate = False
276
277 # TODO: this is not to be part of 2.0. implement rudimentary binary
278 # literals for SQLite, PostgreSQL, MySQL only within
279 # _Binary.literal_processor
280 _legacy_binary_type_literal_encoding = "utf-8"
281
282 @util.deprecated_params(
283 empty_in_strategy=(
284 "1.4",
285 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
286 "deprecated, and no longer has any effect. All IN expressions "
287 "are now rendered using "
288 'the "expanding parameter" strategy which renders a set of bound'
289 'expressions, or an "empty set" SELECT, at statement execution'
290 "time.",
291 ),
292 server_side_cursors=(
293 "1.4",
294 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
295 "is deprecated and will be removed in a future release. Please "
296 "use the "
297 ":paramref:`_engine.Connection.execution_options.stream_results` "
298 "parameter.",
299 ),
300 )
301 def __init__(
302 self,
303 paramstyle: Optional[_ParamStyle] = None,
304 isolation_level: Optional[IsolationLevel] = None,
305 dbapi: Optional[ModuleType] = None,
306 implicit_returning: Literal[True] = True,
307 supports_native_boolean: Optional[bool] = None,
308 max_identifier_length: Optional[int] = None,
309 label_length: Optional[int] = None,
310 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
311 use_insertmanyvalues: Optional[bool] = None,
312 # util.deprecated_params decorator cannot render the
313 # Linting.NO_LINTING constant
314 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
315 server_side_cursors: bool = False,
316 skip_autocommit_rollback: bool = False,
317 **kwargs: Any,
318 ):
319 if server_side_cursors:
320 if not self.supports_server_side_cursors:
321 raise exc.ArgumentError(
322 "Dialect %s does not support server side cursors" % self
323 )
324 else:
325 self.server_side_cursors = True
326
327 if getattr(self, "use_setinputsizes", False):
328 util.warn_deprecated(
329 "The dialect-level use_setinputsizes attribute is "
330 "deprecated. Please use "
331 "bind_typing = BindTyping.SETINPUTSIZES",
332 "2.0",
333 )
334 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
335
336 self.positional = False
337 self._ischema = None
338
339 self.dbapi = dbapi
340
341 self.skip_autocommit_rollback = skip_autocommit_rollback
342
343 if paramstyle is not None:
344 self.paramstyle = paramstyle
345 elif self.dbapi is not None:
346 self.paramstyle = self.dbapi.paramstyle
347 else:
348 self.paramstyle = self.default_paramstyle
349 self.positional = self.paramstyle in (
350 "qmark",
351 "format",
352 "numeric",
353 "numeric_dollar",
354 )
355 self.identifier_preparer = self.preparer(self)
356 self._on_connect_isolation_level = isolation_level
357
358 legacy_tt_callable = getattr(self, "type_compiler", None)
359 if legacy_tt_callable is not None:
360 tt_callable = cast(
361 Type[compiler.GenericTypeCompiler],
362 self.type_compiler,
363 )
364 else:
365 tt_callable = self.type_compiler_cls
366
367 self.type_compiler_instance = self.type_compiler = tt_callable(self)
368
369 if supports_native_boolean is not None:
370 self.supports_native_boolean = supports_native_boolean
371
372 self._user_defined_max_identifier_length = max_identifier_length
373 if self._user_defined_max_identifier_length:
374 self.max_identifier_length = (
375 self._user_defined_max_identifier_length
376 )
377 self.label_length = label_length
378 self.compiler_linting = compiler_linting
379
380 if use_insertmanyvalues is not None:
381 self.use_insertmanyvalues = use_insertmanyvalues
382
383 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
384 self.insertmanyvalues_page_size = insertmanyvalues_page_size
385
386 @property
387 @util.deprecated(
388 "2.0",
389 "full_returning is deprecated, please use insert_returning, "
390 "update_returning, delete_returning",
391 )
392 def full_returning(self):
393 return (
394 self.insert_returning
395 and self.update_returning
396 and self.delete_returning
397 )
398
399 @util.memoized_property
400 def insert_executemany_returning(self):
401 """Default implementation for insert_executemany_returning, if not
402 otherwise overridden by the specific dialect.
403
404 The default dialect determines "insert_executemany_returning" is
405 available if the dialect in use has opted into using the
406 "use_insertmanyvalues" feature. If they haven't opted into that, then
407 this attribute is False, unless the dialect in question overrides this
408 and provides some other implementation (such as the Oracle Database
409 dialects).
410
411 """
412 return self.insert_returning and self.use_insertmanyvalues
413
414 @util.memoized_property
415 def insert_executemany_returning_sort_by_parameter_order(self):
416 """Default implementation for
417 insert_executemany_returning_deterministic_order, if not otherwise
418 overridden by the specific dialect.
419
420 The default dialect determines "insert_executemany_returning" can have
421 deterministic order only if the dialect in use has opted into using the
422 "use_insertmanyvalues" feature, which implements deterministic ordering
423 using client side sentinel columns only by default. The
424 "insertmanyvalues" feature also features alternate forms that can
425 use server-generated PK values as "sentinels", but those are only
426 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
427 bitflag enables those alternate SQL forms, which are disabled
428 by default.
429
430 If the dialect in use hasn't opted into that, then this attribute is
431 False, unless the dialect in question overrides this and provides some
432 other implementation (such as the Oracle Database dialects).
433
434 """
435 return self.insert_returning and self.use_insertmanyvalues
436
437 update_executemany_returning = False
438 delete_executemany_returning = False
439
440 @util.memoized_property
441 def loaded_dbapi(self) -> DBAPIModule:
442 if self.dbapi is None:
443 raise exc.InvalidRequestError(
444 f"Dialect {self} does not have a Python DBAPI established "
445 "and cannot be used for actual database interaction"
446 )
447 return self.dbapi
448
449 @util.memoized_property
450 def _bind_typing_render_casts(self):
451 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
452
453 def _ensure_has_table_connection(self, arg: Connection) -> None:
454 if not isinstance(arg, Connection):
455 raise exc.ArgumentError(
456 "The argument passed to Dialect.has_table() should be a "
457 "%s, got %s. "
458 "Additionally, the Dialect.has_table() method is for "
459 "internal dialect "
460 "use only; please use "
461 "``inspect(some_engine).has_table(<tablename>>)`` "
462 "for public API use." % (Connection, type(arg))
463 )
464
465 @util.memoized_property
466 def _supports_statement_cache(self):
467 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
468 if ssc is None:
469 util.warn(
470 "Dialect %s:%s will not make use of SQL compilation caching "
471 "as it does not set the 'supports_statement_cache' attribute "
472 "to ``True``. This can have "
473 "significant performance implications including some "
474 "performance degradations in comparison to prior SQLAlchemy "
475 "versions. Dialect maintainers should seek to set this "
476 "attribute to True after appropriate development and testing "
477 "for SQLAlchemy 1.4 caching support. Alternatively, this "
478 "attribute may be set to False which will disable this "
479 "warning." % (self.name, self.driver),
480 code="cprf",
481 )
482
483 return bool(ssc)
484
485 @util.memoized_property
486 def _type_memos(self):
487 return weakref.WeakKeyDictionary()
488
489 @property
490 def dialect_description(self): # type: ignore[override]
491 return self.name + "+" + self.driver
492
493 @property
494 def supports_sane_rowcount_returning(self):
495 """True if this dialect supports sane rowcount even if RETURNING is
496 in use.
497
498 For dialects that don't support RETURNING, this is synonymous with
499 ``supports_sane_rowcount``.
500
501 """
502 return self.supports_sane_rowcount
503
504 @classmethod
505 def get_pool_class(cls, url: URL) -> Type[Pool]:
506 default: Type[pool.Pool]
507 if cls.is_async:
508 default = pool.AsyncAdaptedQueuePool
509 else:
510 default = pool.QueuePool
511
512 return getattr(cls, "poolclass", default)
513
514 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
515 return self.get_pool_class(url)
516
517 @classmethod
518 def load_provisioning(cls):
519 package = ".".join(cls.__module__.split(".")[0:-1])
520 try:
521 __import__(package + ".provision")
522 except ImportError:
523 pass
524
525 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
526 if self._on_connect_isolation_level is not None:
527
528 def builtin_connect(dbapi_conn, conn_rec):
529 self._assert_and_set_isolation_level(
530 dbapi_conn, self._on_connect_isolation_level
531 )
532
533 return builtin_connect
534 else:
535 return None
536
537 def initialize(self, connection: Connection) -> None:
538 try:
539 self.server_version_info = self._get_server_version_info(
540 connection
541 )
542 except NotImplementedError:
543 self.server_version_info = None
544 try:
545 self.default_schema_name = self._get_default_schema_name(
546 connection
547 )
548 except NotImplementedError:
549 self.default_schema_name = None
550
551 try:
552 self.default_isolation_level = self.get_default_isolation_level(
553 connection.connection.dbapi_connection
554 )
555 except NotImplementedError:
556 self.default_isolation_level = None
557
558 if not self._user_defined_max_identifier_length:
559 max_ident_length = self._check_max_identifier_length(connection)
560 if max_ident_length:
561 self.max_identifier_length = max_ident_length
562
563 if (
564 self.label_length
565 and self.label_length > self.max_identifier_length
566 ):
567 raise exc.ArgumentError(
568 "Label length of %d is greater than this dialect's"
569 " maximum identifier length of %d"
570 % (self.label_length, self.max_identifier_length)
571 )
572
573 def on_connect(self) -> Optional[Callable[[Any], None]]:
574 # inherits the docstring from interfaces.Dialect.on_connect
575 return None
576
577 def _check_max_identifier_length(self, connection):
578 """Perform a connection / server version specific check to determine
579 the max_identifier_length.
580
581 If the dialect's class level max_identifier_length should be used,
582 can return None.
583
584 """
585 return None
586
587 def get_default_isolation_level(self, dbapi_conn):
588 """Given a DBAPI connection, return its isolation level, or
589 a default isolation level if one cannot be retrieved.
590
591 May be overridden by subclasses in order to provide a
592 "fallback" isolation level for databases that cannot reliably
593 retrieve the actual isolation level.
594
595 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
596 method, propagating any exceptions raised.
597
598 """
599 return self.get_isolation_level(dbapi_conn)
600
601 def type_descriptor(self, typeobj):
602 """Provide a database-specific :class:`.TypeEngine` object, given
603 the generic object which comes from the types module.
604
605 This method looks for a dictionary called
606 ``colspecs`` as a class or instance-level variable,
607 and passes on to :func:`_types.adapt_type`.
608
609 """
610 return type_api.adapt_type(typeobj, self.colspecs)
611
612 def has_index(self, connection, table_name, index_name, schema=None, **kw):
613 if not self.has_table(connection, table_name, schema=schema, **kw):
614 return False
615 for idx in self.get_indexes(
616 connection, table_name, schema=schema, **kw
617 ):
618 if idx["name"] == index_name:
619 return True
620 else:
621 return False
622
623 def has_schema(
624 self, connection: Connection, schema_name: str, **kw: Any
625 ) -> bool:
626 return schema_name in self.get_schema_names(connection, **kw)
627
628 def validate_identifier(self, ident: str) -> None:
629 if len(ident) > self.max_identifier_length:
630 raise exc.IdentifierError(
631 "Identifier '%s' exceeds maximum length of %d characters"
632 % (ident, self.max_identifier_length)
633 )
634
635 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
636 # inherits the docstring from interfaces.Dialect.connect
637 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
638
639 def create_connect_args(self, url: URL) -> ConnectArgsType:
640 # inherits the docstring from interfaces.Dialect.create_connect_args
641 opts = url.translate_connect_args()
642 opts.update(url.query)
643 return ([], opts)
644
645 def set_engine_execution_options(
646 self, engine: Engine, opts: Mapping[str, Any]
647 ) -> None:
648 supported_names = set(self.connection_characteristics).intersection(
649 opts
650 )
651 if supported_names:
652 characteristics: Mapping[str, Any] = util.immutabledict(
653 (name, opts[name]) for name in supported_names
654 )
655
656 @event.listens_for(engine, "engine_connect")
657 def set_connection_characteristics(connection):
658 self._set_connection_characteristics(
659 connection, characteristics
660 )
661
662 def set_connection_execution_options(
663 self, connection: Connection, opts: Mapping[str, Any]
664 ) -> None:
665 supported_names = set(self.connection_characteristics).intersection(
666 opts
667 )
668 if supported_names:
669 characteristics: Mapping[str, Any] = util.immutabledict(
670 (name, opts[name]) for name in supported_names
671 )
672 self._set_connection_characteristics(connection, characteristics)
673
674 def _set_connection_characteristics(self, connection, characteristics):
675 characteristic_values = [
676 (name, self.connection_characteristics[name], value)
677 for name, value in characteristics.items()
678 ]
679
680 if connection.in_transaction():
681 trans_objs = [
682 (name, obj)
683 for name, obj, _ in characteristic_values
684 if obj.transactional
685 ]
686 if trans_objs:
687 raise exc.InvalidRequestError(
688 "This connection has already initialized a SQLAlchemy "
689 "Transaction() object via begin() or autobegin; "
690 "%s may not be altered unless rollback() or commit() "
691 "is called first."
692 % (", ".join(name for name, obj in trans_objs))
693 )
694
695 dbapi_connection = connection.connection.dbapi_connection
696 for _, characteristic, value in characteristic_values:
697 characteristic.set_connection_characteristic(
698 self, connection, dbapi_connection, value
699 )
700 connection.connection._connection_record.finalize_callback.append(
701 functools.partial(self._reset_characteristics, characteristics)
702 )
703
704 def _reset_characteristics(self, characteristics, dbapi_connection):
705 for characteristic_name in characteristics:
706 characteristic = self.connection_characteristics[
707 characteristic_name
708 ]
709 characteristic.reset_characteristic(self, dbapi_connection)
710
711 def do_begin(self, dbapi_connection):
712 pass
713
714 def do_rollback(self, dbapi_connection):
715 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
716 dbapi_connection
717 ):
718 return
719 dbapi_connection.rollback()
720
721 def do_commit(self, dbapi_connection):
722 dbapi_connection.commit()
723
724 def do_terminate(self, dbapi_connection):
725 self.do_close(dbapi_connection)
726
727 def do_close(self, dbapi_connection):
728 dbapi_connection.close()
729
730 @util.memoized_property
731 def _dialect_specific_select_one(self):
732 return str(expression.select(1).compile(dialect=self))
733
734 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
735 try:
736 return self.do_ping(dbapi_connection)
737 except self.loaded_dbapi.Error as err:
738 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
739
740 if self._has_events:
741 try:
742 Connection._handle_dbapi_exception_noconnection(
743 err,
744 self,
745 is_disconnect=is_disconnect,
746 invalidate_pool_on_disconnect=False,
747 is_pre_ping=True,
748 )
749 except exc.StatementError as new_err:
750 is_disconnect = new_err.connection_invalidated
751
752 if is_disconnect:
753 return False
754 else:
755 raise
756
757 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
758 cursor = dbapi_connection.cursor()
759 try:
760 cursor.execute(self._dialect_specific_select_one)
761 finally:
762 cursor.close()
763 return True
764
765 def create_xid(self):
766 """Create a random two-phase transaction ID.
767
768 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
769 do_commit_twophase(). Its format is unspecified.
770 """
771
772 return "_sa_%032x" % random.randint(0, 2**128)
773
774 def do_savepoint(self, connection, name):
775 connection.execute(expression.SavepointClause(name))
776
777 def do_rollback_to_savepoint(self, connection, name):
778 connection.execute(expression.RollbackToSavepointClause(name))
779
780 def do_release_savepoint(self, connection, name):
781 connection.execute(expression.ReleaseSavepointClause(name))
782
783 def _deliver_insertmanyvalues_batches(
784 self,
785 connection,
786 cursor,
787 statement,
788 parameters,
789 generic_setinputsizes,
790 context,
791 ):
792 context = cast(DefaultExecutionContext, context)
793 compiled = cast(SQLCompiler, context.compiled)
794
795 _composite_sentinel_proc: Sequence[
796 Optional[_ResultProcessorType[Any]]
797 ] = ()
798 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
799 _sentinel_proc_initialized: bool = False
800
801 compiled_parameters = context.compiled_parameters
802
803 imv = compiled._insertmanyvalues
804 assert imv is not None
805
806 is_returning: Final[bool] = bool(compiled.effective_returning)
807 batch_size = context.execution_options.get(
808 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
809 )
810
811 if compiled.schema_translate_map:
812 schema_translate_map = context.execution_options.get(
813 "schema_translate_map", {}
814 )
815 else:
816 schema_translate_map = None
817
818 if is_returning:
819 result: Optional[List[Any]] = []
820 context._insertmanyvalues_rows = result
821
822 sort_by_parameter_order = imv.sort_by_parameter_order
823
824 else:
825 sort_by_parameter_order = False
826 result = None
827
828 for imv_batch in compiled._deliver_insertmanyvalues_batches(
829 statement,
830 parameters,
831 compiled_parameters,
832 generic_setinputsizes,
833 batch_size,
834 sort_by_parameter_order,
835 schema_translate_map,
836 ):
837 yield imv_batch
838
839 if is_returning:
840
841 try:
842 rows = context.fetchall_for_returning(cursor)
843 except BaseException as be:
844 connection._handle_dbapi_exception(
845 be,
846 sql_util._long_statement(imv_batch.replaced_statement),
847 imv_batch.replaced_parameters,
848 None,
849 context,
850 is_sub_exec=True,
851 )
852
853 # I would have thought "is_returning: Final[bool]"
854 # would have assured this but pylance thinks not
855 assert result is not None
856
857 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
858 composite_sentinel = imv.num_sentinel_columns > 1
859 if imv.implicit_sentinel:
860 # for implicit sentinel, which is currently single-col
861 # integer autoincrement, do a simple sort.
862 assert not composite_sentinel
863 result.extend(
864 sorted(rows, key=operator.itemgetter(-1))
865 )
866 continue
867
868 # otherwise, create dictionaries to match up batches
869 # with parameters
870 assert imv.sentinel_param_keys
871 assert imv.sentinel_columns
872
873 _nsc = imv.num_sentinel_columns
874
875 if not _sentinel_proc_initialized:
876 if composite_sentinel:
877 _composite_sentinel_proc = [
878 col.type._cached_result_processor(
879 self, cursor_desc[1]
880 )
881 for col, cursor_desc in zip(
882 imv.sentinel_columns,
883 cursor.description[-_nsc:],
884 )
885 ]
886 else:
887 _scalar_sentinel_proc = (
888 imv.sentinel_columns[0]
889 ).type._cached_result_processor(
890 self, cursor.description[-1][1]
891 )
892 _sentinel_proc_initialized = True
893
894 rows_by_sentinel: Union[
895 Dict[Tuple[Any, ...], Any],
896 Dict[Any, Any],
897 ]
898 if composite_sentinel:
899 rows_by_sentinel = {
900 tuple(
901 (proc(val) if proc else val)
902 for val, proc in zip(
903 row[-_nsc:], _composite_sentinel_proc
904 )
905 ): row
906 for row in rows
907 }
908 elif _scalar_sentinel_proc:
909 rows_by_sentinel = {
910 _scalar_sentinel_proc(row[-1]): row for row in rows
911 }
912 else:
913 rows_by_sentinel = {row[-1]: row for row in rows}
914
915 if len(rows_by_sentinel) != len(imv_batch.batch):
916 # see test_insert_exec.py::
917 # IMVSentinelTest::test_sentinel_incorrect_rowcount
918 # for coverage / demonstration
919 raise exc.InvalidRequestError(
920 f"Sentinel-keyed result set did not produce "
921 f"correct number of rows {len(imv_batch.batch)}; "
922 "produced "
923 f"{len(rows_by_sentinel)}. Please ensure the "
924 "sentinel column is fully unique and populated in "
925 "all cases."
926 )
927
928 try:
929 ordered_rows = [
930 rows_by_sentinel[sentinel_keys]
931 for sentinel_keys in imv_batch.sentinel_values
932 ]
933 except KeyError as ke:
934 # see test_insert_exec.py::
935 # IMVSentinelTest::test_sentinel_cant_match_keys
936 # for coverage / demonstration
937 raise exc.InvalidRequestError(
938 f"Can't match sentinel values in result set to "
939 f"parameter sets; key {ke.args[0]!r} was not "
940 "found. "
941 "There may be a mismatch between the datatype "
942 "passed to the DBAPI driver vs. that which it "
943 "returns in a result row. Ensure the given "
944 "Python value matches the expected result type "
945 "*exactly*, taking care to not rely upon implicit "
946 "conversions which may occur such as when using "
947 "strings in place of UUID or integer values, etc. "
948 ) from ke
949
950 result.extend(ordered_rows)
951
952 else:
953 result.extend(rows)
954
955 def do_executemany(self, cursor, statement, parameters, context=None):
956 cursor.executemany(statement, parameters)
957
958 def do_execute(self, cursor, statement, parameters, context=None):
959 cursor.execute(statement, parameters)
960
961 def do_execute_no_params(self, cursor, statement, context=None):
962 cursor.execute(statement)
963
964 def is_disconnect(
965 self,
966 e: DBAPIModule.Error,
967 connection: Union[
968 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
969 ],
970 cursor: Optional[interfaces.DBAPICursor],
971 ) -> bool:
972 return False
973
974 @util.memoized_instancemethod
975 def _gen_allowed_isolation_levels(self, dbapi_conn):
976 try:
977 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
978 except NotImplementedError:
979 return None
980 else:
981 normalized_levels = [
982 level.replace("_", " ").upper() for level in raw_levels
983 ]
984 if raw_levels != normalized_levels:
985 raise ValueError(
986 f"Dialect {self.name!r} get_isolation_level_values() "
987 f"method should return names as UPPERCASE using spaces, "
988 f"not underscores; got "
989 f"{sorted(set(raw_levels).difference(normalized_levels))}"
990 )
991 return tuple(normalized_levels)
992
993 def _assert_and_set_isolation_level(self, dbapi_conn, level):
994 level = level.replace("_", " ").upper()
995
996 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
997 dbapi_conn
998 )
999 if (
1000 _allowed_isolation_levels
1001 and level not in _allowed_isolation_levels
1002 ):
1003 raise exc.ArgumentError(
1004 f"Invalid value {level!r} for isolation_level. "
1005 f"Valid isolation levels for {self.name!r} are "
1006 f"{', '.join(_allowed_isolation_levels)}"
1007 )
1008
1009 self.set_isolation_level(dbapi_conn, level)
1010
1011 def reset_isolation_level(self, dbapi_conn):
1012 if self._on_connect_isolation_level is not None:
1013 assert (
1014 self._on_connect_isolation_level == "AUTOCOMMIT"
1015 or self._on_connect_isolation_level
1016 == self.default_isolation_level
1017 )
1018 self._assert_and_set_isolation_level(
1019 dbapi_conn, self._on_connect_isolation_level
1020 )
1021 else:
1022 assert self.default_isolation_level is not None
1023 self._assert_and_set_isolation_level(
1024 dbapi_conn,
1025 self.default_isolation_level,
1026 )
1027
1028 def normalize_name(self, name):
1029 if name is None:
1030 return None
1031
1032 name_lower = name.lower()
1033 name_upper = name.upper()
1034
1035 if name_upper == name_lower:
1036 # name has no upper/lower conversion, e.g. non-european characters.
1037 # return unchanged
1038 return name
1039 elif name_upper == name and not (
1040 self.identifier_preparer._requires_quotes
1041 )(name_lower):
1042 # name is all uppercase and doesn't require quoting; normalize
1043 # to all lower case
1044 return name_lower
1045 elif name_lower == name:
1046 # name is all lower case, which if denormalized means we need to
1047 # force quoting on it
1048 return quoted_name(name, quote=True)
1049 else:
1050 # name is mixed case, means it will be quoted in SQL when used
1051 # later, no normalizes
1052 return name
1053
1054 def denormalize_name(self, name):
1055 if name is None:
1056 return None
1057
1058 name_lower = name.lower()
1059 name_upper = name.upper()
1060
1061 if name_upper == name_lower:
1062 # name has no upper/lower conversion, e.g. non-european characters.
1063 # return unchanged
1064 return name
1065 elif name_lower == name and not (
1066 self.identifier_preparer._requires_quotes
1067 )(name_lower):
1068 name = name_upper
1069 return name
1070
1071 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1072 return connection
1073
1074 def _overrides_default(self, method):
1075 return (
1076 getattr(type(self), method).__code__
1077 is not getattr(DefaultDialect, method).__code__
1078 )
1079
1080 def _default_multi_reflect(
1081 self,
1082 single_tbl_method,
1083 connection,
1084 kind,
1085 schema,
1086 filter_names,
1087 scope,
1088 **kw,
1089 ):
1090 names_fns = []
1091 temp_names_fns = []
1092 if ObjectKind.TABLE in kind:
1093 names_fns.append(self.get_table_names)
1094 temp_names_fns.append(self.get_temp_table_names)
1095 if ObjectKind.VIEW in kind:
1096 names_fns.append(self.get_view_names)
1097 temp_names_fns.append(self.get_temp_view_names)
1098 if ObjectKind.MATERIALIZED_VIEW in kind:
1099 names_fns.append(self.get_materialized_view_names)
1100 # no temp materialized view at the moment
1101 # temp_names_fns.append(self.get_temp_materialized_view_names)
1102
1103 unreflectable = kw.pop("unreflectable", {})
1104
1105 if (
1106 filter_names
1107 and scope is ObjectScope.ANY
1108 and kind is ObjectKind.ANY
1109 ):
1110 # if names are given and no qualification on type of table
1111 # (i.e. the Table(..., autoload) case), take the names as given,
1112 # don't run names queries. If a table does not exit
1113 # NoSuchTableError is raised and it's skipped
1114
1115 # this also suits the case for mssql where we can reflect
1116 # individual temp tables but there's no temp_names_fn
1117 names = filter_names
1118 else:
1119 names = []
1120 name_kw = {"schema": schema, **kw}
1121 fns = []
1122 if ObjectScope.DEFAULT in scope:
1123 fns.extend(names_fns)
1124 if ObjectScope.TEMPORARY in scope:
1125 fns.extend(temp_names_fns)
1126
1127 for fn in fns:
1128 try:
1129 names.extend(fn(connection, **name_kw))
1130 except NotImplementedError:
1131 pass
1132
1133 if filter_names:
1134 filter_names = set(filter_names)
1135
1136 # iterate over all the tables/views and call the single table method
1137 for table in names:
1138 if not filter_names or table in filter_names:
1139 key = (schema, table)
1140 try:
1141 yield (
1142 key,
1143 single_tbl_method(
1144 connection, table, schema=schema, **kw
1145 ),
1146 )
1147 except exc.UnreflectableTableError as err:
1148 if key not in unreflectable:
1149 unreflectable[key] = err
1150 except exc.NoSuchTableError:
1151 pass
1152
1153 def get_multi_table_options(self, connection, **kw):
1154 return self._default_multi_reflect(
1155 self.get_table_options, connection, **kw
1156 )
1157
1158 def get_multi_columns(self, connection, **kw):
1159 return self._default_multi_reflect(self.get_columns, connection, **kw)
1160
1161 def get_multi_pk_constraint(self, connection, **kw):
1162 return self._default_multi_reflect(
1163 self.get_pk_constraint, connection, **kw
1164 )
1165
1166 def get_multi_foreign_keys(self, connection, **kw):
1167 return self._default_multi_reflect(
1168 self.get_foreign_keys, connection, **kw
1169 )
1170
1171 def get_multi_indexes(self, connection, **kw):
1172 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1173
1174 def get_multi_unique_constraints(self, connection, **kw):
1175 return self._default_multi_reflect(
1176 self.get_unique_constraints, connection, **kw
1177 )
1178
1179 def get_multi_check_constraints(self, connection, **kw):
1180 return self._default_multi_reflect(
1181 self.get_check_constraints, connection, **kw
1182 )
1183
1184 def get_multi_table_comment(self, connection, **kw):
1185 return self._default_multi_reflect(
1186 self.get_table_comment, connection, **kw
1187 )
1188
1189
1190class StrCompileDialect(DefaultDialect):
1191 statement_compiler = compiler.StrSQLCompiler
1192 ddl_compiler = compiler.DDLCompiler
1193 type_compiler_cls = compiler.StrSQLTypeCompiler
1194 preparer = compiler.IdentifierPreparer
1195
1196 insert_returning = True
1197 update_returning = True
1198 delete_returning = True
1199
1200 supports_statement_cache = True
1201
1202 supports_identity_columns = True
1203
1204 supports_sequences = True
1205 sequences_optional = True
1206 preexecute_autoincrement_sequences = False
1207
1208 supports_native_boolean = True
1209
1210 supports_multivalues_insert = True
1211 supports_simple_order_by_label = True
1212
1213
1214class DefaultExecutionContext(ExecutionContext):
1215 isinsert = False
1216 isupdate = False
1217 isdelete = False
1218 is_crud = False
1219 is_text = False
1220 isddl = False
1221
1222 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1223
1224 compiled: Optional[Compiled] = None
1225 result_column_struct: Optional[
1226 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1227 ] = None
1228 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1229
1230 execution_options: _ExecuteOptions = util.EMPTY_DICT
1231
1232 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1233
1234 invoked_statement: Optional[Executable] = None
1235
1236 _is_implicit_returning = False
1237 _is_explicit_returning = False
1238 _is_supplemental_returning = False
1239 _is_server_side = False
1240
1241 _soft_closed = False
1242
1243 _rowcount: Optional[int] = None
1244
1245 # a hook for SQLite's translation of
1246 # result column names
1247 # NOTE: pyhive is using this hook, can't remove it :(
1248 _translate_colname: Optional[
1249 Callable[[str], Tuple[str, Optional[str]]]
1250 ] = None
1251
1252 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1253 """used by set_input_sizes().
1254
1255 This collection comes from ``ExpandedState.parameter_expansion``.
1256
1257 """
1258
1259 cache_hit = NO_CACHE_KEY
1260
1261 root_connection: Connection
1262 _dbapi_connection: PoolProxiedConnection
1263 dialect: Dialect
1264 unicode_statement: str
1265 cursor: DBAPICursor
1266 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1267 parameters: _DBAPIMultiExecuteParams
1268 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1269
1270 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1271
1272 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1273 _num_sentinel_cols: int = 0
1274
1275 @classmethod
1276 def _init_ddl(
1277 cls,
1278 dialect: Dialect,
1279 connection: Connection,
1280 dbapi_connection: PoolProxiedConnection,
1281 execution_options: _ExecuteOptions,
1282 compiled_ddl: DDLCompiler,
1283 ) -> ExecutionContext:
1284 """Initialize execution context for an ExecutableDDLElement
1285 construct."""
1286
1287 self = cls.__new__(cls)
1288 self.root_connection = connection
1289 self._dbapi_connection = dbapi_connection
1290 self.dialect = connection.dialect
1291
1292 self.compiled = compiled = compiled_ddl
1293 self.isddl = True
1294
1295 self.execution_options = execution_options
1296
1297 self.unicode_statement = str(compiled)
1298 if compiled.schema_translate_map:
1299 schema_translate_map = self.execution_options.get(
1300 "schema_translate_map", {}
1301 )
1302
1303 rst = compiled.preparer._render_schema_translates
1304 self.unicode_statement = rst(
1305 self.unicode_statement, schema_translate_map
1306 )
1307
1308 self.statement = self.unicode_statement
1309
1310 self.cursor = self.create_cursor()
1311 self.compiled_parameters = []
1312
1313 if dialect.positional:
1314 self.parameters = [dialect.execute_sequence_format()]
1315 else:
1316 self.parameters = [self._empty_dict_params]
1317
1318 return self
1319
1320 @classmethod
1321 def _init_compiled(
1322 cls,
1323 dialect: Dialect,
1324 connection: Connection,
1325 dbapi_connection: PoolProxiedConnection,
1326 execution_options: _ExecuteOptions,
1327 compiled: SQLCompiler,
1328 parameters: _CoreMultiExecuteParams,
1329 invoked_statement: Executable,
1330 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1331 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1332 ) -> ExecutionContext:
1333 """Initialize execution context for a Compiled construct."""
1334
1335 self = cls.__new__(cls)
1336 self.root_connection = connection
1337 self._dbapi_connection = dbapi_connection
1338 self.dialect = connection.dialect
1339 self.extracted_parameters = extracted_parameters
1340 self.invoked_statement = invoked_statement
1341 self.compiled = compiled
1342 self.cache_hit = cache_hit
1343
1344 self.execution_options = execution_options
1345
1346 self.result_column_struct = (
1347 compiled._result_columns,
1348 compiled._ordered_columns,
1349 compiled._textual_ordered_columns,
1350 compiled._ad_hoc_textual,
1351 compiled._loose_column_name_matching,
1352 )
1353
1354 self.isinsert = ii = compiled.isinsert
1355 self.isupdate = iu = compiled.isupdate
1356 self.isdelete = id_ = compiled.isdelete
1357 self.is_text = compiled.isplaintext
1358
1359 if ii or iu or id_:
1360 dml_statement = compiled.compile_state.statement # type: ignore
1361 if TYPE_CHECKING:
1362 assert isinstance(dml_statement, UpdateBase)
1363 self.is_crud = True
1364 self._is_explicit_returning = ier = bool(dml_statement._returning)
1365 self._is_implicit_returning = iir = bool(
1366 compiled.implicit_returning
1367 )
1368 if iir and dml_statement._supplemental_returning:
1369 self._is_supplemental_returning = True
1370
1371 # dont mix implicit and explicit returning
1372 assert not (iir and ier)
1373
1374 if (ier or iir) and compiled.for_executemany:
1375 if ii and not self.dialect.insert_executemany_returning:
1376 raise exc.InvalidRequestError(
1377 f"Dialect {self.dialect.dialect_description} with "
1378 f"current server capabilities does not support "
1379 "INSERT..RETURNING when executemany is used"
1380 )
1381 elif (
1382 ii
1383 and dml_statement._sort_by_parameter_order
1384 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1385 ):
1386 raise exc.InvalidRequestError(
1387 f"Dialect {self.dialect.dialect_description} with "
1388 f"current server capabilities does not support "
1389 "INSERT..RETURNING with deterministic row ordering "
1390 "when executemany is used"
1391 )
1392 elif (
1393 ii
1394 and self.dialect.use_insertmanyvalues
1395 and not compiled._insertmanyvalues
1396 ):
1397 raise exc.InvalidRequestError(
1398 'Statement does not have "insertmanyvalues" '
1399 "enabled, can't use INSERT..RETURNING with "
1400 "executemany in this case."
1401 )
1402 elif iu and not self.dialect.update_executemany_returning:
1403 raise exc.InvalidRequestError(
1404 f"Dialect {self.dialect.dialect_description} with "
1405 f"current server capabilities does not support "
1406 "UPDATE..RETURNING when executemany is used"
1407 )
1408 elif id_ and not self.dialect.delete_executemany_returning:
1409 raise exc.InvalidRequestError(
1410 f"Dialect {self.dialect.dialect_description} with "
1411 f"current server capabilities does not support "
1412 "DELETE..RETURNING when executemany is used"
1413 )
1414
1415 if not parameters:
1416 self.compiled_parameters = [
1417 compiled.construct_params(
1418 extracted_parameters=extracted_parameters,
1419 escape_names=False,
1420 )
1421 ]
1422 else:
1423 self.compiled_parameters = [
1424 compiled.construct_params(
1425 m,
1426 escape_names=False,
1427 _group_number=grp,
1428 extracted_parameters=extracted_parameters,
1429 )
1430 for grp, m in enumerate(parameters)
1431 ]
1432
1433 if len(parameters) > 1:
1434 if self.isinsert and compiled._insertmanyvalues:
1435 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1436
1437 imv = compiled._insertmanyvalues
1438 if imv.sentinel_columns is not None:
1439 self._num_sentinel_cols = imv.num_sentinel_columns
1440 else:
1441 self.execute_style = ExecuteStyle.EXECUTEMANY
1442
1443 self.unicode_statement = compiled.string
1444
1445 self.cursor = self.create_cursor()
1446
1447 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1448 self._process_execute_defaults()
1449
1450 processors = compiled._bind_processors
1451
1452 flattened_processors: Mapping[
1453 str, _BindProcessorType[Any]
1454 ] = processors # type: ignore[assignment]
1455
1456 if compiled.literal_execute_params or compiled.post_compile_params:
1457 if self.executemany:
1458 raise exc.InvalidRequestError(
1459 "'literal_execute' or 'expanding' parameters can't be "
1460 "used with executemany()"
1461 )
1462
1463 expanded_state = compiled._process_parameters_for_postcompile(
1464 self.compiled_parameters[0]
1465 )
1466
1467 # re-assign self.unicode_statement
1468 self.unicode_statement = expanded_state.statement
1469
1470 self._expanded_parameters = expanded_state.parameter_expansion
1471
1472 flattened_processors = dict(processors) # type: ignore
1473 flattened_processors.update(expanded_state.processors)
1474 positiontup = expanded_state.positiontup
1475 elif compiled.positional:
1476 positiontup = self.compiled.positiontup
1477 else:
1478 positiontup = None
1479
1480 if compiled.schema_translate_map:
1481 schema_translate_map = self.execution_options.get(
1482 "schema_translate_map", {}
1483 )
1484 rst = compiled.preparer._render_schema_translates
1485 self.unicode_statement = rst(
1486 self.unicode_statement, schema_translate_map
1487 )
1488
1489 # final self.unicode_statement is now assigned, encode if needed
1490 # by dialect
1491 self.statement = self.unicode_statement
1492
1493 # Convert the dictionary of bind parameter values
1494 # into a dict or list to be sent to the DBAPI's
1495 # execute() or executemany() method.
1496
1497 if compiled.positional:
1498 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1499 assert positiontup is not None
1500 for compiled_params in self.compiled_parameters:
1501 l_param: List[Any] = [
1502 (
1503 flattened_processors[key](compiled_params[key])
1504 if key in flattened_processors
1505 else compiled_params[key]
1506 )
1507 for key in positiontup
1508 ]
1509 core_positional_parameters.append(
1510 dialect.execute_sequence_format(l_param)
1511 )
1512
1513 self.parameters = core_positional_parameters
1514 else:
1515 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1516 escaped_names = compiled.escaped_bind_names
1517
1518 # note that currently, "expanded" parameters will be present
1519 # in self.compiled_parameters in their quoted form. This is
1520 # slightly inconsistent with the approach taken as of
1521 # #8056 where self.compiled_parameters is meant to contain unquoted
1522 # param names.
1523 d_param: Dict[str, Any]
1524 for compiled_params in self.compiled_parameters:
1525 if escaped_names:
1526 d_param = {
1527 escaped_names.get(key, key): (
1528 flattened_processors[key](compiled_params[key])
1529 if key in flattened_processors
1530 else compiled_params[key]
1531 )
1532 for key in compiled_params
1533 }
1534 else:
1535 d_param = {
1536 key: (
1537 flattened_processors[key](compiled_params[key])
1538 if key in flattened_processors
1539 else compiled_params[key]
1540 )
1541 for key in compiled_params
1542 }
1543
1544 core_dict_parameters.append(d_param)
1545
1546 self.parameters = core_dict_parameters
1547
1548 return self
1549
1550 @classmethod
1551 def _init_statement(
1552 cls,
1553 dialect: Dialect,
1554 connection: Connection,
1555 dbapi_connection: PoolProxiedConnection,
1556 execution_options: _ExecuteOptions,
1557 statement: str,
1558 parameters: _DBAPIMultiExecuteParams,
1559 ) -> ExecutionContext:
1560 """Initialize execution context for a string SQL statement."""
1561
1562 self = cls.__new__(cls)
1563 self.root_connection = connection
1564 self._dbapi_connection = dbapi_connection
1565 self.dialect = connection.dialect
1566 self.is_text = True
1567
1568 self.execution_options = execution_options
1569
1570 if not parameters:
1571 if self.dialect.positional:
1572 self.parameters = [dialect.execute_sequence_format()]
1573 else:
1574 self.parameters = [self._empty_dict_params]
1575 elif isinstance(parameters[0], dialect.execute_sequence_format):
1576 self.parameters = parameters
1577 elif isinstance(parameters[0], dict):
1578 self.parameters = parameters
1579 else:
1580 self.parameters = [
1581 dialect.execute_sequence_format(p) for p in parameters
1582 ]
1583
1584 if len(parameters) > 1:
1585 self.execute_style = ExecuteStyle.EXECUTEMANY
1586
1587 self.statement = self.unicode_statement = statement
1588
1589 self.cursor = self.create_cursor()
1590 return self
1591
1592 @classmethod
1593 def _init_default(
1594 cls,
1595 dialect: Dialect,
1596 connection: Connection,
1597 dbapi_connection: PoolProxiedConnection,
1598 execution_options: _ExecuteOptions,
1599 ) -> ExecutionContext:
1600 """Initialize execution context for a ColumnDefault construct."""
1601
1602 self = cls.__new__(cls)
1603 self.root_connection = connection
1604 self._dbapi_connection = dbapi_connection
1605 self.dialect = connection.dialect
1606
1607 self.execution_options = execution_options
1608
1609 self.cursor = self.create_cursor()
1610 return self
1611
1612 def _get_cache_stats(self) -> str:
1613 if self.compiled is None:
1614 return "raw sql"
1615
1616 now = perf_counter()
1617
1618 ch = self.cache_hit
1619
1620 gen_time = self.compiled._gen_time
1621 assert gen_time is not None
1622
1623 if ch is NO_CACHE_KEY:
1624 return "no key %.5fs" % (now - gen_time,)
1625 elif ch is CACHE_HIT:
1626 return "cached since %.4gs ago" % (now - gen_time,)
1627 elif ch is CACHE_MISS:
1628 return "generated in %.5fs" % (now - gen_time,)
1629 elif ch is CACHING_DISABLED:
1630 if "_cache_disable_reason" in self.execution_options:
1631 return "caching disabled (%s) %.5fs " % (
1632 self.execution_options["_cache_disable_reason"],
1633 now - gen_time,
1634 )
1635 else:
1636 return "caching disabled %.5fs" % (now - gen_time,)
1637 elif ch is NO_DIALECT_SUPPORT:
1638 return "dialect %s+%s does not support caching %.5fs" % (
1639 self.dialect.name,
1640 self.dialect.driver,
1641 now - gen_time,
1642 )
1643 else:
1644 return "unknown"
1645
1646 @property
1647 def executemany(self): # type: ignore[override]
1648 return self.execute_style in (
1649 ExecuteStyle.EXECUTEMANY,
1650 ExecuteStyle.INSERTMANYVALUES,
1651 )
1652
1653 @util.memoized_property
1654 def identifier_preparer(self):
1655 if self.compiled:
1656 return self.compiled.preparer
1657 elif "schema_translate_map" in self.execution_options:
1658 return self.dialect.identifier_preparer._with_schema_translate(
1659 self.execution_options["schema_translate_map"]
1660 )
1661 else:
1662 return self.dialect.identifier_preparer
1663
1664 @util.memoized_property
1665 def engine(self):
1666 return self.root_connection.engine
1667
1668 @util.memoized_property
1669 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1670 if TYPE_CHECKING:
1671 assert isinstance(self.compiled, SQLCompiler)
1672 return self.compiled.postfetch
1673
1674 @util.memoized_property
1675 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1676 if TYPE_CHECKING:
1677 assert isinstance(self.compiled, SQLCompiler)
1678 if self.isinsert:
1679 return self.compiled.insert_prefetch
1680 elif self.isupdate:
1681 return self.compiled.update_prefetch
1682 else:
1683 return ()
1684
1685 @util.memoized_property
1686 def no_parameters(self):
1687 return self.execution_options.get("no_parameters", False)
1688
1689 def _execute_scalar(
1690 self,
1691 stmt: str,
1692 type_: Optional[TypeEngine[Any]],
1693 parameters: Optional[_DBAPISingleExecuteParams] = None,
1694 ) -> Any:
1695 """Execute a string statement on the current cursor, returning a
1696 scalar result.
1697
1698 Used to fire off sequences, default phrases, and "select lastrowid"
1699 types of statements individually or in the context of a parent INSERT
1700 or UPDATE statement.
1701
1702 """
1703
1704 conn = self.root_connection
1705
1706 if "schema_translate_map" in self.execution_options:
1707 schema_translate_map = self.execution_options.get(
1708 "schema_translate_map", {}
1709 )
1710
1711 rst = self.identifier_preparer._render_schema_translates
1712 stmt = rst(stmt, schema_translate_map)
1713
1714 if not parameters:
1715 if self.dialect.positional:
1716 parameters = self.dialect.execute_sequence_format()
1717 else:
1718 parameters = {}
1719
1720 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1721 row = self.cursor.fetchone()
1722 if row is not None:
1723 r = row[0]
1724 else:
1725 r = None
1726 if type_ is not None:
1727 # apply type post processors to the result
1728 proc = type_._cached_result_processor(
1729 self.dialect, self.cursor.description[0][1]
1730 )
1731 if proc:
1732 return proc(r)
1733 return r
1734
1735 @util.memoized_property
1736 def connection(self):
1737 return self.root_connection
1738
1739 def _use_server_side_cursor(self):
1740 if not self.dialect.supports_server_side_cursors:
1741 return False
1742
1743 if self.dialect.server_side_cursors:
1744 # this is deprecated
1745 use_server_side = self.execution_options.get(
1746 "stream_results", True
1747 ) and (
1748 self.compiled
1749 and isinstance(self.compiled.statement, expression.Selectable)
1750 or (
1751 (
1752 not self.compiled
1753 or isinstance(
1754 self.compiled.statement, expression.TextClause
1755 )
1756 )
1757 and self.unicode_statement
1758 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1759 )
1760 )
1761 else:
1762 use_server_side = self.execution_options.get(
1763 "stream_results", False
1764 )
1765
1766 return use_server_side
1767
1768 def create_cursor(self) -> DBAPICursor:
1769 if (
1770 # inlining initial preference checks for SS cursors
1771 self.dialect.supports_server_side_cursors
1772 and (
1773 self.execution_options.get("stream_results", False)
1774 or (
1775 self.dialect.server_side_cursors
1776 and self._use_server_side_cursor()
1777 )
1778 )
1779 ):
1780 self._is_server_side = True
1781 return self.create_server_side_cursor()
1782 else:
1783 self._is_server_side = False
1784 return self.create_default_cursor()
1785
1786 def fetchall_for_returning(self, cursor):
1787 return cursor.fetchall()
1788
1789 def create_default_cursor(self) -> DBAPICursor:
1790 return self._dbapi_connection.cursor()
1791
1792 def create_server_side_cursor(self) -> DBAPICursor:
1793 raise NotImplementedError()
1794
1795 def pre_exec(self):
1796 pass
1797
1798 def get_out_parameter_values(self, names):
1799 raise NotImplementedError(
1800 "This dialect does not support OUT parameters"
1801 )
1802
1803 def post_exec(self):
1804 pass
1805
1806 def get_result_processor(
1807 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1808 ) -> Optional[_ResultProcessorType[Any]]:
1809 """Return a 'result processor' for a given type as present in
1810 cursor.description.
1811
1812 This has a default implementation that dialects can override
1813 for context-sensitive result type handling.
1814
1815 """
1816 return type_._cached_result_processor(self.dialect, coltype)
1817
1818 def get_lastrowid(self) -> int:
1819 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1820
1821 This may involve calling special cursor functions, issuing a new SELECT
1822 on the cursor (or a new one), or returning a stored value that was
1823 calculated within post_exec().
1824
1825 This function will only be called for dialects which support "implicit"
1826 primary key generation, keep preexecute_autoincrement_sequences set to
1827 False, and when no explicit id value was bound to the statement.
1828
1829 The function is called once for an INSERT statement that would need to
1830 return the last inserted primary key for those dialects that make use
1831 of the lastrowid concept. In these cases, it is called directly after
1832 :meth:`.ExecutionContext.post_exec`.
1833
1834 """
1835 return self.cursor.lastrowid
1836
1837 def handle_dbapi_exception(self, e):
1838 pass
1839
1840 @util.non_memoized_property
1841 def rowcount(self) -> int:
1842 if self._rowcount is not None:
1843 return self._rowcount
1844 else:
1845 return self.cursor.rowcount
1846
1847 @property
1848 def _has_rowcount(self):
1849 return self._rowcount is not None
1850
1851 def supports_sane_rowcount(self):
1852 return self.dialect.supports_sane_rowcount
1853
1854 def supports_sane_multi_rowcount(self):
1855 return self.dialect.supports_sane_multi_rowcount
1856
1857 def _setup_result_proxy(self):
1858 exec_opt = self.execution_options
1859
1860 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1861 self._rowcount = self.cursor.rowcount
1862
1863 yp: Optional[Union[int, bool]]
1864 if self.is_crud or self.is_text:
1865 result = self._setup_dml_or_text_result()
1866 yp = False
1867 else:
1868 yp = exec_opt.get("yield_per", None)
1869 sr = self._is_server_side or exec_opt.get("stream_results", False)
1870 strategy = self.cursor_fetch_strategy
1871 if sr and strategy is _cursor._DEFAULT_FETCH:
1872 strategy = _cursor.BufferedRowCursorFetchStrategy(
1873 self.cursor, self.execution_options
1874 )
1875 cursor_description: _DBAPICursorDescription = (
1876 strategy.alternate_cursor_description
1877 or self.cursor.description
1878 )
1879 if cursor_description is None:
1880 strategy = _cursor._NO_CURSOR_DQL
1881
1882 result = _cursor.CursorResult(self, strategy, cursor_description)
1883
1884 compiled = self.compiled
1885
1886 if (
1887 compiled
1888 and not self.isddl
1889 and cast(SQLCompiler, compiled).has_out_parameters
1890 ):
1891 self._setup_out_parameters(result)
1892
1893 self._soft_closed = result._soft_closed
1894
1895 if yp:
1896 result = result.yield_per(yp)
1897
1898 return result
1899
1900 def _setup_out_parameters(self, result):
1901 compiled = cast(SQLCompiler, self.compiled)
1902
1903 out_bindparams = [
1904 (param, name)
1905 for param, name in compiled.bind_names.items()
1906 if param.isoutparam
1907 ]
1908 out_parameters = {}
1909
1910 for bindparam, raw_value in zip(
1911 [param for param, name in out_bindparams],
1912 self.get_out_parameter_values(
1913 [name for param, name in out_bindparams]
1914 ),
1915 ):
1916 type_ = bindparam.type
1917 impl_type = type_.dialect_impl(self.dialect)
1918 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1919 result_processor = impl_type.result_processor(
1920 self.dialect, dbapi_type
1921 )
1922 if result_processor is not None:
1923 raw_value = result_processor(raw_value)
1924 out_parameters[bindparam.key] = raw_value
1925
1926 result.out_parameters = out_parameters
1927
1928 def _setup_dml_or_text_result(self):
1929 compiled = cast(SQLCompiler, self.compiled)
1930
1931 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1932
1933 if self.isinsert:
1934 if (
1935 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1936 and compiled.effective_returning
1937 ):
1938 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1939 self.cursor,
1940 initial_buffer=self._insertmanyvalues_rows,
1941 # maintain alt cursor description if set by the
1942 # dialect, e.g. mssql preserves it
1943 alternate_description=(
1944 strategy.alternate_cursor_description
1945 ),
1946 )
1947
1948 if compiled.postfetch_lastrowid:
1949 self.inserted_primary_key_rows = (
1950 self._setup_ins_pk_from_lastrowid()
1951 )
1952 # else if not self._is_implicit_returning,
1953 # the default inserted_primary_key_rows accessor will
1954 # return an "empty" primary key collection when accessed.
1955
1956 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1957 strategy = _cursor.BufferedRowCursorFetchStrategy(
1958 self.cursor, self.execution_options
1959 )
1960
1961 if strategy is _cursor._NO_CURSOR_DML:
1962 cursor_description = None
1963 else:
1964 cursor_description = (
1965 strategy.alternate_cursor_description
1966 or self.cursor.description
1967 )
1968
1969 if cursor_description is None:
1970 strategy = _cursor._NO_CURSOR_DML
1971 elif self._num_sentinel_cols:
1972 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1973 # the sentinel columns are handled in CursorResult._init_metadata
1974 # using essentially _reduce
1975
1976 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1977 self, strategy, cursor_description
1978 )
1979
1980 if self.isinsert:
1981 if self._is_implicit_returning:
1982 rows = result.all()
1983
1984 self.returned_default_rows = rows
1985
1986 self.inserted_primary_key_rows = (
1987 self._setup_ins_pk_from_implicit_returning(result, rows)
1988 )
1989
1990 # test that it has a cursor metadata that is accurate. the
1991 # first row will have been fetched and current assumptions
1992 # are that the result has only one row, until executemany()
1993 # support is added here.
1994 assert result._metadata.returns_rows
1995
1996 # Insert statement has both return_defaults() and
1997 # returning(). rewind the result on the list of rows
1998 # we just used.
1999 if self._is_supplemental_returning:
2000 result._rewind(rows)
2001 else:
2002 result._soft_close()
2003 elif not self._is_explicit_returning:
2004 result._soft_close()
2005
2006 # we assume here the result does not return any rows.
2007 # *usually*, this will be true. However, some dialects
2008 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2009 # function so this is not necessarily true.
2010 # assert not result.returns_rows
2011
2012 elif self._is_implicit_returning:
2013 rows = result.all()
2014
2015 if rows:
2016 self.returned_default_rows = rows
2017 self._rowcount = len(rows)
2018
2019 if self._is_supplemental_returning:
2020 result._rewind(rows)
2021 else:
2022 result._soft_close()
2023
2024 # test that it has a cursor metadata that is accurate.
2025 # the rows have all been fetched however.
2026 assert result._metadata.returns_rows
2027
2028 elif not result._metadata.returns_rows:
2029 # no results, get rowcount
2030 # (which requires open cursor on some drivers)
2031 if self._rowcount is None:
2032 self._rowcount = self.cursor.rowcount
2033 result._soft_close()
2034 elif self.isupdate or self.isdelete:
2035 if self._rowcount is None:
2036 self._rowcount = self.cursor.rowcount
2037 return result
2038
2039 @util.memoized_property
2040 def inserted_primary_key_rows(self):
2041 # if no specific "get primary key" strategy was set up
2042 # during execution, return a "default" primary key based
2043 # on what's in the compiled_parameters and nothing else.
2044 return self._setup_ins_pk_from_empty()
2045
2046 def _setup_ins_pk_from_lastrowid(self):
2047 getter = cast(
2048 SQLCompiler, self.compiled
2049 )._inserted_primary_key_from_lastrowid_getter
2050 lastrowid = self.get_lastrowid()
2051 return [getter(lastrowid, self.compiled_parameters[0])]
2052
2053 def _setup_ins_pk_from_empty(self):
2054 getter = cast(
2055 SQLCompiler, self.compiled
2056 )._inserted_primary_key_from_lastrowid_getter
2057 return [getter(None, param) for param in self.compiled_parameters]
2058
2059 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2060 if not rows:
2061 return []
2062
2063 getter = cast(
2064 SQLCompiler, self.compiled
2065 )._inserted_primary_key_from_returning_getter
2066 compiled_params = self.compiled_parameters
2067
2068 return [
2069 getter(row, param) for row, param in zip(rows, compiled_params)
2070 ]
2071
2072 def lastrow_has_defaults(self) -> bool:
2073 return (self.isinsert or self.isupdate) and bool(
2074 cast(SQLCompiler, self.compiled).postfetch
2075 )
2076
2077 def _prepare_set_input_sizes(
2078 self,
2079 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2080 """Given a cursor and ClauseParameters, prepare arguments
2081 in order to call the appropriate
2082 style of ``setinputsizes()`` on the cursor, using DB-API types
2083 from the bind parameter's ``TypeEngine`` objects.
2084
2085 This method only called by those dialects which set the
2086 :attr:`.Dialect.bind_typing` attribute to
2087 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2088 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2089 option.
2090
2091 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2092 for pg8000 and asyncpg, which has been changed to inline rendering
2093 of casts.
2094
2095 """
2096 if self.isddl or self.is_text:
2097 return None
2098
2099 compiled = cast(SQLCompiler, self.compiled)
2100
2101 inputsizes = compiled._get_set_input_sizes_lookup()
2102
2103 if inputsizes is None:
2104 return None
2105
2106 dialect = self.dialect
2107
2108 # all of the rest of this... cython?
2109
2110 if dialect._has_events:
2111 inputsizes = dict(inputsizes)
2112 dialect.dispatch.do_setinputsizes(
2113 inputsizes, self.cursor, self.statement, self.parameters, self
2114 )
2115
2116 if compiled.escaped_bind_names:
2117 escaped_bind_names = compiled.escaped_bind_names
2118 else:
2119 escaped_bind_names = None
2120
2121 if dialect.positional:
2122 items = [
2123 (key, compiled.binds[key])
2124 for key in compiled.positiontup or ()
2125 ]
2126 else:
2127 items = [
2128 (key, bindparam)
2129 for bindparam, key in compiled.bind_names.items()
2130 ]
2131
2132 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2133 for key, bindparam in items:
2134 if bindparam in compiled.literal_execute_params:
2135 continue
2136
2137 if key in self._expanded_parameters:
2138 if is_tuple_type(bindparam.type):
2139 num = len(bindparam.type.types)
2140 dbtypes = inputsizes[bindparam]
2141 generic_inputsizes.extend(
2142 (
2143 (
2144 escaped_bind_names.get(paramname, paramname)
2145 if escaped_bind_names is not None
2146 else paramname
2147 ),
2148 dbtypes[idx % num],
2149 bindparam.type.types[idx % num],
2150 )
2151 for idx, paramname in enumerate(
2152 self._expanded_parameters[key]
2153 )
2154 )
2155 else:
2156 dbtype = inputsizes.get(bindparam, None)
2157 generic_inputsizes.extend(
2158 (
2159 (
2160 escaped_bind_names.get(paramname, paramname)
2161 if escaped_bind_names is not None
2162 else paramname
2163 ),
2164 dbtype,
2165 bindparam.type,
2166 )
2167 for paramname in self._expanded_parameters[key]
2168 )
2169 else:
2170 dbtype = inputsizes.get(bindparam, None)
2171
2172 escaped_name = (
2173 escaped_bind_names.get(key, key)
2174 if escaped_bind_names is not None
2175 else key
2176 )
2177
2178 generic_inputsizes.append(
2179 (escaped_name, dbtype, bindparam.type)
2180 )
2181
2182 return generic_inputsizes
2183
2184 def _exec_default(self, column, default, type_):
2185 if default.is_sequence:
2186 return self.fire_sequence(default, type_)
2187 elif default.is_callable:
2188 # this codepath is not normally used as it's inlined
2189 # into _process_execute_defaults
2190 self.current_column = column
2191 return default.arg(self)
2192 elif default.is_clause_element:
2193 return self._exec_default_clause_element(column, default, type_)
2194 else:
2195 # this codepath is not normally used as it's inlined
2196 # into _process_execute_defaults
2197 return default.arg
2198
2199 def _exec_default_clause_element(self, column, default, type_):
2200 # execute a default that's a complete clause element. Here, we have
2201 # to re-implement a miniature version of the compile->parameters->
2202 # cursor.execute() sequence, since we don't want to modify the state
2203 # of the connection / result in progress or create new connection/
2204 # result objects etc.
2205 # .. versionchanged:: 1.4
2206
2207 if not default._arg_is_typed:
2208 default_arg = expression.type_coerce(default.arg, type_)
2209 else:
2210 default_arg = default.arg
2211 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2212 compiled_params = compiled.construct_params()
2213 processors = compiled._bind_processors
2214 if compiled.positional:
2215 parameters = self.dialect.execute_sequence_format(
2216 [
2217 (
2218 processors[key](compiled_params[key]) # type: ignore
2219 if key in processors
2220 else compiled_params[key]
2221 )
2222 for key in compiled.positiontup or ()
2223 ]
2224 )
2225 else:
2226 parameters = {
2227 key: (
2228 processors[key](compiled_params[key]) # type: ignore
2229 if key in processors
2230 else compiled_params[key]
2231 )
2232 for key in compiled_params
2233 }
2234 return self._execute_scalar(
2235 str(compiled), type_, parameters=parameters
2236 )
2237
2238 current_parameters: Optional[_CoreSingleExecuteParams] = None
2239 """A dictionary of parameters applied to the current row.
2240
2241 This attribute is only available in the context of a user-defined default
2242 generation function, e.g. as described at :ref:`context_default_functions`.
2243 It consists of a dictionary which includes entries for each column/value
2244 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2245 dictionary will be the key value of each :class:`_schema.Column`,
2246 which is usually
2247 synonymous with the name.
2248
2249 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2250 does not accommodate for the "multi-values" feature of the
2251 :meth:`_expression.Insert.values` method. The
2252 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2253 preferred.
2254
2255 .. seealso::
2256
2257 :meth:`.DefaultExecutionContext.get_current_parameters`
2258
2259 :ref:`context_default_functions`
2260
2261 """
2262
2263 def get_current_parameters(self, isolate_multiinsert_groups=True):
2264 """Return a dictionary of parameters applied to the current row.
2265
2266 This method can only be used in the context of a user-defined default
2267 generation function, e.g. as described at
2268 :ref:`context_default_functions`. When invoked, a dictionary is
2269 returned which includes entries for each column/value pair that is part
2270 of the INSERT or UPDATE statement. The keys of the dictionary will be
2271 the key value of each :class:`_schema.Column`,
2272 which is usually synonymous
2273 with the name.
2274
2275 :param isolate_multiinsert_groups=True: indicates that multi-valued
2276 INSERT constructs created using :meth:`_expression.Insert.values`
2277 should be
2278 handled by returning only the subset of parameters that are local
2279 to the current column default invocation. When ``False``, the
2280 raw parameters of the statement are returned including the
2281 naming convention used in the case of multi-valued INSERT.
2282
2283 .. seealso::
2284
2285 :attr:`.DefaultExecutionContext.current_parameters`
2286
2287 :ref:`context_default_functions`
2288
2289 """
2290 try:
2291 parameters = self.current_parameters
2292 column = self.current_column
2293 except AttributeError:
2294 raise exc.InvalidRequestError(
2295 "get_current_parameters() can only be invoked in the "
2296 "context of a Python side column default function"
2297 )
2298 else:
2299 assert column is not None
2300 assert parameters is not None
2301 compile_state = cast(
2302 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2303 )
2304 assert compile_state is not None
2305 if (
2306 isolate_multiinsert_groups
2307 and dml.isinsert(compile_state)
2308 and compile_state._has_multi_parameters
2309 ):
2310 if column._is_multiparam_column:
2311 index = column.index + 1
2312 d = {column.original.key: parameters[column.key]}
2313 else:
2314 d = {column.key: parameters[column.key]}
2315 index = 0
2316 assert compile_state._dict_parameters is not None
2317 keys = compile_state._dict_parameters.keys()
2318 d.update(
2319 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2320 )
2321 return d
2322 else:
2323 return parameters
2324
2325 def get_insert_default(self, column):
2326 if column.default is None:
2327 return None
2328 else:
2329 return self._exec_default(column, column.default, column.type)
2330
2331 def get_update_default(self, column):
2332 if column.onupdate is None:
2333 return None
2334 else:
2335 return self._exec_default(column, column.onupdate, column.type)
2336
2337 def _process_execute_defaults(self):
2338 compiled = cast(SQLCompiler, self.compiled)
2339
2340 key_getter = compiled._within_exec_param_key_getter
2341
2342 sentinel_counter = 0
2343
2344 if compiled.insert_prefetch:
2345 prefetch_recs = [
2346 (
2347 c,
2348 key_getter(c),
2349 c._default_description_tuple,
2350 self.get_insert_default,
2351 )
2352 for c in compiled.insert_prefetch
2353 ]
2354 elif compiled.update_prefetch:
2355 prefetch_recs = [
2356 (
2357 c,
2358 key_getter(c),
2359 c._onupdate_description_tuple,
2360 self.get_update_default,
2361 )
2362 for c in compiled.update_prefetch
2363 ]
2364 else:
2365 prefetch_recs = []
2366
2367 for param in self.compiled_parameters:
2368 self.current_parameters = param
2369
2370 for (
2371 c,
2372 param_key,
2373 (arg, is_scalar, is_callable, is_sentinel),
2374 fallback,
2375 ) in prefetch_recs:
2376 if is_sentinel:
2377 param[param_key] = sentinel_counter
2378 sentinel_counter += 1
2379 elif is_scalar:
2380 param[param_key] = arg
2381 elif is_callable:
2382 self.current_column = c
2383 param[param_key] = arg(self)
2384 else:
2385 val = fallback(c)
2386 if val is not None:
2387 param[param_key] = val
2388
2389 del self.current_parameters
2390
2391
2392DefaultDialect.execution_ctx_cls = DefaultExecutionContext