1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import MutableSequence
33from typing import Optional
34from typing import Sequence
35from typing import Set
36from typing import Tuple
37from typing import Type
38from typing import TYPE_CHECKING
39from typing import Union
40import weakref
41
42from . import characteristics
43from . import cursor as _cursor
44from . import interfaces
45from .base import Connection
46from .interfaces import CacheStats
47from .interfaces import DBAPICursor
48from .interfaces import Dialect
49from .interfaces import ExecuteStyle
50from .interfaces import ExecutionContext
51from .reflection import ObjectKind
52from .reflection import ObjectScope
53from .. import event
54from .. import exc
55from .. import pool
56from .. import util
57from ..sql import compiler
58from ..sql import dml
59from ..sql import expression
60from ..sql import type_api
61from ..sql import util as sql_util
62from ..sql._typing import is_tuple_type
63from ..sql.base import _NoArg
64from ..sql.compiler import DDLCompiler
65from ..sql.compiler import InsertmanyvaluesSentinelOpts
66from ..sql.compiler import SQLCompiler
67from ..sql.elements import quoted_name
68from ..util.typing import Final
69from ..util.typing import Literal
70
71if typing.TYPE_CHECKING:
72 from types import ModuleType
73
74 from .base import Engine
75 from .cursor import ResultFetchStrategy
76 from .interfaces import _CoreMultiExecuteParams
77 from .interfaces import _CoreSingleExecuteParams
78 from .interfaces import _DBAPICursorDescription
79 from .interfaces import _DBAPIMultiExecuteParams
80 from .interfaces import _DBAPISingleExecuteParams
81 from .interfaces import _ExecuteOptions
82 from .interfaces import _MutableCoreSingleExecuteParams
83 from .interfaces import _ParamStyle
84 from .interfaces import ConnectArgsType
85 from .interfaces import DBAPIConnection
86 from .interfaces import IsolationLevel
87 from .row import Row
88 from .url import URL
89 from ..event import _ListenerFnType
90 from ..pool import Pool
91 from ..pool import PoolProxiedConnection
92 from ..sql import Executable
93 from ..sql.compiler import Compiled
94 from ..sql.compiler import Linting
95 from ..sql.compiler import ResultColumnsEntry
96 from ..sql.dml import DMLState
97 from ..sql.dml import UpdateBase
98 from ..sql.elements import BindParameter
99 from ..sql.schema import Column
100 from ..sql.type_api import _BindProcessorType
101 from ..sql.type_api import _ResultProcessorType
102 from ..sql.type_api import TypeEngine
103
104
105# When we're handed literal SQL, ensure it's a SELECT query
106SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
107
108
109(
110 CACHE_HIT,
111 CACHE_MISS,
112 CACHING_DISABLED,
113 NO_CACHE_KEY,
114 NO_DIALECT_SUPPORT,
115) = list(CacheStats)
116
117
118class DefaultDialect(Dialect):
119 """Default implementation of Dialect"""
120
121 statement_compiler = compiler.SQLCompiler
122 ddl_compiler = compiler.DDLCompiler
123 type_compiler_cls = compiler.GenericTypeCompiler
124
125 preparer = compiler.IdentifierPreparer
126 supports_alter = True
127 supports_comments = False
128 supports_constraint_comments = False
129 inline_comments = False
130 supports_statement_cache = True
131
132 div_is_floordiv = True
133
134 bind_typing = interfaces.BindTyping.NONE
135
136 include_set_input_sizes: Optional[Set[Any]] = None
137 exclude_set_input_sizes: Optional[Set[Any]] = None
138
139 # the first value we'd get for an autoincrement column.
140 default_sequence_base = 1
141
142 # most DBAPIs happy with this for execute().
143 # not cx_oracle.
144 execute_sequence_format = tuple
145
146 supports_schemas = True
147 supports_views = True
148 supports_sequences = False
149 sequences_optional = False
150 preexecute_autoincrement_sequences = False
151 supports_identity_columns = False
152 postfetch_lastrowid = True
153 favor_returning_over_lastrowid = False
154 insert_null_pk_still_autoincrements = False
155 update_returning = False
156 delete_returning = False
157 update_returning_multifrom = False
158 delete_returning_multifrom = False
159 insert_returning = False
160
161 cte_follows_insert = False
162
163 supports_native_enum = False
164 supports_native_boolean = False
165 supports_native_uuid = False
166 returns_native_bytes = False
167
168 non_native_boolean_check_constraint = True
169
170 supports_simple_order_by_label = True
171
172 tuple_in_values = False
173
174 connection_characteristics = util.immutabledict(
175 {
176 "isolation_level": characteristics.IsolationLevelCharacteristic(),
177 "logging_token": characteristics.LoggingTokenCharacteristic(),
178 }
179 )
180
181 engine_config_types: Mapping[str, Any] = util.immutabledict(
182 {
183 "pool_timeout": util.asint,
184 "echo": util.bool_or_str("debug"),
185 "echo_pool": util.bool_or_str("debug"),
186 "pool_recycle": util.asint,
187 "pool_size": util.asint,
188 "max_overflow": util.asint,
189 "future": util.asbool,
190 }
191 )
192
193 # if the NUMERIC type
194 # returns decimal.Decimal.
195 # *not* the FLOAT type however.
196 supports_native_decimal = False
197
198 name = "default"
199
200 # length at which to truncate
201 # any identifier.
202 max_identifier_length = 9999
203 _user_defined_max_identifier_length: Optional[int] = None
204
205 isolation_level: Optional[str] = None
206
207 # sub-categories of max_identifier_length.
208 # currently these accommodate for MySQL which allows alias names
209 # of 255 but DDL names only of 64.
210 max_index_name_length: Optional[int] = None
211 max_constraint_name_length: Optional[int] = None
212
213 supports_sane_rowcount = True
214 supports_sane_multi_rowcount = True
215 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
216 default_paramstyle = "named"
217
218 supports_default_values = False
219 """dialect supports INSERT... DEFAULT VALUES syntax"""
220
221 supports_default_metavalue = False
222 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
223
224 default_metavalue_token = "DEFAULT"
225 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
226 parenthesis."""
227
228 # not sure if this is a real thing but the compiler will deliver it
229 # if this is the only flag enabled.
230 supports_empty_insert = True
231 """dialect supports INSERT () VALUES ()"""
232
233 supports_multivalues_insert = False
234
235 use_insertmanyvalues: bool = False
236
237 use_insertmanyvalues_wo_returning: bool = False
238
239 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
240 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
241 )
242
243 insertmanyvalues_page_size: int = 1000
244 insertmanyvalues_max_parameters = 32700
245
246 supports_is_distinct_from = True
247
248 supports_server_side_cursors = False
249
250 server_side_cursors = False
251
252 # extra record-level locking features (#4860)
253 supports_for_update_of = False
254
255 server_version_info = None
256
257 default_schema_name: Optional[str] = None
258
259 # indicates symbol names are
260 # UPPERCASED if they are case insensitive
261 # within the database.
262 # if this is True, the methods normalize_name()
263 # and denormalize_name() must be provided.
264 requires_name_normalize = False
265
266 is_async = False
267
268 has_terminate = False
269
270 # TODO: this is not to be part of 2.0. implement rudimentary binary
271 # literals for SQLite, PostgreSQL, MySQL only within
272 # _Binary.literal_processor
273 _legacy_binary_type_literal_encoding = "utf-8"
274
275 @util.deprecated_params(
276 empty_in_strategy=(
277 "1.4",
278 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
279 "deprecated, and no longer has any effect. All IN expressions "
280 "are now rendered using "
281 'the "expanding parameter" strategy which renders a set of bound'
282 'expressions, or an "empty set" SELECT, at statement execution'
283 "time.",
284 ),
285 server_side_cursors=(
286 "1.4",
287 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
288 "is deprecated and will be removed in a future release. Please "
289 "use the "
290 ":paramref:`_engine.Connection.execution_options.stream_results` "
291 "parameter.",
292 ),
293 )
294 def __init__(
295 self,
296 paramstyle: Optional[_ParamStyle] = None,
297 isolation_level: Optional[IsolationLevel] = None,
298 dbapi: Optional[ModuleType] = None,
299 implicit_returning: Literal[True] = True,
300 supports_native_boolean: Optional[bool] = None,
301 max_identifier_length: Optional[int] = None,
302 label_length: Optional[int] = None,
303 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
304 use_insertmanyvalues: Optional[bool] = None,
305 # util.deprecated_params decorator cannot render the
306 # Linting.NO_LINTING constant
307 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
308 server_side_cursors: bool = False,
309 **kwargs: Any,
310 ):
311 if server_side_cursors:
312 if not self.supports_server_side_cursors:
313 raise exc.ArgumentError(
314 "Dialect %s does not support server side cursors" % self
315 )
316 else:
317 self.server_side_cursors = True
318
319 if getattr(self, "use_setinputsizes", False):
320 util.warn_deprecated(
321 "The dialect-level use_setinputsizes attribute is "
322 "deprecated. Please use "
323 "bind_typing = BindTyping.SETINPUTSIZES",
324 "2.0",
325 )
326 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
327
328 self.positional = False
329 self._ischema = None
330
331 self.dbapi = dbapi
332
333 if paramstyle is not None:
334 self.paramstyle = paramstyle
335 elif self.dbapi is not None:
336 self.paramstyle = self.dbapi.paramstyle
337 else:
338 self.paramstyle = self.default_paramstyle
339 self.positional = self.paramstyle in (
340 "qmark",
341 "format",
342 "numeric",
343 "numeric_dollar",
344 )
345 self.identifier_preparer = self.preparer(self)
346 self._on_connect_isolation_level = isolation_level
347
348 legacy_tt_callable = getattr(self, "type_compiler", None)
349 if legacy_tt_callable is not None:
350 tt_callable = cast(
351 Type[compiler.GenericTypeCompiler],
352 self.type_compiler,
353 )
354 else:
355 tt_callable = self.type_compiler_cls
356
357 self.type_compiler_instance = self.type_compiler = tt_callable(self)
358
359 if supports_native_boolean is not None:
360 self.supports_native_boolean = supports_native_boolean
361
362 self._user_defined_max_identifier_length = max_identifier_length
363 if self._user_defined_max_identifier_length:
364 self.max_identifier_length = (
365 self._user_defined_max_identifier_length
366 )
367 self.label_length = label_length
368 self.compiler_linting = compiler_linting
369
370 if use_insertmanyvalues is not None:
371 self.use_insertmanyvalues = use_insertmanyvalues
372
373 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
374 self.insertmanyvalues_page_size = insertmanyvalues_page_size
375
376 @property
377 @util.deprecated(
378 "2.0",
379 "full_returning is deprecated, please use insert_returning, "
380 "update_returning, delete_returning",
381 )
382 def full_returning(self):
383 return (
384 self.insert_returning
385 and self.update_returning
386 and self.delete_returning
387 )
388
389 @util.memoized_property
390 def insert_executemany_returning(self):
391 """Default implementation for insert_executemany_returning, if not
392 otherwise overridden by the specific dialect.
393
394 The default dialect determines "insert_executemany_returning" is
395 available if the dialect in use has opted into using the
396 "use_insertmanyvalues" feature. If they haven't opted into that, then
397 this attribute is False, unless the dialect in question overrides this
398 and provides some other implementation (such as the Oracle Database
399 dialects).
400
401 """
402 return self.insert_returning and self.use_insertmanyvalues
403
404 @util.memoized_property
405 def insert_executemany_returning_sort_by_parameter_order(self):
406 """Default implementation for
407 insert_executemany_returning_deterministic_order, if not otherwise
408 overridden by the specific dialect.
409
410 The default dialect determines "insert_executemany_returning" can have
411 deterministic order only if the dialect in use has opted into using the
412 "use_insertmanyvalues" feature, which implements deterministic ordering
413 using client side sentinel columns only by default. The
414 "insertmanyvalues" feature also features alternate forms that can
415 use server-generated PK values as "sentinels", but those are only
416 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
417 bitflag enables those alternate SQL forms, which are disabled
418 by default.
419
420 If the dialect in use hasn't opted into that, then this attribute is
421 False, unless the dialect in question overrides this and provides some
422 other implementation (such as the Oracle Database dialects).
423
424 """
425 return self.insert_returning and self.use_insertmanyvalues
426
427 update_executemany_returning = False
428 delete_executemany_returning = False
429
430 @util.memoized_property
431 def loaded_dbapi(self) -> ModuleType:
432 if self.dbapi is None:
433 raise exc.InvalidRequestError(
434 f"Dialect {self} does not have a Python DBAPI established "
435 "and cannot be used for actual database interaction"
436 )
437 return self.dbapi
438
439 @util.memoized_property
440 def _bind_typing_render_casts(self):
441 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
442
443 def _ensure_has_table_connection(self, arg: Connection) -> None:
444 if not isinstance(arg, Connection):
445 raise exc.ArgumentError(
446 "The argument passed to Dialect.has_table() should be a "
447 "%s, got %s. "
448 "Additionally, the Dialect.has_table() method is for "
449 "internal dialect "
450 "use only; please use "
451 "``inspect(some_engine).has_table(<tablename>>)`` "
452 "for public API use." % (Connection, type(arg))
453 )
454
455 @util.memoized_property
456 def _supports_statement_cache(self):
457 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
458 if ssc is None:
459 util.warn(
460 "Dialect %s:%s will not make use of SQL compilation caching "
461 "as it does not set the 'supports_statement_cache' attribute "
462 "to ``True``. This can have "
463 "significant performance implications including some "
464 "performance degradations in comparison to prior SQLAlchemy "
465 "versions. Dialect maintainers should seek to set this "
466 "attribute to True after appropriate development and testing "
467 "for SQLAlchemy 1.4 caching support. Alternatively, this "
468 "attribute may be set to False which will disable this "
469 "warning." % (self.name, self.driver),
470 code="cprf",
471 )
472
473 return bool(ssc)
474
475 @util.memoized_property
476 def _type_memos(self):
477 return weakref.WeakKeyDictionary()
478
479 @property
480 def dialect_description(self):
481 return self.name + "+" + self.driver
482
483 @property
484 def supports_sane_rowcount_returning(self):
485 """True if this dialect supports sane rowcount even if RETURNING is
486 in use.
487
488 For dialects that don't support RETURNING, this is synonymous with
489 ``supports_sane_rowcount``.
490
491 """
492 return self.supports_sane_rowcount
493
494 @classmethod
495 def get_pool_class(cls, url: URL) -> Type[Pool]:
496 return getattr(cls, "poolclass", pool.QueuePool)
497
498 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
499 return self.get_pool_class(url)
500
501 @classmethod
502 def load_provisioning(cls):
503 package = ".".join(cls.__module__.split(".")[0:-1])
504 try:
505 __import__(package + ".provision")
506 except ImportError:
507 pass
508
509 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
510 if self._on_connect_isolation_level is not None:
511
512 def builtin_connect(dbapi_conn, conn_rec):
513 self._assert_and_set_isolation_level(
514 dbapi_conn, self._on_connect_isolation_level
515 )
516
517 return builtin_connect
518 else:
519 return None
520
521 def initialize(self, connection: Connection) -> None:
522 try:
523 self.server_version_info = self._get_server_version_info(
524 connection
525 )
526 except NotImplementedError:
527 self.server_version_info = None
528 try:
529 self.default_schema_name = self._get_default_schema_name(
530 connection
531 )
532 except NotImplementedError:
533 self.default_schema_name = None
534
535 try:
536 self.default_isolation_level = self.get_default_isolation_level(
537 connection.connection.dbapi_connection
538 )
539 except NotImplementedError:
540 self.default_isolation_level = None
541
542 if not self._user_defined_max_identifier_length:
543 max_ident_length = self._check_max_identifier_length(connection)
544 if max_ident_length:
545 self.max_identifier_length = max_ident_length
546
547 if (
548 self.label_length
549 and self.label_length > self.max_identifier_length
550 ):
551 raise exc.ArgumentError(
552 "Label length of %d is greater than this dialect's"
553 " maximum identifier length of %d"
554 % (self.label_length, self.max_identifier_length)
555 )
556
557 def on_connect(self) -> Optional[Callable[[Any], Any]]:
558 # inherits the docstring from interfaces.Dialect.on_connect
559 return None
560
561 def _check_max_identifier_length(self, connection):
562 """Perform a connection / server version specific check to determine
563 the max_identifier_length.
564
565 If the dialect's class level max_identifier_length should be used,
566 can return None.
567
568 .. versionadded:: 1.3.9
569
570 """
571 return None
572
573 def get_default_isolation_level(self, dbapi_conn):
574 """Given a DBAPI connection, return its isolation level, or
575 a default isolation level if one cannot be retrieved.
576
577 May be overridden by subclasses in order to provide a
578 "fallback" isolation level for databases that cannot reliably
579 retrieve the actual isolation level.
580
581 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
582 method, propagating any exceptions raised.
583
584 .. versionadded:: 1.3.22
585
586 """
587 return self.get_isolation_level(dbapi_conn)
588
589 def type_descriptor(self, typeobj):
590 """Provide a database-specific :class:`.TypeEngine` object, given
591 the generic object which comes from the types module.
592
593 This method looks for a dictionary called
594 ``colspecs`` as a class or instance-level variable,
595 and passes on to :func:`_types.adapt_type`.
596
597 """
598 return type_api.adapt_type(typeobj, self.colspecs)
599
600 def has_index(self, connection, table_name, index_name, schema=None, **kw):
601 if not self.has_table(connection, table_name, schema=schema, **kw):
602 return False
603 for idx in self.get_indexes(
604 connection, table_name, schema=schema, **kw
605 ):
606 if idx["name"] == index_name:
607 return True
608 else:
609 return False
610
611 def has_schema(
612 self, connection: Connection, schema_name: str, **kw: Any
613 ) -> bool:
614 return schema_name in self.get_schema_names(connection, **kw)
615
616 def validate_identifier(self, ident: str) -> None:
617 if len(ident) > self.max_identifier_length:
618 raise exc.IdentifierError(
619 "Identifier '%s' exceeds maximum length of %d characters"
620 % (ident, self.max_identifier_length)
621 )
622
623 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
624 # inherits the docstring from interfaces.Dialect.connect
625 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
626
627 def create_connect_args(self, url: URL) -> ConnectArgsType:
628 # inherits the docstring from interfaces.Dialect.create_connect_args
629 opts = url.translate_connect_args()
630 opts.update(url.query)
631 return ([], opts)
632
633 def set_engine_execution_options(
634 self, engine: Engine, opts: Mapping[str, Any]
635 ) -> None:
636 supported_names = set(self.connection_characteristics).intersection(
637 opts
638 )
639 if supported_names:
640 characteristics: Mapping[str, Any] = util.immutabledict(
641 (name, opts[name]) for name in supported_names
642 )
643
644 @event.listens_for(engine, "engine_connect")
645 def set_connection_characteristics(connection):
646 self._set_connection_characteristics(
647 connection, characteristics
648 )
649
650 def set_connection_execution_options(
651 self, connection: Connection, opts: Mapping[str, Any]
652 ) -> None:
653 supported_names = set(self.connection_characteristics).intersection(
654 opts
655 )
656 if supported_names:
657 characteristics: Mapping[str, Any] = util.immutabledict(
658 (name, opts[name]) for name in supported_names
659 )
660 self._set_connection_characteristics(connection, characteristics)
661
662 def _set_connection_characteristics(self, connection, characteristics):
663 characteristic_values = [
664 (name, self.connection_characteristics[name], value)
665 for name, value in characteristics.items()
666 ]
667
668 if connection.in_transaction():
669 trans_objs = [
670 (name, obj)
671 for name, obj, _ in characteristic_values
672 if obj.transactional
673 ]
674 if trans_objs:
675 raise exc.InvalidRequestError(
676 "This connection has already initialized a SQLAlchemy "
677 "Transaction() object via begin() or autobegin; "
678 "%s may not be altered unless rollback() or commit() "
679 "is called first."
680 % (", ".join(name for name, obj in trans_objs))
681 )
682
683 dbapi_connection = connection.connection.dbapi_connection
684 for _, characteristic, value in characteristic_values:
685 characteristic.set_connection_characteristic(
686 self, connection, dbapi_connection, value
687 )
688 connection.connection._connection_record.finalize_callback.append(
689 functools.partial(self._reset_characteristics, characteristics)
690 )
691
692 def _reset_characteristics(self, characteristics, dbapi_connection):
693 for characteristic_name in characteristics:
694 characteristic = self.connection_characteristics[
695 characteristic_name
696 ]
697 characteristic.reset_characteristic(self, dbapi_connection)
698
699 def do_begin(self, dbapi_connection):
700 pass
701
702 def do_rollback(self, dbapi_connection):
703 dbapi_connection.rollback()
704
705 def do_commit(self, dbapi_connection):
706 dbapi_connection.commit()
707
708 def do_terminate(self, dbapi_connection):
709 self.do_close(dbapi_connection)
710
711 def do_close(self, dbapi_connection):
712 dbapi_connection.close()
713
714 @util.memoized_property
715 def _dialect_specific_select_one(self):
716 return str(expression.select(1).compile(dialect=self))
717
718 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
719 try:
720 return self.do_ping(dbapi_connection)
721 except self.loaded_dbapi.Error as err:
722 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
723
724 if self._has_events:
725 try:
726 Connection._handle_dbapi_exception_noconnection(
727 err,
728 self,
729 is_disconnect=is_disconnect,
730 invalidate_pool_on_disconnect=False,
731 is_pre_ping=True,
732 )
733 except exc.StatementError as new_err:
734 is_disconnect = new_err.connection_invalidated
735
736 if is_disconnect:
737 return False
738 else:
739 raise
740
741 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
742 cursor = dbapi_connection.cursor()
743 try:
744 cursor.execute(self._dialect_specific_select_one)
745 finally:
746 cursor.close()
747 return True
748
749 def create_xid(self):
750 """Create a random two-phase transaction ID.
751
752 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
753 do_commit_twophase(). Its format is unspecified.
754 """
755
756 return "_sa_%032x" % random.randint(0, 2**128)
757
758 def do_savepoint(self, connection, name):
759 connection.execute(expression.SavepointClause(name))
760
761 def do_rollback_to_savepoint(self, connection, name):
762 connection.execute(expression.RollbackToSavepointClause(name))
763
764 def do_release_savepoint(self, connection, name):
765 connection.execute(expression.ReleaseSavepointClause(name))
766
767 def _deliver_insertmanyvalues_batches(
768 self,
769 connection,
770 cursor,
771 statement,
772 parameters,
773 generic_setinputsizes,
774 context,
775 ):
776 context = cast(DefaultExecutionContext, context)
777 compiled = cast(SQLCompiler, context.compiled)
778
779 _composite_sentinel_proc: Sequence[
780 Optional[_ResultProcessorType[Any]]
781 ] = ()
782 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
783 _sentinel_proc_initialized: bool = False
784
785 compiled_parameters = context.compiled_parameters
786
787 imv = compiled._insertmanyvalues
788 assert imv is not None
789
790 is_returning: Final[bool] = bool(compiled.effective_returning)
791 batch_size = context.execution_options.get(
792 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
793 )
794
795 if compiled.schema_translate_map:
796 schema_translate_map = context.execution_options.get(
797 "schema_translate_map", {}
798 )
799 else:
800 schema_translate_map = None
801
802 if is_returning:
803 result: Optional[List[Any]] = []
804 context._insertmanyvalues_rows = result
805
806 sort_by_parameter_order = imv.sort_by_parameter_order
807
808 else:
809 sort_by_parameter_order = False
810 result = None
811
812 for imv_batch in compiled._deliver_insertmanyvalues_batches(
813 statement,
814 parameters,
815 compiled_parameters,
816 generic_setinputsizes,
817 batch_size,
818 sort_by_parameter_order,
819 schema_translate_map,
820 ):
821 yield imv_batch
822
823 if is_returning:
824
825 try:
826 rows = context.fetchall_for_returning(cursor)
827 except BaseException as be:
828 connection._handle_dbapi_exception(
829 be,
830 sql_util._long_statement(imv_batch.replaced_statement),
831 imv_batch.replaced_parameters,
832 None,
833 context,
834 is_sub_exec=True,
835 )
836
837 # I would have thought "is_returning: Final[bool]"
838 # would have assured this but pylance thinks not
839 assert result is not None
840
841 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
842 composite_sentinel = imv.num_sentinel_columns > 1
843 if imv.implicit_sentinel:
844 # for implicit sentinel, which is currently single-col
845 # integer autoincrement, do a simple sort.
846 assert not composite_sentinel
847 result.extend(
848 sorted(rows, key=operator.itemgetter(-1))
849 )
850 continue
851
852 # otherwise, create dictionaries to match up batches
853 # with parameters
854 assert imv.sentinel_param_keys
855 assert imv.sentinel_columns
856
857 _nsc = imv.num_sentinel_columns
858
859 if not _sentinel_proc_initialized:
860 if composite_sentinel:
861 _composite_sentinel_proc = [
862 col.type._cached_result_processor(
863 self, cursor_desc[1]
864 )
865 for col, cursor_desc in zip(
866 imv.sentinel_columns,
867 cursor.description[-_nsc:],
868 )
869 ]
870 else:
871 _scalar_sentinel_proc = (
872 imv.sentinel_columns[0]
873 ).type._cached_result_processor(
874 self, cursor.description[-1][1]
875 )
876 _sentinel_proc_initialized = True
877
878 rows_by_sentinel: Union[
879 Dict[Tuple[Any, ...], Any],
880 Dict[Any, Any],
881 ]
882 if composite_sentinel:
883 rows_by_sentinel = {
884 tuple(
885 (proc(val) if proc else val)
886 for val, proc in zip(
887 row[-_nsc:], _composite_sentinel_proc
888 )
889 ): row
890 for row in rows
891 }
892 elif _scalar_sentinel_proc:
893 rows_by_sentinel = {
894 _scalar_sentinel_proc(row[-1]): row for row in rows
895 }
896 else:
897 rows_by_sentinel = {row[-1]: row for row in rows}
898
899 if len(rows_by_sentinel) != len(imv_batch.batch):
900 # see test_insert_exec.py::
901 # IMVSentinelTest::test_sentinel_incorrect_rowcount
902 # for coverage / demonstration
903 raise exc.InvalidRequestError(
904 f"Sentinel-keyed result set did not produce "
905 f"correct number of rows {len(imv_batch.batch)}; "
906 "produced "
907 f"{len(rows_by_sentinel)}. Please ensure the "
908 "sentinel column is fully unique and populated in "
909 "all cases."
910 )
911
912 try:
913 ordered_rows = [
914 rows_by_sentinel[sentinel_keys]
915 for sentinel_keys in imv_batch.sentinel_values
916 ]
917 except KeyError as ke:
918 # see test_insert_exec.py::
919 # IMVSentinelTest::test_sentinel_cant_match_keys
920 # for coverage / demonstration
921 raise exc.InvalidRequestError(
922 f"Can't match sentinel values in result set to "
923 f"parameter sets; key {ke.args[0]!r} was not "
924 "found. "
925 "There may be a mismatch between the datatype "
926 "passed to the DBAPI driver vs. that which it "
927 "returns in a result row. Ensure the given "
928 "Python value matches the expected result type "
929 "*exactly*, taking care to not rely upon implicit "
930 "conversions which may occur such as when using "
931 "strings in place of UUID or integer values, etc. "
932 ) from ke
933
934 result.extend(ordered_rows)
935
936 else:
937 result.extend(rows)
938
939 def do_executemany(self, cursor, statement, parameters, context=None):
940 cursor.executemany(statement, parameters)
941
942 def do_execute(self, cursor, statement, parameters, context=None):
943 cursor.execute(statement, parameters)
944
945 def do_execute_no_params(self, cursor, statement, context=None):
946 cursor.execute(statement)
947
948 def is_disconnect(
949 self,
950 e: Exception,
951 connection: Union[
952 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
953 ],
954 cursor: Optional[interfaces.DBAPICursor],
955 ) -> bool:
956 return False
957
958 @util.memoized_instancemethod
959 def _gen_allowed_isolation_levels(self, dbapi_conn):
960 try:
961 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
962 except NotImplementedError:
963 return None
964 else:
965 normalized_levels = [
966 level.replace("_", " ").upper() for level in raw_levels
967 ]
968 if raw_levels != normalized_levels:
969 raise ValueError(
970 f"Dialect {self.name!r} get_isolation_level_values() "
971 f"method should return names as UPPERCASE using spaces, "
972 f"not underscores; got "
973 f"{sorted(set(raw_levels).difference(normalized_levels))}"
974 )
975 return tuple(normalized_levels)
976
977 def _assert_and_set_isolation_level(self, dbapi_conn, level):
978 level = level.replace("_", " ").upper()
979
980 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
981 dbapi_conn
982 )
983 if (
984 _allowed_isolation_levels
985 and level not in _allowed_isolation_levels
986 ):
987 raise exc.ArgumentError(
988 f"Invalid value {level!r} for isolation_level. "
989 f"Valid isolation levels for {self.name!r} are "
990 f"{', '.join(_allowed_isolation_levels)}"
991 )
992
993 self.set_isolation_level(dbapi_conn, level)
994
995 def reset_isolation_level(self, dbapi_conn):
996 if self._on_connect_isolation_level is not None:
997 assert (
998 self._on_connect_isolation_level == "AUTOCOMMIT"
999 or self._on_connect_isolation_level
1000 == self.default_isolation_level
1001 )
1002 self._assert_and_set_isolation_level(
1003 dbapi_conn, self._on_connect_isolation_level
1004 )
1005 else:
1006 assert self.default_isolation_level is not None
1007 self._assert_and_set_isolation_level(
1008 dbapi_conn,
1009 self.default_isolation_level,
1010 )
1011
1012 def normalize_name(self, name):
1013 if name is None:
1014 return None
1015
1016 name_lower = name.lower()
1017 name_upper = name.upper()
1018
1019 if name_upper == name_lower:
1020 # name has no upper/lower conversion, e.g. non-european characters.
1021 # return unchanged
1022 return name
1023 elif name_upper == name and not (
1024 self.identifier_preparer._requires_quotes
1025 )(name_lower):
1026 # name is all uppercase and doesn't require quoting; normalize
1027 # to all lower case
1028 return name_lower
1029 elif name_lower == name:
1030 # name is all lower case, which if denormalized means we need to
1031 # force quoting on it
1032 return quoted_name(name, quote=True)
1033 else:
1034 # name is mixed case, means it will be quoted in SQL when used
1035 # later, no normalizes
1036 return name
1037
1038 def denormalize_name(self, name):
1039 if name is None:
1040 return None
1041
1042 name_lower = name.lower()
1043 name_upper = name.upper()
1044
1045 if name_upper == name_lower:
1046 # name has no upper/lower conversion, e.g. non-european characters.
1047 # return unchanged
1048 return name
1049 elif name_lower == name and not (
1050 self.identifier_preparer._requires_quotes
1051 )(name_lower):
1052 name = name_upper
1053 return name
1054
1055 def get_driver_connection(self, connection):
1056 return connection
1057
1058 def _overrides_default(self, method):
1059 return (
1060 getattr(type(self), method).__code__
1061 is not getattr(DefaultDialect, method).__code__
1062 )
1063
1064 def _default_multi_reflect(
1065 self,
1066 single_tbl_method,
1067 connection,
1068 kind,
1069 schema,
1070 filter_names,
1071 scope,
1072 **kw,
1073 ):
1074 names_fns = []
1075 temp_names_fns = []
1076 if ObjectKind.TABLE in kind:
1077 names_fns.append(self.get_table_names)
1078 temp_names_fns.append(self.get_temp_table_names)
1079 if ObjectKind.VIEW in kind:
1080 names_fns.append(self.get_view_names)
1081 temp_names_fns.append(self.get_temp_view_names)
1082 if ObjectKind.MATERIALIZED_VIEW in kind:
1083 names_fns.append(self.get_materialized_view_names)
1084 # no temp materialized view at the moment
1085 # temp_names_fns.append(self.get_temp_materialized_view_names)
1086
1087 unreflectable = kw.pop("unreflectable", {})
1088
1089 if (
1090 filter_names
1091 and scope is ObjectScope.ANY
1092 and kind is ObjectKind.ANY
1093 ):
1094 # if names are given and no qualification on type of table
1095 # (i.e. the Table(..., autoload) case), take the names as given,
1096 # don't run names queries. If a table does not exit
1097 # NoSuchTableError is raised and it's skipped
1098
1099 # this also suits the case for mssql where we can reflect
1100 # individual temp tables but there's no temp_names_fn
1101 names = filter_names
1102 else:
1103 names = []
1104 name_kw = {"schema": schema, **kw}
1105 fns = []
1106 if ObjectScope.DEFAULT in scope:
1107 fns.extend(names_fns)
1108 if ObjectScope.TEMPORARY in scope:
1109 fns.extend(temp_names_fns)
1110
1111 for fn in fns:
1112 try:
1113 names.extend(fn(connection, **name_kw))
1114 except NotImplementedError:
1115 pass
1116
1117 if filter_names:
1118 filter_names = set(filter_names)
1119
1120 # iterate over all the tables/views and call the single table method
1121 for table in names:
1122 if not filter_names or table in filter_names:
1123 key = (schema, table)
1124 try:
1125 yield (
1126 key,
1127 single_tbl_method(
1128 connection, table, schema=schema, **kw
1129 ),
1130 )
1131 except exc.UnreflectableTableError as err:
1132 if key not in unreflectable:
1133 unreflectable[key] = err
1134 except exc.NoSuchTableError:
1135 pass
1136
1137 def get_multi_table_options(self, connection, **kw):
1138 return self._default_multi_reflect(
1139 self.get_table_options, connection, **kw
1140 )
1141
1142 def get_multi_columns(self, connection, **kw):
1143 return self._default_multi_reflect(self.get_columns, connection, **kw)
1144
1145 def get_multi_pk_constraint(self, connection, **kw):
1146 return self._default_multi_reflect(
1147 self.get_pk_constraint, connection, **kw
1148 )
1149
1150 def get_multi_foreign_keys(self, connection, **kw):
1151 return self._default_multi_reflect(
1152 self.get_foreign_keys, connection, **kw
1153 )
1154
1155 def get_multi_indexes(self, connection, **kw):
1156 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1157
1158 def get_multi_unique_constraints(self, connection, **kw):
1159 return self._default_multi_reflect(
1160 self.get_unique_constraints, connection, **kw
1161 )
1162
1163 def get_multi_check_constraints(self, connection, **kw):
1164 return self._default_multi_reflect(
1165 self.get_check_constraints, connection, **kw
1166 )
1167
1168 def get_multi_table_comment(self, connection, **kw):
1169 return self._default_multi_reflect(
1170 self.get_table_comment, connection, **kw
1171 )
1172
1173
1174class StrCompileDialect(DefaultDialect):
1175 statement_compiler = compiler.StrSQLCompiler
1176 ddl_compiler = compiler.DDLCompiler
1177 type_compiler_cls = compiler.StrSQLTypeCompiler
1178 preparer = compiler.IdentifierPreparer
1179
1180 insert_returning = True
1181 update_returning = True
1182 delete_returning = True
1183
1184 supports_statement_cache = True
1185
1186 supports_identity_columns = True
1187
1188 supports_sequences = True
1189 sequences_optional = True
1190 preexecute_autoincrement_sequences = False
1191
1192 supports_native_boolean = True
1193
1194 supports_multivalues_insert = True
1195 supports_simple_order_by_label = True
1196
1197
1198class DefaultExecutionContext(ExecutionContext):
1199 isinsert = False
1200 isupdate = False
1201 isdelete = False
1202 is_crud = False
1203 is_text = False
1204 isddl = False
1205
1206 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1207
1208 compiled: Optional[Compiled] = None
1209 result_column_struct: Optional[
1210 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1211 ] = None
1212 returned_default_rows: Optional[Sequence[Row[Any]]] = None
1213
1214 execution_options: _ExecuteOptions = util.EMPTY_DICT
1215
1216 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1217
1218 invoked_statement: Optional[Executable] = None
1219
1220 _is_implicit_returning = False
1221 _is_explicit_returning = False
1222 _is_supplemental_returning = False
1223 _is_server_side = False
1224
1225 _soft_closed = False
1226
1227 _rowcount: Optional[int] = None
1228
1229 # a hook for SQLite's translation of
1230 # result column names
1231 # NOTE: pyhive is using this hook, can't remove it :(
1232 _translate_colname: Optional[Callable[[str], str]] = None
1233
1234 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1235 """used by set_input_sizes().
1236
1237 This collection comes from ``ExpandedState.parameter_expansion``.
1238
1239 """
1240
1241 cache_hit = NO_CACHE_KEY
1242
1243 root_connection: Connection
1244 _dbapi_connection: PoolProxiedConnection
1245 dialect: Dialect
1246 unicode_statement: str
1247 cursor: DBAPICursor
1248 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1249 parameters: _DBAPIMultiExecuteParams
1250 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1251
1252 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1253
1254 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1255 _num_sentinel_cols: int = 0
1256
1257 @classmethod
1258 def _init_ddl(
1259 cls,
1260 dialect: Dialect,
1261 connection: Connection,
1262 dbapi_connection: PoolProxiedConnection,
1263 execution_options: _ExecuteOptions,
1264 compiled_ddl: DDLCompiler,
1265 ) -> ExecutionContext:
1266 """Initialize execution context for an ExecutableDDLElement
1267 construct."""
1268
1269 self = cls.__new__(cls)
1270 self.root_connection = connection
1271 self._dbapi_connection = dbapi_connection
1272 self.dialect = connection.dialect
1273
1274 self.compiled = compiled = compiled_ddl
1275 self.isddl = True
1276
1277 self.execution_options = execution_options
1278
1279 self.unicode_statement = str(compiled)
1280 if compiled.schema_translate_map:
1281 schema_translate_map = self.execution_options.get(
1282 "schema_translate_map", {}
1283 )
1284
1285 rst = compiled.preparer._render_schema_translates
1286 self.unicode_statement = rst(
1287 self.unicode_statement, schema_translate_map
1288 )
1289
1290 self.statement = self.unicode_statement
1291
1292 self.cursor = self.create_cursor()
1293 self.compiled_parameters = []
1294
1295 if dialect.positional:
1296 self.parameters = [dialect.execute_sequence_format()]
1297 else:
1298 self.parameters = [self._empty_dict_params]
1299
1300 return self
1301
1302 @classmethod
1303 def _init_compiled(
1304 cls,
1305 dialect: Dialect,
1306 connection: Connection,
1307 dbapi_connection: PoolProxiedConnection,
1308 execution_options: _ExecuteOptions,
1309 compiled: SQLCompiler,
1310 parameters: _CoreMultiExecuteParams,
1311 invoked_statement: Executable,
1312 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1313 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1314 ) -> ExecutionContext:
1315 """Initialize execution context for a Compiled construct."""
1316
1317 self = cls.__new__(cls)
1318 self.root_connection = connection
1319 self._dbapi_connection = dbapi_connection
1320 self.dialect = connection.dialect
1321 self.extracted_parameters = extracted_parameters
1322 self.invoked_statement = invoked_statement
1323 self.compiled = compiled
1324 self.cache_hit = cache_hit
1325
1326 self.execution_options = execution_options
1327
1328 self.result_column_struct = (
1329 compiled._result_columns,
1330 compiled._ordered_columns,
1331 compiled._textual_ordered_columns,
1332 compiled._ad_hoc_textual,
1333 compiled._loose_column_name_matching,
1334 )
1335
1336 self.isinsert = ii = compiled.isinsert
1337 self.isupdate = iu = compiled.isupdate
1338 self.isdelete = id_ = compiled.isdelete
1339 self.is_text = compiled.isplaintext
1340
1341 if ii or iu or id_:
1342 dml_statement = compiled.compile_state.statement # type: ignore
1343 if TYPE_CHECKING:
1344 assert isinstance(dml_statement, UpdateBase)
1345 self.is_crud = True
1346 self._is_explicit_returning = ier = bool(dml_statement._returning)
1347 self._is_implicit_returning = iir = bool(
1348 compiled.implicit_returning
1349 )
1350 if iir and dml_statement._supplemental_returning:
1351 self._is_supplemental_returning = True
1352
1353 # dont mix implicit and explicit returning
1354 assert not (iir and ier)
1355
1356 if (ier or iir) and compiled.for_executemany:
1357 if ii and not self.dialect.insert_executemany_returning:
1358 raise exc.InvalidRequestError(
1359 f"Dialect {self.dialect.dialect_description} with "
1360 f"current server capabilities does not support "
1361 "INSERT..RETURNING when executemany is used"
1362 )
1363 elif (
1364 ii
1365 and dml_statement._sort_by_parameter_order
1366 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1367 ):
1368 raise exc.InvalidRequestError(
1369 f"Dialect {self.dialect.dialect_description} with "
1370 f"current server capabilities does not support "
1371 "INSERT..RETURNING with deterministic row ordering "
1372 "when executemany is used"
1373 )
1374 elif (
1375 ii
1376 and self.dialect.use_insertmanyvalues
1377 and not compiled._insertmanyvalues
1378 ):
1379 raise exc.InvalidRequestError(
1380 'Statement does not have "insertmanyvalues" '
1381 "enabled, can't use INSERT..RETURNING with "
1382 "executemany in this case."
1383 )
1384 elif iu and not self.dialect.update_executemany_returning:
1385 raise exc.InvalidRequestError(
1386 f"Dialect {self.dialect.dialect_description} with "
1387 f"current server capabilities does not support "
1388 "UPDATE..RETURNING when executemany is used"
1389 )
1390 elif id_ and not self.dialect.delete_executemany_returning:
1391 raise exc.InvalidRequestError(
1392 f"Dialect {self.dialect.dialect_description} with "
1393 f"current server capabilities does not support "
1394 "DELETE..RETURNING when executemany is used"
1395 )
1396
1397 if not parameters:
1398 self.compiled_parameters = [
1399 compiled.construct_params(
1400 extracted_parameters=extracted_parameters,
1401 escape_names=False,
1402 )
1403 ]
1404 else:
1405 self.compiled_parameters = [
1406 compiled.construct_params(
1407 m,
1408 escape_names=False,
1409 _group_number=grp,
1410 extracted_parameters=extracted_parameters,
1411 )
1412 for grp, m in enumerate(parameters)
1413 ]
1414
1415 if len(parameters) > 1:
1416 if self.isinsert and compiled._insertmanyvalues:
1417 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1418
1419 imv = compiled._insertmanyvalues
1420 if imv.sentinel_columns is not None:
1421 self._num_sentinel_cols = imv.num_sentinel_columns
1422 else:
1423 self.execute_style = ExecuteStyle.EXECUTEMANY
1424
1425 self.unicode_statement = compiled.string
1426
1427 self.cursor = self.create_cursor()
1428
1429 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1430 self._process_execute_defaults()
1431
1432 processors = compiled._bind_processors
1433
1434 flattened_processors: Mapping[
1435 str, _BindProcessorType[Any]
1436 ] = processors # type: ignore[assignment]
1437
1438 if compiled.literal_execute_params or compiled.post_compile_params:
1439 if self.executemany:
1440 raise exc.InvalidRequestError(
1441 "'literal_execute' or 'expanding' parameters can't be "
1442 "used with executemany()"
1443 )
1444
1445 expanded_state = compiled._process_parameters_for_postcompile(
1446 self.compiled_parameters[0]
1447 )
1448
1449 # re-assign self.unicode_statement
1450 self.unicode_statement = expanded_state.statement
1451
1452 self._expanded_parameters = expanded_state.parameter_expansion
1453
1454 flattened_processors = dict(processors) # type: ignore
1455 flattened_processors.update(expanded_state.processors)
1456 positiontup = expanded_state.positiontup
1457 elif compiled.positional:
1458 positiontup = self.compiled.positiontup
1459 else:
1460 positiontup = None
1461
1462 if compiled.schema_translate_map:
1463 schema_translate_map = self.execution_options.get(
1464 "schema_translate_map", {}
1465 )
1466 rst = compiled.preparer._render_schema_translates
1467 self.unicode_statement = rst(
1468 self.unicode_statement, schema_translate_map
1469 )
1470
1471 # final self.unicode_statement is now assigned, encode if needed
1472 # by dialect
1473 self.statement = self.unicode_statement
1474
1475 # Convert the dictionary of bind parameter values
1476 # into a dict or list to be sent to the DBAPI's
1477 # execute() or executemany() method.
1478
1479 if compiled.positional:
1480 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1481 assert positiontup is not None
1482 for compiled_params in self.compiled_parameters:
1483 l_param: List[Any] = [
1484 (
1485 flattened_processors[key](compiled_params[key])
1486 if key in flattened_processors
1487 else compiled_params[key]
1488 )
1489 for key in positiontup
1490 ]
1491 core_positional_parameters.append(
1492 dialect.execute_sequence_format(l_param)
1493 )
1494
1495 self.parameters = core_positional_parameters
1496 else:
1497 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1498 escaped_names = compiled.escaped_bind_names
1499
1500 # note that currently, "expanded" parameters will be present
1501 # in self.compiled_parameters in their quoted form. This is
1502 # slightly inconsistent with the approach taken as of
1503 # #8056 where self.compiled_parameters is meant to contain unquoted
1504 # param names.
1505 d_param: Dict[str, Any]
1506 for compiled_params in self.compiled_parameters:
1507 if escaped_names:
1508 d_param = {
1509 escaped_names.get(key, key): (
1510 flattened_processors[key](compiled_params[key])
1511 if key in flattened_processors
1512 else compiled_params[key]
1513 )
1514 for key in compiled_params
1515 }
1516 else:
1517 d_param = {
1518 key: (
1519 flattened_processors[key](compiled_params[key])
1520 if key in flattened_processors
1521 else compiled_params[key]
1522 )
1523 for key in compiled_params
1524 }
1525
1526 core_dict_parameters.append(d_param)
1527
1528 self.parameters = core_dict_parameters
1529
1530 return self
1531
1532 @classmethod
1533 def _init_statement(
1534 cls,
1535 dialect: Dialect,
1536 connection: Connection,
1537 dbapi_connection: PoolProxiedConnection,
1538 execution_options: _ExecuteOptions,
1539 statement: str,
1540 parameters: _DBAPIMultiExecuteParams,
1541 ) -> ExecutionContext:
1542 """Initialize execution context for a string SQL statement."""
1543
1544 self = cls.__new__(cls)
1545 self.root_connection = connection
1546 self._dbapi_connection = dbapi_connection
1547 self.dialect = connection.dialect
1548 self.is_text = True
1549
1550 self.execution_options = execution_options
1551
1552 if not parameters:
1553 if self.dialect.positional:
1554 self.parameters = [dialect.execute_sequence_format()]
1555 else:
1556 self.parameters = [self._empty_dict_params]
1557 elif isinstance(parameters[0], dialect.execute_sequence_format):
1558 self.parameters = parameters
1559 elif isinstance(parameters[0], dict):
1560 self.parameters = parameters
1561 else:
1562 self.parameters = [
1563 dialect.execute_sequence_format(p) for p in parameters
1564 ]
1565
1566 if len(parameters) > 1:
1567 self.execute_style = ExecuteStyle.EXECUTEMANY
1568
1569 self.statement = self.unicode_statement = statement
1570
1571 self.cursor = self.create_cursor()
1572 return self
1573
1574 @classmethod
1575 def _init_default(
1576 cls,
1577 dialect: Dialect,
1578 connection: Connection,
1579 dbapi_connection: PoolProxiedConnection,
1580 execution_options: _ExecuteOptions,
1581 ) -> ExecutionContext:
1582 """Initialize execution context for a ColumnDefault construct."""
1583
1584 self = cls.__new__(cls)
1585 self.root_connection = connection
1586 self._dbapi_connection = dbapi_connection
1587 self.dialect = connection.dialect
1588
1589 self.execution_options = execution_options
1590
1591 self.cursor = self.create_cursor()
1592 return self
1593
1594 def _get_cache_stats(self) -> str:
1595 if self.compiled is None:
1596 return "raw sql"
1597
1598 now = perf_counter()
1599
1600 ch = self.cache_hit
1601
1602 gen_time = self.compiled._gen_time
1603 assert gen_time is not None
1604
1605 if ch is NO_CACHE_KEY:
1606 return "no key %.5fs" % (now - gen_time,)
1607 elif ch is CACHE_HIT:
1608 return "cached since %.4gs ago" % (now - gen_time,)
1609 elif ch is CACHE_MISS:
1610 return "generated in %.5fs" % (now - gen_time,)
1611 elif ch is CACHING_DISABLED:
1612 if "_cache_disable_reason" in self.execution_options:
1613 return "caching disabled (%s) %.5fs " % (
1614 self.execution_options["_cache_disable_reason"],
1615 now - gen_time,
1616 )
1617 else:
1618 return "caching disabled %.5fs" % (now - gen_time,)
1619 elif ch is NO_DIALECT_SUPPORT:
1620 return "dialect %s+%s does not support caching %.5fs" % (
1621 self.dialect.name,
1622 self.dialect.driver,
1623 now - gen_time,
1624 )
1625 else:
1626 return "unknown"
1627
1628 @property
1629 def executemany(self):
1630 return self.execute_style in (
1631 ExecuteStyle.EXECUTEMANY,
1632 ExecuteStyle.INSERTMANYVALUES,
1633 )
1634
1635 @util.memoized_property
1636 def identifier_preparer(self):
1637 if self.compiled:
1638 return self.compiled.preparer
1639 elif "schema_translate_map" in self.execution_options:
1640 return self.dialect.identifier_preparer._with_schema_translate(
1641 self.execution_options["schema_translate_map"]
1642 )
1643 else:
1644 return self.dialect.identifier_preparer
1645
1646 @util.memoized_property
1647 def engine(self):
1648 return self.root_connection.engine
1649
1650 @util.memoized_property
1651 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1652 if TYPE_CHECKING:
1653 assert isinstance(self.compiled, SQLCompiler)
1654 return self.compiled.postfetch
1655
1656 @util.memoized_property
1657 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1658 if TYPE_CHECKING:
1659 assert isinstance(self.compiled, SQLCompiler)
1660 if self.isinsert:
1661 return self.compiled.insert_prefetch
1662 elif self.isupdate:
1663 return self.compiled.update_prefetch
1664 else:
1665 return ()
1666
1667 @util.memoized_property
1668 def no_parameters(self):
1669 return self.execution_options.get("no_parameters", False)
1670
1671 def _execute_scalar(
1672 self,
1673 stmt: str,
1674 type_: Optional[TypeEngine[Any]],
1675 parameters: Optional[_DBAPISingleExecuteParams] = None,
1676 ) -> Any:
1677 """Execute a string statement on the current cursor, returning a
1678 scalar result.
1679
1680 Used to fire off sequences, default phrases, and "select lastrowid"
1681 types of statements individually or in the context of a parent INSERT
1682 or UPDATE statement.
1683
1684 """
1685
1686 conn = self.root_connection
1687
1688 if "schema_translate_map" in self.execution_options:
1689 schema_translate_map = self.execution_options.get(
1690 "schema_translate_map", {}
1691 )
1692
1693 rst = self.identifier_preparer._render_schema_translates
1694 stmt = rst(stmt, schema_translate_map)
1695
1696 if not parameters:
1697 if self.dialect.positional:
1698 parameters = self.dialect.execute_sequence_format()
1699 else:
1700 parameters = {}
1701
1702 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1703 row = self.cursor.fetchone()
1704 if row is not None:
1705 r = row[0]
1706 else:
1707 r = None
1708 if type_ is not None:
1709 # apply type post processors to the result
1710 proc = type_._cached_result_processor(
1711 self.dialect, self.cursor.description[0][1]
1712 )
1713 if proc:
1714 return proc(r)
1715 return r
1716
1717 @util.memoized_property
1718 def connection(self):
1719 return self.root_connection
1720
1721 def _use_server_side_cursor(self):
1722 if not self.dialect.supports_server_side_cursors:
1723 return False
1724
1725 if self.dialect.server_side_cursors:
1726 # this is deprecated
1727 use_server_side = self.execution_options.get(
1728 "stream_results", True
1729 ) and (
1730 self.compiled
1731 and isinstance(self.compiled.statement, expression.Selectable)
1732 or (
1733 (
1734 not self.compiled
1735 or isinstance(
1736 self.compiled.statement, expression.TextClause
1737 )
1738 )
1739 and self.unicode_statement
1740 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1741 )
1742 )
1743 else:
1744 use_server_side = self.execution_options.get(
1745 "stream_results", False
1746 )
1747
1748 return use_server_side
1749
1750 def create_cursor(self) -> DBAPICursor:
1751 if (
1752 # inlining initial preference checks for SS cursors
1753 self.dialect.supports_server_side_cursors
1754 and (
1755 self.execution_options.get("stream_results", False)
1756 or (
1757 self.dialect.server_side_cursors
1758 and self._use_server_side_cursor()
1759 )
1760 )
1761 ):
1762 self._is_server_side = True
1763 return self.create_server_side_cursor()
1764 else:
1765 self._is_server_side = False
1766 return self.create_default_cursor()
1767
1768 def fetchall_for_returning(self, cursor):
1769 return cursor.fetchall()
1770
1771 def create_default_cursor(self) -> DBAPICursor:
1772 return self._dbapi_connection.cursor()
1773
1774 def create_server_side_cursor(self) -> DBAPICursor:
1775 raise NotImplementedError()
1776
1777 def pre_exec(self):
1778 pass
1779
1780 def get_out_parameter_values(self, names):
1781 raise NotImplementedError(
1782 "This dialect does not support OUT parameters"
1783 )
1784
1785 def post_exec(self):
1786 pass
1787
1788 def get_result_processor(self, type_, colname, coltype):
1789 """Return a 'result processor' for a given type as present in
1790 cursor.description.
1791
1792 This has a default implementation that dialects can override
1793 for context-sensitive result type handling.
1794
1795 """
1796 return type_._cached_result_processor(self.dialect, coltype)
1797
1798 def get_lastrowid(self):
1799 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1800
1801 This may involve calling special cursor functions, issuing a new SELECT
1802 on the cursor (or a new one), or returning a stored value that was
1803 calculated within post_exec().
1804
1805 This function will only be called for dialects which support "implicit"
1806 primary key generation, keep preexecute_autoincrement_sequences set to
1807 False, and when no explicit id value was bound to the statement.
1808
1809 The function is called once for an INSERT statement that would need to
1810 return the last inserted primary key for those dialects that make use
1811 of the lastrowid concept. In these cases, it is called directly after
1812 :meth:`.ExecutionContext.post_exec`.
1813
1814 """
1815 return self.cursor.lastrowid
1816
1817 def handle_dbapi_exception(self, e):
1818 pass
1819
1820 @util.non_memoized_property
1821 def rowcount(self) -> int:
1822 if self._rowcount is not None:
1823 return self._rowcount
1824 else:
1825 return self.cursor.rowcount
1826
1827 @property
1828 def _has_rowcount(self):
1829 return self._rowcount is not None
1830
1831 def supports_sane_rowcount(self):
1832 return self.dialect.supports_sane_rowcount
1833
1834 def supports_sane_multi_rowcount(self):
1835 return self.dialect.supports_sane_multi_rowcount
1836
1837 def _setup_result_proxy(self):
1838 exec_opt = self.execution_options
1839
1840 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1841 self._rowcount = self.cursor.rowcount
1842
1843 if self.is_crud or self.is_text:
1844 result = self._setup_dml_or_text_result()
1845 yp = False
1846 else:
1847 yp = exec_opt.get("yield_per", None)
1848 sr = self._is_server_side or exec_opt.get("stream_results", False)
1849 strategy = self.cursor_fetch_strategy
1850 if sr and strategy is _cursor._DEFAULT_FETCH:
1851 strategy = _cursor.BufferedRowCursorFetchStrategy(
1852 self.cursor, self.execution_options
1853 )
1854 cursor_description: _DBAPICursorDescription = (
1855 strategy.alternate_cursor_description
1856 or self.cursor.description
1857 )
1858 if cursor_description is None:
1859 strategy = _cursor._NO_CURSOR_DQL
1860
1861 result = _cursor.CursorResult(self, strategy, cursor_description)
1862
1863 compiled = self.compiled
1864
1865 if (
1866 compiled
1867 and not self.isddl
1868 and cast(SQLCompiler, compiled).has_out_parameters
1869 ):
1870 self._setup_out_parameters(result)
1871
1872 self._soft_closed = result._soft_closed
1873
1874 if yp:
1875 result = result.yield_per(yp)
1876
1877 return result
1878
1879 def _setup_out_parameters(self, result):
1880 compiled = cast(SQLCompiler, self.compiled)
1881
1882 out_bindparams = [
1883 (param, name)
1884 for param, name in compiled.bind_names.items()
1885 if param.isoutparam
1886 ]
1887 out_parameters = {}
1888
1889 for bindparam, raw_value in zip(
1890 [param for param, name in out_bindparams],
1891 self.get_out_parameter_values(
1892 [name for param, name in out_bindparams]
1893 ),
1894 ):
1895 type_ = bindparam.type
1896 impl_type = type_.dialect_impl(self.dialect)
1897 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1898 result_processor = impl_type.result_processor(
1899 self.dialect, dbapi_type
1900 )
1901 if result_processor is not None:
1902 raw_value = result_processor(raw_value)
1903 out_parameters[bindparam.key] = raw_value
1904
1905 result.out_parameters = out_parameters
1906
1907 def _setup_dml_or_text_result(self):
1908 compiled = cast(SQLCompiler, self.compiled)
1909
1910 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1911
1912 if self.isinsert:
1913 if (
1914 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1915 and compiled.effective_returning
1916 ):
1917 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1918 self.cursor,
1919 initial_buffer=self._insertmanyvalues_rows,
1920 # maintain alt cursor description if set by the
1921 # dialect, e.g. mssql preserves it
1922 alternate_description=(
1923 strategy.alternate_cursor_description
1924 ),
1925 )
1926
1927 if compiled.postfetch_lastrowid:
1928 self.inserted_primary_key_rows = (
1929 self._setup_ins_pk_from_lastrowid()
1930 )
1931 # else if not self._is_implicit_returning,
1932 # the default inserted_primary_key_rows accessor will
1933 # return an "empty" primary key collection when accessed.
1934
1935 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1936 strategy = _cursor.BufferedRowCursorFetchStrategy(
1937 self.cursor, self.execution_options
1938 )
1939
1940 if strategy is _cursor._NO_CURSOR_DML:
1941 cursor_description = None
1942 else:
1943 cursor_description = (
1944 strategy.alternate_cursor_description
1945 or self.cursor.description
1946 )
1947
1948 if cursor_description is None:
1949 strategy = _cursor._NO_CURSOR_DML
1950 elif self._num_sentinel_cols:
1951 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1952 # strip out the sentinel columns from cursor description
1953 # a similar logic is done to the rows only in CursorResult
1954 cursor_description = cursor_description[
1955 0 : -self._num_sentinel_cols
1956 ]
1957
1958 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1959 self, strategy, cursor_description
1960 )
1961
1962 if self.isinsert:
1963 if self._is_implicit_returning:
1964 rows = result.all()
1965
1966 self.returned_default_rows = rows
1967
1968 self.inserted_primary_key_rows = (
1969 self._setup_ins_pk_from_implicit_returning(result, rows)
1970 )
1971
1972 # test that it has a cursor metadata that is accurate. the
1973 # first row will have been fetched and current assumptions
1974 # are that the result has only one row, until executemany()
1975 # support is added here.
1976 assert result._metadata.returns_rows
1977
1978 # Insert statement has both return_defaults() and
1979 # returning(). rewind the result on the list of rows
1980 # we just used.
1981 if self._is_supplemental_returning:
1982 result._rewind(rows)
1983 else:
1984 result._soft_close()
1985 elif not self._is_explicit_returning:
1986 result._soft_close()
1987
1988 # we assume here the result does not return any rows.
1989 # *usually*, this will be true. However, some dialects
1990 # such as that of MSSQL/pyodbc need to SELECT a post fetch
1991 # function so this is not necessarily true.
1992 # assert not result.returns_rows
1993
1994 elif self._is_implicit_returning:
1995 rows = result.all()
1996
1997 if rows:
1998 self.returned_default_rows = rows
1999 self._rowcount = len(rows)
2000
2001 if self._is_supplemental_returning:
2002 result._rewind(rows)
2003 else:
2004 result._soft_close()
2005
2006 # test that it has a cursor metadata that is accurate.
2007 # the rows have all been fetched however.
2008 assert result._metadata.returns_rows
2009
2010 elif not result._metadata.returns_rows:
2011 # no results, get rowcount
2012 # (which requires open cursor on some drivers)
2013 if self._rowcount is None:
2014 self._rowcount = self.cursor.rowcount
2015 result._soft_close()
2016 elif self.isupdate or self.isdelete:
2017 if self._rowcount is None:
2018 self._rowcount = self.cursor.rowcount
2019 return result
2020
2021 @util.memoized_property
2022 def inserted_primary_key_rows(self):
2023 # if no specific "get primary key" strategy was set up
2024 # during execution, return a "default" primary key based
2025 # on what's in the compiled_parameters and nothing else.
2026 return self._setup_ins_pk_from_empty()
2027
2028 def _setup_ins_pk_from_lastrowid(self):
2029 getter = cast(
2030 SQLCompiler, self.compiled
2031 )._inserted_primary_key_from_lastrowid_getter
2032 lastrowid = self.get_lastrowid()
2033 return [getter(lastrowid, self.compiled_parameters[0])]
2034
2035 def _setup_ins_pk_from_empty(self):
2036 getter = cast(
2037 SQLCompiler, self.compiled
2038 )._inserted_primary_key_from_lastrowid_getter
2039 return [getter(None, param) for param in self.compiled_parameters]
2040
2041 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2042 if not rows:
2043 return []
2044
2045 getter = cast(
2046 SQLCompiler, self.compiled
2047 )._inserted_primary_key_from_returning_getter
2048 compiled_params = self.compiled_parameters
2049
2050 return [
2051 getter(row, param) for row, param in zip(rows, compiled_params)
2052 ]
2053
2054 def lastrow_has_defaults(self):
2055 return (self.isinsert or self.isupdate) and bool(
2056 cast(SQLCompiler, self.compiled).postfetch
2057 )
2058
2059 def _prepare_set_input_sizes(
2060 self,
2061 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2062 """Given a cursor and ClauseParameters, prepare arguments
2063 in order to call the appropriate
2064 style of ``setinputsizes()`` on the cursor, using DB-API types
2065 from the bind parameter's ``TypeEngine`` objects.
2066
2067 This method only called by those dialects which set the
2068 :attr:`.Dialect.bind_typing` attribute to
2069 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2070 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2071 option.
2072
2073 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2074 for pg8000 and asyncpg, which has been changed to inline rendering
2075 of casts.
2076
2077 """
2078 if self.isddl or self.is_text:
2079 return None
2080
2081 compiled = cast(SQLCompiler, self.compiled)
2082
2083 inputsizes = compiled._get_set_input_sizes_lookup()
2084
2085 if inputsizes is None:
2086 return None
2087
2088 dialect = self.dialect
2089
2090 # all of the rest of this... cython?
2091
2092 if dialect._has_events:
2093 inputsizes = dict(inputsizes)
2094 dialect.dispatch.do_setinputsizes(
2095 inputsizes, self.cursor, self.statement, self.parameters, self
2096 )
2097
2098 if compiled.escaped_bind_names:
2099 escaped_bind_names = compiled.escaped_bind_names
2100 else:
2101 escaped_bind_names = None
2102
2103 if dialect.positional:
2104 items = [
2105 (key, compiled.binds[key])
2106 for key in compiled.positiontup or ()
2107 ]
2108 else:
2109 items = [
2110 (key, bindparam)
2111 for bindparam, key in compiled.bind_names.items()
2112 ]
2113
2114 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2115 for key, bindparam in items:
2116 if bindparam in compiled.literal_execute_params:
2117 continue
2118
2119 if key in self._expanded_parameters:
2120 if is_tuple_type(bindparam.type):
2121 num = len(bindparam.type.types)
2122 dbtypes = inputsizes[bindparam]
2123 generic_inputsizes.extend(
2124 (
2125 (
2126 escaped_bind_names.get(paramname, paramname)
2127 if escaped_bind_names is not None
2128 else paramname
2129 ),
2130 dbtypes[idx % num],
2131 bindparam.type.types[idx % num],
2132 )
2133 for idx, paramname in enumerate(
2134 self._expanded_parameters[key]
2135 )
2136 )
2137 else:
2138 dbtype = inputsizes.get(bindparam, None)
2139 generic_inputsizes.extend(
2140 (
2141 (
2142 escaped_bind_names.get(paramname, paramname)
2143 if escaped_bind_names is not None
2144 else paramname
2145 ),
2146 dbtype,
2147 bindparam.type,
2148 )
2149 for paramname in self._expanded_parameters[key]
2150 )
2151 else:
2152 dbtype = inputsizes.get(bindparam, None)
2153
2154 escaped_name = (
2155 escaped_bind_names.get(key, key)
2156 if escaped_bind_names is not None
2157 else key
2158 )
2159
2160 generic_inputsizes.append(
2161 (escaped_name, dbtype, bindparam.type)
2162 )
2163
2164 return generic_inputsizes
2165
2166 def _exec_default(self, column, default, type_):
2167 if default.is_sequence:
2168 return self.fire_sequence(default, type_)
2169 elif default.is_callable:
2170 # this codepath is not normally used as it's inlined
2171 # into _process_execute_defaults
2172 self.current_column = column
2173 return default.arg(self)
2174 elif default.is_clause_element:
2175 return self._exec_default_clause_element(column, default, type_)
2176 else:
2177 # this codepath is not normally used as it's inlined
2178 # into _process_execute_defaults
2179 return default.arg
2180
2181 def _exec_default_clause_element(self, column, default, type_):
2182 # execute a default that's a complete clause element. Here, we have
2183 # to re-implement a miniature version of the compile->parameters->
2184 # cursor.execute() sequence, since we don't want to modify the state
2185 # of the connection / result in progress or create new connection/
2186 # result objects etc.
2187 # .. versionchanged:: 1.4
2188
2189 if not default._arg_is_typed:
2190 default_arg = expression.type_coerce(default.arg, type_)
2191 else:
2192 default_arg = default.arg
2193 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2194 compiled_params = compiled.construct_params()
2195 processors = compiled._bind_processors
2196 if compiled.positional:
2197 parameters = self.dialect.execute_sequence_format(
2198 [
2199 (
2200 processors[key](compiled_params[key]) # type: ignore
2201 if key in processors
2202 else compiled_params[key]
2203 )
2204 for key in compiled.positiontup or ()
2205 ]
2206 )
2207 else:
2208 parameters = {
2209 key: (
2210 processors[key](compiled_params[key]) # type: ignore
2211 if key in processors
2212 else compiled_params[key]
2213 )
2214 for key in compiled_params
2215 }
2216 return self._execute_scalar(
2217 str(compiled), type_, parameters=parameters
2218 )
2219
2220 current_parameters: Optional[_CoreSingleExecuteParams] = None
2221 """A dictionary of parameters applied to the current row.
2222
2223 This attribute is only available in the context of a user-defined default
2224 generation function, e.g. as described at :ref:`context_default_functions`.
2225 It consists of a dictionary which includes entries for each column/value
2226 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2227 dictionary will be the key value of each :class:`_schema.Column`,
2228 which is usually
2229 synonymous with the name.
2230
2231 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2232 does not accommodate for the "multi-values" feature of the
2233 :meth:`_expression.Insert.values` method. The
2234 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2235 preferred.
2236
2237 .. seealso::
2238
2239 :meth:`.DefaultExecutionContext.get_current_parameters`
2240
2241 :ref:`context_default_functions`
2242
2243 """
2244
2245 def get_current_parameters(self, isolate_multiinsert_groups=True):
2246 """Return a dictionary of parameters applied to the current row.
2247
2248 This method can only be used in the context of a user-defined default
2249 generation function, e.g. as described at
2250 :ref:`context_default_functions`. When invoked, a dictionary is
2251 returned which includes entries for each column/value pair that is part
2252 of the INSERT or UPDATE statement. The keys of the dictionary will be
2253 the key value of each :class:`_schema.Column`,
2254 which is usually synonymous
2255 with the name.
2256
2257 :param isolate_multiinsert_groups=True: indicates that multi-valued
2258 INSERT constructs created using :meth:`_expression.Insert.values`
2259 should be
2260 handled by returning only the subset of parameters that are local
2261 to the current column default invocation. When ``False``, the
2262 raw parameters of the statement are returned including the
2263 naming convention used in the case of multi-valued INSERT.
2264
2265 .. versionadded:: 1.2 added
2266 :meth:`.DefaultExecutionContext.get_current_parameters`
2267 which provides more functionality over the existing
2268 :attr:`.DefaultExecutionContext.current_parameters`
2269 attribute.
2270
2271 .. seealso::
2272
2273 :attr:`.DefaultExecutionContext.current_parameters`
2274
2275 :ref:`context_default_functions`
2276
2277 """
2278 try:
2279 parameters = self.current_parameters
2280 column = self.current_column
2281 except AttributeError:
2282 raise exc.InvalidRequestError(
2283 "get_current_parameters() can only be invoked in the "
2284 "context of a Python side column default function"
2285 )
2286 else:
2287 assert column is not None
2288 assert parameters is not None
2289 compile_state = cast(
2290 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2291 )
2292 assert compile_state is not None
2293 if (
2294 isolate_multiinsert_groups
2295 and dml.isinsert(compile_state)
2296 and compile_state._has_multi_parameters
2297 ):
2298 if column._is_multiparam_column:
2299 index = column.index + 1
2300 d = {column.original.key: parameters[column.key]}
2301 else:
2302 d = {column.key: parameters[column.key]}
2303 index = 0
2304 assert compile_state._dict_parameters is not None
2305 keys = compile_state._dict_parameters.keys()
2306 d.update(
2307 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2308 )
2309 return d
2310 else:
2311 return parameters
2312
2313 def get_insert_default(self, column):
2314 if column.default is None:
2315 return None
2316 else:
2317 return self._exec_default(column, column.default, column.type)
2318
2319 def get_update_default(self, column):
2320 if column.onupdate is None:
2321 return None
2322 else:
2323 return self._exec_default(column, column.onupdate, column.type)
2324
2325 def _process_execute_defaults(self):
2326 compiled = cast(SQLCompiler, self.compiled)
2327
2328 key_getter = compiled._within_exec_param_key_getter
2329
2330 sentinel_counter = 0
2331
2332 if compiled.insert_prefetch:
2333 prefetch_recs = [
2334 (
2335 c,
2336 key_getter(c),
2337 c._default_description_tuple,
2338 self.get_insert_default,
2339 )
2340 for c in compiled.insert_prefetch
2341 ]
2342 elif compiled.update_prefetch:
2343 prefetch_recs = [
2344 (
2345 c,
2346 key_getter(c),
2347 c._onupdate_description_tuple,
2348 self.get_update_default,
2349 )
2350 for c in compiled.update_prefetch
2351 ]
2352 else:
2353 prefetch_recs = []
2354
2355 for param in self.compiled_parameters:
2356 self.current_parameters = param
2357
2358 for (
2359 c,
2360 param_key,
2361 (arg, is_scalar, is_callable, is_sentinel),
2362 fallback,
2363 ) in prefetch_recs:
2364 if is_sentinel:
2365 param[param_key] = sentinel_counter
2366 sentinel_counter += 1
2367 elif is_scalar:
2368 param[param_key] = arg
2369 elif is_callable:
2370 self.current_column = c
2371 param[param_key] = arg(self)
2372 else:
2373 val = fallback(c)
2374 if val is not None:
2375 param[param_key] = val
2376
2377 del self.current_parameters
2378
2379
2380DefaultDialect.execution_ctx_cls = DefaultExecutionContext