1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import MutableSequence
33from typing import Optional
34from typing import Sequence
35from typing import Set
36from typing import Tuple
37from typing import Type
38from typing import TYPE_CHECKING
39from typing import Union
40import weakref
41
42from . import characteristics
43from . import cursor as _cursor
44from . import interfaces
45from .base import Connection
46from .interfaces import CacheStats
47from .interfaces import DBAPICursor
48from .interfaces import Dialect
49from .interfaces import ExecuteStyle
50from .interfaces import ExecutionContext
51from .reflection import ObjectKind
52from .reflection import ObjectScope
53from .. import event
54from .. import exc
55from .. import pool
56from .. import util
57from ..sql import compiler
58from ..sql import dml
59from ..sql import expression
60from ..sql import type_api
61from ..sql import util as sql_util
62from ..sql._typing import is_tuple_type
63from ..sql.base import _NoArg
64from ..sql.compiler import DDLCompiler
65from ..sql.compiler import InsertmanyvaluesSentinelOpts
66from ..sql.compiler import SQLCompiler
67from ..sql.elements import quoted_name
68from ..util.typing import Final
69from ..util.typing import Literal
70
71if typing.TYPE_CHECKING:
72 from types import ModuleType
73
74 from .base import Engine
75 from .cursor import ResultFetchStrategy
76 from .interfaces import _CoreMultiExecuteParams
77 from .interfaces import _CoreSingleExecuteParams
78 from .interfaces import _DBAPICursorDescription
79 from .interfaces import _DBAPIMultiExecuteParams
80 from .interfaces import _DBAPISingleExecuteParams
81 from .interfaces import _ExecuteOptions
82 from .interfaces import _MutableCoreSingleExecuteParams
83 from .interfaces import _ParamStyle
84 from .interfaces import ConnectArgsType
85 from .interfaces import DBAPIConnection
86 from .interfaces import DBAPIModule
87 from .interfaces import DBAPIType
88 from .interfaces import IsolationLevel
89 from .row import Row
90 from .url import URL
91 from ..event import _ListenerFnType
92 from ..pool import Pool
93 from ..pool import PoolProxiedConnection
94 from ..sql import Executable
95 from ..sql.compiler import Compiled
96 from ..sql.compiler import Linting
97 from ..sql.compiler import ResultColumnsEntry
98 from ..sql.dml import DMLState
99 from ..sql.dml import UpdateBase
100 from ..sql.elements import BindParameter
101 from ..sql.schema import Column
102 from ..sql.type_api import _BindProcessorType
103 from ..sql.type_api import _ResultProcessorType
104 from ..sql.type_api import TypeEngine
105
106
107# When we're handed literal SQL, ensure it's a SELECT query
108SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
109
110
111(
112 CACHE_HIT,
113 CACHE_MISS,
114 CACHING_DISABLED,
115 NO_CACHE_KEY,
116 NO_DIALECT_SUPPORT,
117) = list(CacheStats)
118
119
120class DefaultDialect(Dialect):
121 """Default implementation of Dialect"""
122
123 statement_compiler = compiler.SQLCompiler
124 ddl_compiler = compiler.DDLCompiler
125 type_compiler_cls = compiler.GenericTypeCompiler
126
127 preparer = compiler.IdentifierPreparer
128 supports_alter = True
129 supports_comments = False
130 supports_constraint_comments = False
131 inline_comments = False
132 supports_statement_cache = True
133
134 div_is_floordiv = True
135
136 bind_typing = interfaces.BindTyping.NONE
137
138 include_set_input_sizes: Optional[Set[Any]] = None
139 exclude_set_input_sizes: Optional[Set[Any]] = None
140
141 # the first value we'd get for an autoincrement column.
142 default_sequence_base = 1
143
144 # most DBAPIs happy with this for execute().
145 # not cx_oracle.
146 execute_sequence_format = tuple
147
148 supports_schemas = True
149 supports_views = True
150 supports_sequences = False
151 sequences_optional = False
152 preexecute_autoincrement_sequences = False
153 supports_identity_columns = False
154 postfetch_lastrowid = True
155 favor_returning_over_lastrowid = False
156 insert_null_pk_still_autoincrements = False
157 update_returning = False
158 delete_returning = False
159 update_returning_multifrom = False
160 delete_returning_multifrom = False
161 insert_returning = False
162
163 cte_follows_insert = False
164
165 supports_native_enum = False
166 supports_native_boolean = False
167 supports_native_uuid = False
168 returns_native_bytes = False
169
170 non_native_boolean_check_constraint = True
171
172 supports_simple_order_by_label = True
173
174 tuple_in_values = False
175
176 connection_characteristics = util.immutabledict(
177 {
178 "isolation_level": characteristics.IsolationLevelCharacteristic(),
179 "logging_token": characteristics.LoggingTokenCharacteristic(),
180 }
181 )
182
183 engine_config_types: Mapping[str, Any] = util.immutabledict(
184 {
185 "pool_timeout": util.asint,
186 "echo": util.bool_or_str("debug"),
187 "echo_pool": util.bool_or_str("debug"),
188 "pool_recycle": util.asint,
189 "pool_size": util.asint,
190 "max_overflow": util.asint,
191 "future": util.asbool,
192 }
193 )
194
195 # if the NUMERIC type
196 # returns decimal.Decimal.
197 # *not* the FLOAT type however.
198 supports_native_decimal = False
199
200 name = "default"
201
202 # length at which to truncate
203 # any identifier.
204 max_identifier_length = 9999
205 _user_defined_max_identifier_length: Optional[int] = None
206
207 isolation_level: Optional[str] = None
208
209 # sub-categories of max_identifier_length.
210 # currently these accommodate for MySQL which allows alias names
211 # of 255 but DDL names only of 64.
212 max_index_name_length: Optional[int] = None
213 max_constraint_name_length: Optional[int] = None
214
215 supports_sane_rowcount = True
216 supports_sane_multi_rowcount = True
217 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
218 default_paramstyle = "named"
219
220 supports_default_values = False
221 """dialect supports INSERT... DEFAULT VALUES syntax"""
222
223 supports_default_metavalue = False
224 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
225
226 default_metavalue_token = "DEFAULT"
227 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
228 parenthesis."""
229
230 # not sure if this is a real thing but the compiler will deliver it
231 # if this is the only flag enabled.
232 supports_empty_insert = True
233 """dialect supports INSERT () VALUES ()"""
234
235 supports_multivalues_insert = False
236
237 use_insertmanyvalues: bool = False
238
239 use_insertmanyvalues_wo_returning: bool = False
240
241 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
242 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
243 )
244
245 insertmanyvalues_page_size: int = 1000
246 insertmanyvalues_max_parameters = 32700
247
248 supports_is_distinct_from = True
249
250 supports_server_side_cursors = False
251
252 server_side_cursors = False
253
254 # extra record-level locking features (#4860)
255 supports_for_update_of = False
256
257 server_version_info = None
258
259 default_schema_name: Optional[str] = None
260
261 # indicates symbol names are
262 # UPPERCASED if they are case insensitive
263 # within the database.
264 # if this is True, the methods normalize_name()
265 # and denormalize_name() must be provided.
266 requires_name_normalize = False
267
268 is_async = False
269
270 has_terminate = False
271
272 # TODO: this is not to be part of 2.0. implement rudimentary binary
273 # literals for SQLite, PostgreSQL, MySQL only within
274 # _Binary.literal_processor
275 _legacy_binary_type_literal_encoding = "utf-8"
276
277 @util.deprecated_params(
278 empty_in_strategy=(
279 "1.4",
280 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
281 "deprecated, and no longer has any effect. All IN expressions "
282 "are now rendered using "
283 'the "expanding parameter" strategy which renders a set of bound'
284 'expressions, or an "empty set" SELECT, at statement execution'
285 "time.",
286 ),
287 server_side_cursors=(
288 "1.4",
289 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
290 "is deprecated and will be removed in a future release. Please "
291 "use the "
292 ":paramref:`_engine.Connection.execution_options.stream_results` "
293 "parameter.",
294 ),
295 )
296 def __init__(
297 self,
298 paramstyle: Optional[_ParamStyle] = None,
299 isolation_level: Optional[IsolationLevel] = None,
300 dbapi: Optional[ModuleType] = None,
301 implicit_returning: Literal[True] = True,
302 supports_native_boolean: Optional[bool] = None,
303 max_identifier_length: Optional[int] = None,
304 label_length: Optional[int] = None,
305 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
306 use_insertmanyvalues: Optional[bool] = None,
307 # util.deprecated_params decorator cannot render the
308 # Linting.NO_LINTING constant
309 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
310 server_side_cursors: bool = False,
311 skip_autocommit_rollback: bool = False,
312 **kwargs: Any,
313 ):
314 if server_side_cursors:
315 if not self.supports_server_side_cursors:
316 raise exc.ArgumentError(
317 "Dialect %s does not support server side cursors" % self
318 )
319 else:
320 self.server_side_cursors = True
321
322 if getattr(self, "use_setinputsizes", False):
323 util.warn_deprecated(
324 "The dialect-level use_setinputsizes attribute is "
325 "deprecated. Please use "
326 "bind_typing = BindTyping.SETINPUTSIZES",
327 "2.0",
328 )
329 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
330
331 self.positional = False
332 self._ischema = None
333
334 self.dbapi = dbapi
335
336 self.skip_autocommit_rollback = skip_autocommit_rollback
337
338 if paramstyle is not None:
339 self.paramstyle = paramstyle
340 elif self.dbapi is not None:
341 self.paramstyle = self.dbapi.paramstyle
342 else:
343 self.paramstyle = self.default_paramstyle
344 self.positional = self.paramstyle in (
345 "qmark",
346 "format",
347 "numeric",
348 "numeric_dollar",
349 )
350 self.identifier_preparer = self.preparer(self)
351 self._on_connect_isolation_level = isolation_level
352
353 legacy_tt_callable = getattr(self, "type_compiler", None)
354 if legacy_tt_callable is not None:
355 tt_callable = cast(
356 Type[compiler.GenericTypeCompiler],
357 self.type_compiler,
358 )
359 else:
360 tt_callable = self.type_compiler_cls
361
362 self.type_compiler_instance = self.type_compiler = tt_callable(self)
363
364 if supports_native_boolean is not None:
365 self.supports_native_boolean = supports_native_boolean
366
367 self._user_defined_max_identifier_length = max_identifier_length
368 if self._user_defined_max_identifier_length:
369 self.max_identifier_length = (
370 self._user_defined_max_identifier_length
371 )
372 self.label_length = label_length
373 self.compiler_linting = compiler_linting
374
375 if use_insertmanyvalues is not None:
376 self.use_insertmanyvalues = use_insertmanyvalues
377
378 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
379 self.insertmanyvalues_page_size = insertmanyvalues_page_size
380
381 @property
382 @util.deprecated(
383 "2.0",
384 "full_returning is deprecated, please use insert_returning, "
385 "update_returning, delete_returning",
386 )
387 def full_returning(self):
388 return (
389 self.insert_returning
390 and self.update_returning
391 and self.delete_returning
392 )
393
394 @util.memoized_property
395 def insert_executemany_returning(self):
396 """Default implementation for insert_executemany_returning, if not
397 otherwise overridden by the specific dialect.
398
399 The default dialect determines "insert_executemany_returning" is
400 available if the dialect in use has opted into using the
401 "use_insertmanyvalues" feature. If they haven't opted into that, then
402 this attribute is False, unless the dialect in question overrides this
403 and provides some other implementation (such as the Oracle Database
404 dialects).
405
406 """
407 return self.insert_returning and self.use_insertmanyvalues
408
409 @util.memoized_property
410 def insert_executemany_returning_sort_by_parameter_order(self):
411 """Default implementation for
412 insert_executemany_returning_deterministic_order, if not otherwise
413 overridden by the specific dialect.
414
415 The default dialect determines "insert_executemany_returning" can have
416 deterministic order only if the dialect in use has opted into using the
417 "use_insertmanyvalues" feature, which implements deterministic ordering
418 using client side sentinel columns only by default. The
419 "insertmanyvalues" feature also features alternate forms that can
420 use server-generated PK values as "sentinels", but those are only
421 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
422 bitflag enables those alternate SQL forms, which are disabled
423 by default.
424
425 If the dialect in use hasn't opted into that, then this attribute is
426 False, unless the dialect in question overrides this and provides some
427 other implementation (such as the Oracle Database dialects).
428
429 """
430 return self.insert_returning and self.use_insertmanyvalues
431
432 update_executemany_returning = False
433 delete_executemany_returning = False
434
435 @util.memoized_property
436 def loaded_dbapi(self) -> DBAPIModule:
437 if self.dbapi is None:
438 raise exc.InvalidRequestError(
439 f"Dialect {self} does not have a Python DBAPI established "
440 "and cannot be used for actual database interaction"
441 )
442 return self.dbapi
443
444 @util.memoized_property
445 def _bind_typing_render_casts(self):
446 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
447
448 def _ensure_has_table_connection(self, arg: Connection) -> None:
449 if not isinstance(arg, Connection):
450 raise exc.ArgumentError(
451 "The argument passed to Dialect.has_table() should be a "
452 "%s, got %s. "
453 "Additionally, the Dialect.has_table() method is for "
454 "internal dialect "
455 "use only; please use "
456 "``inspect(some_engine).has_table(<tablename>>)`` "
457 "for public API use." % (Connection, type(arg))
458 )
459
460 @util.memoized_property
461 def _supports_statement_cache(self):
462 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
463 if ssc is None:
464 util.warn(
465 "Dialect %s:%s will not make use of SQL compilation caching "
466 "as it does not set the 'supports_statement_cache' attribute "
467 "to ``True``. This can have "
468 "significant performance implications including some "
469 "performance degradations in comparison to prior SQLAlchemy "
470 "versions. Dialect maintainers should seek to set this "
471 "attribute to True after appropriate development and testing "
472 "for SQLAlchemy 1.4 caching support. Alternatively, this "
473 "attribute may be set to False which will disable this "
474 "warning." % (self.name, self.driver),
475 code="cprf",
476 )
477
478 return bool(ssc)
479
480 @util.memoized_property
481 def _type_memos(self):
482 return weakref.WeakKeyDictionary()
483
484 @property
485 def dialect_description(self): # type: ignore[override]
486 return self.name + "+" + self.driver
487
488 @property
489 def supports_sane_rowcount_returning(self):
490 """True if this dialect supports sane rowcount even if RETURNING is
491 in use.
492
493 For dialects that don't support RETURNING, this is synonymous with
494 ``supports_sane_rowcount``.
495
496 """
497 return self.supports_sane_rowcount
498
499 @classmethod
500 def get_pool_class(cls, url: URL) -> Type[Pool]:
501 return getattr(cls, "poolclass", pool.QueuePool)
502
503 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
504 return self.get_pool_class(url)
505
506 @classmethod
507 def load_provisioning(cls):
508 package = ".".join(cls.__module__.split(".")[0:-1])
509 try:
510 __import__(package + ".provision")
511 except ImportError:
512 pass
513
514 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
515 if self._on_connect_isolation_level is not None:
516
517 def builtin_connect(dbapi_conn, conn_rec):
518 self._assert_and_set_isolation_level(
519 dbapi_conn, self._on_connect_isolation_level
520 )
521
522 return builtin_connect
523 else:
524 return None
525
526 def initialize(self, connection: Connection) -> None:
527 try:
528 self.server_version_info = self._get_server_version_info(
529 connection
530 )
531 except NotImplementedError:
532 self.server_version_info = None
533 try:
534 self.default_schema_name = self._get_default_schema_name(
535 connection
536 )
537 except NotImplementedError:
538 self.default_schema_name = None
539
540 try:
541 self.default_isolation_level = self.get_default_isolation_level(
542 connection.connection.dbapi_connection
543 )
544 except NotImplementedError:
545 self.default_isolation_level = None
546
547 if not self._user_defined_max_identifier_length:
548 max_ident_length = self._check_max_identifier_length(connection)
549 if max_ident_length:
550 self.max_identifier_length = max_ident_length
551
552 if (
553 self.label_length
554 and self.label_length > self.max_identifier_length
555 ):
556 raise exc.ArgumentError(
557 "Label length of %d is greater than this dialect's"
558 " maximum identifier length of %d"
559 % (self.label_length, self.max_identifier_length)
560 )
561
562 def on_connect(self) -> Optional[Callable[[Any], None]]:
563 # inherits the docstring from interfaces.Dialect.on_connect
564 return None
565
566 def _check_max_identifier_length(self, connection):
567 """Perform a connection / server version specific check to determine
568 the max_identifier_length.
569
570 If the dialect's class level max_identifier_length should be used,
571 can return None.
572
573 .. versionadded:: 1.3.9
574
575 """
576 return None
577
578 def get_default_isolation_level(self, dbapi_conn):
579 """Given a DBAPI connection, return its isolation level, or
580 a default isolation level if one cannot be retrieved.
581
582 May be overridden by subclasses in order to provide a
583 "fallback" isolation level for databases that cannot reliably
584 retrieve the actual isolation level.
585
586 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
587 method, propagating any exceptions raised.
588
589 .. versionadded:: 1.3.22
590
591 """
592 return self.get_isolation_level(dbapi_conn)
593
594 def type_descriptor(self, typeobj):
595 """Provide a database-specific :class:`.TypeEngine` object, given
596 the generic object which comes from the types module.
597
598 This method looks for a dictionary called
599 ``colspecs`` as a class or instance-level variable,
600 and passes on to :func:`_types.adapt_type`.
601
602 """
603 return type_api.adapt_type(typeobj, self.colspecs)
604
605 def has_index(self, connection, table_name, index_name, schema=None, **kw):
606 if not self.has_table(connection, table_name, schema=schema, **kw):
607 return False
608 for idx in self.get_indexes(
609 connection, table_name, schema=schema, **kw
610 ):
611 if idx["name"] == index_name:
612 return True
613 else:
614 return False
615
616 def has_schema(
617 self, connection: Connection, schema_name: str, **kw: Any
618 ) -> bool:
619 return schema_name in self.get_schema_names(connection, **kw)
620
621 def validate_identifier(self, ident: str) -> None:
622 if len(ident) > self.max_identifier_length:
623 raise exc.IdentifierError(
624 "Identifier '%s' exceeds maximum length of %d characters"
625 % (ident, self.max_identifier_length)
626 )
627
628 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
629 # inherits the docstring from interfaces.Dialect.connect
630 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
631
632 def create_connect_args(self, url: URL) -> ConnectArgsType:
633 # inherits the docstring from interfaces.Dialect.create_connect_args
634 opts = url.translate_connect_args()
635 opts.update(url.query)
636 return ([], opts)
637
638 def set_engine_execution_options(
639 self, engine: Engine, opts: Mapping[str, Any]
640 ) -> None:
641 supported_names = set(self.connection_characteristics).intersection(
642 opts
643 )
644 if supported_names:
645 characteristics: Mapping[str, Any] = util.immutabledict(
646 (name, opts[name]) for name in supported_names
647 )
648
649 @event.listens_for(engine, "engine_connect")
650 def set_connection_characteristics(connection):
651 self._set_connection_characteristics(
652 connection, characteristics
653 )
654
655 def set_connection_execution_options(
656 self, connection: Connection, opts: Mapping[str, Any]
657 ) -> None:
658 supported_names = set(self.connection_characteristics).intersection(
659 opts
660 )
661 if supported_names:
662 characteristics: Mapping[str, Any] = util.immutabledict(
663 (name, opts[name]) for name in supported_names
664 )
665 self._set_connection_characteristics(connection, characteristics)
666
667 def _set_connection_characteristics(self, connection, characteristics):
668 characteristic_values = [
669 (name, self.connection_characteristics[name], value)
670 for name, value in characteristics.items()
671 ]
672
673 if connection.in_transaction():
674 trans_objs = [
675 (name, obj)
676 for name, obj, _ in characteristic_values
677 if obj.transactional
678 ]
679 if trans_objs:
680 raise exc.InvalidRequestError(
681 "This connection has already initialized a SQLAlchemy "
682 "Transaction() object via begin() or autobegin; "
683 "%s may not be altered unless rollback() or commit() "
684 "is called first."
685 % (", ".join(name for name, obj in trans_objs))
686 )
687
688 dbapi_connection = connection.connection.dbapi_connection
689 for _, characteristic, value in characteristic_values:
690 characteristic.set_connection_characteristic(
691 self, connection, dbapi_connection, value
692 )
693 connection.connection._connection_record.finalize_callback.append(
694 functools.partial(self._reset_characteristics, characteristics)
695 )
696
697 def _reset_characteristics(self, characteristics, dbapi_connection):
698 for characteristic_name in characteristics:
699 characteristic = self.connection_characteristics[
700 characteristic_name
701 ]
702 characteristic.reset_characteristic(self, dbapi_connection)
703
704 def do_begin(self, dbapi_connection):
705 pass
706
707 def do_rollback(self, dbapi_connection):
708 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
709 dbapi_connection
710 ):
711 return
712 dbapi_connection.rollback()
713
714 def do_commit(self, dbapi_connection):
715 dbapi_connection.commit()
716
717 def do_terminate(self, dbapi_connection):
718 self.do_close(dbapi_connection)
719
720 def do_close(self, dbapi_connection):
721 dbapi_connection.close()
722
723 @util.memoized_property
724 def _dialect_specific_select_one(self):
725 return str(expression.select(1).compile(dialect=self))
726
727 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
728 try:
729 return self.do_ping(dbapi_connection)
730 except self.loaded_dbapi.Error as err:
731 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
732
733 if self._has_events:
734 try:
735 Connection._handle_dbapi_exception_noconnection(
736 err,
737 self,
738 is_disconnect=is_disconnect,
739 invalidate_pool_on_disconnect=False,
740 is_pre_ping=True,
741 )
742 except exc.StatementError as new_err:
743 is_disconnect = new_err.connection_invalidated
744
745 if is_disconnect:
746 return False
747 else:
748 raise
749
750 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
751 cursor = dbapi_connection.cursor()
752 try:
753 cursor.execute(self._dialect_specific_select_one)
754 finally:
755 cursor.close()
756 return True
757
758 def create_xid(self):
759 """Create a random two-phase transaction ID.
760
761 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
762 do_commit_twophase(). Its format is unspecified.
763 """
764
765 return "_sa_%032x" % random.randint(0, 2**128)
766
767 def do_savepoint(self, connection, name):
768 connection.execute(expression.SavepointClause(name))
769
770 def do_rollback_to_savepoint(self, connection, name):
771 connection.execute(expression.RollbackToSavepointClause(name))
772
773 def do_release_savepoint(self, connection, name):
774 connection.execute(expression.ReleaseSavepointClause(name))
775
776 def _deliver_insertmanyvalues_batches(
777 self,
778 connection,
779 cursor,
780 statement,
781 parameters,
782 generic_setinputsizes,
783 context,
784 ):
785 context = cast(DefaultExecutionContext, context)
786 compiled = cast(SQLCompiler, context.compiled)
787
788 _composite_sentinel_proc: Sequence[
789 Optional[_ResultProcessorType[Any]]
790 ] = ()
791 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
792 _sentinel_proc_initialized: bool = False
793
794 compiled_parameters = context.compiled_parameters
795
796 imv = compiled._insertmanyvalues
797 assert imv is not None
798
799 is_returning: Final[bool] = bool(compiled.effective_returning)
800 batch_size = context.execution_options.get(
801 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
802 )
803
804 if compiled.schema_translate_map:
805 schema_translate_map = context.execution_options.get(
806 "schema_translate_map", {}
807 )
808 else:
809 schema_translate_map = None
810
811 if is_returning:
812 result: Optional[List[Any]] = []
813 context._insertmanyvalues_rows = result
814
815 sort_by_parameter_order = imv.sort_by_parameter_order
816
817 else:
818 sort_by_parameter_order = False
819 result = None
820
821 for imv_batch in compiled._deliver_insertmanyvalues_batches(
822 statement,
823 parameters,
824 compiled_parameters,
825 generic_setinputsizes,
826 batch_size,
827 sort_by_parameter_order,
828 schema_translate_map,
829 ):
830 yield imv_batch
831
832 if is_returning:
833
834 try:
835 rows = context.fetchall_for_returning(cursor)
836 except BaseException as be:
837 connection._handle_dbapi_exception(
838 be,
839 sql_util._long_statement(imv_batch.replaced_statement),
840 imv_batch.replaced_parameters,
841 None,
842 context,
843 is_sub_exec=True,
844 )
845
846 # I would have thought "is_returning: Final[bool]"
847 # would have assured this but pylance thinks not
848 assert result is not None
849
850 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
851 composite_sentinel = imv.num_sentinel_columns > 1
852 if imv.implicit_sentinel:
853 # for implicit sentinel, which is currently single-col
854 # integer autoincrement, do a simple sort.
855 assert not composite_sentinel
856 result.extend(
857 sorted(rows, key=operator.itemgetter(-1))
858 )
859 continue
860
861 # otherwise, create dictionaries to match up batches
862 # with parameters
863 assert imv.sentinel_param_keys
864 assert imv.sentinel_columns
865
866 _nsc = imv.num_sentinel_columns
867
868 if not _sentinel_proc_initialized:
869 if composite_sentinel:
870 _composite_sentinel_proc = [
871 col.type._cached_result_processor(
872 self, cursor_desc[1]
873 )
874 for col, cursor_desc in zip(
875 imv.sentinel_columns,
876 cursor.description[-_nsc:],
877 )
878 ]
879 else:
880 _scalar_sentinel_proc = (
881 imv.sentinel_columns[0]
882 ).type._cached_result_processor(
883 self, cursor.description[-1][1]
884 )
885 _sentinel_proc_initialized = True
886
887 rows_by_sentinel: Union[
888 Dict[Tuple[Any, ...], Any],
889 Dict[Any, Any],
890 ]
891 if composite_sentinel:
892 rows_by_sentinel = {
893 tuple(
894 (proc(val) if proc else val)
895 for val, proc in zip(
896 row[-_nsc:], _composite_sentinel_proc
897 )
898 ): row
899 for row in rows
900 }
901 elif _scalar_sentinel_proc:
902 rows_by_sentinel = {
903 _scalar_sentinel_proc(row[-1]): row for row in rows
904 }
905 else:
906 rows_by_sentinel = {row[-1]: row for row in rows}
907
908 if len(rows_by_sentinel) != len(imv_batch.batch):
909 # see test_insert_exec.py::
910 # IMVSentinelTest::test_sentinel_incorrect_rowcount
911 # for coverage / demonstration
912 raise exc.InvalidRequestError(
913 f"Sentinel-keyed result set did not produce "
914 f"correct number of rows {len(imv_batch.batch)}; "
915 "produced "
916 f"{len(rows_by_sentinel)}. Please ensure the "
917 "sentinel column is fully unique and populated in "
918 "all cases."
919 )
920
921 try:
922 ordered_rows = [
923 rows_by_sentinel[sentinel_keys]
924 for sentinel_keys in imv_batch.sentinel_values
925 ]
926 except KeyError as ke:
927 # see test_insert_exec.py::
928 # IMVSentinelTest::test_sentinel_cant_match_keys
929 # for coverage / demonstration
930 raise exc.InvalidRequestError(
931 f"Can't match sentinel values in result set to "
932 f"parameter sets; key {ke.args[0]!r} was not "
933 "found. "
934 "There may be a mismatch between the datatype "
935 "passed to the DBAPI driver vs. that which it "
936 "returns in a result row. Ensure the given "
937 "Python value matches the expected result type "
938 "*exactly*, taking care to not rely upon implicit "
939 "conversions which may occur such as when using "
940 "strings in place of UUID or integer values, etc. "
941 ) from ke
942
943 result.extend(ordered_rows)
944
945 else:
946 result.extend(rows)
947
948 def do_executemany(self, cursor, statement, parameters, context=None):
949 cursor.executemany(statement, parameters)
950
951 def do_execute(self, cursor, statement, parameters, context=None):
952 cursor.execute(statement, parameters)
953
954 def do_execute_no_params(self, cursor, statement, context=None):
955 cursor.execute(statement)
956
957 def is_disconnect(
958 self,
959 e: DBAPIModule.Error,
960 connection: Union[
961 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
962 ],
963 cursor: Optional[interfaces.DBAPICursor],
964 ) -> bool:
965 return False
966
967 @util.memoized_instancemethod
968 def _gen_allowed_isolation_levels(self, dbapi_conn):
969 try:
970 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
971 except NotImplementedError:
972 return None
973 else:
974 normalized_levels = [
975 level.replace("_", " ").upper() for level in raw_levels
976 ]
977 if raw_levels != normalized_levels:
978 raise ValueError(
979 f"Dialect {self.name!r} get_isolation_level_values() "
980 f"method should return names as UPPERCASE using spaces, "
981 f"not underscores; got "
982 f"{sorted(set(raw_levels).difference(normalized_levels))}"
983 )
984 return tuple(normalized_levels)
985
986 def _assert_and_set_isolation_level(self, dbapi_conn, level):
987 level = level.replace("_", " ").upper()
988
989 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
990 dbapi_conn
991 )
992 if (
993 _allowed_isolation_levels
994 and level not in _allowed_isolation_levels
995 ):
996 raise exc.ArgumentError(
997 f"Invalid value {level!r} for isolation_level. "
998 f"Valid isolation levels for {self.name!r} are "
999 f"{', '.join(_allowed_isolation_levels)}"
1000 )
1001
1002 self.set_isolation_level(dbapi_conn, level)
1003
1004 def reset_isolation_level(self, dbapi_conn):
1005 if self._on_connect_isolation_level is not None:
1006 assert (
1007 self._on_connect_isolation_level == "AUTOCOMMIT"
1008 or self._on_connect_isolation_level
1009 == self.default_isolation_level
1010 )
1011 self._assert_and_set_isolation_level(
1012 dbapi_conn, self._on_connect_isolation_level
1013 )
1014 else:
1015 assert self.default_isolation_level is not None
1016 self._assert_and_set_isolation_level(
1017 dbapi_conn,
1018 self.default_isolation_level,
1019 )
1020
1021 def normalize_name(self, name):
1022 if name is None:
1023 return None
1024
1025 name_lower = name.lower()
1026 name_upper = name.upper()
1027
1028 if name_upper == name_lower:
1029 # name has no upper/lower conversion, e.g. non-european characters.
1030 # return unchanged
1031 return name
1032 elif name_upper == name and not (
1033 self.identifier_preparer._requires_quotes
1034 )(name_lower):
1035 # name is all uppercase and doesn't require quoting; normalize
1036 # to all lower case
1037 return name_lower
1038 elif name_lower == name:
1039 # name is all lower case, which if denormalized means we need to
1040 # force quoting on it
1041 return quoted_name(name, quote=True)
1042 else:
1043 # name is mixed case, means it will be quoted in SQL when used
1044 # later, no normalizes
1045 return name
1046
1047 def denormalize_name(self, name):
1048 if name is None:
1049 return None
1050
1051 name_lower = name.lower()
1052 name_upper = name.upper()
1053
1054 if name_upper == name_lower:
1055 # name has no upper/lower conversion, e.g. non-european characters.
1056 # return unchanged
1057 return name
1058 elif name_lower == name and not (
1059 self.identifier_preparer._requires_quotes
1060 )(name_lower):
1061 name = name_upper
1062 return name
1063
1064 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1065 return connection
1066
1067 def _overrides_default(self, method):
1068 return (
1069 getattr(type(self), method).__code__
1070 is not getattr(DefaultDialect, method).__code__
1071 )
1072
1073 def _default_multi_reflect(
1074 self,
1075 single_tbl_method,
1076 connection,
1077 kind,
1078 schema,
1079 filter_names,
1080 scope,
1081 **kw,
1082 ):
1083 names_fns = []
1084 temp_names_fns = []
1085 if ObjectKind.TABLE in kind:
1086 names_fns.append(self.get_table_names)
1087 temp_names_fns.append(self.get_temp_table_names)
1088 if ObjectKind.VIEW in kind:
1089 names_fns.append(self.get_view_names)
1090 temp_names_fns.append(self.get_temp_view_names)
1091 if ObjectKind.MATERIALIZED_VIEW in kind:
1092 names_fns.append(self.get_materialized_view_names)
1093 # no temp materialized view at the moment
1094 # temp_names_fns.append(self.get_temp_materialized_view_names)
1095
1096 unreflectable = kw.pop("unreflectable", {})
1097
1098 if (
1099 filter_names
1100 and scope is ObjectScope.ANY
1101 and kind is ObjectKind.ANY
1102 ):
1103 # if names are given and no qualification on type of table
1104 # (i.e. the Table(..., autoload) case), take the names as given,
1105 # don't run names queries. If a table does not exit
1106 # NoSuchTableError is raised and it's skipped
1107
1108 # this also suits the case for mssql where we can reflect
1109 # individual temp tables but there's no temp_names_fn
1110 names = filter_names
1111 else:
1112 names = []
1113 name_kw = {"schema": schema, **kw}
1114 fns = []
1115 if ObjectScope.DEFAULT in scope:
1116 fns.extend(names_fns)
1117 if ObjectScope.TEMPORARY in scope:
1118 fns.extend(temp_names_fns)
1119
1120 for fn in fns:
1121 try:
1122 names.extend(fn(connection, **name_kw))
1123 except NotImplementedError:
1124 pass
1125
1126 if filter_names:
1127 filter_names = set(filter_names)
1128
1129 # iterate over all the tables/views and call the single table method
1130 for table in names:
1131 if not filter_names or table in filter_names:
1132 key = (schema, table)
1133 try:
1134 yield (
1135 key,
1136 single_tbl_method(
1137 connection, table, schema=schema, **kw
1138 ),
1139 )
1140 except exc.UnreflectableTableError as err:
1141 if key not in unreflectable:
1142 unreflectable[key] = err
1143 except exc.NoSuchTableError:
1144 pass
1145
1146 def get_multi_table_options(self, connection, **kw):
1147 return self._default_multi_reflect(
1148 self.get_table_options, connection, **kw
1149 )
1150
1151 def get_multi_columns(self, connection, **kw):
1152 return self._default_multi_reflect(self.get_columns, connection, **kw)
1153
1154 def get_multi_pk_constraint(self, connection, **kw):
1155 return self._default_multi_reflect(
1156 self.get_pk_constraint, connection, **kw
1157 )
1158
1159 def get_multi_foreign_keys(self, connection, **kw):
1160 return self._default_multi_reflect(
1161 self.get_foreign_keys, connection, **kw
1162 )
1163
1164 def get_multi_indexes(self, connection, **kw):
1165 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1166
1167 def get_multi_unique_constraints(self, connection, **kw):
1168 return self._default_multi_reflect(
1169 self.get_unique_constraints, connection, **kw
1170 )
1171
1172 def get_multi_check_constraints(self, connection, **kw):
1173 return self._default_multi_reflect(
1174 self.get_check_constraints, connection, **kw
1175 )
1176
1177 def get_multi_table_comment(self, connection, **kw):
1178 return self._default_multi_reflect(
1179 self.get_table_comment, connection, **kw
1180 )
1181
1182
1183class StrCompileDialect(DefaultDialect):
1184 statement_compiler = compiler.StrSQLCompiler
1185 ddl_compiler = compiler.DDLCompiler
1186 type_compiler_cls = compiler.StrSQLTypeCompiler
1187 preparer = compiler.IdentifierPreparer
1188
1189 insert_returning = True
1190 update_returning = True
1191 delete_returning = True
1192
1193 supports_statement_cache = True
1194
1195 supports_identity_columns = True
1196
1197 supports_sequences = True
1198 sequences_optional = True
1199 preexecute_autoincrement_sequences = False
1200
1201 supports_native_boolean = True
1202
1203 supports_multivalues_insert = True
1204 supports_simple_order_by_label = True
1205
1206
1207class DefaultExecutionContext(ExecutionContext):
1208 isinsert = False
1209 isupdate = False
1210 isdelete = False
1211 is_crud = False
1212 is_text = False
1213 isddl = False
1214
1215 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1216
1217 compiled: Optional[Compiled] = None
1218 result_column_struct: Optional[
1219 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1220 ] = None
1221 returned_default_rows: Optional[Sequence[Row[Any]]] = None
1222
1223 execution_options: _ExecuteOptions = util.EMPTY_DICT
1224
1225 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1226
1227 invoked_statement: Optional[Executable] = None
1228
1229 _is_implicit_returning = False
1230 _is_explicit_returning = False
1231 _is_supplemental_returning = False
1232 _is_server_side = False
1233
1234 _soft_closed = False
1235
1236 _rowcount: Optional[int] = None
1237
1238 # a hook for SQLite's translation of
1239 # result column names
1240 # NOTE: pyhive is using this hook, can't remove it :(
1241 _translate_colname: Optional[
1242 Callable[[str], Tuple[str, Optional[str]]]
1243 ] = None
1244
1245 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1246 """used by set_input_sizes().
1247
1248 This collection comes from ``ExpandedState.parameter_expansion``.
1249
1250 """
1251
1252 cache_hit = NO_CACHE_KEY
1253
1254 root_connection: Connection
1255 _dbapi_connection: PoolProxiedConnection
1256 dialect: Dialect
1257 unicode_statement: str
1258 cursor: DBAPICursor
1259 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1260 parameters: _DBAPIMultiExecuteParams
1261 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1262
1263 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1264
1265 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1266 _num_sentinel_cols: int = 0
1267
1268 @classmethod
1269 def _init_ddl(
1270 cls,
1271 dialect: Dialect,
1272 connection: Connection,
1273 dbapi_connection: PoolProxiedConnection,
1274 execution_options: _ExecuteOptions,
1275 compiled_ddl: DDLCompiler,
1276 ) -> ExecutionContext:
1277 """Initialize execution context for an ExecutableDDLElement
1278 construct."""
1279
1280 self = cls.__new__(cls)
1281 self.root_connection = connection
1282 self._dbapi_connection = dbapi_connection
1283 self.dialect = connection.dialect
1284
1285 self.compiled = compiled = compiled_ddl
1286 self.isddl = True
1287
1288 self.execution_options = execution_options
1289
1290 self.unicode_statement = str(compiled)
1291 if compiled.schema_translate_map:
1292 schema_translate_map = self.execution_options.get(
1293 "schema_translate_map", {}
1294 )
1295
1296 rst = compiled.preparer._render_schema_translates
1297 self.unicode_statement = rst(
1298 self.unicode_statement, schema_translate_map
1299 )
1300
1301 self.statement = self.unicode_statement
1302
1303 self.cursor = self.create_cursor()
1304 self.compiled_parameters = []
1305
1306 if dialect.positional:
1307 self.parameters = [dialect.execute_sequence_format()]
1308 else:
1309 self.parameters = [self._empty_dict_params]
1310
1311 return self
1312
1313 @classmethod
1314 def _init_compiled(
1315 cls,
1316 dialect: Dialect,
1317 connection: Connection,
1318 dbapi_connection: PoolProxiedConnection,
1319 execution_options: _ExecuteOptions,
1320 compiled: SQLCompiler,
1321 parameters: _CoreMultiExecuteParams,
1322 invoked_statement: Executable,
1323 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1324 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1325 ) -> ExecutionContext:
1326 """Initialize execution context for a Compiled construct."""
1327
1328 self = cls.__new__(cls)
1329 self.root_connection = connection
1330 self._dbapi_connection = dbapi_connection
1331 self.dialect = connection.dialect
1332 self.extracted_parameters = extracted_parameters
1333 self.invoked_statement = invoked_statement
1334 self.compiled = compiled
1335 self.cache_hit = cache_hit
1336
1337 self.execution_options = execution_options
1338
1339 self.result_column_struct = (
1340 compiled._result_columns,
1341 compiled._ordered_columns,
1342 compiled._textual_ordered_columns,
1343 compiled._ad_hoc_textual,
1344 compiled._loose_column_name_matching,
1345 )
1346
1347 self.isinsert = ii = compiled.isinsert
1348 self.isupdate = iu = compiled.isupdate
1349 self.isdelete = id_ = compiled.isdelete
1350 self.is_text = compiled.isplaintext
1351
1352 if ii or iu or id_:
1353 dml_statement = compiled.compile_state.statement # type: ignore
1354 if TYPE_CHECKING:
1355 assert isinstance(dml_statement, UpdateBase)
1356 self.is_crud = True
1357 self._is_explicit_returning = ier = bool(dml_statement._returning)
1358 self._is_implicit_returning = iir = bool(
1359 compiled.implicit_returning
1360 )
1361 if iir and dml_statement._supplemental_returning:
1362 self._is_supplemental_returning = True
1363
1364 # dont mix implicit and explicit returning
1365 assert not (iir and ier)
1366
1367 if (ier or iir) and compiled.for_executemany:
1368 if ii and not self.dialect.insert_executemany_returning:
1369 raise exc.InvalidRequestError(
1370 f"Dialect {self.dialect.dialect_description} with "
1371 f"current server capabilities does not support "
1372 "INSERT..RETURNING when executemany is used"
1373 )
1374 elif (
1375 ii
1376 and dml_statement._sort_by_parameter_order
1377 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1378 ):
1379 raise exc.InvalidRequestError(
1380 f"Dialect {self.dialect.dialect_description} with "
1381 f"current server capabilities does not support "
1382 "INSERT..RETURNING with deterministic row ordering "
1383 "when executemany is used"
1384 )
1385 elif (
1386 ii
1387 and self.dialect.use_insertmanyvalues
1388 and not compiled._insertmanyvalues
1389 ):
1390 raise exc.InvalidRequestError(
1391 'Statement does not have "insertmanyvalues" '
1392 "enabled, can't use INSERT..RETURNING with "
1393 "executemany in this case."
1394 )
1395 elif iu and not self.dialect.update_executemany_returning:
1396 raise exc.InvalidRequestError(
1397 f"Dialect {self.dialect.dialect_description} with "
1398 f"current server capabilities does not support "
1399 "UPDATE..RETURNING when executemany is used"
1400 )
1401 elif id_ and not self.dialect.delete_executemany_returning:
1402 raise exc.InvalidRequestError(
1403 f"Dialect {self.dialect.dialect_description} with "
1404 f"current server capabilities does not support "
1405 "DELETE..RETURNING when executemany is used"
1406 )
1407
1408 if not parameters:
1409 self.compiled_parameters = [
1410 compiled.construct_params(
1411 extracted_parameters=extracted_parameters,
1412 escape_names=False,
1413 )
1414 ]
1415 else:
1416 self.compiled_parameters = [
1417 compiled.construct_params(
1418 m,
1419 escape_names=False,
1420 _group_number=grp,
1421 extracted_parameters=extracted_parameters,
1422 )
1423 for grp, m in enumerate(parameters)
1424 ]
1425
1426 if len(parameters) > 1:
1427 if self.isinsert and compiled._insertmanyvalues:
1428 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1429
1430 imv = compiled._insertmanyvalues
1431 if imv.sentinel_columns is not None:
1432 self._num_sentinel_cols = imv.num_sentinel_columns
1433 else:
1434 self.execute_style = ExecuteStyle.EXECUTEMANY
1435
1436 self.unicode_statement = compiled.string
1437
1438 self.cursor = self.create_cursor()
1439
1440 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1441 self._process_execute_defaults()
1442
1443 processors = compiled._bind_processors
1444
1445 flattened_processors: Mapping[
1446 str, _BindProcessorType[Any]
1447 ] = processors # type: ignore[assignment]
1448
1449 if compiled.literal_execute_params or compiled.post_compile_params:
1450 if self.executemany:
1451 raise exc.InvalidRequestError(
1452 "'literal_execute' or 'expanding' parameters can't be "
1453 "used with executemany()"
1454 )
1455
1456 expanded_state = compiled._process_parameters_for_postcompile(
1457 self.compiled_parameters[0]
1458 )
1459
1460 # re-assign self.unicode_statement
1461 self.unicode_statement = expanded_state.statement
1462
1463 self._expanded_parameters = expanded_state.parameter_expansion
1464
1465 flattened_processors = dict(processors) # type: ignore
1466 flattened_processors.update(expanded_state.processors)
1467 positiontup = expanded_state.positiontup
1468 elif compiled.positional:
1469 positiontup = self.compiled.positiontup
1470 else:
1471 positiontup = None
1472
1473 if compiled.schema_translate_map:
1474 schema_translate_map = self.execution_options.get(
1475 "schema_translate_map", {}
1476 )
1477 rst = compiled.preparer._render_schema_translates
1478 self.unicode_statement = rst(
1479 self.unicode_statement, schema_translate_map
1480 )
1481
1482 # final self.unicode_statement is now assigned, encode if needed
1483 # by dialect
1484 self.statement = self.unicode_statement
1485
1486 # Convert the dictionary of bind parameter values
1487 # into a dict or list to be sent to the DBAPI's
1488 # execute() or executemany() method.
1489
1490 if compiled.positional:
1491 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1492 assert positiontup is not None
1493 for compiled_params in self.compiled_parameters:
1494 l_param: List[Any] = [
1495 (
1496 flattened_processors[key](compiled_params[key])
1497 if key in flattened_processors
1498 else compiled_params[key]
1499 )
1500 for key in positiontup
1501 ]
1502 core_positional_parameters.append(
1503 dialect.execute_sequence_format(l_param)
1504 )
1505
1506 self.parameters = core_positional_parameters
1507 else:
1508 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1509 escaped_names = compiled.escaped_bind_names
1510
1511 # note that currently, "expanded" parameters will be present
1512 # in self.compiled_parameters in their quoted form. This is
1513 # slightly inconsistent with the approach taken as of
1514 # #8056 where self.compiled_parameters is meant to contain unquoted
1515 # param names.
1516 d_param: Dict[str, Any]
1517 for compiled_params in self.compiled_parameters:
1518 if escaped_names:
1519 d_param = {
1520 escaped_names.get(key, key): (
1521 flattened_processors[key](compiled_params[key])
1522 if key in flattened_processors
1523 else compiled_params[key]
1524 )
1525 for key in compiled_params
1526 }
1527 else:
1528 d_param = {
1529 key: (
1530 flattened_processors[key](compiled_params[key])
1531 if key in flattened_processors
1532 else compiled_params[key]
1533 )
1534 for key in compiled_params
1535 }
1536
1537 core_dict_parameters.append(d_param)
1538
1539 self.parameters = core_dict_parameters
1540
1541 return self
1542
1543 @classmethod
1544 def _init_statement(
1545 cls,
1546 dialect: Dialect,
1547 connection: Connection,
1548 dbapi_connection: PoolProxiedConnection,
1549 execution_options: _ExecuteOptions,
1550 statement: str,
1551 parameters: _DBAPIMultiExecuteParams,
1552 ) -> ExecutionContext:
1553 """Initialize execution context for a string SQL statement."""
1554
1555 self = cls.__new__(cls)
1556 self.root_connection = connection
1557 self._dbapi_connection = dbapi_connection
1558 self.dialect = connection.dialect
1559 self.is_text = True
1560
1561 self.execution_options = execution_options
1562
1563 if not parameters:
1564 if self.dialect.positional:
1565 self.parameters = [dialect.execute_sequence_format()]
1566 else:
1567 self.parameters = [self._empty_dict_params]
1568 elif isinstance(parameters[0], dialect.execute_sequence_format):
1569 self.parameters = parameters
1570 elif isinstance(parameters[0], dict):
1571 self.parameters = parameters
1572 else:
1573 self.parameters = [
1574 dialect.execute_sequence_format(p) for p in parameters
1575 ]
1576
1577 if len(parameters) > 1:
1578 self.execute_style = ExecuteStyle.EXECUTEMANY
1579
1580 self.statement = self.unicode_statement = statement
1581
1582 self.cursor = self.create_cursor()
1583 return self
1584
1585 @classmethod
1586 def _init_default(
1587 cls,
1588 dialect: Dialect,
1589 connection: Connection,
1590 dbapi_connection: PoolProxiedConnection,
1591 execution_options: _ExecuteOptions,
1592 ) -> ExecutionContext:
1593 """Initialize execution context for a ColumnDefault construct."""
1594
1595 self = cls.__new__(cls)
1596 self.root_connection = connection
1597 self._dbapi_connection = dbapi_connection
1598 self.dialect = connection.dialect
1599
1600 self.execution_options = execution_options
1601
1602 self.cursor = self.create_cursor()
1603 return self
1604
1605 def _get_cache_stats(self) -> str:
1606 if self.compiled is None:
1607 return "raw sql"
1608
1609 now = perf_counter()
1610
1611 ch = self.cache_hit
1612
1613 gen_time = self.compiled._gen_time
1614 assert gen_time is not None
1615
1616 if ch is NO_CACHE_KEY:
1617 return "no key %.5fs" % (now - gen_time,)
1618 elif ch is CACHE_HIT:
1619 return "cached since %.4gs ago" % (now - gen_time,)
1620 elif ch is CACHE_MISS:
1621 return "generated in %.5fs" % (now - gen_time,)
1622 elif ch is CACHING_DISABLED:
1623 if "_cache_disable_reason" in self.execution_options:
1624 return "caching disabled (%s) %.5fs " % (
1625 self.execution_options["_cache_disable_reason"],
1626 now - gen_time,
1627 )
1628 else:
1629 return "caching disabled %.5fs" % (now - gen_time,)
1630 elif ch is NO_DIALECT_SUPPORT:
1631 return "dialect %s+%s does not support caching %.5fs" % (
1632 self.dialect.name,
1633 self.dialect.driver,
1634 now - gen_time,
1635 )
1636 else:
1637 return "unknown"
1638
1639 @property
1640 def executemany(self): # type: ignore[override]
1641 return self.execute_style in (
1642 ExecuteStyle.EXECUTEMANY,
1643 ExecuteStyle.INSERTMANYVALUES,
1644 )
1645
1646 @util.memoized_property
1647 def identifier_preparer(self):
1648 if self.compiled:
1649 return self.compiled.preparer
1650 elif "schema_translate_map" in self.execution_options:
1651 return self.dialect.identifier_preparer._with_schema_translate(
1652 self.execution_options["schema_translate_map"]
1653 )
1654 else:
1655 return self.dialect.identifier_preparer
1656
1657 @util.memoized_property
1658 def engine(self):
1659 return self.root_connection.engine
1660
1661 @util.memoized_property
1662 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1663 if TYPE_CHECKING:
1664 assert isinstance(self.compiled, SQLCompiler)
1665 return self.compiled.postfetch
1666
1667 @util.memoized_property
1668 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1669 if TYPE_CHECKING:
1670 assert isinstance(self.compiled, SQLCompiler)
1671 if self.isinsert:
1672 return self.compiled.insert_prefetch
1673 elif self.isupdate:
1674 return self.compiled.update_prefetch
1675 else:
1676 return ()
1677
1678 @util.memoized_property
1679 def no_parameters(self):
1680 return self.execution_options.get("no_parameters", False)
1681
1682 def _execute_scalar(
1683 self,
1684 stmt: str,
1685 type_: Optional[TypeEngine[Any]],
1686 parameters: Optional[_DBAPISingleExecuteParams] = None,
1687 ) -> Any:
1688 """Execute a string statement on the current cursor, returning a
1689 scalar result.
1690
1691 Used to fire off sequences, default phrases, and "select lastrowid"
1692 types of statements individually or in the context of a parent INSERT
1693 or UPDATE statement.
1694
1695 """
1696
1697 conn = self.root_connection
1698
1699 if "schema_translate_map" in self.execution_options:
1700 schema_translate_map = self.execution_options.get(
1701 "schema_translate_map", {}
1702 )
1703
1704 rst = self.identifier_preparer._render_schema_translates
1705 stmt = rst(stmt, schema_translate_map)
1706
1707 if not parameters:
1708 if self.dialect.positional:
1709 parameters = self.dialect.execute_sequence_format()
1710 else:
1711 parameters = {}
1712
1713 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1714 row = self.cursor.fetchone()
1715 if row is not None:
1716 r = row[0]
1717 else:
1718 r = None
1719 if type_ is not None:
1720 # apply type post processors to the result
1721 proc = type_._cached_result_processor(
1722 self.dialect, self.cursor.description[0][1]
1723 )
1724 if proc:
1725 return proc(r)
1726 return r
1727
1728 @util.memoized_property
1729 def connection(self):
1730 return self.root_connection
1731
1732 def _use_server_side_cursor(self):
1733 if not self.dialect.supports_server_side_cursors:
1734 return False
1735
1736 if self.dialect.server_side_cursors:
1737 # this is deprecated
1738 use_server_side = self.execution_options.get(
1739 "stream_results", True
1740 ) and (
1741 self.compiled
1742 and isinstance(self.compiled.statement, expression.Selectable)
1743 or (
1744 (
1745 not self.compiled
1746 or isinstance(
1747 self.compiled.statement, expression.TextClause
1748 )
1749 )
1750 and self.unicode_statement
1751 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1752 )
1753 )
1754 else:
1755 use_server_side = self.execution_options.get(
1756 "stream_results", False
1757 )
1758
1759 return use_server_side
1760
1761 def create_cursor(self) -> DBAPICursor:
1762 if (
1763 # inlining initial preference checks for SS cursors
1764 self.dialect.supports_server_side_cursors
1765 and (
1766 self.execution_options.get("stream_results", False)
1767 or (
1768 self.dialect.server_side_cursors
1769 and self._use_server_side_cursor()
1770 )
1771 )
1772 ):
1773 self._is_server_side = True
1774 return self.create_server_side_cursor()
1775 else:
1776 self._is_server_side = False
1777 return self.create_default_cursor()
1778
1779 def fetchall_for_returning(self, cursor):
1780 return cursor.fetchall()
1781
1782 def create_default_cursor(self) -> DBAPICursor:
1783 return self._dbapi_connection.cursor()
1784
1785 def create_server_side_cursor(self) -> DBAPICursor:
1786 raise NotImplementedError()
1787
1788 def pre_exec(self):
1789 pass
1790
1791 def get_out_parameter_values(self, names):
1792 raise NotImplementedError(
1793 "This dialect does not support OUT parameters"
1794 )
1795
1796 def post_exec(self):
1797 pass
1798
1799 def get_result_processor(
1800 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1801 ) -> Optional[_ResultProcessorType[Any]]:
1802 """Return a 'result processor' for a given type as present in
1803 cursor.description.
1804
1805 This has a default implementation that dialects can override
1806 for context-sensitive result type handling.
1807
1808 """
1809 return type_._cached_result_processor(self.dialect, coltype)
1810
1811 def get_lastrowid(self) -> int:
1812 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1813
1814 This may involve calling special cursor functions, issuing a new SELECT
1815 on the cursor (or a new one), or returning a stored value that was
1816 calculated within post_exec().
1817
1818 This function will only be called for dialects which support "implicit"
1819 primary key generation, keep preexecute_autoincrement_sequences set to
1820 False, and when no explicit id value was bound to the statement.
1821
1822 The function is called once for an INSERT statement that would need to
1823 return the last inserted primary key for those dialects that make use
1824 of the lastrowid concept. In these cases, it is called directly after
1825 :meth:`.ExecutionContext.post_exec`.
1826
1827 """
1828 return self.cursor.lastrowid
1829
1830 def handle_dbapi_exception(self, e):
1831 pass
1832
1833 @util.non_memoized_property
1834 def rowcount(self) -> int:
1835 if self._rowcount is not None:
1836 return self._rowcount
1837 else:
1838 return self.cursor.rowcount
1839
1840 @property
1841 def _has_rowcount(self):
1842 return self._rowcount is not None
1843
1844 def supports_sane_rowcount(self):
1845 return self.dialect.supports_sane_rowcount
1846
1847 def supports_sane_multi_rowcount(self):
1848 return self.dialect.supports_sane_multi_rowcount
1849
1850 def _setup_result_proxy(self):
1851 exec_opt = self.execution_options
1852
1853 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1854 self._rowcount = self.cursor.rowcount
1855
1856 yp: Optional[Union[int, bool]]
1857 if self.is_crud or self.is_text:
1858 result = self._setup_dml_or_text_result()
1859 yp = False
1860 else:
1861 yp = exec_opt.get("yield_per", None)
1862 sr = self._is_server_side or exec_opt.get("stream_results", False)
1863 strategy = self.cursor_fetch_strategy
1864 if sr and strategy is _cursor._DEFAULT_FETCH:
1865 strategy = _cursor.BufferedRowCursorFetchStrategy(
1866 self.cursor, self.execution_options
1867 )
1868 cursor_description: _DBAPICursorDescription = (
1869 strategy.alternate_cursor_description
1870 or self.cursor.description
1871 )
1872 if cursor_description is None:
1873 strategy = _cursor._NO_CURSOR_DQL
1874
1875 result = _cursor.CursorResult(self, strategy, cursor_description)
1876
1877 compiled = self.compiled
1878
1879 if (
1880 compiled
1881 and not self.isddl
1882 and cast(SQLCompiler, compiled).has_out_parameters
1883 ):
1884 self._setup_out_parameters(result)
1885
1886 self._soft_closed = result._soft_closed
1887
1888 if yp:
1889 result = result.yield_per(yp)
1890
1891 return result
1892
1893 def _setup_out_parameters(self, result):
1894 compiled = cast(SQLCompiler, self.compiled)
1895
1896 out_bindparams = [
1897 (param, name)
1898 for param, name in compiled.bind_names.items()
1899 if param.isoutparam
1900 ]
1901 out_parameters = {}
1902
1903 for bindparam, raw_value in zip(
1904 [param for param, name in out_bindparams],
1905 self.get_out_parameter_values(
1906 [name for param, name in out_bindparams]
1907 ),
1908 ):
1909 type_ = bindparam.type
1910 impl_type = type_.dialect_impl(self.dialect)
1911 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1912 result_processor = impl_type.result_processor(
1913 self.dialect, dbapi_type
1914 )
1915 if result_processor is not None:
1916 raw_value = result_processor(raw_value)
1917 out_parameters[bindparam.key] = raw_value
1918
1919 result.out_parameters = out_parameters
1920
1921 def _setup_dml_or_text_result(self):
1922 compiled = cast(SQLCompiler, self.compiled)
1923
1924 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1925
1926 if self.isinsert:
1927 if (
1928 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1929 and compiled.effective_returning
1930 ):
1931 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1932 self.cursor,
1933 initial_buffer=self._insertmanyvalues_rows,
1934 # maintain alt cursor description if set by the
1935 # dialect, e.g. mssql preserves it
1936 alternate_description=(
1937 strategy.alternate_cursor_description
1938 ),
1939 )
1940
1941 if compiled.postfetch_lastrowid:
1942 self.inserted_primary_key_rows = (
1943 self._setup_ins_pk_from_lastrowid()
1944 )
1945 # else if not self._is_implicit_returning,
1946 # the default inserted_primary_key_rows accessor will
1947 # return an "empty" primary key collection when accessed.
1948
1949 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1950 strategy = _cursor.BufferedRowCursorFetchStrategy(
1951 self.cursor, self.execution_options
1952 )
1953
1954 if strategy is _cursor._NO_CURSOR_DML:
1955 cursor_description = None
1956 else:
1957 cursor_description = (
1958 strategy.alternate_cursor_description
1959 or self.cursor.description
1960 )
1961
1962 if cursor_description is None:
1963 strategy = _cursor._NO_CURSOR_DML
1964 elif self._num_sentinel_cols:
1965 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1966 # strip out the sentinel columns from cursor description
1967 # a similar logic is done to the rows only in CursorResult
1968 cursor_description = cursor_description[
1969 0 : -self._num_sentinel_cols
1970 ]
1971
1972 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1973 self, strategy, cursor_description
1974 )
1975
1976 if self.isinsert:
1977 if self._is_implicit_returning:
1978 rows = result.all()
1979
1980 self.returned_default_rows = rows
1981
1982 self.inserted_primary_key_rows = (
1983 self._setup_ins_pk_from_implicit_returning(result, rows)
1984 )
1985
1986 # test that it has a cursor metadata that is accurate. the
1987 # first row will have been fetched and current assumptions
1988 # are that the result has only one row, until executemany()
1989 # support is added here.
1990 assert result._metadata.returns_rows
1991
1992 # Insert statement has both return_defaults() and
1993 # returning(). rewind the result on the list of rows
1994 # we just used.
1995 if self._is_supplemental_returning:
1996 result._rewind(rows)
1997 else:
1998 result._soft_close()
1999 elif not self._is_explicit_returning:
2000 result._soft_close()
2001
2002 # we assume here the result does not return any rows.
2003 # *usually*, this will be true. However, some dialects
2004 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2005 # function so this is not necessarily true.
2006 # assert not result.returns_rows
2007
2008 elif self._is_implicit_returning:
2009 rows = result.all()
2010
2011 if rows:
2012 self.returned_default_rows = rows
2013 self._rowcount = len(rows)
2014
2015 if self._is_supplemental_returning:
2016 result._rewind(rows)
2017 else:
2018 result._soft_close()
2019
2020 # test that it has a cursor metadata that is accurate.
2021 # the rows have all been fetched however.
2022 assert result._metadata.returns_rows
2023
2024 elif not result._metadata.returns_rows:
2025 # no results, get rowcount
2026 # (which requires open cursor on some drivers)
2027 if self._rowcount is None:
2028 self._rowcount = self.cursor.rowcount
2029 result._soft_close()
2030 elif self.isupdate or self.isdelete:
2031 if self._rowcount is None:
2032 self._rowcount = self.cursor.rowcount
2033 return result
2034
2035 @util.memoized_property
2036 def inserted_primary_key_rows(self):
2037 # if no specific "get primary key" strategy was set up
2038 # during execution, return a "default" primary key based
2039 # on what's in the compiled_parameters and nothing else.
2040 return self._setup_ins_pk_from_empty()
2041
2042 def _setup_ins_pk_from_lastrowid(self):
2043 getter = cast(
2044 SQLCompiler, self.compiled
2045 )._inserted_primary_key_from_lastrowid_getter
2046 lastrowid = self.get_lastrowid()
2047 return [getter(lastrowid, self.compiled_parameters[0])]
2048
2049 def _setup_ins_pk_from_empty(self):
2050 getter = cast(
2051 SQLCompiler, self.compiled
2052 )._inserted_primary_key_from_lastrowid_getter
2053 return [getter(None, param) for param in self.compiled_parameters]
2054
2055 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2056 if not rows:
2057 return []
2058
2059 getter = cast(
2060 SQLCompiler, self.compiled
2061 )._inserted_primary_key_from_returning_getter
2062 compiled_params = self.compiled_parameters
2063
2064 return [
2065 getter(row, param) for row, param in zip(rows, compiled_params)
2066 ]
2067
2068 def lastrow_has_defaults(self) -> bool:
2069 return (self.isinsert or self.isupdate) and bool(
2070 cast(SQLCompiler, self.compiled).postfetch
2071 )
2072
2073 def _prepare_set_input_sizes(
2074 self,
2075 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2076 """Given a cursor and ClauseParameters, prepare arguments
2077 in order to call the appropriate
2078 style of ``setinputsizes()`` on the cursor, using DB-API types
2079 from the bind parameter's ``TypeEngine`` objects.
2080
2081 This method only called by those dialects which set the
2082 :attr:`.Dialect.bind_typing` attribute to
2083 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2084 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2085 option.
2086
2087 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2088 for pg8000 and asyncpg, which has been changed to inline rendering
2089 of casts.
2090
2091 """
2092 if self.isddl or self.is_text:
2093 return None
2094
2095 compiled = cast(SQLCompiler, self.compiled)
2096
2097 inputsizes = compiled._get_set_input_sizes_lookup()
2098
2099 if inputsizes is None:
2100 return None
2101
2102 dialect = self.dialect
2103
2104 # all of the rest of this... cython?
2105
2106 if dialect._has_events:
2107 inputsizes = dict(inputsizes)
2108 dialect.dispatch.do_setinputsizes(
2109 inputsizes, self.cursor, self.statement, self.parameters, self
2110 )
2111
2112 if compiled.escaped_bind_names:
2113 escaped_bind_names = compiled.escaped_bind_names
2114 else:
2115 escaped_bind_names = None
2116
2117 if dialect.positional:
2118 items = [
2119 (key, compiled.binds[key])
2120 for key in compiled.positiontup or ()
2121 ]
2122 else:
2123 items = [
2124 (key, bindparam)
2125 for bindparam, key in compiled.bind_names.items()
2126 ]
2127
2128 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2129 for key, bindparam in items:
2130 if bindparam in compiled.literal_execute_params:
2131 continue
2132
2133 if key in self._expanded_parameters:
2134 if is_tuple_type(bindparam.type):
2135 num = len(bindparam.type.types)
2136 dbtypes = inputsizes[bindparam]
2137 generic_inputsizes.extend(
2138 (
2139 (
2140 escaped_bind_names.get(paramname, paramname)
2141 if escaped_bind_names is not None
2142 else paramname
2143 ),
2144 dbtypes[idx % num],
2145 bindparam.type.types[idx % num],
2146 )
2147 for idx, paramname in enumerate(
2148 self._expanded_parameters[key]
2149 )
2150 )
2151 else:
2152 dbtype = inputsizes.get(bindparam, None)
2153 generic_inputsizes.extend(
2154 (
2155 (
2156 escaped_bind_names.get(paramname, paramname)
2157 if escaped_bind_names is not None
2158 else paramname
2159 ),
2160 dbtype,
2161 bindparam.type,
2162 )
2163 for paramname in self._expanded_parameters[key]
2164 )
2165 else:
2166 dbtype = inputsizes.get(bindparam, None)
2167
2168 escaped_name = (
2169 escaped_bind_names.get(key, key)
2170 if escaped_bind_names is not None
2171 else key
2172 )
2173
2174 generic_inputsizes.append(
2175 (escaped_name, dbtype, bindparam.type)
2176 )
2177
2178 return generic_inputsizes
2179
2180 def _exec_default(self, column, default, type_):
2181 if default.is_sequence:
2182 return self.fire_sequence(default, type_)
2183 elif default.is_callable:
2184 # this codepath is not normally used as it's inlined
2185 # into _process_execute_defaults
2186 self.current_column = column
2187 return default.arg(self)
2188 elif default.is_clause_element:
2189 return self._exec_default_clause_element(column, default, type_)
2190 else:
2191 # this codepath is not normally used as it's inlined
2192 # into _process_execute_defaults
2193 return default.arg
2194
2195 def _exec_default_clause_element(self, column, default, type_):
2196 # execute a default that's a complete clause element. Here, we have
2197 # to re-implement a miniature version of the compile->parameters->
2198 # cursor.execute() sequence, since we don't want to modify the state
2199 # of the connection / result in progress or create new connection/
2200 # result objects etc.
2201 # .. versionchanged:: 1.4
2202
2203 if not default._arg_is_typed:
2204 default_arg = expression.type_coerce(default.arg, type_)
2205 else:
2206 default_arg = default.arg
2207 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2208 compiled_params = compiled.construct_params()
2209 processors = compiled._bind_processors
2210 if compiled.positional:
2211 parameters = self.dialect.execute_sequence_format(
2212 [
2213 (
2214 processors[key](compiled_params[key]) # type: ignore
2215 if key in processors
2216 else compiled_params[key]
2217 )
2218 for key in compiled.positiontup or ()
2219 ]
2220 )
2221 else:
2222 parameters = {
2223 key: (
2224 processors[key](compiled_params[key]) # type: ignore
2225 if key in processors
2226 else compiled_params[key]
2227 )
2228 for key in compiled_params
2229 }
2230 return self._execute_scalar(
2231 str(compiled), type_, parameters=parameters
2232 )
2233
2234 current_parameters: Optional[_CoreSingleExecuteParams] = None
2235 """A dictionary of parameters applied to the current row.
2236
2237 This attribute is only available in the context of a user-defined default
2238 generation function, e.g. as described at :ref:`context_default_functions`.
2239 It consists of a dictionary which includes entries for each column/value
2240 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2241 dictionary will be the key value of each :class:`_schema.Column`,
2242 which is usually
2243 synonymous with the name.
2244
2245 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2246 does not accommodate for the "multi-values" feature of the
2247 :meth:`_expression.Insert.values` method. The
2248 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2249 preferred.
2250
2251 .. seealso::
2252
2253 :meth:`.DefaultExecutionContext.get_current_parameters`
2254
2255 :ref:`context_default_functions`
2256
2257 """
2258
2259 def get_current_parameters(self, isolate_multiinsert_groups=True):
2260 """Return a dictionary of parameters applied to the current row.
2261
2262 This method can only be used in the context of a user-defined default
2263 generation function, e.g. as described at
2264 :ref:`context_default_functions`. When invoked, a dictionary is
2265 returned which includes entries for each column/value pair that is part
2266 of the INSERT or UPDATE statement. The keys of the dictionary will be
2267 the key value of each :class:`_schema.Column`,
2268 which is usually synonymous
2269 with the name.
2270
2271 :param isolate_multiinsert_groups=True: indicates that multi-valued
2272 INSERT constructs created using :meth:`_expression.Insert.values`
2273 should be
2274 handled by returning only the subset of parameters that are local
2275 to the current column default invocation. When ``False``, the
2276 raw parameters of the statement are returned including the
2277 naming convention used in the case of multi-valued INSERT.
2278
2279 .. versionadded:: 1.2 added
2280 :meth:`.DefaultExecutionContext.get_current_parameters`
2281 which provides more functionality over the existing
2282 :attr:`.DefaultExecutionContext.current_parameters`
2283 attribute.
2284
2285 .. seealso::
2286
2287 :attr:`.DefaultExecutionContext.current_parameters`
2288
2289 :ref:`context_default_functions`
2290
2291 """
2292 try:
2293 parameters = self.current_parameters
2294 column = self.current_column
2295 except AttributeError:
2296 raise exc.InvalidRequestError(
2297 "get_current_parameters() can only be invoked in the "
2298 "context of a Python side column default function"
2299 )
2300 else:
2301 assert column is not None
2302 assert parameters is not None
2303 compile_state = cast(
2304 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2305 )
2306 assert compile_state is not None
2307 if (
2308 isolate_multiinsert_groups
2309 and dml.isinsert(compile_state)
2310 and compile_state._has_multi_parameters
2311 ):
2312 if column._is_multiparam_column:
2313 index = column.index + 1
2314 d = {column.original.key: parameters[column.key]}
2315 else:
2316 d = {column.key: parameters[column.key]}
2317 index = 0
2318 assert compile_state._dict_parameters is not None
2319 keys = compile_state._dict_parameters.keys()
2320 d.update(
2321 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2322 )
2323 return d
2324 else:
2325 return parameters
2326
2327 def get_insert_default(self, column):
2328 if column.default is None:
2329 return None
2330 else:
2331 return self._exec_default(column, column.default, column.type)
2332
2333 def get_update_default(self, column):
2334 if column.onupdate is None:
2335 return None
2336 else:
2337 return self._exec_default(column, column.onupdate, column.type)
2338
2339 def _process_execute_defaults(self):
2340 compiled = cast(SQLCompiler, self.compiled)
2341
2342 key_getter = compiled._within_exec_param_key_getter
2343
2344 sentinel_counter = 0
2345
2346 if compiled.insert_prefetch:
2347 prefetch_recs = [
2348 (
2349 c,
2350 key_getter(c),
2351 c._default_description_tuple,
2352 self.get_insert_default,
2353 )
2354 for c in compiled.insert_prefetch
2355 ]
2356 elif compiled.update_prefetch:
2357 prefetch_recs = [
2358 (
2359 c,
2360 key_getter(c),
2361 c._onupdate_description_tuple,
2362 self.get_update_default,
2363 )
2364 for c in compiled.update_prefetch
2365 ]
2366 else:
2367 prefetch_recs = []
2368
2369 for param in self.compiled_parameters:
2370 self.current_parameters = param
2371
2372 for (
2373 c,
2374 param_key,
2375 (arg, is_scalar, is_callable, is_sentinel),
2376 fallback,
2377 ) in prefetch_recs:
2378 if is_sentinel:
2379 param[param_key] = sentinel_counter
2380 sentinel_counter += 1
2381 elif is_scalar:
2382 param[param_key] = arg
2383 elif is_callable:
2384 self.current_column = c
2385 param[param_key] = arg(self)
2386 else:
2387 val = fallback(c)
2388 if val is not None:
2389 param[param_key] = val
2390
2391 del self.current_parameters
2392
2393
2394DefaultDialect.execution_ctx_cls = DefaultExecutionContext