1# engine/default.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from .base import Engine
76 from .cursor import ResultFetchStrategy
77 from .interfaces import _CoreMultiExecuteParams
78 from .interfaces import _CoreSingleExecuteParams
79 from .interfaces import _DBAPICursorDescription
80 from .interfaces import _DBAPIMultiExecuteParams
81 from .interfaces import _DBAPISingleExecuteParams
82 from .interfaces import _ExecuteOptions
83 from .interfaces import _MutableCoreSingleExecuteParams
84 from .interfaces import _ParamStyle
85 from .interfaces import ConnectArgsType
86 from .interfaces import DBAPIConnection
87 from .interfaces import DBAPIModule
88 from .interfaces import DBAPIType
89 from .interfaces import IsolationLevel
90 from .row import Row
91 from .url import URL
92 from ..event import _ListenerFnType
93 from ..pool import Pool
94 from ..pool import PoolProxiedConnection
95 from ..sql import Executable
96 from ..sql.compiler import Compiled
97 from ..sql.compiler import Linting
98 from ..sql.compiler import ResultColumnsEntry
99 from ..sql.dml import DMLState
100 from ..sql.dml import UpdateBase
101 from ..sql.elements import BindParameter
102 from ..sql.schema import Column
103 from ..sql.sqltypes import _JSON_VALUE
104 from ..sql.type_api import _BindProcessorType
105 from ..sql.type_api import _ResultProcessorType
106 from ..sql.type_api import TypeEngine
107
108
109# When we're handed literal SQL, ensure it's a SELECT query
110SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
111
112
113(
114 CACHE_HIT,
115 CACHE_MISS,
116 CACHING_DISABLED,
117 NO_CACHE_KEY,
118 NO_DIALECT_SUPPORT,
119) = list(CacheStats)
120
121
122class DefaultDialect(Dialect):
123 """Default implementation of Dialect"""
124
125 statement_compiler = compiler.SQLCompiler
126 ddl_compiler = compiler.DDLCompiler
127 type_compiler_cls = compiler.GenericTypeCompiler
128
129 preparer = compiler.IdentifierPreparer
130 supports_alter = True
131 supports_comments = False
132 supports_constraint_comments = False
133 inline_comments = False
134 supports_statement_cache = True
135
136 div_is_floordiv = True
137
138 bind_typing = interfaces.BindTyping.NONE
139
140 include_set_input_sizes: Optional[Set[Any]] = None
141 exclude_set_input_sizes: Optional[Set[Any]] = None
142
143 # the first value we'd get for an autoincrement column.
144 default_sequence_base = 1
145
146 # most DBAPIs happy with this for execute().
147 # not cx_oracle.
148 execute_sequence_format = tuple
149
150 supports_schemas = True
151 supports_views = True
152 supports_sequences = False
153 sequences_optional = False
154 preexecute_autoincrement_sequences = False
155 supports_identity_columns = False
156 postfetch_lastrowid = True
157 favor_returning_over_lastrowid = False
158 insert_null_pk_still_autoincrements = False
159 update_returning = False
160 delete_returning = False
161 update_returning_multifrom = False
162 delete_returning_multifrom = False
163 insert_returning = False
164
165 aggregate_order_by_style = AggregateOrderByStyle.INLINE
166
167 cte_follows_insert = False
168
169 supports_native_enum = False
170 supports_native_boolean = False
171 supports_native_uuid = False
172 returns_native_bytes = False
173
174 supports_native_json_serialization = False
175 supports_native_json_deserialization = False
176 dialect_injects_custom_json_deserializer = False
177 _json_serializer: Callable[[_JSON_VALUE], str] | None = None
178
179 _json_deserializer: Callable[[str], _JSON_VALUE] | None = None
180
181 non_native_boolean_check_constraint = True
182
183 supports_simple_order_by_label = True
184
185 tuple_in_values = False
186
187 connection_characteristics = util.immutabledict(
188 {
189 "isolation_level": characteristics.IsolationLevelCharacteristic(),
190 "logging_token": characteristics.LoggingTokenCharacteristic(),
191 }
192 )
193
194 engine_config_types: Mapping[str, Any] = util.immutabledict(
195 {
196 "pool_timeout": util.asint,
197 "echo": util.bool_or_str("debug"),
198 "echo_pool": util.bool_or_str("debug"),
199 "pool_recycle": util.asint,
200 "pool_size": util.asint,
201 "max_overflow": util.asint,
202 "future": util.asbool,
203 }
204 )
205
206 # if the NUMERIC type
207 # returns decimal.Decimal.
208 # *not* the FLOAT type however.
209 supports_native_decimal = False
210
211 name = "default"
212
213 # length at which to truncate
214 # any identifier.
215 max_identifier_length = 9999
216 _user_defined_max_identifier_length: Optional[int] = None
217
218 isolation_level: Optional[str] = None
219
220 # sub-categories of max_identifier_length.
221 # currently these accommodate for MySQL which allows alias names
222 # of 255 but DDL names only of 64.
223 max_index_name_length: Optional[int] = None
224 max_constraint_name_length: Optional[int] = None
225
226 supports_sane_rowcount = True
227 supports_sane_multi_rowcount = True
228 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
229 default_paramstyle = "named"
230
231 supports_default_values = False
232 """dialect supports INSERT... DEFAULT VALUES syntax"""
233
234 supports_default_metavalue = False
235 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
236
237 default_metavalue_token = "DEFAULT"
238 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
239 parenthesis."""
240
241 # not sure if this is a real thing but the compiler will deliver it
242 # if this is the only flag enabled.
243 supports_empty_insert = True
244 """dialect supports INSERT () VALUES ()"""
245
246 supports_multivalues_insert = False
247
248 use_insertmanyvalues: bool = False
249
250 use_insertmanyvalues_wo_returning: bool = False
251
252 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
253 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
254 )
255
256 insertmanyvalues_page_size: int = 1000
257 insertmanyvalues_max_parameters = 32700
258
259 supports_is_distinct_from = True
260
261 supports_server_side_cursors = False
262
263 server_side_cursors = False
264
265 # extra record-level locking features (#4860)
266 supports_for_update_of = False
267
268 server_version_info = None
269
270 default_schema_name: Optional[str] = None
271
272 # indicates symbol names are
273 # UPPERCASED if they are case insensitive
274 # within the database.
275 # if this is True, the methods normalize_name()
276 # and denormalize_name() must be provided.
277 requires_name_normalize = False
278
279 is_async = False
280
281 has_terminate = False
282
283 # TODO: this is not to be part of 2.0. implement rudimentary binary
284 # literals for SQLite, PostgreSQL, MySQL only within
285 # _Binary.literal_processor
286 _legacy_binary_type_literal_encoding = "utf-8"
287
288 @util.deprecated_params(
289 empty_in_strategy=(
290 "1.4",
291 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
292 "deprecated, and no longer has any effect. All IN expressions "
293 "are now rendered using "
294 'the "expanding parameter" strategy which renders a set of bound'
295 'expressions, or an "empty set" SELECT, at statement execution'
296 "time.",
297 ),
298 server_side_cursors=(
299 "1.4",
300 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
301 "is deprecated and will be removed in a future release. Please "
302 "use the "
303 ":paramref:`_engine.Connection.execution_options.stream_results` "
304 "parameter.",
305 ),
306 )
307 def __init__(
308 self,
309 paramstyle: Optional[_ParamStyle] = None,
310 isolation_level: Optional[IsolationLevel] = None,
311 dbapi: Optional[DBAPIModule] = None,
312 implicit_returning: Literal[True] = True,
313 supports_native_boolean: Optional[bool] = None,
314 max_identifier_length: Optional[int] = None,
315 label_length: Optional[int] = None,
316 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
317 use_insertmanyvalues: Optional[bool] = None,
318 # util.deprecated_params decorator cannot render the
319 # Linting.NO_LINTING constant
320 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
321 server_side_cursors: bool = False,
322 skip_autocommit_rollback: bool = False,
323 **kwargs: Any,
324 ):
325 if server_side_cursors:
326 if not self.supports_server_side_cursors:
327 raise exc.ArgumentError(
328 "Dialect %s does not support server side cursors" % self
329 )
330 else:
331 self.server_side_cursors = True
332
333 if getattr(self, "use_setinputsizes", False):
334 util.warn_deprecated(
335 "The dialect-level use_setinputsizes attribute is "
336 "deprecated. Please use "
337 "bind_typing = BindTyping.SETINPUTSIZES",
338 "2.0",
339 )
340 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
341
342 self.positional = False
343 self._ischema = None
344
345 self.dbapi = dbapi
346
347 self.skip_autocommit_rollback = skip_autocommit_rollback
348
349 if paramstyle is not None:
350 self.paramstyle = paramstyle
351 elif self.dbapi is not None:
352 self.paramstyle = self.dbapi.paramstyle
353 else:
354 self.paramstyle = self.default_paramstyle
355 self.positional = self.paramstyle in (
356 "qmark",
357 "format",
358 "numeric",
359 "numeric_dollar",
360 )
361 self.identifier_preparer = self.preparer(self)
362 self._on_connect_isolation_level = isolation_level
363
364 legacy_tt_callable = getattr(self, "type_compiler", None)
365 if legacy_tt_callable is not None:
366 tt_callable = cast(
367 Type[compiler.GenericTypeCompiler],
368 self.type_compiler,
369 )
370 else:
371 tt_callable = self.type_compiler_cls
372
373 self.type_compiler_instance = self.type_compiler = tt_callable(self)
374
375 if supports_native_boolean is not None:
376 self.supports_native_boolean = supports_native_boolean
377
378 self._user_defined_max_identifier_length = max_identifier_length
379 if self._user_defined_max_identifier_length:
380 self.max_identifier_length = (
381 self._user_defined_max_identifier_length
382 )
383 self.label_length = label_length
384 self.compiler_linting = compiler_linting
385
386 if use_insertmanyvalues is not None:
387 self.use_insertmanyvalues = use_insertmanyvalues
388
389 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
390 self.insertmanyvalues_page_size = insertmanyvalues_page_size
391
392 @property
393 @util.deprecated(
394 "2.0",
395 "full_returning is deprecated, please use insert_returning, "
396 "update_returning, delete_returning",
397 )
398 def full_returning(self):
399 return (
400 self.insert_returning
401 and self.update_returning
402 and self.delete_returning
403 )
404
405 @util.memoized_property
406 def insert_executemany_returning(self):
407 """Default implementation for insert_executemany_returning, if not
408 otherwise overridden by the specific dialect.
409
410 The default dialect determines "insert_executemany_returning" is
411 available if the dialect in use has opted into using the
412 "use_insertmanyvalues" feature. If they haven't opted into that, then
413 this attribute is False, unless the dialect in question overrides this
414 and provides some other implementation (such as the Oracle Database
415 dialects).
416
417 """
418 return self.insert_returning and self.use_insertmanyvalues
419
420 @util.memoized_property
421 def insert_executemany_returning_sort_by_parameter_order(self):
422 """Default implementation for
423 insert_executemany_returning_deterministic_order, if not otherwise
424 overridden by the specific dialect.
425
426 The default dialect determines "insert_executemany_returning" can have
427 deterministic order only if the dialect in use has opted into using the
428 "use_insertmanyvalues" feature, which implements deterministic ordering
429 using client side sentinel columns only by default. The
430 "insertmanyvalues" feature also features alternate forms that can
431 use server-generated PK values as "sentinels", but those are only
432 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
433 bitflag enables those alternate SQL forms, which are disabled
434 by default.
435
436 If the dialect in use hasn't opted into that, then this attribute is
437 False, unless the dialect in question overrides this and provides some
438 other implementation (such as the Oracle Database dialects).
439
440 """
441 return self.insert_returning and self.use_insertmanyvalues
442
443 update_executemany_returning = False
444 delete_executemany_returning = False
445
446 @util.memoized_property
447 def loaded_dbapi(self) -> DBAPIModule:
448 if self.dbapi is None:
449 raise exc.InvalidRequestError(
450 f"Dialect {self} does not have a Python DBAPI established "
451 "and cannot be used for actual database interaction"
452 )
453 return self.dbapi
454
455 @util.memoized_property
456 def _bind_typing_render_casts(self):
457 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
458
459 def _ensure_has_table_connection(self, arg: Connection) -> None:
460 if not isinstance(arg, Connection):
461 raise exc.ArgumentError(
462 "The argument passed to Dialect.has_table() should be a "
463 "%s, got %s. "
464 "Additionally, the Dialect.has_table() method is for "
465 "internal dialect "
466 "use only; please use "
467 "``inspect(some_engine).has_table(<tablename>>)`` "
468 "for public API use." % (Connection, type(arg))
469 )
470
471 @util.memoized_property
472 def _supports_statement_cache(self):
473 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
474 if ssc is None:
475 util.warn(
476 "Dialect %s:%s will not make use of SQL compilation caching "
477 "as it does not set the 'supports_statement_cache' attribute "
478 "to ``True``. This can have "
479 "significant performance implications including some "
480 "performance degradations in comparison to prior SQLAlchemy "
481 "versions. Dialect maintainers should seek to set this "
482 "attribute to True after appropriate development and testing "
483 "for SQLAlchemy 1.4 caching support. Alternatively, this "
484 "attribute may be set to False which will disable this "
485 "warning." % (self.name, self.driver),
486 code="cprf",
487 )
488
489 return bool(ssc)
490
491 @util.memoized_property
492 def _type_memos(self):
493 return weakref.WeakKeyDictionary()
494
495 @property
496 def dialect_description(self): # type: ignore[override]
497 return self.name + "+" + self.driver
498
499 @property
500 def supports_sane_rowcount_returning(self):
501 """True if this dialect supports sane rowcount even if RETURNING is
502 in use.
503
504 For dialects that don't support RETURNING, this is synonymous with
505 ``supports_sane_rowcount``.
506
507 """
508 return self.supports_sane_rowcount
509
510 @classmethod
511 def get_pool_class(cls, url: URL) -> Type[Pool]:
512 default: Type[pool.Pool]
513 if cls.is_async:
514 default = pool.AsyncAdaptedQueuePool
515 else:
516 default = pool.QueuePool
517
518 return getattr(cls, "poolclass", default)
519
520 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
521 return self.get_pool_class(url)
522
523 @classmethod
524 def load_provisioning(cls):
525 package = ".".join(cls.__module__.split(".")[0:-1])
526 try:
527 __import__(package + ".provision")
528 except ImportError:
529 pass
530
531 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
532 if self._on_connect_isolation_level is not None:
533
534 def builtin_connect(dbapi_conn, conn_rec):
535 self._assert_and_set_isolation_level(
536 dbapi_conn, self._on_connect_isolation_level
537 )
538
539 return builtin_connect
540 else:
541 return None
542
543 def initialize(self, connection: Connection) -> None:
544 try:
545 self.server_version_info = self._get_server_version_info(
546 connection
547 )
548 except NotImplementedError:
549 self.server_version_info = None
550 try:
551 self.default_schema_name = self._get_default_schema_name(
552 connection
553 )
554 except NotImplementedError:
555 self.default_schema_name = None
556
557 try:
558 self.default_isolation_level = self.get_default_isolation_level(
559 connection.connection.dbapi_connection
560 )
561 except NotImplementedError:
562 self.default_isolation_level = None
563
564 if not self._user_defined_max_identifier_length:
565 max_ident_length = self._check_max_identifier_length(connection)
566 if max_ident_length:
567 self.max_identifier_length = max_ident_length
568
569 if (
570 self.label_length
571 and self.label_length > self.max_identifier_length
572 ):
573 raise exc.ArgumentError(
574 "Label length of %d is greater than this dialect's"
575 " maximum identifier length of %d"
576 % (self.label_length, self.max_identifier_length)
577 )
578
579 def on_connect(self) -> Optional[Callable[[Any], None]]:
580 # inherits the docstring from interfaces.Dialect.on_connect
581 return None
582
583 def _check_max_identifier_length(self, connection):
584 """Perform a connection / server version specific check to determine
585 the max_identifier_length.
586
587 If the dialect's class level max_identifier_length should be used,
588 can return None.
589
590 """
591 return None
592
593 def get_default_isolation_level(self, dbapi_conn):
594 """Given a DBAPI connection, return its isolation level, or
595 a default isolation level if one cannot be retrieved.
596
597 May be overridden by subclasses in order to provide a
598 "fallback" isolation level for databases that cannot reliably
599 retrieve the actual isolation level.
600
601 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
602 method, propagating any exceptions raised.
603
604 """
605 return self.get_isolation_level(dbapi_conn)
606
607 def type_descriptor(self, typeobj):
608 """Provide a database-specific :class:`.TypeEngine` object, given
609 the generic object which comes from the types module.
610
611 This method looks for a dictionary called
612 ``colspecs`` as a class or instance-level variable,
613 and passes on to :func:`_types.adapt_type`.
614
615 """
616 return type_api.adapt_type(typeobj, self.colspecs)
617
618 def has_index(self, connection, table_name, index_name, schema=None, **kw):
619 if not self.has_table(connection, table_name, schema=schema, **kw):
620 return False
621 for idx in self.get_indexes(
622 connection, table_name, schema=schema, **kw
623 ):
624 if idx["name"] == index_name:
625 return True
626 else:
627 return False
628
629 def has_schema(
630 self, connection: Connection, schema_name: str, **kw: Any
631 ) -> bool:
632 return schema_name in self.get_schema_names(connection, **kw)
633
634 def validate_identifier(self, ident: str) -> None:
635 if len(ident) > self.max_identifier_length:
636 raise exc.IdentifierError(
637 "Identifier '%s' exceeds maximum length of %d characters"
638 % (ident, self.max_identifier_length)
639 )
640
641 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
642 # inherits the docstring from interfaces.Dialect.connect
643 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
644
645 def create_connect_args(self, url: URL) -> ConnectArgsType:
646 # inherits the docstring from interfaces.Dialect.create_connect_args
647 opts = url.translate_connect_args()
648 opts.update(url.query)
649 return ([], opts)
650
651 def set_engine_execution_options(
652 self, engine: Engine, opts: Mapping[str, Any]
653 ) -> None:
654 supported_names = set(self.connection_characteristics).intersection(
655 opts
656 )
657 if supported_names:
658 characteristics: Mapping[str, Any] = util.immutabledict(
659 (name, opts[name]) for name in supported_names
660 )
661
662 @event.listens_for(engine, "engine_connect")
663 def set_connection_characteristics(connection):
664 self._set_connection_characteristics(
665 connection, characteristics
666 )
667
668 def set_connection_execution_options(
669 self, connection: Connection, opts: Mapping[str, Any]
670 ) -> None:
671 supported_names = set(self.connection_characteristics).intersection(
672 opts
673 )
674 if supported_names:
675 characteristics: Mapping[str, Any] = util.immutabledict(
676 (name, opts[name]) for name in supported_names
677 )
678 self._set_connection_characteristics(connection, characteristics)
679
680 def _set_connection_characteristics(self, connection, characteristics):
681 characteristic_values = [
682 (name, self.connection_characteristics[name], value)
683 for name, value in characteristics.items()
684 ]
685
686 if connection.in_transaction():
687 trans_objs = [
688 (name, obj)
689 for name, obj, _ in characteristic_values
690 if obj.transactional
691 ]
692 if trans_objs:
693 raise exc.InvalidRequestError(
694 "This connection has already initialized a SQLAlchemy "
695 "Transaction() object via begin() or autobegin; "
696 "%s may not be altered unless rollback() or commit() "
697 "is called first."
698 % (", ".join(name for name, obj in trans_objs))
699 )
700
701 dbapi_connection = connection.connection.dbapi_connection
702 for _, characteristic, value in characteristic_values:
703 characteristic.set_connection_characteristic(
704 self, connection, dbapi_connection, value
705 )
706 connection.connection._connection_record.finalize_callback.append(
707 functools.partial(self._reset_characteristics, characteristics)
708 )
709
710 def _reset_characteristics(self, characteristics, dbapi_connection):
711 for characteristic_name in characteristics:
712 characteristic = self.connection_characteristics[
713 characteristic_name
714 ]
715 characteristic.reset_characteristic(self, dbapi_connection)
716
717 def do_begin(self, dbapi_connection):
718 pass
719
720 def do_rollback(self, dbapi_connection):
721 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
722 dbapi_connection
723 ):
724 return
725 dbapi_connection.rollback()
726
727 def do_commit(self, dbapi_connection):
728 dbapi_connection.commit()
729
730 def do_terminate(self, dbapi_connection):
731 self.do_close(dbapi_connection)
732
733 def do_close(self, dbapi_connection):
734 dbapi_connection.close()
735
736 @util.memoized_property
737 def _dialect_specific_select_one(self):
738 return str(expression.select(1).compile(dialect=self))
739
740 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
741 try:
742 return self.do_ping(dbapi_connection)
743 except self.loaded_dbapi.Error as err:
744 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
745
746 if self._has_events:
747 try:
748 Connection._handle_dbapi_exception_noconnection(
749 err,
750 self,
751 is_disconnect=is_disconnect,
752 invalidate_pool_on_disconnect=False,
753 is_pre_ping=True,
754 )
755 except exc.StatementError as new_err:
756 is_disconnect = new_err.connection_invalidated
757
758 if is_disconnect:
759 return False
760 else:
761 raise
762
763 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
764 cursor = dbapi_connection.cursor()
765 try:
766 cursor.execute(self._dialect_specific_select_one)
767 finally:
768 cursor.close()
769 return True
770
771 def create_xid(self):
772 """Create a random two-phase transaction ID.
773
774 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
775 do_commit_twophase(). Its format is unspecified.
776 """
777
778 return "_sa_%032x" % random.randint(0, 2**128)
779
780 def do_savepoint(self, connection, name):
781 connection.execute(expression.SavepointClause(name))
782
783 def do_rollback_to_savepoint(self, connection, name):
784 connection.execute(expression.RollbackToSavepointClause(name))
785
786 def do_release_savepoint(self, connection, name):
787 connection.execute(expression.ReleaseSavepointClause(name))
788
789 def _deliver_insertmanyvalues_batches(
790 self,
791 connection,
792 cursor,
793 statement,
794 parameters,
795 generic_setinputsizes,
796 context,
797 ):
798 context = cast(DefaultExecutionContext, context)
799 compiled = cast(SQLCompiler, context.compiled)
800
801 _composite_sentinel_proc: Sequence[
802 Optional[_ResultProcessorType[Any]]
803 ] = ()
804 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
805 _sentinel_proc_initialized: bool = False
806
807 compiled_parameters = context.compiled_parameters
808
809 imv = compiled._insertmanyvalues
810 assert imv is not None
811
812 is_returning: Final[bool] = bool(compiled.effective_returning)
813 batch_size = context.execution_options.get(
814 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
815 )
816
817 if compiled.schema_translate_map:
818 schema_translate_map = context.execution_options.get(
819 "schema_translate_map", {}
820 )
821 else:
822 schema_translate_map = None
823
824 if is_returning:
825 result: Optional[List[Any]] = []
826 context._insertmanyvalues_rows = result
827
828 sort_by_parameter_order = imv.sort_by_parameter_order
829
830 else:
831 sort_by_parameter_order = False
832 result = None
833
834 for imv_batch in compiled._deliver_insertmanyvalues_batches(
835 statement,
836 parameters,
837 compiled_parameters,
838 generic_setinputsizes,
839 batch_size,
840 sort_by_parameter_order,
841 schema_translate_map,
842 ):
843 yield imv_batch
844
845 if is_returning:
846
847 try:
848 rows = context.fetchall_for_returning(cursor)
849 except BaseException as be:
850 connection._handle_dbapi_exception(
851 be,
852 sql_util._long_statement(imv_batch.replaced_statement),
853 imv_batch.replaced_parameters,
854 None,
855 context,
856 is_sub_exec=True,
857 )
858
859 # I would have thought "is_returning: Final[bool]"
860 # would have assured this but pylance thinks not
861 assert result is not None
862
863 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
864 composite_sentinel = imv.num_sentinel_columns > 1
865 if imv.implicit_sentinel:
866 # for implicit sentinel, which is currently single-col
867 # integer autoincrement, do a simple sort.
868 assert not composite_sentinel
869 result.extend(
870 sorted(rows, key=operator.itemgetter(-1))
871 )
872 continue
873
874 # otherwise, create dictionaries to match up batches
875 # with parameters
876 assert imv.sentinel_param_keys
877 assert imv.sentinel_columns
878
879 _nsc = imv.num_sentinel_columns
880
881 if not _sentinel_proc_initialized:
882 if composite_sentinel:
883 _composite_sentinel_proc = [
884 col.type._cached_result_processor(
885 self, cursor_desc[1]
886 )
887 for col, cursor_desc in zip(
888 imv.sentinel_columns,
889 cursor.description[-_nsc:],
890 )
891 ]
892 else:
893 _scalar_sentinel_proc = (
894 imv.sentinel_columns[0]
895 ).type._cached_result_processor(
896 self, cursor.description[-1][1]
897 )
898 _sentinel_proc_initialized = True
899
900 rows_by_sentinel: Union[
901 Dict[Tuple[Any, ...], Any],
902 Dict[Any, Any],
903 ]
904
905 if composite_sentinel:
906 rows_by_sentinel = {
907 tuple(
908 (proc(val) if proc else val)
909 for val, proc in zip(
910 row[-_nsc:], _composite_sentinel_proc
911 )
912 ): row
913 for row in rows
914 }
915 elif _scalar_sentinel_proc:
916 rows_by_sentinel = {
917 _scalar_sentinel_proc(row[-1]): row for row in rows
918 }
919 else:
920 rows_by_sentinel = {row[-1]: row for row in rows}
921
922 if len(rows_by_sentinel) != len(imv_batch.batch):
923 # see test_insert_exec.py::
924 # IMVSentinelTest::test_sentinel_incorrect_rowcount
925 # for coverage / demonstration
926 raise exc.InvalidRequestError(
927 f"Sentinel-keyed result set did not produce "
928 f"correct number of rows {len(imv_batch.batch)}; "
929 "produced "
930 f"{len(rows_by_sentinel)}. Please ensure the "
931 "sentinel column is fully unique and populated in "
932 "all cases."
933 )
934
935 try:
936 ordered_rows = [
937 rows_by_sentinel[sentinel_keys]
938 for sentinel_keys in imv_batch.sentinel_values
939 ]
940 except KeyError as ke:
941 # see test_insert_exec.py::
942 # IMVSentinelTest::test_sentinel_cant_match_keys
943 # for coverage / demonstration
944 raise exc.InvalidRequestError(
945 f"Can't match sentinel values in result set to "
946 f"parameter sets; key {ke.args[0]!r} was not "
947 "found. "
948 "There may be a mismatch between the datatype "
949 "passed to the DBAPI driver vs. that which it "
950 "returns in a result row. Ensure the given "
951 "Python value matches the expected result type "
952 "*exactly*, taking care to not rely upon implicit "
953 "conversions which may occur such as when using "
954 "strings in place of UUID or integer values, etc. "
955 ) from ke
956
957 result.extend(ordered_rows)
958
959 else:
960 result.extend(rows)
961
962 def do_executemany(self, cursor, statement, parameters, context=None):
963 cursor.executemany(statement, parameters)
964
965 def do_execute(self, cursor, statement, parameters, context=None):
966 cursor.execute(statement, parameters)
967
968 def do_execute_no_params(self, cursor, statement, context=None):
969 cursor.execute(statement)
970
971 def is_disconnect(
972 self,
973 e: DBAPIModule.Error,
974 connection: Union[
975 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
976 ],
977 cursor: Optional[interfaces.DBAPICursor],
978 ) -> bool:
979 return False
980
981 @util.memoized_instancemethod
982 def _gen_allowed_isolation_levels(self, dbapi_conn):
983 try:
984 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
985 except NotImplementedError:
986 return None
987 else:
988 normalized_levels = [
989 level.replace("_", " ").upper() for level in raw_levels
990 ]
991 if raw_levels != normalized_levels:
992 raise ValueError(
993 f"Dialect {self.name!r} get_isolation_level_values() "
994 f"method should return names as UPPERCASE using spaces, "
995 f"not underscores; got "
996 f"{sorted(set(raw_levels).difference(normalized_levels))}"
997 )
998 return tuple(normalized_levels)
999
1000 def _assert_and_set_isolation_level(self, dbapi_conn, level):
1001 level = level.replace("_", " ").upper()
1002
1003 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
1004 dbapi_conn
1005 )
1006 if (
1007 _allowed_isolation_levels
1008 and level not in _allowed_isolation_levels
1009 ):
1010 raise exc.ArgumentError(
1011 f"Invalid value {level!r} for isolation_level. "
1012 f"Valid isolation levels for {self.name!r} are "
1013 f"{', '.join(_allowed_isolation_levels)}"
1014 )
1015
1016 self.set_isolation_level(dbapi_conn, level)
1017
1018 def reset_isolation_level(self, dbapi_conn):
1019 if self._on_connect_isolation_level is not None:
1020 assert (
1021 self._on_connect_isolation_level == "AUTOCOMMIT"
1022 or self._on_connect_isolation_level
1023 == self.default_isolation_level
1024 )
1025 self._assert_and_set_isolation_level(
1026 dbapi_conn, self._on_connect_isolation_level
1027 )
1028 else:
1029 assert self.default_isolation_level is not None
1030 self._assert_and_set_isolation_level(
1031 dbapi_conn,
1032 self.default_isolation_level,
1033 )
1034
1035 def normalize_name(self, name):
1036 if name is None:
1037 return None
1038
1039 name_lower = name.lower()
1040 name_upper = name.upper()
1041
1042 if name_upper == name_lower:
1043 # name has no upper/lower conversion, e.g. non-european characters.
1044 # return unchanged
1045 return name
1046 elif name_upper == name and not (
1047 self.identifier_preparer._requires_quotes
1048 )(name_lower):
1049 # name is all uppercase and doesn't require quoting; normalize
1050 # to all lower case
1051 return name_lower
1052 elif name_lower == name:
1053 # name is all lower case, which if denormalized means we need to
1054 # force quoting on it
1055 return quoted_name(name, quote=True)
1056 else:
1057 # name is mixed case, means it will be quoted in SQL when used
1058 # later, no normalizes
1059 return name
1060
1061 def denormalize_name(self, name):
1062 if name is None:
1063 return None
1064
1065 name_lower = name.lower()
1066 name_upper = name.upper()
1067
1068 if name_upper == name_lower:
1069 # name has no upper/lower conversion, e.g. non-european characters.
1070 # return unchanged
1071 return name
1072 elif name_lower == name and not (
1073 self.identifier_preparer._requires_quotes
1074 )(name_lower):
1075 name = name_upper
1076 return name
1077
1078 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1079 return connection
1080
1081 def _overrides_default(self, method):
1082 return (
1083 getattr(type(self), method).__code__
1084 is not getattr(DefaultDialect, method).__code__
1085 )
1086
1087 def _default_multi_reflect(
1088 self,
1089 single_tbl_method,
1090 connection,
1091 kind,
1092 schema,
1093 filter_names,
1094 scope,
1095 **kw,
1096 ):
1097 names_fns = []
1098 temp_names_fns = []
1099 if ObjectKind.TABLE in kind:
1100 names_fns.append(self.get_table_names)
1101 temp_names_fns.append(self.get_temp_table_names)
1102 if ObjectKind.VIEW in kind:
1103 names_fns.append(self.get_view_names)
1104 temp_names_fns.append(self.get_temp_view_names)
1105 if ObjectKind.MATERIALIZED_VIEW in kind:
1106 names_fns.append(self.get_materialized_view_names)
1107 # no temp materialized view at the moment
1108 # temp_names_fns.append(self.get_temp_materialized_view_names)
1109
1110 unreflectable = kw.pop("unreflectable", {})
1111
1112 if (
1113 filter_names
1114 and scope is ObjectScope.ANY
1115 and kind is ObjectKind.ANY
1116 ):
1117 # if names are given and no qualification on type of table
1118 # (i.e. the Table(..., autoload) case), take the names as given,
1119 # don't run names queries. If a table does not exit
1120 # NoSuchTableError is raised and it's skipped
1121
1122 # this also suits the case for mssql where we can reflect
1123 # individual temp tables but there's no temp_names_fn
1124 names = filter_names
1125 else:
1126 names = []
1127 name_kw = {"schema": schema, **kw}
1128 fns = []
1129 if ObjectScope.DEFAULT in scope:
1130 fns.extend(names_fns)
1131 if ObjectScope.TEMPORARY in scope:
1132 fns.extend(temp_names_fns)
1133
1134 for fn in fns:
1135 try:
1136 names.extend(fn(connection, **name_kw))
1137 except NotImplementedError:
1138 pass
1139
1140 if filter_names:
1141 filter_names = set(filter_names)
1142
1143 # iterate over all the tables/views and call the single table method
1144 for table in names:
1145 if not filter_names or table in filter_names:
1146 key = (schema, table)
1147 try:
1148 yield (
1149 key,
1150 single_tbl_method(
1151 connection, table, schema=schema, **kw
1152 ),
1153 )
1154 except exc.UnreflectableTableError as err:
1155 if key not in unreflectable:
1156 unreflectable[key] = err
1157 except exc.NoSuchTableError:
1158 pass
1159
1160 def get_multi_table_options(self, connection, **kw):
1161 return self._default_multi_reflect(
1162 self.get_table_options, connection, **kw
1163 )
1164
1165 def get_multi_columns(self, connection, **kw):
1166 return self._default_multi_reflect(self.get_columns, connection, **kw)
1167
1168 def get_multi_pk_constraint(self, connection, **kw):
1169 return self._default_multi_reflect(
1170 self.get_pk_constraint, connection, **kw
1171 )
1172
1173 def get_multi_foreign_keys(self, connection, **kw):
1174 return self._default_multi_reflect(
1175 self.get_foreign_keys, connection, **kw
1176 )
1177
1178 def get_multi_indexes(self, connection, **kw):
1179 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1180
1181 def get_multi_unique_constraints(self, connection, **kw):
1182 return self._default_multi_reflect(
1183 self.get_unique_constraints, connection, **kw
1184 )
1185
1186 def get_multi_check_constraints(self, connection, **kw):
1187 return self._default_multi_reflect(
1188 self.get_check_constraints, connection, **kw
1189 )
1190
1191 def get_multi_table_comment(self, connection, **kw):
1192 return self._default_multi_reflect(
1193 self.get_table_comment, connection, **kw
1194 )
1195
1196
1197class StrCompileDialect(DefaultDialect):
1198 statement_compiler = compiler.StrSQLCompiler
1199 ddl_compiler = compiler.DDLCompiler
1200 type_compiler_cls = compiler.StrSQLTypeCompiler
1201 preparer = compiler.IdentifierPreparer
1202
1203 insert_returning = True
1204 update_returning = True
1205 delete_returning = True
1206
1207 supports_statement_cache = True
1208
1209 supports_identity_columns = True
1210
1211 supports_sequences = True
1212 sequences_optional = True
1213 preexecute_autoincrement_sequences = False
1214
1215 supports_native_boolean = True
1216
1217 supports_multivalues_insert = True
1218 supports_simple_order_by_label = True
1219
1220
1221class DefaultExecutionContext(ExecutionContext):
1222 isinsert = False
1223 isupdate = False
1224 isdelete = False
1225 is_crud = False
1226 is_text = False
1227 isddl = False
1228
1229 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1230
1231 compiled: Optional[Compiled] = None
1232 result_column_struct: Optional[
1233 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1234 ] = None
1235 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1236
1237 execution_options: _ExecuteOptions = util.EMPTY_DICT
1238
1239 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1240
1241 invoked_statement: Optional[Executable] = None
1242
1243 _is_implicit_returning = False
1244 _is_explicit_returning = False
1245 _is_supplemental_returning = False
1246 _is_server_side = False
1247
1248 _soft_closed = False
1249
1250 _rowcount: Optional[int] = None
1251
1252 # a hook for SQLite's translation of
1253 # result column names
1254 # NOTE: pyhive is using this hook, can't remove it :(
1255 _translate_colname: Optional[
1256 Callable[[str], Tuple[str, Optional[str]]]
1257 ] = None
1258
1259 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1260 """used by set_input_sizes().
1261
1262 This collection comes from ``ExpandedState.parameter_expansion``.
1263
1264 """
1265
1266 cache_hit = NO_CACHE_KEY
1267
1268 root_connection: Connection
1269 _dbapi_connection: PoolProxiedConnection
1270 dialect: Dialect
1271 unicode_statement: str
1272 cursor: DBAPICursor
1273 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1274 parameters: _DBAPIMultiExecuteParams
1275 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1276
1277 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1278
1279 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1280 _num_sentinel_cols: int = 0
1281
1282 @classmethod
1283 def _init_ddl(
1284 cls,
1285 dialect: Dialect,
1286 connection: Connection,
1287 dbapi_connection: PoolProxiedConnection,
1288 execution_options: _ExecuteOptions,
1289 compiled_ddl: DDLCompiler,
1290 ) -> ExecutionContext:
1291 """Initialize execution context for an ExecutableDDLElement
1292 construct."""
1293
1294 self = cls.__new__(cls)
1295 self.root_connection = connection
1296 self._dbapi_connection = dbapi_connection
1297 self.dialect = connection.dialect
1298
1299 self.compiled = compiled = compiled_ddl
1300 self.isddl = True
1301
1302 self.execution_options = execution_options
1303
1304 self.unicode_statement = str(compiled)
1305 if compiled.schema_translate_map:
1306 schema_translate_map = self.execution_options.get(
1307 "schema_translate_map", {}
1308 )
1309
1310 rst = compiled.preparer._render_schema_translates
1311 self.unicode_statement = rst(
1312 self.unicode_statement, schema_translate_map
1313 )
1314
1315 self.statement = self.unicode_statement
1316
1317 self.cursor = self.create_cursor()
1318 self.compiled_parameters = []
1319
1320 if dialect.positional:
1321 self.parameters = [dialect.execute_sequence_format()]
1322 else:
1323 self.parameters = [self._empty_dict_params]
1324
1325 return self
1326
1327 @classmethod
1328 def _init_compiled(
1329 cls,
1330 dialect: Dialect,
1331 connection: Connection,
1332 dbapi_connection: PoolProxiedConnection,
1333 execution_options: _ExecuteOptions,
1334 compiled: SQLCompiler,
1335 parameters: _CoreMultiExecuteParams,
1336 invoked_statement: Executable,
1337 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1338 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1339 param_dict: _CoreSingleExecuteParams | None = None,
1340 ) -> ExecutionContext:
1341 """Initialize execution context for a Compiled construct."""
1342
1343 self = cls.__new__(cls)
1344 self.root_connection = connection
1345 self._dbapi_connection = dbapi_connection
1346 self.dialect = connection.dialect
1347 self.extracted_parameters = extracted_parameters
1348 self.invoked_statement = invoked_statement
1349 self.compiled = compiled
1350 self.cache_hit = cache_hit
1351
1352 self.execution_options = execution_options
1353
1354 self.result_column_struct = (
1355 compiled._result_columns,
1356 compiled._ordered_columns,
1357 compiled._textual_ordered_columns,
1358 compiled._ad_hoc_textual,
1359 compiled._loose_column_name_matching,
1360 )
1361
1362 self.isinsert = ii = compiled.isinsert
1363 self.isupdate = iu = compiled.isupdate
1364 self.isdelete = id_ = compiled.isdelete
1365 self.is_text = compiled.isplaintext
1366
1367 if ii or iu or id_:
1368 dml_statement = compiled.compile_state.statement # type: ignore
1369 if TYPE_CHECKING:
1370 assert isinstance(dml_statement, UpdateBase)
1371 self.is_crud = True
1372 self._is_explicit_returning = ier = bool(dml_statement._returning)
1373 self._is_implicit_returning = iir = bool(
1374 compiled.implicit_returning
1375 )
1376 if iir and dml_statement._supplemental_returning:
1377 self._is_supplemental_returning = True
1378
1379 # dont mix implicit and explicit returning
1380 assert not (iir and ier)
1381
1382 if (ier or iir) and compiled.for_executemany:
1383 if ii and not self.dialect.insert_executemany_returning:
1384 raise exc.InvalidRequestError(
1385 f"Dialect {self.dialect.dialect_description} with "
1386 f"current server capabilities does not support "
1387 "INSERT..RETURNING when executemany is used"
1388 )
1389 elif (
1390 ii
1391 and dml_statement._sort_by_parameter_order
1392 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1393 ):
1394 raise exc.InvalidRequestError(
1395 f"Dialect {self.dialect.dialect_description} with "
1396 f"current server capabilities does not support "
1397 "INSERT..RETURNING with deterministic row ordering "
1398 "when executemany is used"
1399 )
1400 elif (
1401 ii
1402 and self.dialect.use_insertmanyvalues
1403 and not compiled._insertmanyvalues
1404 ):
1405 raise exc.InvalidRequestError(
1406 'Statement does not have "insertmanyvalues" '
1407 "enabled, can't use INSERT..RETURNING with "
1408 "executemany in this case."
1409 )
1410 elif iu and not self.dialect.update_executemany_returning:
1411 raise exc.InvalidRequestError(
1412 f"Dialect {self.dialect.dialect_description} with "
1413 f"current server capabilities does not support "
1414 "UPDATE..RETURNING when executemany is used"
1415 )
1416 elif id_ and not self.dialect.delete_executemany_returning:
1417 raise exc.InvalidRequestError(
1418 f"Dialect {self.dialect.dialect_description} with "
1419 f"current server capabilities does not support "
1420 "DELETE..RETURNING when executemany is used"
1421 )
1422
1423 if not parameters:
1424 self.compiled_parameters = [
1425 compiled.construct_params(
1426 extracted_parameters=extracted_parameters,
1427 escape_names=False,
1428 _collected_params=param_dict,
1429 )
1430 ]
1431 else:
1432 self.compiled_parameters = [
1433 compiled.construct_params(
1434 m,
1435 escape_names=False,
1436 _group_number=grp,
1437 extracted_parameters=extracted_parameters,
1438 _collected_params=param_dict,
1439 )
1440 for grp, m in enumerate(parameters)
1441 ]
1442
1443 if len(parameters) > 1:
1444 if self.isinsert and compiled._insertmanyvalues:
1445 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1446
1447 imv = compiled._insertmanyvalues
1448 if imv.sentinel_columns is not None:
1449 self._num_sentinel_cols = imv.num_sentinel_columns
1450 else:
1451 self.execute_style = ExecuteStyle.EXECUTEMANY
1452
1453 self.unicode_statement = compiled.string
1454
1455 self.cursor = self.create_cursor()
1456
1457 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1458 self._process_execute_defaults()
1459
1460 processors = compiled._bind_processors
1461
1462 flattened_processors: Mapping[
1463 str, _BindProcessorType[Any]
1464 ] = processors # type: ignore[assignment]
1465
1466 if compiled.literal_execute_params or compiled.post_compile_params:
1467 if self.executemany:
1468 raise exc.InvalidRequestError(
1469 "'literal_execute' or 'expanding' parameters can't be "
1470 "used with executemany()"
1471 )
1472
1473 expanded_state = compiled._process_parameters_for_postcompile(
1474 self.compiled_parameters[0]
1475 )
1476
1477 # re-assign self.unicode_statement
1478 self.unicode_statement = expanded_state.statement
1479
1480 self._expanded_parameters = expanded_state.parameter_expansion
1481
1482 flattened_processors = dict(processors) # type: ignore
1483 flattened_processors.update(expanded_state.processors)
1484 positiontup = expanded_state.positiontup
1485 elif compiled.positional:
1486 positiontup = self.compiled.positiontup
1487 else:
1488 positiontup = None
1489
1490 if compiled.schema_translate_map:
1491 schema_translate_map = self.execution_options.get(
1492 "schema_translate_map", {}
1493 )
1494 rst = compiled.preparer._render_schema_translates
1495 self.unicode_statement = rst(
1496 self.unicode_statement, schema_translate_map
1497 )
1498
1499 # final self.unicode_statement is now assigned, encode if needed
1500 # by dialect
1501 self.statement = self.unicode_statement
1502
1503 # Convert the dictionary of bind parameter values
1504 # into a dict or list to be sent to the DBAPI's
1505 # execute() or executemany() method.
1506
1507 if compiled.positional:
1508 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1509 assert positiontup is not None
1510 for compiled_params in self.compiled_parameters:
1511 l_param: List[Any] = [
1512 (
1513 flattened_processors[key](compiled_params[key])
1514 if key in flattened_processors
1515 else compiled_params[key]
1516 )
1517 for key in positiontup
1518 ]
1519 core_positional_parameters.append(
1520 dialect.execute_sequence_format(l_param)
1521 )
1522
1523 self.parameters = core_positional_parameters
1524 else:
1525 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1526 escaped_names = compiled.escaped_bind_names
1527
1528 # note that currently, "expanded" parameters will be present
1529 # in self.compiled_parameters in their quoted form. This is
1530 # slightly inconsistent with the approach taken as of
1531 # #8056 where self.compiled_parameters is meant to contain unquoted
1532 # param names.
1533 d_param: Dict[str, Any]
1534 for compiled_params in self.compiled_parameters:
1535 if escaped_names:
1536 d_param = {
1537 escaped_names.get(key, key): (
1538 flattened_processors[key](compiled_params[key])
1539 if key in flattened_processors
1540 else compiled_params[key]
1541 )
1542 for key in compiled_params
1543 }
1544 else:
1545 d_param = {
1546 key: (
1547 flattened_processors[key](compiled_params[key])
1548 if key in flattened_processors
1549 else compiled_params[key]
1550 )
1551 for key in compiled_params
1552 }
1553
1554 core_dict_parameters.append(d_param)
1555
1556 self.parameters = core_dict_parameters
1557
1558 return self
1559
1560 @classmethod
1561 def _init_statement(
1562 cls,
1563 dialect: Dialect,
1564 connection: Connection,
1565 dbapi_connection: PoolProxiedConnection,
1566 execution_options: _ExecuteOptions,
1567 statement: str,
1568 parameters: _DBAPIMultiExecuteParams,
1569 ) -> ExecutionContext:
1570 """Initialize execution context for a string SQL statement."""
1571
1572 self = cls.__new__(cls)
1573 self.root_connection = connection
1574 self._dbapi_connection = dbapi_connection
1575 self.dialect = connection.dialect
1576 self.is_text = True
1577
1578 self.execution_options = execution_options
1579
1580 if not parameters:
1581 if self.dialect.positional:
1582 self.parameters = [dialect.execute_sequence_format()]
1583 else:
1584 self.parameters = [self._empty_dict_params]
1585 elif isinstance(parameters[0], dialect.execute_sequence_format):
1586 self.parameters = parameters
1587 elif isinstance(parameters[0], dict):
1588 self.parameters = parameters
1589 else:
1590 self.parameters = [
1591 dialect.execute_sequence_format(p) for p in parameters
1592 ]
1593
1594 if len(parameters) > 1:
1595 self.execute_style = ExecuteStyle.EXECUTEMANY
1596
1597 self.statement = self.unicode_statement = statement
1598
1599 self.cursor = self.create_cursor()
1600 return self
1601
1602 @classmethod
1603 def _init_default(
1604 cls,
1605 dialect: Dialect,
1606 connection: Connection,
1607 dbapi_connection: PoolProxiedConnection,
1608 execution_options: _ExecuteOptions,
1609 ) -> ExecutionContext:
1610 """Initialize execution context for a ColumnDefault construct."""
1611
1612 self = cls.__new__(cls)
1613 self.root_connection = connection
1614 self._dbapi_connection = dbapi_connection
1615 self.dialect = connection.dialect
1616
1617 self.execution_options = execution_options
1618
1619 self.cursor = self.create_cursor()
1620 return self
1621
1622 def _get_cache_stats(self) -> str:
1623 if self.compiled is None:
1624 return "raw sql"
1625
1626 now = perf_counter()
1627
1628 ch = self.cache_hit
1629
1630 gen_time = self.compiled._gen_time
1631 assert gen_time is not None
1632
1633 if ch is NO_CACHE_KEY:
1634 return "no key %.5fs" % (now - gen_time,)
1635 elif ch is CACHE_HIT:
1636 return "cached since %.4gs ago" % (now - gen_time,)
1637 elif ch is CACHE_MISS:
1638 return "generated in %.5fs" % (now - gen_time,)
1639 elif ch is CACHING_DISABLED:
1640 if "_cache_disable_reason" in self.execution_options:
1641 return "caching disabled (%s) %.5fs " % (
1642 self.execution_options["_cache_disable_reason"],
1643 now - gen_time,
1644 )
1645 else:
1646 return "caching disabled %.5fs" % (now - gen_time,)
1647 elif ch is NO_DIALECT_SUPPORT:
1648 return "dialect %s+%s does not support caching %.5fs" % (
1649 self.dialect.name,
1650 self.dialect.driver,
1651 now - gen_time,
1652 )
1653 else:
1654 return "unknown"
1655
1656 @property
1657 def executemany(self): # type: ignore[override]
1658 return self.execute_style in (
1659 ExecuteStyle.EXECUTEMANY,
1660 ExecuteStyle.INSERTMANYVALUES,
1661 )
1662
1663 @util.memoized_property
1664 def identifier_preparer(self):
1665 if self.compiled:
1666 return self.compiled.preparer
1667 elif "schema_translate_map" in self.execution_options:
1668 return self.dialect.identifier_preparer._with_schema_translate(
1669 self.execution_options["schema_translate_map"]
1670 )
1671 else:
1672 return self.dialect.identifier_preparer
1673
1674 @util.memoized_property
1675 def engine(self):
1676 return self.root_connection.engine
1677
1678 @util.memoized_property
1679 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1680 if TYPE_CHECKING:
1681 assert isinstance(self.compiled, SQLCompiler)
1682 return self.compiled.postfetch
1683
1684 @util.memoized_property
1685 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1686 if TYPE_CHECKING:
1687 assert isinstance(self.compiled, SQLCompiler)
1688 if self.isinsert:
1689 return self.compiled.insert_prefetch
1690 elif self.isupdate:
1691 return self.compiled.update_prefetch
1692 else:
1693 return ()
1694
1695 @util.memoized_property
1696 def no_parameters(self):
1697 return self.execution_options.get("no_parameters", False)
1698
1699 def _execute_scalar(
1700 self,
1701 stmt: str,
1702 type_: Optional[TypeEngine[Any]],
1703 parameters: Optional[_DBAPISingleExecuteParams] = None,
1704 ) -> Any:
1705 """Execute a string statement on the current cursor, returning a
1706 scalar result.
1707
1708 Used to fire off sequences, default phrases, and "select lastrowid"
1709 types of statements individually or in the context of a parent INSERT
1710 or UPDATE statement.
1711
1712 """
1713
1714 conn = self.root_connection
1715
1716 if "schema_translate_map" in self.execution_options:
1717 schema_translate_map = self.execution_options.get(
1718 "schema_translate_map", {}
1719 )
1720
1721 rst = self.identifier_preparer._render_schema_translates
1722 stmt = rst(stmt, schema_translate_map)
1723
1724 if not parameters:
1725 if self.dialect.positional:
1726 parameters = self.dialect.execute_sequence_format()
1727 else:
1728 parameters = {}
1729
1730 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1731 row = self.cursor.fetchone()
1732 if row is not None:
1733 r = row[0]
1734 else:
1735 r = None
1736 if type_ is not None:
1737 # apply type post processors to the result
1738 proc = type_._cached_result_processor(
1739 self.dialect, self.cursor.description[0][1]
1740 )
1741 if proc:
1742 return proc(r)
1743 return r
1744
1745 @util.memoized_property
1746 def connection(self):
1747 return self.root_connection
1748
1749 def _use_server_side_cursor(self):
1750 if not self.dialect.supports_server_side_cursors:
1751 return False
1752
1753 if self.dialect.server_side_cursors:
1754 # this is deprecated
1755 use_server_side = self.execution_options.get(
1756 "stream_results", True
1757 ) and (
1758 self.compiled
1759 and isinstance(self.compiled.statement, expression.Selectable)
1760 or (
1761 (
1762 not self.compiled
1763 or isinstance(
1764 self.compiled.statement, expression.TextClause
1765 )
1766 )
1767 and self.unicode_statement
1768 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1769 )
1770 )
1771 else:
1772 use_server_side = self.execution_options.get(
1773 "stream_results", False
1774 )
1775
1776 return use_server_side
1777
1778 def create_cursor(self) -> DBAPICursor:
1779 if (
1780 # inlining initial preference checks for SS cursors
1781 self.dialect.supports_server_side_cursors
1782 and (
1783 self.execution_options.get("stream_results", False)
1784 or (
1785 self.dialect.server_side_cursors
1786 and self._use_server_side_cursor()
1787 )
1788 )
1789 ):
1790 self._is_server_side = True
1791 return self.create_server_side_cursor()
1792 else:
1793 self._is_server_side = False
1794 return self.create_default_cursor()
1795
1796 def fetchall_for_returning(self, cursor):
1797 return cursor.fetchall()
1798
1799 def create_default_cursor(self) -> DBAPICursor:
1800 return self._dbapi_connection.cursor()
1801
1802 def create_server_side_cursor(self) -> DBAPICursor:
1803 raise NotImplementedError()
1804
1805 def pre_exec(self):
1806 pass
1807
1808 def get_out_parameter_values(self, names):
1809 raise NotImplementedError(
1810 "This dialect does not support OUT parameters"
1811 )
1812
1813 def post_exec(self):
1814 pass
1815
1816 def get_result_processor(
1817 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1818 ) -> Optional[_ResultProcessorType[Any]]:
1819 """Return a 'result processor' for a given type as present in
1820 cursor.description.
1821
1822 This has a default implementation that dialects can override
1823 for context-sensitive result type handling.
1824
1825 """
1826 return type_._cached_result_processor(self.dialect, coltype)
1827
1828 def get_lastrowid(self) -> int:
1829 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1830
1831 This may involve calling special cursor functions, issuing a new SELECT
1832 on the cursor (or a new one), or returning a stored value that was
1833 calculated within post_exec().
1834
1835 This function will only be called for dialects which support "implicit"
1836 primary key generation, keep preexecute_autoincrement_sequences set to
1837 False, and when no explicit id value was bound to the statement.
1838
1839 The function is called once for an INSERT statement that would need to
1840 return the last inserted primary key for those dialects that make use
1841 of the lastrowid concept. In these cases, it is called directly after
1842 :meth:`.ExecutionContext.post_exec`.
1843
1844 """
1845 return self.cursor.lastrowid
1846
1847 def handle_dbapi_exception(self, e):
1848 pass
1849
1850 @util.non_memoized_property
1851 def rowcount(self) -> int:
1852 if self._rowcount is not None:
1853 return self._rowcount
1854 else:
1855 return self.cursor.rowcount
1856
1857 @property
1858 def _has_rowcount(self):
1859 return self._rowcount is not None
1860
1861 def supports_sane_rowcount(self):
1862 return self.dialect.supports_sane_rowcount
1863
1864 def supports_sane_multi_rowcount(self):
1865 return self.dialect.supports_sane_multi_rowcount
1866
1867 def _setup_result_proxy(self):
1868 exec_opt = self.execution_options
1869
1870 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1871 self._rowcount = self.cursor.rowcount
1872
1873 yp: Optional[Union[int, bool]]
1874 if self.is_crud or self.is_text:
1875 result = self._setup_dml_or_text_result()
1876 yp = False
1877 else:
1878 yp = exec_opt.get("yield_per", None)
1879 sr = self._is_server_side or exec_opt.get("stream_results", False)
1880 strategy = self.cursor_fetch_strategy
1881 if sr and strategy is _cursor._DEFAULT_FETCH:
1882 strategy = _cursor.BufferedRowCursorFetchStrategy(
1883 self.cursor, self.execution_options
1884 )
1885 cursor_description: _DBAPICursorDescription = (
1886 strategy.alternate_cursor_description
1887 or self.cursor.description
1888 )
1889 if cursor_description is None:
1890 strategy = _cursor._NO_CURSOR_DQL
1891
1892 result = _cursor.CursorResult(self, strategy, cursor_description)
1893
1894 compiled = self.compiled
1895
1896 if (
1897 compiled
1898 and not self.isddl
1899 and cast(SQLCompiler, compiled).has_out_parameters
1900 ):
1901 self._setup_out_parameters(result)
1902
1903 self._soft_closed = result._soft_closed
1904
1905 if yp:
1906 result = result.yield_per(yp)
1907
1908 return result
1909
1910 def _setup_out_parameters(self, result):
1911 compiled = cast(SQLCompiler, self.compiled)
1912
1913 out_bindparams = [
1914 (param, name)
1915 for param, name in compiled.bind_names.items()
1916 if param.isoutparam
1917 ]
1918 out_parameters = {}
1919
1920 for bindparam, raw_value in zip(
1921 [param for param, name in out_bindparams],
1922 self.get_out_parameter_values(
1923 [name for param, name in out_bindparams]
1924 ),
1925 ):
1926 type_ = bindparam.type
1927 impl_type = type_.dialect_impl(self.dialect)
1928 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1929 result_processor = impl_type.result_processor(
1930 self.dialect, dbapi_type
1931 )
1932 if result_processor is not None:
1933 raw_value = result_processor(raw_value)
1934 out_parameters[bindparam.key] = raw_value
1935
1936 result.out_parameters = out_parameters
1937
1938 def _setup_dml_or_text_result(self):
1939 compiled = cast(SQLCompiler, self.compiled)
1940
1941 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1942
1943 if self.isinsert:
1944 if (
1945 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1946 and compiled.effective_returning
1947 ):
1948 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1949 self.cursor,
1950 initial_buffer=self._insertmanyvalues_rows,
1951 # maintain alt cursor description if set by the
1952 # dialect, e.g. mssql preserves it
1953 alternate_description=(
1954 strategy.alternate_cursor_description
1955 ),
1956 )
1957
1958 if compiled.postfetch_lastrowid:
1959 self.inserted_primary_key_rows = (
1960 self._setup_ins_pk_from_lastrowid()
1961 )
1962 # else if not self._is_implicit_returning,
1963 # the default inserted_primary_key_rows accessor will
1964 # return an "empty" primary key collection when accessed.
1965
1966 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1967 strategy = _cursor.BufferedRowCursorFetchStrategy(
1968 self.cursor, self.execution_options
1969 )
1970
1971 if strategy is _cursor._NO_CURSOR_DML:
1972 cursor_description = None
1973 else:
1974 cursor_description = (
1975 strategy.alternate_cursor_description
1976 or self.cursor.description
1977 )
1978
1979 if cursor_description is None:
1980 strategy = _cursor._NO_CURSOR_DML
1981 elif self._num_sentinel_cols:
1982 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1983 # the sentinel columns are handled in CursorResult._init_metadata
1984 # using essentially _reduce
1985
1986 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1987 self, strategy, cursor_description
1988 )
1989
1990 if self.isinsert:
1991 if self._is_implicit_returning:
1992 rows = result.all()
1993
1994 self.returned_default_rows = rows
1995
1996 self.inserted_primary_key_rows = (
1997 self._setup_ins_pk_from_implicit_returning(result, rows)
1998 )
1999
2000 # test that it has a cursor metadata that is accurate. the
2001 # first row will have been fetched and current assumptions
2002 # are that the result has only one row, until executemany()
2003 # support is added here.
2004 assert result._metadata.returns_rows
2005
2006 # Insert statement has both return_defaults() and
2007 # returning(). rewind the result on the list of rows
2008 # we just used.
2009 if self._is_supplemental_returning:
2010 result._rewind(rows)
2011 else:
2012 result._soft_close()
2013 elif not self._is_explicit_returning:
2014 result._soft_close()
2015
2016 # we assume here the result does not return any rows.
2017 # *usually*, this will be true. However, some dialects
2018 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2019 # function so this is not necessarily true.
2020 # assert not result.returns_rows
2021
2022 elif self._is_implicit_returning:
2023 rows = result.all()
2024
2025 if rows:
2026 self.returned_default_rows = rows
2027 self._rowcount = len(rows)
2028
2029 if self._is_supplemental_returning:
2030 result._rewind(rows)
2031 else:
2032 result._soft_close()
2033
2034 # test that it has a cursor metadata that is accurate.
2035 # the rows have all been fetched however.
2036 assert result._metadata.returns_rows
2037
2038 elif not result._metadata.returns_rows:
2039 # no results, get rowcount
2040 # (which requires open cursor on some drivers)
2041 if self._rowcount is None:
2042 self._rowcount = self.cursor.rowcount
2043 result._soft_close()
2044 elif self.isupdate or self.isdelete:
2045 if self._rowcount is None:
2046 self._rowcount = self.cursor.rowcount
2047 return result
2048
2049 @util.memoized_property
2050 def inserted_primary_key_rows(self):
2051 # if no specific "get primary key" strategy was set up
2052 # during execution, return a "default" primary key based
2053 # on what's in the compiled_parameters and nothing else.
2054 return self._setup_ins_pk_from_empty()
2055
2056 def _setup_ins_pk_from_lastrowid(self):
2057 getter = cast(
2058 SQLCompiler, self.compiled
2059 )._inserted_primary_key_from_lastrowid_getter
2060 lastrowid = self.get_lastrowid()
2061 return [getter(lastrowid, self.compiled_parameters[0])]
2062
2063 def _setup_ins_pk_from_empty(self):
2064 getter = cast(
2065 SQLCompiler, self.compiled
2066 )._inserted_primary_key_from_lastrowid_getter
2067 return [getter(None, param) for param in self.compiled_parameters]
2068
2069 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2070 if not rows:
2071 return []
2072
2073 getter = cast(
2074 SQLCompiler, self.compiled
2075 )._inserted_primary_key_from_returning_getter
2076 compiled_params = self.compiled_parameters
2077
2078 return [
2079 getter(row, param) for row, param in zip(rows, compiled_params)
2080 ]
2081
2082 def lastrow_has_defaults(self) -> bool:
2083 return (self.isinsert or self.isupdate) and bool(
2084 cast(SQLCompiler, self.compiled).postfetch
2085 )
2086
2087 def _prepare_set_input_sizes(
2088 self,
2089 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2090 """Given a cursor and ClauseParameters, prepare arguments
2091 in order to call the appropriate
2092 style of ``setinputsizes()`` on the cursor, using DB-API types
2093 from the bind parameter's ``TypeEngine`` objects.
2094
2095 This method only called by those dialects which set the
2096 :attr:`.Dialect.bind_typing` attribute to
2097 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2098 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2099 option.
2100
2101 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2102 for pg8000 and asyncpg, which has been changed to inline rendering
2103 of casts.
2104
2105 """
2106 if self.isddl or self.is_text:
2107 return None
2108
2109 compiled = cast(SQLCompiler, self.compiled)
2110
2111 inputsizes = compiled._get_set_input_sizes_lookup()
2112
2113 if inputsizes is None:
2114 return None
2115
2116 dialect = self.dialect
2117
2118 # all of the rest of this... cython?
2119
2120 if dialect._has_events:
2121 inputsizes = dict(inputsizes)
2122 dialect.dispatch.do_setinputsizes(
2123 inputsizes, self.cursor, self.statement, self.parameters, self
2124 )
2125
2126 if compiled.escaped_bind_names:
2127 escaped_bind_names = compiled.escaped_bind_names
2128 else:
2129 escaped_bind_names = None
2130
2131 if dialect.positional:
2132 items = [
2133 (key, compiled.binds[key])
2134 for key in compiled.positiontup or ()
2135 ]
2136 else:
2137 items = [
2138 (key, bindparam)
2139 for bindparam, key in compiled.bind_names.items()
2140 ]
2141
2142 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2143 for key, bindparam in items:
2144 if bindparam in compiled.literal_execute_params:
2145 continue
2146
2147 if key in self._expanded_parameters:
2148 if is_tuple_type(bindparam.type):
2149 num = len(bindparam.type.types)
2150 dbtypes = inputsizes[bindparam]
2151 generic_inputsizes.extend(
2152 (
2153 (
2154 escaped_bind_names.get(paramname, paramname)
2155 if escaped_bind_names is not None
2156 else paramname
2157 ),
2158 dbtypes[idx % num],
2159 bindparam.type.types[idx % num],
2160 )
2161 for idx, paramname in enumerate(
2162 self._expanded_parameters[key]
2163 )
2164 )
2165 else:
2166 dbtype = inputsizes.get(bindparam, None)
2167 generic_inputsizes.extend(
2168 (
2169 (
2170 escaped_bind_names.get(paramname, paramname)
2171 if escaped_bind_names is not None
2172 else paramname
2173 ),
2174 dbtype,
2175 bindparam.type,
2176 )
2177 for paramname in self._expanded_parameters[key]
2178 )
2179 else:
2180 dbtype = inputsizes.get(bindparam, None)
2181
2182 escaped_name = (
2183 escaped_bind_names.get(key, key)
2184 if escaped_bind_names is not None
2185 else key
2186 )
2187
2188 generic_inputsizes.append(
2189 (escaped_name, dbtype, bindparam.type)
2190 )
2191
2192 return generic_inputsizes
2193
2194 def _exec_default(self, column, default, type_):
2195 if default.is_sequence:
2196 return self.fire_sequence(default, type_)
2197 elif default.is_callable:
2198 # this codepath is not normally used as it's inlined
2199 # into _process_execute_defaults
2200 self.current_column = column
2201 return default.arg(self)
2202 elif default.is_clause_element:
2203 return self._exec_default_clause_element(column, default, type_)
2204 else:
2205 # this codepath is not normally used as it's inlined
2206 # into _process_execute_defaults
2207 return default.arg
2208
2209 def _exec_default_clause_element(self, column, default, type_):
2210 # execute a default that's a complete clause element. Here, we have
2211 # to re-implement a miniature version of the compile->parameters->
2212 # cursor.execute() sequence, since we don't want to modify the state
2213 # of the connection / result in progress or create new connection/
2214 # result objects etc.
2215 # .. versionchanged:: 1.4
2216
2217 if not default._arg_is_typed:
2218 default_arg = expression.type_coerce(default.arg, type_)
2219 else:
2220 default_arg = default.arg
2221 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2222 compiled_params = compiled.construct_params()
2223 processors = compiled._bind_processors
2224 if compiled.positional:
2225 parameters = self.dialect.execute_sequence_format(
2226 [
2227 (
2228 processors[key](compiled_params[key]) # type: ignore
2229 if key in processors
2230 else compiled_params[key]
2231 )
2232 for key in compiled.positiontup or ()
2233 ]
2234 )
2235 else:
2236 parameters = {
2237 key: (
2238 processors[key](compiled_params[key]) # type: ignore
2239 if key in processors
2240 else compiled_params[key]
2241 )
2242 for key in compiled_params
2243 }
2244 return self._execute_scalar(
2245 str(compiled), type_, parameters=parameters
2246 )
2247
2248 current_parameters: Optional[_CoreSingleExecuteParams] = None
2249 """A dictionary of parameters applied to the current row.
2250
2251 This attribute is only available in the context of a user-defined default
2252 generation function, e.g. as described at :ref:`context_default_functions`.
2253 It consists of a dictionary which includes entries for each column/value
2254 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2255 dictionary will be the key value of each :class:`_schema.Column`,
2256 which is usually
2257 synonymous with the name.
2258
2259 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2260 does not accommodate for the "multi-values" feature of the
2261 :meth:`_expression.Insert.values` method. The
2262 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2263 preferred.
2264
2265 .. seealso::
2266
2267 :meth:`.DefaultExecutionContext.get_current_parameters`
2268
2269 :ref:`context_default_functions`
2270
2271 """
2272
2273 def get_current_parameters(self, isolate_multiinsert_groups=True):
2274 """Return a dictionary of parameters applied to the current row.
2275
2276 This method can only be used in the context of a user-defined default
2277 generation function, e.g. as described at
2278 :ref:`context_default_functions`. When invoked, a dictionary is
2279 returned which includes entries for each column/value pair that is part
2280 of the INSERT or UPDATE statement. The keys of the dictionary will be
2281 the key value of each :class:`_schema.Column`,
2282 which is usually synonymous
2283 with the name.
2284
2285 :param isolate_multiinsert_groups=True: indicates that multi-valued
2286 INSERT constructs created using :meth:`_expression.Insert.values`
2287 should be
2288 handled by returning only the subset of parameters that are local
2289 to the current column default invocation. When ``False``, the
2290 raw parameters of the statement are returned including the
2291 naming convention used in the case of multi-valued INSERT.
2292
2293 .. seealso::
2294
2295 :attr:`.DefaultExecutionContext.current_parameters`
2296
2297 :ref:`context_default_functions`
2298
2299 """
2300 try:
2301 parameters = self.current_parameters
2302 column = self.current_column
2303 except AttributeError:
2304 raise exc.InvalidRequestError(
2305 "get_current_parameters() can only be invoked in the "
2306 "context of a Python side column default function"
2307 )
2308 else:
2309 assert column is not None
2310 assert parameters is not None
2311 compile_state = cast(
2312 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2313 )
2314 assert compile_state is not None
2315 if (
2316 isolate_multiinsert_groups
2317 and dml.isinsert(compile_state)
2318 and compile_state._has_multi_parameters
2319 ):
2320 if column._is_multiparam_column:
2321 index = column.index + 1
2322 d = {column.original.key: parameters[column.key]}
2323 else:
2324 d = {column.key: parameters[column.key]}
2325 index = 0
2326 assert compile_state._dict_parameters is not None
2327 keys = compile_state._dict_parameters.keys()
2328 d.update(
2329 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2330 )
2331 return d
2332 else:
2333 return parameters
2334
2335 def get_insert_default(self, column):
2336 if column.default is None:
2337 return None
2338 else:
2339 return self._exec_default(column, column.default, column.type)
2340
2341 def get_update_default(self, column):
2342 if column.onupdate is None:
2343 return None
2344 else:
2345 return self._exec_default(column, column.onupdate, column.type)
2346
2347 def _process_execute_defaults(self):
2348 compiled = cast(SQLCompiler, self.compiled)
2349
2350 key_getter = compiled._within_exec_param_key_getter
2351
2352 sentinel_counter = 0
2353
2354 if compiled.insert_prefetch:
2355 prefetch_recs = [
2356 (
2357 c,
2358 key_getter(c),
2359 c._default_description_tuple,
2360 self.get_insert_default,
2361 )
2362 for c in compiled.insert_prefetch
2363 ]
2364 elif compiled.update_prefetch:
2365 prefetch_recs = [
2366 (
2367 c,
2368 key_getter(c),
2369 c._onupdate_description_tuple,
2370 self.get_update_default,
2371 )
2372 for c in compiled.update_prefetch
2373 ]
2374 else:
2375 prefetch_recs = []
2376
2377 for param in self.compiled_parameters:
2378 self.current_parameters = param
2379
2380 for (
2381 c,
2382 param_key,
2383 (arg, is_scalar, is_callable, is_sentinel),
2384 fallback,
2385 ) in prefetch_recs:
2386 if is_sentinel:
2387 param[param_key] = sentinel_counter
2388 sentinel_counter += 1
2389 elif is_scalar:
2390 param[param_key] = arg
2391 elif is_callable:
2392 self.current_column = c
2393 param[param_key] = arg(self)
2394 else:
2395 val = fallback(c)
2396 if val is not None:
2397 param[param_key] = val
2398
2399 del self.current_parameters
2400
2401
2402DefaultDialect.execution_ctx_cls = DefaultExecutionContext