1# engine/default.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from .base import Engine
76 from .cursor import ResultFetchStrategy
77 from .interfaces import _CoreMultiExecuteParams
78 from .interfaces import _CoreSingleExecuteParams
79 from .interfaces import _DBAPICursorDescription
80 from .interfaces import _DBAPIMultiExecuteParams
81 from .interfaces import _DBAPISingleExecuteParams
82 from .interfaces import _ExecuteOptions
83 from .interfaces import _MutableCoreSingleExecuteParams
84 from .interfaces import _ParamStyle
85 from .interfaces import ConnectArgsType
86 from .interfaces import DBAPIConnection
87 from .interfaces import DBAPIModule
88 from .interfaces import DBAPIType
89 from .interfaces import IsolationLevel
90 from .row import Row
91 from .url import URL
92 from ..event import _ListenerFnType
93 from ..pool import Pool
94 from ..pool import PoolProxiedConnection
95 from ..sql import Executable
96 from ..sql.compiler import Compiled
97 from ..sql.compiler import Linting
98 from ..sql.compiler import ResultColumnsEntry
99 from ..sql.dml import DMLState
100 from ..sql.dml import UpdateBase
101 from ..sql.elements import BindParameter
102 from ..sql.schema import Column
103 from ..sql.type_api import _BindProcessorType
104 from ..sql.type_api import _ResultProcessorType
105 from ..sql.type_api import TypeEngine
106
107
108# When we're handed literal SQL, ensure it's a SELECT query
109SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
110
111
112(
113 CACHE_HIT,
114 CACHE_MISS,
115 CACHING_DISABLED,
116 NO_CACHE_KEY,
117 NO_DIALECT_SUPPORT,
118) = list(CacheStats)
119
120
121class DefaultDialect(Dialect):
122 """Default implementation of Dialect"""
123
124 statement_compiler = compiler.SQLCompiler
125 ddl_compiler = compiler.DDLCompiler
126 type_compiler_cls = compiler.GenericTypeCompiler
127
128 preparer = compiler.IdentifierPreparer
129 supports_alter = True
130 supports_comments = False
131 supports_constraint_comments = False
132 inline_comments = False
133 supports_statement_cache = True
134
135 div_is_floordiv = True
136
137 bind_typing = interfaces.BindTyping.NONE
138
139 include_set_input_sizes: Optional[Set[Any]] = None
140 exclude_set_input_sizes: Optional[Set[Any]] = None
141
142 # the first value we'd get for an autoincrement column.
143 default_sequence_base = 1
144
145 # most DBAPIs happy with this for execute().
146 # not cx_oracle.
147 execute_sequence_format = tuple
148
149 supports_schemas = True
150 supports_views = True
151 supports_sequences = False
152 sequences_optional = False
153 preexecute_autoincrement_sequences = False
154 supports_identity_columns = False
155 postfetch_lastrowid = True
156 favor_returning_over_lastrowid = False
157 insert_null_pk_still_autoincrements = False
158 update_returning = False
159 delete_returning = False
160 update_returning_multifrom = False
161 delete_returning_multifrom = False
162 insert_returning = False
163
164 aggregate_order_by_style = AggregateOrderByStyle.INLINE
165
166 cte_follows_insert = False
167
168 supports_native_enum = False
169 supports_native_boolean = False
170 supports_native_uuid = False
171 returns_native_bytes = False
172
173 non_native_boolean_check_constraint = True
174
175 supports_simple_order_by_label = True
176
177 tuple_in_values = False
178
179 connection_characteristics = util.immutabledict(
180 {
181 "isolation_level": characteristics.IsolationLevelCharacteristic(),
182 "logging_token": characteristics.LoggingTokenCharacteristic(),
183 }
184 )
185
186 engine_config_types: Mapping[str, Any] = util.immutabledict(
187 {
188 "pool_timeout": util.asint,
189 "echo": util.bool_or_str("debug"),
190 "echo_pool": util.bool_or_str("debug"),
191 "pool_recycle": util.asint,
192 "pool_size": util.asint,
193 "max_overflow": util.asint,
194 "future": util.asbool,
195 }
196 )
197
198 # if the NUMERIC type
199 # returns decimal.Decimal.
200 # *not* the FLOAT type however.
201 supports_native_decimal = False
202
203 name = "default"
204
205 # length at which to truncate
206 # any identifier.
207 max_identifier_length = 9999
208 _user_defined_max_identifier_length: Optional[int] = None
209
210 isolation_level: Optional[str] = None
211
212 # sub-categories of max_identifier_length.
213 # currently these accommodate for MySQL which allows alias names
214 # of 255 but DDL names only of 64.
215 max_index_name_length: Optional[int] = None
216 max_constraint_name_length: Optional[int] = None
217
218 supports_sane_rowcount = True
219 supports_sane_multi_rowcount = True
220 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
221 default_paramstyle = "named"
222
223 supports_default_values = False
224 """dialect supports INSERT... DEFAULT VALUES syntax"""
225
226 supports_default_metavalue = False
227 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
228
229 default_metavalue_token = "DEFAULT"
230 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
231 parenthesis."""
232
233 # not sure if this is a real thing but the compiler will deliver it
234 # if this is the only flag enabled.
235 supports_empty_insert = True
236 """dialect supports INSERT () VALUES ()"""
237
238 supports_multivalues_insert = False
239
240 use_insertmanyvalues: bool = False
241
242 use_insertmanyvalues_wo_returning: bool = False
243
244 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
245 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
246 )
247
248 insertmanyvalues_page_size: int = 1000
249 insertmanyvalues_max_parameters = 32700
250
251 supports_is_distinct_from = True
252
253 supports_server_side_cursors = False
254
255 server_side_cursors = False
256
257 # extra record-level locking features (#4860)
258 supports_for_update_of = False
259
260 server_version_info = None
261
262 default_schema_name: Optional[str] = None
263
264 # indicates symbol names are
265 # UPPERCASED if they are case insensitive
266 # within the database.
267 # if this is True, the methods normalize_name()
268 # and denormalize_name() must be provided.
269 requires_name_normalize = False
270
271 is_async = False
272
273 has_terminate = False
274
275 # TODO: this is not to be part of 2.0. implement rudimentary binary
276 # literals for SQLite, PostgreSQL, MySQL only within
277 # _Binary.literal_processor
278 _legacy_binary_type_literal_encoding = "utf-8"
279
280 @util.deprecated_params(
281 empty_in_strategy=(
282 "1.4",
283 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
284 "deprecated, and no longer has any effect. All IN expressions "
285 "are now rendered using "
286 'the "expanding parameter" strategy which renders a set of bound'
287 'expressions, or an "empty set" SELECT, at statement execution'
288 "time.",
289 ),
290 server_side_cursors=(
291 "1.4",
292 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
293 "is deprecated and will be removed in a future release. Please "
294 "use the "
295 ":paramref:`_engine.Connection.execution_options.stream_results` "
296 "parameter.",
297 ),
298 )
299 def __init__(
300 self,
301 paramstyle: Optional[_ParamStyle] = None,
302 isolation_level: Optional[IsolationLevel] = None,
303 dbapi: Optional[DBAPIModule] = None,
304 implicit_returning: Literal[True] = True,
305 supports_native_boolean: Optional[bool] = None,
306 max_identifier_length: Optional[int] = None,
307 label_length: Optional[int] = None,
308 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
309 use_insertmanyvalues: Optional[bool] = None,
310 # util.deprecated_params decorator cannot render the
311 # Linting.NO_LINTING constant
312 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
313 server_side_cursors: bool = False,
314 skip_autocommit_rollback: bool = False,
315 **kwargs: Any,
316 ):
317 if server_side_cursors:
318 if not self.supports_server_side_cursors:
319 raise exc.ArgumentError(
320 "Dialect %s does not support server side cursors" % self
321 )
322 else:
323 self.server_side_cursors = True
324
325 if getattr(self, "use_setinputsizes", False):
326 util.warn_deprecated(
327 "The dialect-level use_setinputsizes attribute is "
328 "deprecated. Please use "
329 "bind_typing = BindTyping.SETINPUTSIZES",
330 "2.0",
331 )
332 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
333
334 self.positional = False
335 self._ischema = None
336
337 self.dbapi = dbapi
338
339 self.skip_autocommit_rollback = skip_autocommit_rollback
340
341 if paramstyle is not None:
342 self.paramstyle = paramstyle
343 elif self.dbapi is not None:
344 self.paramstyle = self.dbapi.paramstyle
345 else:
346 self.paramstyle = self.default_paramstyle
347 self.positional = self.paramstyle in (
348 "qmark",
349 "format",
350 "numeric",
351 "numeric_dollar",
352 )
353 self.identifier_preparer = self.preparer(self)
354 self._on_connect_isolation_level = isolation_level
355
356 legacy_tt_callable = getattr(self, "type_compiler", None)
357 if legacy_tt_callable is not None:
358 tt_callable = cast(
359 Type[compiler.GenericTypeCompiler],
360 self.type_compiler,
361 )
362 else:
363 tt_callable = self.type_compiler_cls
364
365 self.type_compiler_instance = self.type_compiler = tt_callable(self)
366
367 if supports_native_boolean is not None:
368 self.supports_native_boolean = supports_native_boolean
369
370 self._user_defined_max_identifier_length = max_identifier_length
371 if self._user_defined_max_identifier_length:
372 self.max_identifier_length = (
373 self._user_defined_max_identifier_length
374 )
375 self.label_length = label_length
376 self.compiler_linting = compiler_linting
377
378 if use_insertmanyvalues is not None:
379 self.use_insertmanyvalues = use_insertmanyvalues
380
381 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
382 self.insertmanyvalues_page_size = insertmanyvalues_page_size
383
384 @property
385 @util.deprecated(
386 "2.0",
387 "full_returning is deprecated, please use insert_returning, "
388 "update_returning, delete_returning",
389 )
390 def full_returning(self):
391 return (
392 self.insert_returning
393 and self.update_returning
394 and self.delete_returning
395 )
396
397 @util.memoized_property
398 def insert_executemany_returning(self):
399 """Default implementation for insert_executemany_returning, if not
400 otherwise overridden by the specific dialect.
401
402 The default dialect determines "insert_executemany_returning" is
403 available if the dialect in use has opted into using the
404 "use_insertmanyvalues" feature. If they haven't opted into that, then
405 this attribute is False, unless the dialect in question overrides this
406 and provides some other implementation (such as the Oracle Database
407 dialects).
408
409 """
410 return self.insert_returning and self.use_insertmanyvalues
411
412 @util.memoized_property
413 def insert_executemany_returning_sort_by_parameter_order(self):
414 """Default implementation for
415 insert_executemany_returning_deterministic_order, if not otherwise
416 overridden by the specific dialect.
417
418 The default dialect determines "insert_executemany_returning" can have
419 deterministic order only if the dialect in use has opted into using the
420 "use_insertmanyvalues" feature, which implements deterministic ordering
421 using client side sentinel columns only by default. The
422 "insertmanyvalues" feature also features alternate forms that can
423 use server-generated PK values as "sentinels", but those are only
424 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
425 bitflag enables those alternate SQL forms, which are disabled
426 by default.
427
428 If the dialect in use hasn't opted into that, then this attribute is
429 False, unless the dialect in question overrides this and provides some
430 other implementation (such as the Oracle Database dialects).
431
432 """
433 return self.insert_returning and self.use_insertmanyvalues
434
435 update_executemany_returning = False
436 delete_executemany_returning = False
437
438 @util.memoized_property
439 def loaded_dbapi(self) -> DBAPIModule:
440 if self.dbapi is None:
441 raise exc.InvalidRequestError(
442 f"Dialect {self} does not have a Python DBAPI established "
443 "and cannot be used for actual database interaction"
444 )
445 return self.dbapi
446
447 @util.memoized_property
448 def _bind_typing_render_casts(self):
449 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
450
451 def _ensure_has_table_connection(self, arg: Connection) -> None:
452 if not isinstance(arg, Connection):
453 raise exc.ArgumentError(
454 "The argument passed to Dialect.has_table() should be a "
455 "%s, got %s. "
456 "Additionally, the Dialect.has_table() method is for "
457 "internal dialect "
458 "use only; please use "
459 "``inspect(some_engine).has_table(<tablename>>)`` "
460 "for public API use." % (Connection, type(arg))
461 )
462
463 @util.memoized_property
464 def _supports_statement_cache(self):
465 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
466 if ssc is None:
467 util.warn(
468 "Dialect %s:%s will not make use of SQL compilation caching "
469 "as it does not set the 'supports_statement_cache' attribute "
470 "to ``True``. This can have "
471 "significant performance implications including some "
472 "performance degradations in comparison to prior SQLAlchemy "
473 "versions. Dialect maintainers should seek to set this "
474 "attribute to True after appropriate development and testing "
475 "for SQLAlchemy 1.4 caching support. Alternatively, this "
476 "attribute may be set to False which will disable this "
477 "warning." % (self.name, self.driver),
478 code="cprf",
479 )
480
481 return bool(ssc)
482
483 @util.memoized_property
484 def _type_memos(self):
485 return weakref.WeakKeyDictionary()
486
487 @property
488 def dialect_description(self): # type: ignore[override]
489 return self.name + "+" + self.driver
490
491 @property
492 def supports_sane_rowcount_returning(self):
493 """True if this dialect supports sane rowcount even if RETURNING is
494 in use.
495
496 For dialects that don't support RETURNING, this is synonymous with
497 ``supports_sane_rowcount``.
498
499 """
500 return self.supports_sane_rowcount
501
502 @classmethod
503 def get_pool_class(cls, url: URL) -> Type[Pool]:
504 default: Type[pool.Pool]
505 if cls.is_async:
506 default = pool.AsyncAdaptedQueuePool
507 else:
508 default = pool.QueuePool
509
510 return getattr(cls, "poolclass", default)
511
512 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
513 return self.get_pool_class(url)
514
515 @classmethod
516 def load_provisioning(cls):
517 package = ".".join(cls.__module__.split(".")[0:-1])
518 try:
519 __import__(package + ".provision")
520 except ImportError:
521 pass
522
523 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
524 if self._on_connect_isolation_level is not None:
525
526 def builtin_connect(dbapi_conn, conn_rec):
527 self._assert_and_set_isolation_level(
528 dbapi_conn, self._on_connect_isolation_level
529 )
530
531 return builtin_connect
532 else:
533 return None
534
535 def initialize(self, connection: Connection) -> None:
536 try:
537 self.server_version_info = self._get_server_version_info(
538 connection
539 )
540 except NotImplementedError:
541 self.server_version_info = None
542 try:
543 self.default_schema_name = self._get_default_schema_name(
544 connection
545 )
546 except NotImplementedError:
547 self.default_schema_name = None
548
549 try:
550 self.default_isolation_level = self.get_default_isolation_level(
551 connection.connection.dbapi_connection
552 )
553 except NotImplementedError:
554 self.default_isolation_level = None
555
556 if not self._user_defined_max_identifier_length:
557 max_ident_length = self._check_max_identifier_length(connection)
558 if max_ident_length:
559 self.max_identifier_length = max_ident_length
560
561 if (
562 self.label_length
563 and self.label_length > self.max_identifier_length
564 ):
565 raise exc.ArgumentError(
566 "Label length of %d is greater than this dialect's"
567 " maximum identifier length of %d"
568 % (self.label_length, self.max_identifier_length)
569 )
570
571 def on_connect(self) -> Optional[Callable[[Any], None]]:
572 # inherits the docstring from interfaces.Dialect.on_connect
573 return None
574
575 def _check_max_identifier_length(self, connection):
576 """Perform a connection / server version specific check to determine
577 the max_identifier_length.
578
579 If the dialect's class level max_identifier_length should be used,
580 can return None.
581
582 """
583 return None
584
585 def get_default_isolation_level(self, dbapi_conn):
586 """Given a DBAPI connection, return its isolation level, or
587 a default isolation level if one cannot be retrieved.
588
589 May be overridden by subclasses in order to provide a
590 "fallback" isolation level for databases that cannot reliably
591 retrieve the actual isolation level.
592
593 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
594 method, propagating any exceptions raised.
595
596 """
597 return self.get_isolation_level(dbapi_conn)
598
599 def type_descriptor(self, typeobj):
600 """Provide a database-specific :class:`.TypeEngine` object, given
601 the generic object which comes from the types module.
602
603 This method looks for a dictionary called
604 ``colspecs`` as a class or instance-level variable,
605 and passes on to :func:`_types.adapt_type`.
606
607 """
608 return type_api.adapt_type(typeobj, self.colspecs)
609
610 def has_index(self, connection, table_name, index_name, schema=None, **kw):
611 if not self.has_table(connection, table_name, schema=schema, **kw):
612 return False
613 for idx in self.get_indexes(
614 connection, table_name, schema=schema, **kw
615 ):
616 if idx["name"] == index_name:
617 return True
618 else:
619 return False
620
621 def has_schema(
622 self, connection: Connection, schema_name: str, **kw: Any
623 ) -> bool:
624 return schema_name in self.get_schema_names(connection, **kw)
625
626 def validate_identifier(self, ident: str) -> None:
627 if len(ident) > self.max_identifier_length:
628 raise exc.IdentifierError(
629 "Identifier '%s' exceeds maximum length of %d characters"
630 % (ident, self.max_identifier_length)
631 )
632
633 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
634 # inherits the docstring from interfaces.Dialect.connect
635 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
636
637 def create_connect_args(self, url: URL) -> ConnectArgsType:
638 # inherits the docstring from interfaces.Dialect.create_connect_args
639 opts = url.translate_connect_args()
640 opts.update(url.query)
641 return ([], opts)
642
643 def set_engine_execution_options(
644 self, engine: Engine, opts: Mapping[str, Any]
645 ) -> None:
646 supported_names = set(self.connection_characteristics).intersection(
647 opts
648 )
649 if supported_names:
650 characteristics: Mapping[str, Any] = util.immutabledict(
651 (name, opts[name]) for name in supported_names
652 )
653
654 @event.listens_for(engine, "engine_connect")
655 def set_connection_characteristics(connection):
656 self._set_connection_characteristics(
657 connection, characteristics
658 )
659
660 def set_connection_execution_options(
661 self, connection: Connection, opts: Mapping[str, Any]
662 ) -> None:
663 supported_names = set(self.connection_characteristics).intersection(
664 opts
665 )
666 if supported_names:
667 characteristics: Mapping[str, Any] = util.immutabledict(
668 (name, opts[name]) for name in supported_names
669 )
670 self._set_connection_characteristics(connection, characteristics)
671
672 def _set_connection_characteristics(self, connection, characteristics):
673 characteristic_values = [
674 (name, self.connection_characteristics[name], value)
675 for name, value in characteristics.items()
676 ]
677
678 if connection.in_transaction():
679 trans_objs = [
680 (name, obj)
681 for name, obj, _ in characteristic_values
682 if obj.transactional
683 ]
684 if trans_objs:
685 raise exc.InvalidRequestError(
686 "This connection has already initialized a SQLAlchemy "
687 "Transaction() object via begin() or autobegin; "
688 "%s may not be altered unless rollback() or commit() "
689 "is called first."
690 % (", ".join(name for name, obj in trans_objs))
691 )
692
693 dbapi_connection = connection.connection.dbapi_connection
694 for _, characteristic, value in characteristic_values:
695 characteristic.set_connection_characteristic(
696 self, connection, dbapi_connection, value
697 )
698 connection.connection._connection_record.finalize_callback.append(
699 functools.partial(self._reset_characteristics, characteristics)
700 )
701
702 def _reset_characteristics(self, characteristics, dbapi_connection):
703 for characteristic_name in characteristics:
704 characteristic = self.connection_characteristics[
705 characteristic_name
706 ]
707 characteristic.reset_characteristic(self, dbapi_connection)
708
709 def do_begin(self, dbapi_connection):
710 pass
711
712 def do_rollback(self, dbapi_connection):
713 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
714 dbapi_connection
715 ):
716 return
717 dbapi_connection.rollback()
718
719 def do_commit(self, dbapi_connection):
720 dbapi_connection.commit()
721
722 def do_terminate(self, dbapi_connection):
723 self.do_close(dbapi_connection)
724
725 def do_close(self, dbapi_connection):
726 dbapi_connection.close()
727
728 @util.memoized_property
729 def _dialect_specific_select_one(self):
730 return str(expression.select(1).compile(dialect=self))
731
732 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
733 try:
734 return self.do_ping(dbapi_connection)
735 except self.loaded_dbapi.Error as err:
736 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
737
738 if self._has_events:
739 try:
740 Connection._handle_dbapi_exception_noconnection(
741 err,
742 self,
743 is_disconnect=is_disconnect,
744 invalidate_pool_on_disconnect=False,
745 is_pre_ping=True,
746 )
747 except exc.StatementError as new_err:
748 is_disconnect = new_err.connection_invalidated
749
750 if is_disconnect:
751 return False
752 else:
753 raise
754
755 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
756 cursor = dbapi_connection.cursor()
757 try:
758 cursor.execute(self._dialect_specific_select_one)
759 finally:
760 cursor.close()
761 return True
762
763 def create_xid(self):
764 """Create a random two-phase transaction ID.
765
766 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
767 do_commit_twophase(). Its format is unspecified.
768 """
769
770 return "_sa_%032x" % random.randint(0, 2**128)
771
772 def do_savepoint(self, connection, name):
773 connection.execute(expression.SavepointClause(name))
774
775 def do_rollback_to_savepoint(self, connection, name):
776 connection.execute(expression.RollbackToSavepointClause(name))
777
778 def do_release_savepoint(self, connection, name):
779 connection.execute(expression.ReleaseSavepointClause(name))
780
781 def _deliver_insertmanyvalues_batches(
782 self,
783 connection,
784 cursor,
785 statement,
786 parameters,
787 generic_setinputsizes,
788 context,
789 ):
790 context = cast(DefaultExecutionContext, context)
791 compiled = cast(SQLCompiler, context.compiled)
792
793 _composite_sentinel_proc: Sequence[
794 Optional[_ResultProcessorType[Any]]
795 ] = ()
796 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
797 _sentinel_proc_initialized: bool = False
798
799 compiled_parameters = context.compiled_parameters
800
801 imv = compiled._insertmanyvalues
802 assert imv is not None
803
804 is_returning: Final[bool] = bool(compiled.effective_returning)
805 batch_size = context.execution_options.get(
806 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
807 )
808
809 if compiled.schema_translate_map:
810 schema_translate_map = context.execution_options.get(
811 "schema_translate_map", {}
812 )
813 else:
814 schema_translate_map = None
815
816 if is_returning:
817 result: Optional[List[Any]] = []
818 context._insertmanyvalues_rows = result
819
820 sort_by_parameter_order = imv.sort_by_parameter_order
821
822 else:
823 sort_by_parameter_order = False
824 result = None
825
826 for imv_batch in compiled._deliver_insertmanyvalues_batches(
827 statement,
828 parameters,
829 compiled_parameters,
830 generic_setinputsizes,
831 batch_size,
832 sort_by_parameter_order,
833 schema_translate_map,
834 ):
835 yield imv_batch
836
837 if is_returning:
838
839 try:
840 rows = context.fetchall_for_returning(cursor)
841 except BaseException as be:
842 connection._handle_dbapi_exception(
843 be,
844 sql_util._long_statement(imv_batch.replaced_statement),
845 imv_batch.replaced_parameters,
846 None,
847 context,
848 is_sub_exec=True,
849 )
850
851 # I would have thought "is_returning: Final[bool]"
852 # would have assured this but pylance thinks not
853 assert result is not None
854
855 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
856 composite_sentinel = imv.num_sentinel_columns > 1
857 if imv.implicit_sentinel:
858 # for implicit sentinel, which is currently single-col
859 # integer autoincrement, do a simple sort.
860 assert not composite_sentinel
861 result.extend(
862 sorted(rows, key=operator.itemgetter(-1))
863 )
864 continue
865
866 # otherwise, create dictionaries to match up batches
867 # with parameters
868 assert imv.sentinel_param_keys
869 assert imv.sentinel_columns
870
871 _nsc = imv.num_sentinel_columns
872
873 if not _sentinel_proc_initialized:
874 if composite_sentinel:
875 _composite_sentinel_proc = [
876 col.type._cached_result_processor(
877 self, cursor_desc[1]
878 )
879 for col, cursor_desc in zip(
880 imv.sentinel_columns,
881 cursor.description[-_nsc:],
882 )
883 ]
884 else:
885 _scalar_sentinel_proc = (
886 imv.sentinel_columns[0]
887 ).type._cached_result_processor(
888 self, cursor.description[-1][1]
889 )
890 _sentinel_proc_initialized = True
891
892 rows_by_sentinel: Union[
893 Dict[Tuple[Any, ...], Any],
894 Dict[Any, Any],
895 ]
896
897 if composite_sentinel:
898 rows_by_sentinel = {
899 tuple(
900 (proc(val) if proc else val)
901 for val, proc in zip(
902 row[-_nsc:], _composite_sentinel_proc
903 )
904 ): row
905 for row in rows
906 }
907 elif _scalar_sentinel_proc:
908 rows_by_sentinel = {
909 _scalar_sentinel_proc(row[-1]): row for row in rows
910 }
911 else:
912 rows_by_sentinel = {row[-1]: row for row in rows}
913
914 if len(rows_by_sentinel) != len(imv_batch.batch):
915 # see test_insert_exec.py::
916 # IMVSentinelTest::test_sentinel_incorrect_rowcount
917 # for coverage / demonstration
918 raise exc.InvalidRequestError(
919 f"Sentinel-keyed result set did not produce "
920 f"correct number of rows {len(imv_batch.batch)}; "
921 "produced "
922 f"{len(rows_by_sentinel)}. Please ensure the "
923 "sentinel column is fully unique and populated in "
924 "all cases."
925 )
926
927 try:
928 ordered_rows = [
929 rows_by_sentinel[sentinel_keys]
930 for sentinel_keys in imv_batch.sentinel_values
931 ]
932 except KeyError as ke:
933 # see test_insert_exec.py::
934 # IMVSentinelTest::test_sentinel_cant_match_keys
935 # for coverage / demonstration
936 raise exc.InvalidRequestError(
937 f"Can't match sentinel values in result set to "
938 f"parameter sets; key {ke.args[0]!r} was not "
939 "found. "
940 "There may be a mismatch between the datatype "
941 "passed to the DBAPI driver vs. that which it "
942 "returns in a result row. Ensure the given "
943 "Python value matches the expected result type "
944 "*exactly*, taking care to not rely upon implicit "
945 "conversions which may occur such as when using "
946 "strings in place of UUID or integer values, etc. "
947 ) from ke
948
949 result.extend(ordered_rows)
950
951 else:
952 result.extend(rows)
953
954 def do_executemany(self, cursor, statement, parameters, context=None):
955 cursor.executemany(statement, parameters)
956
957 def do_execute(self, cursor, statement, parameters, context=None):
958 cursor.execute(statement, parameters)
959
960 def do_execute_no_params(self, cursor, statement, context=None):
961 cursor.execute(statement)
962
963 def is_disconnect(
964 self,
965 e: DBAPIModule.Error,
966 connection: Union[
967 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
968 ],
969 cursor: Optional[interfaces.DBAPICursor],
970 ) -> bool:
971 return False
972
973 @util.memoized_instancemethod
974 def _gen_allowed_isolation_levels(self, dbapi_conn):
975 try:
976 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
977 except NotImplementedError:
978 return None
979 else:
980 normalized_levels = [
981 level.replace("_", " ").upper() for level in raw_levels
982 ]
983 if raw_levels != normalized_levels:
984 raise ValueError(
985 f"Dialect {self.name!r} get_isolation_level_values() "
986 f"method should return names as UPPERCASE using spaces, "
987 f"not underscores; got "
988 f"{sorted(set(raw_levels).difference(normalized_levels))}"
989 )
990 return tuple(normalized_levels)
991
992 def _assert_and_set_isolation_level(self, dbapi_conn, level):
993 level = level.replace("_", " ").upper()
994
995 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
996 dbapi_conn
997 )
998 if (
999 _allowed_isolation_levels
1000 and level not in _allowed_isolation_levels
1001 ):
1002 raise exc.ArgumentError(
1003 f"Invalid value {level!r} for isolation_level. "
1004 f"Valid isolation levels for {self.name!r} are "
1005 f"{', '.join(_allowed_isolation_levels)}"
1006 )
1007
1008 self.set_isolation_level(dbapi_conn, level)
1009
1010 def reset_isolation_level(self, dbapi_conn):
1011 if self._on_connect_isolation_level is not None:
1012 assert (
1013 self._on_connect_isolation_level == "AUTOCOMMIT"
1014 or self._on_connect_isolation_level
1015 == self.default_isolation_level
1016 )
1017 self._assert_and_set_isolation_level(
1018 dbapi_conn, self._on_connect_isolation_level
1019 )
1020 else:
1021 assert self.default_isolation_level is not None
1022 self._assert_and_set_isolation_level(
1023 dbapi_conn,
1024 self.default_isolation_level,
1025 )
1026
1027 def normalize_name(self, name):
1028 if name is None:
1029 return None
1030
1031 name_lower = name.lower()
1032 name_upper = name.upper()
1033
1034 if name_upper == name_lower:
1035 # name has no upper/lower conversion, e.g. non-european characters.
1036 # return unchanged
1037 return name
1038 elif name_upper == name and not (
1039 self.identifier_preparer._requires_quotes
1040 )(name_lower):
1041 # name is all uppercase and doesn't require quoting; normalize
1042 # to all lower case
1043 return name_lower
1044 elif name_lower == name:
1045 # name is all lower case, which if denormalized means we need to
1046 # force quoting on it
1047 return quoted_name(name, quote=True)
1048 else:
1049 # name is mixed case, means it will be quoted in SQL when used
1050 # later, no normalizes
1051 return name
1052
1053 def denormalize_name(self, name):
1054 if name is None:
1055 return None
1056
1057 name_lower = name.lower()
1058 name_upper = name.upper()
1059
1060 if name_upper == name_lower:
1061 # name has no upper/lower conversion, e.g. non-european characters.
1062 # return unchanged
1063 return name
1064 elif name_lower == name and not (
1065 self.identifier_preparer._requires_quotes
1066 )(name_lower):
1067 name = name_upper
1068 return name
1069
1070 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1071 return connection
1072
1073 def _overrides_default(self, method):
1074 return (
1075 getattr(type(self), method).__code__
1076 is not getattr(DefaultDialect, method).__code__
1077 )
1078
1079 def _default_multi_reflect(
1080 self,
1081 single_tbl_method,
1082 connection,
1083 kind,
1084 schema,
1085 filter_names,
1086 scope,
1087 **kw,
1088 ):
1089 names_fns = []
1090 temp_names_fns = []
1091 if ObjectKind.TABLE in kind:
1092 names_fns.append(self.get_table_names)
1093 temp_names_fns.append(self.get_temp_table_names)
1094 if ObjectKind.VIEW in kind:
1095 names_fns.append(self.get_view_names)
1096 temp_names_fns.append(self.get_temp_view_names)
1097 if ObjectKind.MATERIALIZED_VIEW in kind:
1098 names_fns.append(self.get_materialized_view_names)
1099 # no temp materialized view at the moment
1100 # temp_names_fns.append(self.get_temp_materialized_view_names)
1101
1102 unreflectable = kw.pop("unreflectable", {})
1103
1104 if (
1105 filter_names
1106 and scope is ObjectScope.ANY
1107 and kind is ObjectKind.ANY
1108 ):
1109 # if names are given and no qualification on type of table
1110 # (i.e. the Table(..., autoload) case), take the names as given,
1111 # don't run names queries. If a table does not exit
1112 # NoSuchTableError is raised and it's skipped
1113
1114 # this also suits the case for mssql where we can reflect
1115 # individual temp tables but there's no temp_names_fn
1116 names = filter_names
1117 else:
1118 names = []
1119 name_kw = {"schema": schema, **kw}
1120 fns = []
1121 if ObjectScope.DEFAULT in scope:
1122 fns.extend(names_fns)
1123 if ObjectScope.TEMPORARY in scope:
1124 fns.extend(temp_names_fns)
1125
1126 for fn in fns:
1127 try:
1128 names.extend(fn(connection, **name_kw))
1129 except NotImplementedError:
1130 pass
1131
1132 if filter_names:
1133 filter_names = set(filter_names)
1134
1135 # iterate over all the tables/views and call the single table method
1136 for table in names:
1137 if not filter_names or table in filter_names:
1138 key = (schema, table)
1139 try:
1140 yield (
1141 key,
1142 single_tbl_method(
1143 connection, table, schema=schema, **kw
1144 ),
1145 )
1146 except exc.UnreflectableTableError as err:
1147 if key not in unreflectable:
1148 unreflectable[key] = err
1149 except exc.NoSuchTableError:
1150 pass
1151
1152 def get_multi_table_options(self, connection, **kw):
1153 return self._default_multi_reflect(
1154 self.get_table_options, connection, **kw
1155 )
1156
1157 def get_multi_columns(self, connection, **kw):
1158 return self._default_multi_reflect(self.get_columns, connection, **kw)
1159
1160 def get_multi_pk_constraint(self, connection, **kw):
1161 return self._default_multi_reflect(
1162 self.get_pk_constraint, connection, **kw
1163 )
1164
1165 def get_multi_foreign_keys(self, connection, **kw):
1166 return self._default_multi_reflect(
1167 self.get_foreign_keys, connection, **kw
1168 )
1169
1170 def get_multi_indexes(self, connection, **kw):
1171 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1172
1173 def get_multi_unique_constraints(self, connection, **kw):
1174 return self._default_multi_reflect(
1175 self.get_unique_constraints, connection, **kw
1176 )
1177
1178 def get_multi_check_constraints(self, connection, **kw):
1179 return self._default_multi_reflect(
1180 self.get_check_constraints, connection, **kw
1181 )
1182
1183 def get_multi_table_comment(self, connection, **kw):
1184 return self._default_multi_reflect(
1185 self.get_table_comment, connection, **kw
1186 )
1187
1188
1189class StrCompileDialect(DefaultDialect):
1190 statement_compiler = compiler.StrSQLCompiler
1191 ddl_compiler = compiler.DDLCompiler
1192 type_compiler_cls = compiler.StrSQLTypeCompiler
1193 preparer = compiler.IdentifierPreparer
1194
1195 insert_returning = True
1196 update_returning = True
1197 delete_returning = True
1198
1199 supports_statement_cache = True
1200
1201 supports_identity_columns = True
1202
1203 supports_sequences = True
1204 sequences_optional = True
1205 preexecute_autoincrement_sequences = False
1206
1207 supports_native_boolean = True
1208
1209 supports_multivalues_insert = True
1210 supports_simple_order_by_label = True
1211
1212
1213class DefaultExecutionContext(ExecutionContext):
1214 isinsert = False
1215 isupdate = False
1216 isdelete = False
1217 is_crud = False
1218 is_text = False
1219 isddl = False
1220
1221 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1222
1223 compiled: Optional[Compiled] = None
1224 result_column_struct: Optional[
1225 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1226 ] = None
1227 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1228
1229 execution_options: _ExecuteOptions = util.EMPTY_DICT
1230
1231 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1232
1233 invoked_statement: Optional[Executable] = None
1234
1235 _is_implicit_returning = False
1236 _is_explicit_returning = False
1237 _is_supplemental_returning = False
1238 _is_server_side = False
1239
1240 _soft_closed = False
1241
1242 _rowcount: Optional[int] = None
1243
1244 # a hook for SQLite's translation of
1245 # result column names
1246 # NOTE: pyhive is using this hook, can't remove it :(
1247 _translate_colname: Optional[
1248 Callable[[str], Tuple[str, Optional[str]]]
1249 ] = None
1250
1251 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1252 """used by set_input_sizes().
1253
1254 This collection comes from ``ExpandedState.parameter_expansion``.
1255
1256 """
1257
1258 cache_hit = NO_CACHE_KEY
1259
1260 root_connection: Connection
1261 _dbapi_connection: PoolProxiedConnection
1262 dialect: Dialect
1263 unicode_statement: str
1264 cursor: DBAPICursor
1265 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1266 parameters: _DBAPIMultiExecuteParams
1267 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1268
1269 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1270
1271 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1272 _num_sentinel_cols: int = 0
1273
1274 @classmethod
1275 def _init_ddl(
1276 cls,
1277 dialect: Dialect,
1278 connection: Connection,
1279 dbapi_connection: PoolProxiedConnection,
1280 execution_options: _ExecuteOptions,
1281 compiled_ddl: DDLCompiler,
1282 ) -> ExecutionContext:
1283 """Initialize execution context for an ExecutableDDLElement
1284 construct."""
1285
1286 self = cls.__new__(cls)
1287 self.root_connection = connection
1288 self._dbapi_connection = dbapi_connection
1289 self.dialect = connection.dialect
1290
1291 self.compiled = compiled = compiled_ddl
1292 self.isddl = True
1293
1294 self.execution_options = execution_options
1295
1296 self.unicode_statement = str(compiled)
1297 if compiled.schema_translate_map:
1298 schema_translate_map = self.execution_options.get(
1299 "schema_translate_map", {}
1300 )
1301
1302 rst = compiled.preparer._render_schema_translates
1303 self.unicode_statement = rst(
1304 self.unicode_statement, schema_translate_map
1305 )
1306
1307 self.statement = self.unicode_statement
1308
1309 self.cursor = self.create_cursor()
1310 self.compiled_parameters = []
1311
1312 if dialect.positional:
1313 self.parameters = [dialect.execute_sequence_format()]
1314 else:
1315 self.parameters = [self._empty_dict_params]
1316
1317 return self
1318
1319 @classmethod
1320 def _init_compiled(
1321 cls,
1322 dialect: Dialect,
1323 connection: Connection,
1324 dbapi_connection: PoolProxiedConnection,
1325 execution_options: _ExecuteOptions,
1326 compiled: SQLCompiler,
1327 parameters: _CoreMultiExecuteParams,
1328 invoked_statement: Executable,
1329 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1330 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1331 param_dict: _CoreSingleExecuteParams | None = None,
1332 ) -> ExecutionContext:
1333 """Initialize execution context for a Compiled construct."""
1334
1335 self = cls.__new__(cls)
1336 self.root_connection = connection
1337 self._dbapi_connection = dbapi_connection
1338 self.dialect = connection.dialect
1339 self.extracted_parameters = extracted_parameters
1340 self.invoked_statement = invoked_statement
1341 self.compiled = compiled
1342 self.cache_hit = cache_hit
1343
1344 self.execution_options = execution_options
1345
1346 self.result_column_struct = (
1347 compiled._result_columns,
1348 compiled._ordered_columns,
1349 compiled._textual_ordered_columns,
1350 compiled._ad_hoc_textual,
1351 compiled._loose_column_name_matching,
1352 )
1353
1354 self.isinsert = ii = compiled.isinsert
1355 self.isupdate = iu = compiled.isupdate
1356 self.isdelete = id_ = compiled.isdelete
1357 self.is_text = compiled.isplaintext
1358
1359 if ii or iu or id_:
1360 dml_statement = compiled.compile_state.statement # type: ignore
1361 if TYPE_CHECKING:
1362 assert isinstance(dml_statement, UpdateBase)
1363 self.is_crud = True
1364 self._is_explicit_returning = ier = bool(dml_statement._returning)
1365 self._is_implicit_returning = iir = bool(
1366 compiled.implicit_returning
1367 )
1368 if iir and dml_statement._supplemental_returning:
1369 self._is_supplemental_returning = True
1370
1371 # dont mix implicit and explicit returning
1372 assert not (iir and ier)
1373
1374 if (ier or iir) and compiled.for_executemany:
1375 if ii and not self.dialect.insert_executemany_returning:
1376 raise exc.InvalidRequestError(
1377 f"Dialect {self.dialect.dialect_description} with "
1378 f"current server capabilities does not support "
1379 "INSERT..RETURNING when executemany is used"
1380 )
1381 elif (
1382 ii
1383 and dml_statement._sort_by_parameter_order
1384 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1385 ):
1386 raise exc.InvalidRequestError(
1387 f"Dialect {self.dialect.dialect_description} with "
1388 f"current server capabilities does not support "
1389 "INSERT..RETURNING with deterministic row ordering "
1390 "when executemany is used"
1391 )
1392 elif (
1393 ii
1394 and self.dialect.use_insertmanyvalues
1395 and not compiled._insertmanyvalues
1396 ):
1397 raise exc.InvalidRequestError(
1398 'Statement does not have "insertmanyvalues" '
1399 "enabled, can't use INSERT..RETURNING with "
1400 "executemany in this case."
1401 )
1402 elif iu and not self.dialect.update_executemany_returning:
1403 raise exc.InvalidRequestError(
1404 f"Dialect {self.dialect.dialect_description} with "
1405 f"current server capabilities does not support "
1406 "UPDATE..RETURNING when executemany is used"
1407 )
1408 elif id_ and not self.dialect.delete_executemany_returning:
1409 raise exc.InvalidRequestError(
1410 f"Dialect {self.dialect.dialect_description} with "
1411 f"current server capabilities does not support "
1412 "DELETE..RETURNING when executemany is used"
1413 )
1414
1415 if not parameters:
1416 self.compiled_parameters = [
1417 compiled.construct_params(
1418 extracted_parameters=extracted_parameters,
1419 escape_names=False,
1420 _collected_params=param_dict,
1421 )
1422 ]
1423 else:
1424 self.compiled_parameters = [
1425 compiled.construct_params(
1426 m,
1427 escape_names=False,
1428 _group_number=grp,
1429 extracted_parameters=extracted_parameters,
1430 _collected_params=param_dict,
1431 )
1432 for grp, m in enumerate(parameters)
1433 ]
1434
1435 if len(parameters) > 1:
1436 if self.isinsert and compiled._insertmanyvalues:
1437 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1438
1439 imv = compiled._insertmanyvalues
1440 if imv.sentinel_columns is not None:
1441 self._num_sentinel_cols = imv.num_sentinel_columns
1442 else:
1443 self.execute_style = ExecuteStyle.EXECUTEMANY
1444
1445 self.unicode_statement = compiled.string
1446
1447 self.cursor = self.create_cursor()
1448
1449 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1450 self._process_execute_defaults()
1451
1452 processors = compiled._bind_processors
1453
1454 flattened_processors: Mapping[
1455 str, _BindProcessorType[Any]
1456 ] = processors # type: ignore[assignment]
1457
1458 if compiled.literal_execute_params or compiled.post_compile_params:
1459 if self.executemany:
1460 raise exc.InvalidRequestError(
1461 "'literal_execute' or 'expanding' parameters can't be "
1462 "used with executemany()"
1463 )
1464
1465 expanded_state = compiled._process_parameters_for_postcompile(
1466 self.compiled_parameters[0]
1467 )
1468
1469 # re-assign self.unicode_statement
1470 self.unicode_statement = expanded_state.statement
1471
1472 self._expanded_parameters = expanded_state.parameter_expansion
1473
1474 flattened_processors = dict(processors) # type: ignore
1475 flattened_processors.update(expanded_state.processors)
1476 positiontup = expanded_state.positiontup
1477 elif compiled.positional:
1478 positiontup = self.compiled.positiontup
1479 else:
1480 positiontup = None
1481
1482 if compiled.schema_translate_map:
1483 schema_translate_map = self.execution_options.get(
1484 "schema_translate_map", {}
1485 )
1486 rst = compiled.preparer._render_schema_translates
1487 self.unicode_statement = rst(
1488 self.unicode_statement, schema_translate_map
1489 )
1490
1491 # final self.unicode_statement is now assigned, encode if needed
1492 # by dialect
1493 self.statement = self.unicode_statement
1494
1495 # Convert the dictionary of bind parameter values
1496 # into a dict or list to be sent to the DBAPI's
1497 # execute() or executemany() method.
1498
1499 if compiled.positional:
1500 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1501 assert positiontup is not None
1502 for compiled_params in self.compiled_parameters:
1503 l_param: List[Any] = [
1504 (
1505 flattened_processors[key](compiled_params[key])
1506 if key in flattened_processors
1507 else compiled_params[key]
1508 )
1509 for key in positiontup
1510 ]
1511 core_positional_parameters.append(
1512 dialect.execute_sequence_format(l_param)
1513 )
1514
1515 self.parameters = core_positional_parameters
1516 else:
1517 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1518 escaped_names = compiled.escaped_bind_names
1519
1520 # note that currently, "expanded" parameters will be present
1521 # in self.compiled_parameters in their quoted form. This is
1522 # slightly inconsistent with the approach taken as of
1523 # #8056 where self.compiled_parameters is meant to contain unquoted
1524 # param names.
1525 d_param: Dict[str, Any]
1526 for compiled_params in self.compiled_parameters:
1527 if escaped_names:
1528 d_param = {
1529 escaped_names.get(key, key): (
1530 flattened_processors[key](compiled_params[key])
1531 if key in flattened_processors
1532 else compiled_params[key]
1533 )
1534 for key in compiled_params
1535 }
1536 else:
1537 d_param = {
1538 key: (
1539 flattened_processors[key](compiled_params[key])
1540 if key in flattened_processors
1541 else compiled_params[key]
1542 )
1543 for key in compiled_params
1544 }
1545
1546 core_dict_parameters.append(d_param)
1547
1548 self.parameters = core_dict_parameters
1549
1550 return self
1551
1552 @classmethod
1553 def _init_statement(
1554 cls,
1555 dialect: Dialect,
1556 connection: Connection,
1557 dbapi_connection: PoolProxiedConnection,
1558 execution_options: _ExecuteOptions,
1559 statement: str,
1560 parameters: _DBAPIMultiExecuteParams,
1561 ) -> ExecutionContext:
1562 """Initialize execution context for a string SQL statement."""
1563
1564 self = cls.__new__(cls)
1565 self.root_connection = connection
1566 self._dbapi_connection = dbapi_connection
1567 self.dialect = connection.dialect
1568 self.is_text = True
1569
1570 self.execution_options = execution_options
1571
1572 if not parameters:
1573 if self.dialect.positional:
1574 self.parameters = [dialect.execute_sequence_format()]
1575 else:
1576 self.parameters = [self._empty_dict_params]
1577 elif isinstance(parameters[0], dialect.execute_sequence_format):
1578 self.parameters = parameters
1579 elif isinstance(parameters[0], dict):
1580 self.parameters = parameters
1581 else:
1582 self.parameters = [
1583 dialect.execute_sequence_format(p) for p in parameters
1584 ]
1585
1586 if len(parameters) > 1:
1587 self.execute_style = ExecuteStyle.EXECUTEMANY
1588
1589 self.statement = self.unicode_statement = statement
1590
1591 self.cursor = self.create_cursor()
1592 return self
1593
1594 @classmethod
1595 def _init_default(
1596 cls,
1597 dialect: Dialect,
1598 connection: Connection,
1599 dbapi_connection: PoolProxiedConnection,
1600 execution_options: _ExecuteOptions,
1601 ) -> ExecutionContext:
1602 """Initialize execution context for a ColumnDefault construct."""
1603
1604 self = cls.__new__(cls)
1605 self.root_connection = connection
1606 self._dbapi_connection = dbapi_connection
1607 self.dialect = connection.dialect
1608
1609 self.execution_options = execution_options
1610
1611 self.cursor = self.create_cursor()
1612 return self
1613
1614 def _get_cache_stats(self) -> str:
1615 if self.compiled is None:
1616 return "raw sql"
1617
1618 now = perf_counter()
1619
1620 ch = self.cache_hit
1621
1622 gen_time = self.compiled._gen_time
1623 assert gen_time is not None
1624
1625 if ch is NO_CACHE_KEY:
1626 return "no key %.5fs" % (now - gen_time,)
1627 elif ch is CACHE_HIT:
1628 return "cached since %.4gs ago" % (now - gen_time,)
1629 elif ch is CACHE_MISS:
1630 return "generated in %.5fs" % (now - gen_time,)
1631 elif ch is CACHING_DISABLED:
1632 if "_cache_disable_reason" in self.execution_options:
1633 return "caching disabled (%s) %.5fs " % (
1634 self.execution_options["_cache_disable_reason"],
1635 now - gen_time,
1636 )
1637 else:
1638 return "caching disabled %.5fs" % (now - gen_time,)
1639 elif ch is NO_DIALECT_SUPPORT:
1640 return "dialect %s+%s does not support caching %.5fs" % (
1641 self.dialect.name,
1642 self.dialect.driver,
1643 now - gen_time,
1644 )
1645 else:
1646 return "unknown"
1647
1648 @property
1649 def executemany(self): # type: ignore[override]
1650 return self.execute_style in (
1651 ExecuteStyle.EXECUTEMANY,
1652 ExecuteStyle.INSERTMANYVALUES,
1653 )
1654
1655 @util.memoized_property
1656 def identifier_preparer(self):
1657 if self.compiled:
1658 return self.compiled.preparer
1659 elif "schema_translate_map" in self.execution_options:
1660 return self.dialect.identifier_preparer._with_schema_translate(
1661 self.execution_options["schema_translate_map"]
1662 )
1663 else:
1664 return self.dialect.identifier_preparer
1665
1666 @util.memoized_property
1667 def engine(self):
1668 return self.root_connection.engine
1669
1670 @util.memoized_property
1671 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1672 if TYPE_CHECKING:
1673 assert isinstance(self.compiled, SQLCompiler)
1674 return self.compiled.postfetch
1675
1676 @util.memoized_property
1677 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1678 if TYPE_CHECKING:
1679 assert isinstance(self.compiled, SQLCompiler)
1680 if self.isinsert:
1681 return self.compiled.insert_prefetch
1682 elif self.isupdate:
1683 return self.compiled.update_prefetch
1684 else:
1685 return ()
1686
1687 @util.memoized_property
1688 def no_parameters(self):
1689 return self.execution_options.get("no_parameters", False)
1690
1691 def _execute_scalar(
1692 self,
1693 stmt: str,
1694 type_: Optional[TypeEngine[Any]],
1695 parameters: Optional[_DBAPISingleExecuteParams] = None,
1696 ) -> Any:
1697 """Execute a string statement on the current cursor, returning a
1698 scalar result.
1699
1700 Used to fire off sequences, default phrases, and "select lastrowid"
1701 types of statements individually or in the context of a parent INSERT
1702 or UPDATE statement.
1703
1704 """
1705
1706 conn = self.root_connection
1707
1708 if "schema_translate_map" in self.execution_options:
1709 schema_translate_map = self.execution_options.get(
1710 "schema_translate_map", {}
1711 )
1712
1713 rst = self.identifier_preparer._render_schema_translates
1714 stmt = rst(stmt, schema_translate_map)
1715
1716 if not parameters:
1717 if self.dialect.positional:
1718 parameters = self.dialect.execute_sequence_format()
1719 else:
1720 parameters = {}
1721
1722 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1723 row = self.cursor.fetchone()
1724 if row is not None:
1725 r = row[0]
1726 else:
1727 r = None
1728 if type_ is not None:
1729 # apply type post processors to the result
1730 proc = type_._cached_result_processor(
1731 self.dialect, self.cursor.description[0][1]
1732 )
1733 if proc:
1734 return proc(r)
1735 return r
1736
1737 @util.memoized_property
1738 def connection(self):
1739 return self.root_connection
1740
1741 def _use_server_side_cursor(self):
1742 if not self.dialect.supports_server_side_cursors:
1743 return False
1744
1745 if self.dialect.server_side_cursors:
1746 # this is deprecated
1747 use_server_side = self.execution_options.get(
1748 "stream_results", True
1749 ) and (
1750 self.compiled
1751 and isinstance(self.compiled.statement, expression.Selectable)
1752 or (
1753 (
1754 not self.compiled
1755 or isinstance(
1756 self.compiled.statement, expression.TextClause
1757 )
1758 )
1759 and self.unicode_statement
1760 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1761 )
1762 )
1763 else:
1764 use_server_side = self.execution_options.get(
1765 "stream_results", False
1766 )
1767
1768 return use_server_side
1769
1770 def create_cursor(self) -> DBAPICursor:
1771 if (
1772 # inlining initial preference checks for SS cursors
1773 self.dialect.supports_server_side_cursors
1774 and (
1775 self.execution_options.get("stream_results", False)
1776 or (
1777 self.dialect.server_side_cursors
1778 and self._use_server_side_cursor()
1779 )
1780 )
1781 ):
1782 self._is_server_side = True
1783 return self.create_server_side_cursor()
1784 else:
1785 self._is_server_side = False
1786 return self.create_default_cursor()
1787
1788 def fetchall_for_returning(self, cursor):
1789 return cursor.fetchall()
1790
1791 def create_default_cursor(self) -> DBAPICursor:
1792 return self._dbapi_connection.cursor()
1793
1794 def create_server_side_cursor(self) -> DBAPICursor:
1795 raise NotImplementedError()
1796
1797 def pre_exec(self):
1798 pass
1799
1800 def get_out_parameter_values(self, names):
1801 raise NotImplementedError(
1802 "This dialect does not support OUT parameters"
1803 )
1804
1805 def post_exec(self):
1806 pass
1807
1808 def get_result_processor(
1809 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1810 ) -> Optional[_ResultProcessorType[Any]]:
1811 """Return a 'result processor' for a given type as present in
1812 cursor.description.
1813
1814 This has a default implementation that dialects can override
1815 for context-sensitive result type handling.
1816
1817 """
1818 return type_._cached_result_processor(self.dialect, coltype)
1819
1820 def get_lastrowid(self) -> int:
1821 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1822
1823 This may involve calling special cursor functions, issuing a new SELECT
1824 on the cursor (or a new one), or returning a stored value that was
1825 calculated within post_exec().
1826
1827 This function will only be called for dialects which support "implicit"
1828 primary key generation, keep preexecute_autoincrement_sequences set to
1829 False, and when no explicit id value was bound to the statement.
1830
1831 The function is called once for an INSERT statement that would need to
1832 return the last inserted primary key for those dialects that make use
1833 of the lastrowid concept. In these cases, it is called directly after
1834 :meth:`.ExecutionContext.post_exec`.
1835
1836 """
1837 return self.cursor.lastrowid
1838
1839 def handle_dbapi_exception(self, e):
1840 pass
1841
1842 @util.non_memoized_property
1843 def rowcount(self) -> int:
1844 if self._rowcount is not None:
1845 return self._rowcount
1846 else:
1847 return self.cursor.rowcount
1848
1849 @property
1850 def _has_rowcount(self):
1851 return self._rowcount is not None
1852
1853 def supports_sane_rowcount(self):
1854 return self.dialect.supports_sane_rowcount
1855
1856 def supports_sane_multi_rowcount(self):
1857 return self.dialect.supports_sane_multi_rowcount
1858
1859 def _setup_result_proxy(self):
1860 exec_opt = self.execution_options
1861
1862 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1863 self._rowcount = self.cursor.rowcount
1864
1865 yp: Optional[Union[int, bool]]
1866 if self.is_crud or self.is_text:
1867 result = self._setup_dml_or_text_result()
1868 yp = False
1869 else:
1870 yp = exec_opt.get("yield_per", None)
1871 sr = self._is_server_side or exec_opt.get("stream_results", False)
1872 strategy = self.cursor_fetch_strategy
1873 if sr and strategy is _cursor._DEFAULT_FETCH:
1874 strategy = _cursor.BufferedRowCursorFetchStrategy(
1875 self.cursor, self.execution_options
1876 )
1877 cursor_description: _DBAPICursorDescription = (
1878 strategy.alternate_cursor_description
1879 or self.cursor.description
1880 )
1881 if cursor_description is None:
1882 strategy = _cursor._NO_CURSOR_DQL
1883
1884 result = _cursor.CursorResult(self, strategy, cursor_description)
1885
1886 compiled = self.compiled
1887
1888 if (
1889 compiled
1890 and not self.isddl
1891 and cast(SQLCompiler, compiled).has_out_parameters
1892 ):
1893 self._setup_out_parameters(result)
1894
1895 self._soft_closed = result._soft_closed
1896
1897 if yp:
1898 result = result.yield_per(yp)
1899
1900 return result
1901
1902 def _setup_out_parameters(self, result):
1903 compiled = cast(SQLCompiler, self.compiled)
1904
1905 out_bindparams = [
1906 (param, name)
1907 for param, name in compiled.bind_names.items()
1908 if param.isoutparam
1909 ]
1910 out_parameters = {}
1911
1912 for bindparam, raw_value in zip(
1913 [param for param, name in out_bindparams],
1914 self.get_out_parameter_values(
1915 [name for param, name in out_bindparams]
1916 ),
1917 ):
1918 type_ = bindparam.type
1919 impl_type = type_.dialect_impl(self.dialect)
1920 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1921 result_processor = impl_type.result_processor(
1922 self.dialect, dbapi_type
1923 )
1924 if result_processor is not None:
1925 raw_value = result_processor(raw_value)
1926 out_parameters[bindparam.key] = raw_value
1927
1928 result.out_parameters = out_parameters
1929
1930 def _setup_dml_or_text_result(self):
1931 compiled = cast(SQLCompiler, self.compiled)
1932
1933 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1934
1935 if self.isinsert:
1936 if (
1937 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1938 and compiled.effective_returning
1939 ):
1940 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1941 self.cursor,
1942 initial_buffer=self._insertmanyvalues_rows,
1943 # maintain alt cursor description if set by the
1944 # dialect, e.g. mssql preserves it
1945 alternate_description=(
1946 strategy.alternate_cursor_description
1947 ),
1948 )
1949
1950 if compiled.postfetch_lastrowid:
1951 self.inserted_primary_key_rows = (
1952 self._setup_ins_pk_from_lastrowid()
1953 )
1954 # else if not self._is_implicit_returning,
1955 # the default inserted_primary_key_rows accessor will
1956 # return an "empty" primary key collection when accessed.
1957
1958 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1959 strategy = _cursor.BufferedRowCursorFetchStrategy(
1960 self.cursor, self.execution_options
1961 )
1962
1963 if strategy is _cursor._NO_CURSOR_DML:
1964 cursor_description = None
1965 else:
1966 cursor_description = (
1967 strategy.alternate_cursor_description
1968 or self.cursor.description
1969 )
1970
1971 if cursor_description is None:
1972 strategy = _cursor._NO_CURSOR_DML
1973 elif self._num_sentinel_cols:
1974 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1975 # the sentinel columns are handled in CursorResult._init_metadata
1976 # using essentially _reduce
1977
1978 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1979 self, strategy, cursor_description
1980 )
1981
1982 if self.isinsert:
1983 if self._is_implicit_returning:
1984 rows = result.all()
1985
1986 self.returned_default_rows = rows
1987
1988 self.inserted_primary_key_rows = (
1989 self._setup_ins_pk_from_implicit_returning(result, rows)
1990 )
1991
1992 # test that it has a cursor metadata that is accurate. the
1993 # first row will have been fetched and current assumptions
1994 # are that the result has only one row, until executemany()
1995 # support is added here.
1996 assert result._metadata.returns_rows
1997
1998 # Insert statement has both return_defaults() and
1999 # returning(). rewind the result on the list of rows
2000 # we just used.
2001 if self._is_supplemental_returning:
2002 result._rewind(rows)
2003 else:
2004 result._soft_close()
2005 elif not self._is_explicit_returning:
2006 result._soft_close()
2007
2008 # we assume here the result does not return any rows.
2009 # *usually*, this will be true. However, some dialects
2010 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2011 # function so this is not necessarily true.
2012 # assert not result.returns_rows
2013
2014 elif self._is_implicit_returning:
2015 rows = result.all()
2016
2017 if rows:
2018 self.returned_default_rows = rows
2019 self._rowcount = len(rows)
2020
2021 if self._is_supplemental_returning:
2022 result._rewind(rows)
2023 else:
2024 result._soft_close()
2025
2026 # test that it has a cursor metadata that is accurate.
2027 # the rows have all been fetched however.
2028 assert result._metadata.returns_rows
2029
2030 elif not result._metadata.returns_rows:
2031 # no results, get rowcount
2032 # (which requires open cursor on some drivers)
2033 if self._rowcount is None:
2034 self._rowcount = self.cursor.rowcount
2035 result._soft_close()
2036 elif self.isupdate or self.isdelete:
2037 if self._rowcount is None:
2038 self._rowcount = self.cursor.rowcount
2039 return result
2040
2041 @util.memoized_property
2042 def inserted_primary_key_rows(self):
2043 # if no specific "get primary key" strategy was set up
2044 # during execution, return a "default" primary key based
2045 # on what's in the compiled_parameters and nothing else.
2046 return self._setup_ins_pk_from_empty()
2047
2048 def _setup_ins_pk_from_lastrowid(self):
2049 getter = cast(
2050 SQLCompiler, self.compiled
2051 )._inserted_primary_key_from_lastrowid_getter
2052 lastrowid = self.get_lastrowid()
2053 return [getter(lastrowid, self.compiled_parameters[0])]
2054
2055 def _setup_ins_pk_from_empty(self):
2056 getter = cast(
2057 SQLCompiler, self.compiled
2058 )._inserted_primary_key_from_lastrowid_getter
2059 return [getter(None, param) for param in self.compiled_parameters]
2060
2061 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2062 if not rows:
2063 return []
2064
2065 getter = cast(
2066 SQLCompiler, self.compiled
2067 )._inserted_primary_key_from_returning_getter
2068 compiled_params = self.compiled_parameters
2069
2070 return [
2071 getter(row, param) for row, param in zip(rows, compiled_params)
2072 ]
2073
2074 def lastrow_has_defaults(self) -> bool:
2075 return (self.isinsert or self.isupdate) and bool(
2076 cast(SQLCompiler, self.compiled).postfetch
2077 )
2078
2079 def _prepare_set_input_sizes(
2080 self,
2081 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2082 """Given a cursor and ClauseParameters, prepare arguments
2083 in order to call the appropriate
2084 style of ``setinputsizes()`` on the cursor, using DB-API types
2085 from the bind parameter's ``TypeEngine`` objects.
2086
2087 This method only called by those dialects which set the
2088 :attr:`.Dialect.bind_typing` attribute to
2089 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2090 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2091 option.
2092
2093 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2094 for pg8000 and asyncpg, which has been changed to inline rendering
2095 of casts.
2096
2097 """
2098 if self.isddl or self.is_text:
2099 return None
2100
2101 compiled = cast(SQLCompiler, self.compiled)
2102
2103 inputsizes = compiled._get_set_input_sizes_lookup()
2104
2105 if inputsizes is None:
2106 return None
2107
2108 dialect = self.dialect
2109
2110 # all of the rest of this... cython?
2111
2112 if dialect._has_events:
2113 inputsizes = dict(inputsizes)
2114 dialect.dispatch.do_setinputsizes(
2115 inputsizes, self.cursor, self.statement, self.parameters, self
2116 )
2117
2118 if compiled.escaped_bind_names:
2119 escaped_bind_names = compiled.escaped_bind_names
2120 else:
2121 escaped_bind_names = None
2122
2123 if dialect.positional:
2124 items = [
2125 (key, compiled.binds[key])
2126 for key in compiled.positiontup or ()
2127 ]
2128 else:
2129 items = [
2130 (key, bindparam)
2131 for bindparam, key in compiled.bind_names.items()
2132 ]
2133
2134 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2135 for key, bindparam in items:
2136 if bindparam in compiled.literal_execute_params:
2137 continue
2138
2139 if key in self._expanded_parameters:
2140 if is_tuple_type(bindparam.type):
2141 num = len(bindparam.type.types)
2142 dbtypes = inputsizes[bindparam]
2143 generic_inputsizes.extend(
2144 (
2145 (
2146 escaped_bind_names.get(paramname, paramname)
2147 if escaped_bind_names is not None
2148 else paramname
2149 ),
2150 dbtypes[idx % num],
2151 bindparam.type.types[idx % num],
2152 )
2153 for idx, paramname in enumerate(
2154 self._expanded_parameters[key]
2155 )
2156 )
2157 else:
2158 dbtype = inputsizes.get(bindparam, None)
2159 generic_inputsizes.extend(
2160 (
2161 (
2162 escaped_bind_names.get(paramname, paramname)
2163 if escaped_bind_names is not None
2164 else paramname
2165 ),
2166 dbtype,
2167 bindparam.type,
2168 )
2169 for paramname in self._expanded_parameters[key]
2170 )
2171 else:
2172 dbtype = inputsizes.get(bindparam, None)
2173
2174 escaped_name = (
2175 escaped_bind_names.get(key, key)
2176 if escaped_bind_names is not None
2177 else key
2178 )
2179
2180 generic_inputsizes.append(
2181 (escaped_name, dbtype, bindparam.type)
2182 )
2183
2184 return generic_inputsizes
2185
2186 def _exec_default(self, column, default, type_):
2187 if default.is_sequence:
2188 return self.fire_sequence(default, type_)
2189 elif default.is_callable:
2190 # this codepath is not normally used as it's inlined
2191 # into _process_execute_defaults
2192 self.current_column = column
2193 return default.arg(self)
2194 elif default.is_clause_element:
2195 return self._exec_default_clause_element(column, default, type_)
2196 else:
2197 # this codepath is not normally used as it's inlined
2198 # into _process_execute_defaults
2199 return default.arg
2200
2201 def _exec_default_clause_element(self, column, default, type_):
2202 # execute a default that's a complete clause element. Here, we have
2203 # to re-implement a miniature version of the compile->parameters->
2204 # cursor.execute() sequence, since we don't want to modify the state
2205 # of the connection / result in progress or create new connection/
2206 # result objects etc.
2207 # .. versionchanged:: 1.4
2208
2209 if not default._arg_is_typed:
2210 default_arg = expression.type_coerce(default.arg, type_)
2211 else:
2212 default_arg = default.arg
2213 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2214 compiled_params = compiled.construct_params()
2215 processors = compiled._bind_processors
2216 if compiled.positional:
2217 parameters = self.dialect.execute_sequence_format(
2218 [
2219 (
2220 processors[key](compiled_params[key]) # type: ignore
2221 if key in processors
2222 else compiled_params[key]
2223 )
2224 for key in compiled.positiontup or ()
2225 ]
2226 )
2227 else:
2228 parameters = {
2229 key: (
2230 processors[key](compiled_params[key]) # type: ignore
2231 if key in processors
2232 else compiled_params[key]
2233 )
2234 for key in compiled_params
2235 }
2236 return self._execute_scalar(
2237 str(compiled), type_, parameters=parameters
2238 )
2239
2240 current_parameters: Optional[_CoreSingleExecuteParams] = None
2241 """A dictionary of parameters applied to the current row.
2242
2243 This attribute is only available in the context of a user-defined default
2244 generation function, e.g. as described at :ref:`context_default_functions`.
2245 It consists of a dictionary which includes entries for each column/value
2246 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2247 dictionary will be the key value of each :class:`_schema.Column`,
2248 which is usually
2249 synonymous with the name.
2250
2251 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2252 does not accommodate for the "multi-values" feature of the
2253 :meth:`_expression.Insert.values` method. The
2254 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2255 preferred.
2256
2257 .. seealso::
2258
2259 :meth:`.DefaultExecutionContext.get_current_parameters`
2260
2261 :ref:`context_default_functions`
2262
2263 """
2264
2265 def get_current_parameters(self, isolate_multiinsert_groups=True):
2266 """Return a dictionary of parameters applied to the current row.
2267
2268 This method can only be used in the context of a user-defined default
2269 generation function, e.g. as described at
2270 :ref:`context_default_functions`. When invoked, a dictionary is
2271 returned which includes entries for each column/value pair that is part
2272 of the INSERT or UPDATE statement. The keys of the dictionary will be
2273 the key value of each :class:`_schema.Column`,
2274 which is usually synonymous
2275 with the name.
2276
2277 :param isolate_multiinsert_groups=True: indicates that multi-valued
2278 INSERT constructs created using :meth:`_expression.Insert.values`
2279 should be
2280 handled by returning only the subset of parameters that are local
2281 to the current column default invocation. When ``False``, the
2282 raw parameters of the statement are returned including the
2283 naming convention used in the case of multi-valued INSERT.
2284
2285 .. seealso::
2286
2287 :attr:`.DefaultExecutionContext.current_parameters`
2288
2289 :ref:`context_default_functions`
2290
2291 """
2292 try:
2293 parameters = self.current_parameters
2294 column = self.current_column
2295 except AttributeError:
2296 raise exc.InvalidRequestError(
2297 "get_current_parameters() can only be invoked in the "
2298 "context of a Python side column default function"
2299 )
2300 else:
2301 assert column is not None
2302 assert parameters is not None
2303 compile_state = cast(
2304 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2305 )
2306 assert compile_state is not None
2307 if (
2308 isolate_multiinsert_groups
2309 and dml.isinsert(compile_state)
2310 and compile_state._has_multi_parameters
2311 ):
2312 if column._is_multiparam_column:
2313 index = column.index + 1
2314 d = {column.original.key: parameters[column.key]}
2315 else:
2316 d = {column.key: parameters[column.key]}
2317 index = 0
2318 assert compile_state._dict_parameters is not None
2319 keys = compile_state._dict_parameters.keys()
2320 d.update(
2321 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2322 )
2323 return d
2324 else:
2325 return parameters
2326
2327 def get_insert_default(self, column):
2328 if column.default is None:
2329 return None
2330 else:
2331 return self._exec_default(column, column.default, column.type)
2332
2333 def get_update_default(self, column):
2334 if column.onupdate is None:
2335 return None
2336 else:
2337 return self._exec_default(column, column.onupdate, column.type)
2338
2339 def _process_execute_defaults(self):
2340 compiled = cast(SQLCompiler, self.compiled)
2341
2342 key_getter = compiled._within_exec_param_key_getter
2343
2344 sentinel_counter = 0
2345
2346 if compiled.insert_prefetch:
2347 prefetch_recs = [
2348 (
2349 c,
2350 key_getter(c),
2351 c._default_description_tuple,
2352 self.get_insert_default,
2353 )
2354 for c in compiled.insert_prefetch
2355 ]
2356 elif compiled.update_prefetch:
2357 prefetch_recs = [
2358 (
2359 c,
2360 key_getter(c),
2361 c._onupdate_description_tuple,
2362 self.get_update_default,
2363 )
2364 for c in compiled.update_prefetch
2365 ]
2366 else:
2367 prefetch_recs = []
2368
2369 for param in self.compiled_parameters:
2370 self.current_parameters = param
2371
2372 for (
2373 c,
2374 param_key,
2375 (arg, is_scalar, is_callable, is_sentinel),
2376 fallback,
2377 ) in prefetch_recs:
2378 if is_sentinel:
2379 param[param_key] = sentinel_counter
2380 sentinel_counter += 1
2381 elif is_scalar:
2382 param[param_key] = arg
2383 elif is_callable:
2384 self.current_column = c
2385 param[param_key] = arg(self)
2386 else:
2387 val = fallback(c)
2388 if val is not None:
2389 param[param_key] = val
2390
2391 del self.current_parameters
2392
2393
2394DefaultDialect.execution_ctx_cls = DefaultExecutionContext