1# engine/default.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from .base import Engine
76 from .cursor import ResultFetchStrategy
77 from .interfaces import _CoreMultiExecuteParams
78 from .interfaces import _CoreSingleExecuteParams
79 from .interfaces import _DBAPICursorDescription
80 from .interfaces import _DBAPIMultiExecuteParams
81 from .interfaces import _DBAPISingleExecuteParams
82 from .interfaces import _ExecuteOptions
83 from .interfaces import _MutableCoreSingleExecuteParams
84 from .interfaces import _ParamStyle
85 from .interfaces import ConnectArgsType
86 from .interfaces import DBAPIConnection
87 from .interfaces import DBAPIModule
88 from .interfaces import DBAPIType
89 from .interfaces import IsolationLevel
90 from .row import Row
91 from .url import URL
92 from ..event import _ListenerFnType
93 from ..pool import Pool
94 from ..pool import PoolProxiedConnection
95 from ..sql import Executable
96 from ..sql.compiler import Compiled
97 from ..sql.compiler import Linting
98 from ..sql.compiler import ResultColumnsEntry
99 from ..sql.dml import DMLState
100 from ..sql.dml import UpdateBase
101 from ..sql.elements import BindParameter
102 from ..sql.schema import Column
103 from ..sql.type_api import _BindProcessorType
104 from ..sql.type_api import _ResultProcessorType
105 from ..sql.type_api import TypeEngine
106
107
108# When we're handed literal SQL, ensure it's a SELECT query
109SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
110
111
112(
113 CACHE_HIT,
114 CACHE_MISS,
115 CACHING_DISABLED,
116 NO_CACHE_KEY,
117 NO_DIALECT_SUPPORT,
118) = list(CacheStats)
119
120
121class DefaultDialect(Dialect):
122 """Default implementation of Dialect"""
123
124 statement_compiler = compiler.SQLCompiler
125 ddl_compiler = compiler.DDLCompiler
126 type_compiler_cls = compiler.GenericTypeCompiler
127
128 preparer = compiler.IdentifierPreparer
129 supports_alter = True
130 supports_comments = False
131 supports_constraint_comments = False
132 inline_comments = False
133 supports_statement_cache = True
134
135 div_is_floordiv = True
136
137 bind_typing = interfaces.BindTyping.NONE
138
139 include_set_input_sizes: Optional[Set[Any]] = None
140 exclude_set_input_sizes: Optional[Set[Any]] = None
141
142 # the first value we'd get for an autoincrement column.
143 default_sequence_base = 1
144
145 # most DBAPIs happy with this for execute().
146 # not cx_oracle.
147 execute_sequence_format = tuple
148
149 supports_schemas = True
150 supports_views = True
151 supports_sequences = False
152 sequences_optional = False
153 preexecute_autoincrement_sequences = False
154 supports_identity_columns = False
155 postfetch_lastrowid = True
156 favor_returning_over_lastrowid = False
157 insert_null_pk_still_autoincrements = False
158 update_returning = False
159 delete_returning = False
160 update_returning_multifrom = False
161 delete_returning_multifrom = False
162 insert_returning = False
163
164 aggregate_order_by_style = AggregateOrderByStyle.INLINE
165
166 cte_follows_insert = False
167
168 supports_native_enum = False
169 supports_native_boolean = False
170 supports_native_uuid = False
171 returns_native_bytes = False
172
173 non_native_boolean_check_constraint = True
174
175 supports_simple_order_by_label = True
176
177 tuple_in_values = False
178
179 connection_characteristics = util.immutabledict(
180 {
181 "isolation_level": characteristics.IsolationLevelCharacteristic(),
182 "logging_token": characteristics.LoggingTokenCharacteristic(),
183 }
184 )
185
186 engine_config_types: Mapping[str, Any] = util.immutabledict(
187 {
188 "pool_timeout": util.asint,
189 "echo": util.bool_or_str("debug"),
190 "echo_pool": util.bool_or_str("debug"),
191 "pool_recycle": util.asint,
192 "pool_size": util.asint,
193 "max_overflow": util.asint,
194 "future": util.asbool,
195 }
196 )
197
198 # if the NUMERIC type
199 # returns decimal.Decimal.
200 # *not* the FLOAT type however.
201 supports_native_decimal = False
202
203 name = "default"
204
205 # length at which to truncate
206 # any identifier.
207 max_identifier_length = 9999
208 _user_defined_max_identifier_length: Optional[int] = None
209
210 isolation_level: Optional[str] = None
211
212 # sub-categories of max_identifier_length.
213 # currently these accommodate for MySQL which allows alias names
214 # of 255 but DDL names only of 64.
215 max_index_name_length: Optional[int] = None
216 max_constraint_name_length: Optional[int] = None
217
218 supports_sane_rowcount = True
219 supports_sane_multi_rowcount = True
220 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
221 default_paramstyle = "named"
222
223 supports_default_values = False
224 """dialect supports INSERT... DEFAULT VALUES syntax"""
225
226 supports_default_metavalue = False
227 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
228
229 default_metavalue_token = "DEFAULT"
230 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
231 parenthesis."""
232
233 # not sure if this is a real thing but the compiler will deliver it
234 # if this is the only flag enabled.
235 supports_empty_insert = True
236 """dialect supports INSERT () VALUES ()"""
237
238 supports_multivalues_insert = False
239
240 use_insertmanyvalues: bool = False
241
242 use_insertmanyvalues_wo_returning: bool = False
243
244 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
245 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
246 )
247
248 insertmanyvalues_page_size: int = 1000
249 insertmanyvalues_max_parameters = 32700
250
251 supports_is_distinct_from = True
252
253 supports_server_side_cursors = False
254
255 server_side_cursors = False
256
257 # extra record-level locking features (#4860)
258 supports_for_update_of = False
259
260 server_version_info = None
261
262 default_schema_name: Optional[str] = None
263
264 # indicates symbol names are
265 # UPPERCASED if they are case insensitive
266 # within the database.
267 # if this is True, the methods normalize_name()
268 # and denormalize_name() must be provided.
269 requires_name_normalize = False
270
271 is_async = False
272
273 has_terminate = False
274
275 # TODO: this is not to be part of 2.0. implement rudimentary binary
276 # literals for SQLite, PostgreSQL, MySQL only within
277 # _Binary.literal_processor
278 _legacy_binary_type_literal_encoding = "utf-8"
279
280 @util.deprecated_params(
281 empty_in_strategy=(
282 "1.4",
283 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
284 "deprecated, and no longer has any effect. All IN expressions "
285 "are now rendered using "
286 'the "expanding parameter" strategy which renders a set of bound'
287 'expressions, or an "empty set" SELECT, at statement execution'
288 "time.",
289 ),
290 server_side_cursors=(
291 "1.4",
292 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
293 "is deprecated and will be removed in a future release. Please "
294 "use the "
295 ":paramref:`_engine.Connection.execution_options.stream_results` "
296 "parameter.",
297 ),
298 )
299 def __init__(
300 self,
301 paramstyle: Optional[_ParamStyle] = None,
302 isolation_level: Optional[IsolationLevel] = None,
303 dbapi: Optional[DBAPIModule] = None,
304 implicit_returning: Literal[True] = True,
305 supports_native_boolean: Optional[bool] = None,
306 max_identifier_length: Optional[int] = None,
307 label_length: Optional[int] = None,
308 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
309 use_insertmanyvalues: Optional[bool] = None,
310 # util.deprecated_params decorator cannot render the
311 # Linting.NO_LINTING constant
312 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
313 server_side_cursors: bool = False,
314 skip_autocommit_rollback: bool = False,
315 **kwargs: Any,
316 ):
317 if server_side_cursors:
318 if not self.supports_server_side_cursors:
319 raise exc.ArgumentError(
320 "Dialect %s does not support server side cursors" % self
321 )
322 else:
323 self.server_side_cursors = True
324
325 if getattr(self, "use_setinputsizes", False):
326 util.warn_deprecated(
327 "The dialect-level use_setinputsizes attribute is "
328 "deprecated. Please use "
329 "bind_typing = BindTyping.SETINPUTSIZES",
330 "2.0",
331 )
332 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
333
334 self.positional = False
335 self._ischema = None
336
337 self.dbapi = dbapi
338
339 self.skip_autocommit_rollback = skip_autocommit_rollback
340
341 if paramstyle is not None:
342 self.paramstyle = paramstyle
343 elif self.dbapi is not None:
344 self.paramstyle = self.dbapi.paramstyle
345 else:
346 self.paramstyle = self.default_paramstyle
347 self.positional = self.paramstyle in (
348 "qmark",
349 "format",
350 "numeric",
351 "numeric_dollar",
352 )
353 self.identifier_preparer = self.preparer(self)
354 self._on_connect_isolation_level = isolation_level
355
356 legacy_tt_callable = getattr(self, "type_compiler", None)
357 if legacy_tt_callable is not None:
358 tt_callable = cast(
359 Type[compiler.GenericTypeCompiler],
360 self.type_compiler,
361 )
362 else:
363 tt_callable = self.type_compiler_cls
364
365 self.type_compiler_instance = self.type_compiler = tt_callable(self)
366
367 if supports_native_boolean is not None:
368 self.supports_native_boolean = supports_native_boolean
369
370 self._user_defined_max_identifier_length = max_identifier_length
371 if self._user_defined_max_identifier_length:
372 self.max_identifier_length = (
373 self._user_defined_max_identifier_length
374 )
375 self.label_length = label_length
376 self.compiler_linting = compiler_linting
377
378 if use_insertmanyvalues is not None:
379 self.use_insertmanyvalues = use_insertmanyvalues
380
381 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
382 self.insertmanyvalues_page_size = insertmanyvalues_page_size
383
384 @property
385 @util.deprecated(
386 "2.0",
387 "full_returning is deprecated, please use insert_returning, "
388 "update_returning, delete_returning",
389 )
390 def full_returning(self):
391 return (
392 self.insert_returning
393 and self.update_returning
394 and self.delete_returning
395 )
396
397 @util.memoized_property
398 def insert_executemany_returning(self):
399 """Default implementation for insert_executemany_returning, if not
400 otherwise overridden by the specific dialect.
401
402 The default dialect determines "insert_executemany_returning" is
403 available if the dialect in use has opted into using the
404 "use_insertmanyvalues" feature. If they haven't opted into that, then
405 this attribute is False, unless the dialect in question overrides this
406 and provides some other implementation (such as the Oracle Database
407 dialects).
408
409 """
410 return self.insert_returning and self.use_insertmanyvalues
411
412 @util.memoized_property
413 def insert_executemany_returning_sort_by_parameter_order(self):
414 """Default implementation for
415 insert_executemany_returning_deterministic_order, if not otherwise
416 overridden by the specific dialect.
417
418 The default dialect determines "insert_executemany_returning" can have
419 deterministic order only if the dialect in use has opted into using the
420 "use_insertmanyvalues" feature, which implements deterministic ordering
421 using client side sentinel columns only by default. The
422 "insertmanyvalues" feature also features alternate forms that can
423 use server-generated PK values as "sentinels", but those are only
424 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
425 bitflag enables those alternate SQL forms, which are disabled
426 by default.
427
428 If the dialect in use hasn't opted into that, then this attribute is
429 False, unless the dialect in question overrides this and provides some
430 other implementation (such as the Oracle Database dialects).
431
432 """
433 return self.insert_returning and self.use_insertmanyvalues
434
435 update_executemany_returning = False
436 delete_executemany_returning = False
437
438 @util.memoized_property
439 def loaded_dbapi(self) -> DBAPIModule:
440 if self.dbapi is None:
441 raise exc.InvalidRequestError(
442 f"Dialect {self} does not have a Python DBAPI established "
443 "and cannot be used for actual database interaction"
444 )
445 return self.dbapi
446
447 @util.memoized_property
448 def _bind_typing_render_casts(self):
449 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
450
451 def _ensure_has_table_connection(self, arg: Connection) -> None:
452 if not isinstance(arg, Connection):
453 raise exc.ArgumentError(
454 "The argument passed to Dialect.has_table() should be a "
455 "%s, got %s. "
456 "Additionally, the Dialect.has_table() method is for "
457 "internal dialect "
458 "use only; please use "
459 "``inspect(some_engine).has_table(<tablename>>)`` "
460 "for public API use." % (Connection, type(arg))
461 )
462
463 @util.memoized_property
464 def _supports_statement_cache(self):
465 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
466 if ssc is None:
467 util.warn(
468 "Dialect %s:%s will not make use of SQL compilation caching "
469 "as it does not set the 'supports_statement_cache' attribute "
470 "to ``True``. This can have "
471 "significant performance implications including some "
472 "performance degradations in comparison to prior SQLAlchemy "
473 "versions. Dialect maintainers should seek to set this "
474 "attribute to True after appropriate development and testing "
475 "for SQLAlchemy 1.4 caching support. Alternatively, this "
476 "attribute may be set to False which will disable this "
477 "warning." % (self.name, self.driver),
478 code="cprf",
479 )
480
481 return bool(ssc)
482
483 @util.memoized_property
484 def _type_memos(self):
485 return weakref.WeakKeyDictionary()
486
487 @property
488 def dialect_description(self): # type: ignore[override]
489 return self.name + "+" + self.driver
490
491 @property
492 def supports_sane_rowcount_returning(self):
493 """True if this dialect supports sane rowcount even if RETURNING is
494 in use.
495
496 For dialects that don't support RETURNING, this is synonymous with
497 ``supports_sane_rowcount``.
498
499 """
500 return self.supports_sane_rowcount
501
502 @classmethod
503 def get_pool_class(cls, url: URL) -> Type[Pool]:
504 default: Type[pool.Pool]
505 if cls.is_async:
506 default = pool.AsyncAdaptedQueuePool
507 else:
508 default = pool.QueuePool
509
510 return getattr(cls, "poolclass", default)
511
512 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
513 return self.get_pool_class(url)
514
515 @classmethod
516 def load_provisioning(cls):
517 package = ".".join(cls.__module__.split(".")[0:-1])
518 try:
519 __import__(package + ".provision")
520 except ImportError:
521 pass
522
523 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
524 if self._on_connect_isolation_level is not None:
525
526 def builtin_connect(dbapi_conn, conn_rec):
527 self._assert_and_set_isolation_level(
528 dbapi_conn, self._on_connect_isolation_level
529 )
530
531 return builtin_connect
532 else:
533 return None
534
535 def initialize(self, connection: Connection) -> None:
536 try:
537 self.server_version_info = self._get_server_version_info(
538 connection
539 )
540 except NotImplementedError:
541 self.server_version_info = None
542 try:
543 self.default_schema_name = self._get_default_schema_name(
544 connection
545 )
546 except NotImplementedError:
547 self.default_schema_name = None
548
549 try:
550 self.default_isolation_level = self.get_default_isolation_level(
551 connection.connection.dbapi_connection
552 )
553 except NotImplementedError:
554 self.default_isolation_level = None
555
556 if not self._user_defined_max_identifier_length:
557 max_ident_length = self._check_max_identifier_length(connection)
558 if max_ident_length:
559 self.max_identifier_length = max_ident_length
560
561 if (
562 self.label_length
563 and self.label_length > self.max_identifier_length
564 ):
565 raise exc.ArgumentError(
566 "Label length of %d is greater than this dialect's"
567 " maximum identifier length of %d"
568 % (self.label_length, self.max_identifier_length)
569 )
570
571 def on_connect(self) -> Optional[Callable[[Any], None]]:
572 # inherits the docstring from interfaces.Dialect.on_connect
573 return None
574
575 def _check_max_identifier_length(self, connection):
576 """Perform a connection / server version specific check to determine
577 the max_identifier_length.
578
579 If the dialect's class level max_identifier_length should be used,
580 can return None.
581
582 """
583 return None
584
585 def get_default_isolation_level(self, dbapi_conn):
586 """Given a DBAPI connection, return its isolation level, or
587 a default isolation level if one cannot be retrieved.
588
589 May be overridden by subclasses in order to provide a
590 "fallback" isolation level for databases that cannot reliably
591 retrieve the actual isolation level.
592
593 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
594 method, propagating any exceptions raised.
595
596 """
597 return self.get_isolation_level(dbapi_conn)
598
599 def type_descriptor(self, typeobj):
600 """Provide a database-specific :class:`.TypeEngine` object, given
601 the generic object which comes from the types module.
602
603 This method looks for a dictionary called
604 ``colspecs`` as a class or instance-level variable,
605 and passes on to :func:`_types.adapt_type`.
606
607 """
608 return type_api.adapt_type(typeobj, self.colspecs)
609
610 def has_index(self, connection, table_name, index_name, schema=None, **kw):
611 if not self.has_table(connection, table_name, schema=schema, **kw):
612 return False
613 for idx in self.get_indexes(
614 connection, table_name, schema=schema, **kw
615 ):
616 if idx["name"] == index_name:
617 return True
618 else:
619 return False
620
621 def has_schema(
622 self, connection: Connection, schema_name: str, **kw: Any
623 ) -> bool:
624 return schema_name in self.get_schema_names(connection, **kw)
625
626 def validate_identifier(self, ident: str) -> None:
627 if len(ident) > self.max_identifier_length:
628 raise exc.IdentifierError(
629 "Identifier '%s' exceeds maximum length of %d characters"
630 % (ident, self.max_identifier_length)
631 )
632
633 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
634 # inherits the docstring from interfaces.Dialect.connect
635 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
636
637 def create_connect_args(self, url: URL) -> ConnectArgsType:
638 # inherits the docstring from interfaces.Dialect.create_connect_args
639 opts = url.translate_connect_args()
640 opts.update(url.query)
641 return ([], opts)
642
643 def set_engine_execution_options(
644 self, engine: Engine, opts: Mapping[str, Any]
645 ) -> None:
646 supported_names = set(self.connection_characteristics).intersection(
647 opts
648 )
649 if supported_names:
650 characteristics: Mapping[str, Any] = util.immutabledict(
651 (name, opts[name]) for name in supported_names
652 )
653
654 @event.listens_for(engine, "engine_connect")
655 def set_connection_characteristics(connection):
656 self._set_connection_characteristics(
657 connection, characteristics
658 )
659
660 def set_connection_execution_options(
661 self, connection: Connection, opts: Mapping[str, Any]
662 ) -> None:
663 supported_names = set(self.connection_characteristics).intersection(
664 opts
665 )
666 if supported_names:
667 characteristics: Mapping[str, Any] = util.immutabledict(
668 (name, opts[name]) for name in supported_names
669 )
670 self._set_connection_characteristics(connection, characteristics)
671
672 def _set_connection_characteristics(self, connection, characteristics):
673 characteristic_values = [
674 (name, self.connection_characteristics[name], value)
675 for name, value in characteristics.items()
676 ]
677
678 if connection.in_transaction():
679 trans_objs = [
680 (name, obj)
681 for name, obj, _ in characteristic_values
682 if obj.transactional
683 ]
684 if trans_objs:
685 raise exc.InvalidRequestError(
686 "This connection has already initialized a SQLAlchemy "
687 "Transaction() object via begin() or autobegin; "
688 "%s may not be altered unless rollback() or commit() "
689 "is called first."
690 % (", ".join(name for name, obj in trans_objs))
691 )
692
693 dbapi_connection = connection.connection.dbapi_connection
694 for _, characteristic, value in characteristic_values:
695 characteristic.set_connection_characteristic(
696 self, connection, dbapi_connection, value
697 )
698 connection.connection._connection_record.finalize_callback.append(
699 functools.partial(self._reset_characteristics, characteristics)
700 )
701
702 def _reset_characteristics(self, characteristics, dbapi_connection):
703 for characteristic_name in characteristics:
704 characteristic = self.connection_characteristics[
705 characteristic_name
706 ]
707 characteristic.reset_characteristic(self, dbapi_connection)
708
709 def do_begin(self, dbapi_connection):
710 pass
711
712 def do_rollback(self, dbapi_connection):
713 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
714 dbapi_connection
715 ):
716 return
717 dbapi_connection.rollback()
718
719 def do_commit(self, dbapi_connection):
720 dbapi_connection.commit()
721
722 def do_terminate(self, dbapi_connection):
723 self.do_close(dbapi_connection)
724
725 def do_close(self, dbapi_connection):
726 dbapi_connection.close()
727
728 @util.memoized_property
729 def _dialect_specific_select_one(self):
730 return str(expression.select(1).compile(dialect=self))
731
732 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
733 try:
734 return self.do_ping(dbapi_connection)
735 except self.loaded_dbapi.Error as err:
736 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
737
738 if self._has_events:
739 try:
740 Connection._handle_dbapi_exception_noconnection(
741 err,
742 self,
743 is_disconnect=is_disconnect,
744 invalidate_pool_on_disconnect=False,
745 is_pre_ping=True,
746 )
747 except exc.StatementError as new_err:
748 is_disconnect = new_err.connection_invalidated
749
750 if is_disconnect:
751 return False
752 else:
753 raise
754
755 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
756 cursor = dbapi_connection.cursor()
757 try:
758 cursor.execute(self._dialect_specific_select_one)
759 finally:
760 cursor.close()
761 return True
762
763 def create_xid(self):
764 """Create a random two-phase transaction ID.
765
766 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
767 do_commit_twophase(). Its format is unspecified.
768 """
769
770 return "_sa_%032x" % random.randint(0, 2**128)
771
772 def do_savepoint(self, connection, name):
773 connection.execute(expression.SavepointClause(name))
774
775 def do_rollback_to_savepoint(self, connection, name):
776 connection.execute(expression.RollbackToSavepointClause(name))
777
778 def do_release_savepoint(self, connection, name):
779 connection.execute(expression.ReleaseSavepointClause(name))
780
781 def _deliver_insertmanyvalues_batches(
782 self,
783 connection,
784 cursor,
785 statement,
786 parameters,
787 generic_setinputsizes,
788 context,
789 ):
790 context = cast(DefaultExecutionContext, context)
791 compiled = cast(SQLCompiler, context.compiled)
792
793 _composite_sentinel_proc: Sequence[
794 Optional[_ResultProcessorType[Any]]
795 ] = ()
796 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
797 _sentinel_proc_initialized: bool = False
798
799 compiled_parameters = context.compiled_parameters
800
801 imv = compiled._insertmanyvalues
802 assert imv is not None
803
804 is_returning: Final[bool] = bool(compiled.effective_returning)
805 batch_size = context.execution_options.get(
806 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
807 )
808
809 if compiled.schema_translate_map:
810 schema_translate_map = context.execution_options.get(
811 "schema_translate_map", {}
812 )
813 else:
814 schema_translate_map = None
815
816 if is_returning:
817 result: Optional[List[Any]] = []
818 context._insertmanyvalues_rows = result
819
820 sort_by_parameter_order = imv.sort_by_parameter_order
821
822 else:
823 sort_by_parameter_order = False
824 result = None
825
826 for imv_batch in compiled._deliver_insertmanyvalues_batches(
827 statement,
828 parameters,
829 compiled_parameters,
830 generic_setinputsizes,
831 batch_size,
832 sort_by_parameter_order,
833 schema_translate_map,
834 ):
835 yield imv_batch
836
837 if is_returning:
838
839 try:
840 rows = context.fetchall_for_returning(cursor)
841 except BaseException as be:
842 connection._handle_dbapi_exception(
843 be,
844 sql_util._long_statement(imv_batch.replaced_statement),
845 imv_batch.replaced_parameters,
846 None,
847 context,
848 is_sub_exec=True,
849 )
850
851 # I would have thought "is_returning: Final[bool]"
852 # would have assured this but pylance thinks not
853 assert result is not None
854
855 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
856 composite_sentinel = imv.num_sentinel_columns > 1
857 if imv.implicit_sentinel:
858 # for implicit sentinel, which is currently single-col
859 # integer autoincrement, do a simple sort.
860 assert not composite_sentinel
861 result.extend(
862 sorted(rows, key=operator.itemgetter(-1))
863 )
864 continue
865
866 # otherwise, create dictionaries to match up batches
867 # with parameters
868 assert imv.sentinel_param_keys
869 assert imv.sentinel_columns
870
871 _nsc = imv.num_sentinel_columns
872
873 if not _sentinel_proc_initialized:
874 if composite_sentinel:
875 _composite_sentinel_proc = [
876 col.type._cached_result_processor(
877 self, cursor_desc[1]
878 )
879 for col, cursor_desc in zip(
880 imv.sentinel_columns,
881 cursor.description[-_nsc:],
882 )
883 ]
884 else:
885 _scalar_sentinel_proc = (
886 imv.sentinel_columns[0]
887 ).type._cached_result_processor(
888 self, cursor.description[-1][1]
889 )
890 _sentinel_proc_initialized = True
891
892 rows_by_sentinel: Union[
893 Dict[Tuple[Any, ...], Any],
894 Dict[Any, Any],
895 ]
896 if composite_sentinel:
897 rows_by_sentinel = {
898 tuple(
899 (proc(val) if proc else val)
900 for val, proc in zip(
901 row[-_nsc:], _composite_sentinel_proc
902 )
903 ): row
904 for row in rows
905 }
906 elif _scalar_sentinel_proc:
907 rows_by_sentinel = {
908 _scalar_sentinel_proc(row[-1]): row for row in rows
909 }
910 else:
911 rows_by_sentinel = {row[-1]: row for row in rows}
912
913 if len(rows_by_sentinel) != len(imv_batch.batch):
914 # see test_insert_exec.py::
915 # IMVSentinelTest::test_sentinel_incorrect_rowcount
916 # for coverage / demonstration
917 raise exc.InvalidRequestError(
918 f"Sentinel-keyed result set did not produce "
919 f"correct number of rows {len(imv_batch.batch)}; "
920 "produced "
921 f"{len(rows_by_sentinel)}. Please ensure the "
922 "sentinel column is fully unique and populated in "
923 "all cases."
924 )
925
926 try:
927 ordered_rows = [
928 rows_by_sentinel[sentinel_keys]
929 for sentinel_keys in imv_batch.sentinel_values
930 ]
931 except KeyError as ke:
932 # see test_insert_exec.py::
933 # IMVSentinelTest::test_sentinel_cant_match_keys
934 # for coverage / demonstration
935 raise exc.InvalidRequestError(
936 f"Can't match sentinel values in result set to "
937 f"parameter sets; key {ke.args[0]!r} was not "
938 "found. "
939 "There may be a mismatch between the datatype "
940 "passed to the DBAPI driver vs. that which it "
941 "returns in a result row. Ensure the given "
942 "Python value matches the expected result type "
943 "*exactly*, taking care to not rely upon implicit "
944 "conversions which may occur such as when using "
945 "strings in place of UUID or integer values, etc. "
946 ) from ke
947
948 result.extend(ordered_rows)
949
950 else:
951 result.extend(rows)
952
953 def do_executemany(self, cursor, statement, parameters, context=None):
954 cursor.executemany(statement, parameters)
955
956 def do_execute(self, cursor, statement, parameters, context=None):
957 cursor.execute(statement, parameters)
958
959 def do_execute_no_params(self, cursor, statement, context=None):
960 cursor.execute(statement)
961
962 def is_disconnect(
963 self,
964 e: DBAPIModule.Error,
965 connection: Union[
966 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
967 ],
968 cursor: Optional[interfaces.DBAPICursor],
969 ) -> bool:
970 return False
971
972 @util.memoized_instancemethod
973 def _gen_allowed_isolation_levels(self, dbapi_conn):
974 try:
975 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
976 except NotImplementedError:
977 return None
978 else:
979 normalized_levels = [
980 level.replace("_", " ").upper() for level in raw_levels
981 ]
982 if raw_levels != normalized_levels:
983 raise ValueError(
984 f"Dialect {self.name!r} get_isolation_level_values() "
985 f"method should return names as UPPERCASE using spaces, "
986 f"not underscores; got "
987 f"{sorted(set(raw_levels).difference(normalized_levels))}"
988 )
989 return tuple(normalized_levels)
990
991 def _assert_and_set_isolation_level(self, dbapi_conn, level):
992 level = level.replace("_", " ").upper()
993
994 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
995 dbapi_conn
996 )
997 if (
998 _allowed_isolation_levels
999 and level not in _allowed_isolation_levels
1000 ):
1001 raise exc.ArgumentError(
1002 f"Invalid value {level!r} for isolation_level. "
1003 f"Valid isolation levels for {self.name!r} are "
1004 f"{', '.join(_allowed_isolation_levels)}"
1005 )
1006
1007 self.set_isolation_level(dbapi_conn, level)
1008
1009 def reset_isolation_level(self, dbapi_conn):
1010 if self._on_connect_isolation_level is not None:
1011 assert (
1012 self._on_connect_isolation_level == "AUTOCOMMIT"
1013 or self._on_connect_isolation_level
1014 == self.default_isolation_level
1015 )
1016 self._assert_and_set_isolation_level(
1017 dbapi_conn, self._on_connect_isolation_level
1018 )
1019 else:
1020 assert self.default_isolation_level is not None
1021 self._assert_and_set_isolation_level(
1022 dbapi_conn,
1023 self.default_isolation_level,
1024 )
1025
1026 def normalize_name(self, name):
1027 if name is None:
1028 return None
1029
1030 name_lower = name.lower()
1031 name_upper = name.upper()
1032
1033 if name_upper == name_lower:
1034 # name has no upper/lower conversion, e.g. non-european characters.
1035 # return unchanged
1036 return name
1037 elif name_upper == name and not (
1038 self.identifier_preparer._requires_quotes
1039 )(name_lower):
1040 # name is all uppercase and doesn't require quoting; normalize
1041 # to all lower case
1042 return name_lower
1043 elif name_lower == name:
1044 # name is all lower case, which if denormalized means we need to
1045 # force quoting on it
1046 return quoted_name(name, quote=True)
1047 else:
1048 # name is mixed case, means it will be quoted in SQL when used
1049 # later, no normalizes
1050 return name
1051
1052 def denormalize_name(self, name):
1053 if name is None:
1054 return None
1055
1056 name_lower = name.lower()
1057 name_upper = name.upper()
1058
1059 if name_upper == name_lower:
1060 # name has no upper/lower conversion, e.g. non-european characters.
1061 # return unchanged
1062 return name
1063 elif name_lower == name and not (
1064 self.identifier_preparer._requires_quotes
1065 )(name_lower):
1066 name = name_upper
1067 return name
1068
1069 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1070 return connection
1071
1072 def _overrides_default(self, method):
1073 return (
1074 getattr(type(self), method).__code__
1075 is not getattr(DefaultDialect, method).__code__
1076 )
1077
1078 def _default_multi_reflect(
1079 self,
1080 single_tbl_method,
1081 connection,
1082 kind,
1083 schema,
1084 filter_names,
1085 scope,
1086 **kw,
1087 ):
1088 names_fns = []
1089 temp_names_fns = []
1090 if ObjectKind.TABLE in kind:
1091 names_fns.append(self.get_table_names)
1092 temp_names_fns.append(self.get_temp_table_names)
1093 if ObjectKind.VIEW in kind:
1094 names_fns.append(self.get_view_names)
1095 temp_names_fns.append(self.get_temp_view_names)
1096 if ObjectKind.MATERIALIZED_VIEW in kind:
1097 names_fns.append(self.get_materialized_view_names)
1098 # no temp materialized view at the moment
1099 # temp_names_fns.append(self.get_temp_materialized_view_names)
1100
1101 unreflectable = kw.pop("unreflectable", {})
1102
1103 if (
1104 filter_names
1105 and scope is ObjectScope.ANY
1106 and kind is ObjectKind.ANY
1107 ):
1108 # if names are given and no qualification on type of table
1109 # (i.e. the Table(..., autoload) case), take the names as given,
1110 # don't run names queries. If a table does not exit
1111 # NoSuchTableError is raised and it's skipped
1112
1113 # this also suits the case for mssql where we can reflect
1114 # individual temp tables but there's no temp_names_fn
1115 names = filter_names
1116 else:
1117 names = []
1118 name_kw = {"schema": schema, **kw}
1119 fns = []
1120 if ObjectScope.DEFAULT in scope:
1121 fns.extend(names_fns)
1122 if ObjectScope.TEMPORARY in scope:
1123 fns.extend(temp_names_fns)
1124
1125 for fn in fns:
1126 try:
1127 names.extend(fn(connection, **name_kw))
1128 except NotImplementedError:
1129 pass
1130
1131 if filter_names:
1132 filter_names = set(filter_names)
1133
1134 # iterate over all the tables/views and call the single table method
1135 for table in names:
1136 if not filter_names or table in filter_names:
1137 key = (schema, table)
1138 try:
1139 yield (
1140 key,
1141 single_tbl_method(
1142 connection, table, schema=schema, **kw
1143 ),
1144 )
1145 except exc.UnreflectableTableError as err:
1146 if key not in unreflectable:
1147 unreflectable[key] = err
1148 except exc.NoSuchTableError:
1149 pass
1150
1151 def get_multi_table_options(self, connection, **kw):
1152 return self._default_multi_reflect(
1153 self.get_table_options, connection, **kw
1154 )
1155
1156 def get_multi_columns(self, connection, **kw):
1157 return self._default_multi_reflect(self.get_columns, connection, **kw)
1158
1159 def get_multi_pk_constraint(self, connection, **kw):
1160 return self._default_multi_reflect(
1161 self.get_pk_constraint, connection, **kw
1162 )
1163
1164 def get_multi_foreign_keys(self, connection, **kw):
1165 return self._default_multi_reflect(
1166 self.get_foreign_keys, connection, **kw
1167 )
1168
1169 def get_multi_indexes(self, connection, **kw):
1170 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1171
1172 def get_multi_unique_constraints(self, connection, **kw):
1173 return self._default_multi_reflect(
1174 self.get_unique_constraints, connection, **kw
1175 )
1176
1177 def get_multi_check_constraints(self, connection, **kw):
1178 return self._default_multi_reflect(
1179 self.get_check_constraints, connection, **kw
1180 )
1181
1182 def get_multi_table_comment(self, connection, **kw):
1183 return self._default_multi_reflect(
1184 self.get_table_comment, connection, **kw
1185 )
1186
1187
1188class StrCompileDialect(DefaultDialect):
1189 statement_compiler = compiler.StrSQLCompiler
1190 ddl_compiler = compiler.DDLCompiler
1191 type_compiler_cls = compiler.StrSQLTypeCompiler
1192 preparer = compiler.IdentifierPreparer
1193
1194 insert_returning = True
1195 update_returning = True
1196 delete_returning = True
1197
1198 supports_statement_cache = True
1199
1200 supports_identity_columns = True
1201
1202 supports_sequences = True
1203 sequences_optional = True
1204 preexecute_autoincrement_sequences = False
1205
1206 supports_native_boolean = True
1207
1208 supports_multivalues_insert = True
1209 supports_simple_order_by_label = True
1210
1211
1212class DefaultExecutionContext(ExecutionContext):
1213 isinsert = False
1214 isupdate = False
1215 isdelete = False
1216 is_crud = False
1217 is_text = False
1218 isddl = False
1219
1220 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1221
1222 compiled: Optional[Compiled] = None
1223 result_column_struct: Optional[
1224 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1225 ] = None
1226 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1227
1228 execution_options: _ExecuteOptions = util.EMPTY_DICT
1229
1230 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1231
1232 invoked_statement: Optional[Executable] = None
1233
1234 _is_implicit_returning = False
1235 _is_explicit_returning = False
1236 _is_supplemental_returning = False
1237 _is_server_side = False
1238
1239 _soft_closed = False
1240
1241 _rowcount: Optional[int] = None
1242
1243 # a hook for SQLite's translation of
1244 # result column names
1245 # NOTE: pyhive is using this hook, can't remove it :(
1246 _translate_colname: Optional[
1247 Callable[[str], Tuple[str, Optional[str]]]
1248 ] = None
1249
1250 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1251 """used by set_input_sizes().
1252
1253 This collection comes from ``ExpandedState.parameter_expansion``.
1254
1255 """
1256
1257 cache_hit = NO_CACHE_KEY
1258
1259 root_connection: Connection
1260 _dbapi_connection: PoolProxiedConnection
1261 dialect: Dialect
1262 unicode_statement: str
1263 cursor: DBAPICursor
1264 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1265 parameters: _DBAPIMultiExecuteParams
1266 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1267
1268 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1269
1270 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1271 _num_sentinel_cols: int = 0
1272
1273 @classmethod
1274 def _init_ddl(
1275 cls,
1276 dialect: Dialect,
1277 connection: Connection,
1278 dbapi_connection: PoolProxiedConnection,
1279 execution_options: _ExecuteOptions,
1280 compiled_ddl: DDLCompiler,
1281 ) -> ExecutionContext:
1282 """Initialize execution context for an ExecutableDDLElement
1283 construct."""
1284
1285 self = cls.__new__(cls)
1286 self.root_connection = connection
1287 self._dbapi_connection = dbapi_connection
1288 self.dialect = connection.dialect
1289
1290 self.compiled = compiled = compiled_ddl
1291 self.isddl = True
1292
1293 self.execution_options = execution_options
1294
1295 self.unicode_statement = str(compiled)
1296 if compiled.schema_translate_map:
1297 schema_translate_map = self.execution_options.get(
1298 "schema_translate_map", {}
1299 )
1300
1301 rst = compiled.preparer._render_schema_translates
1302 self.unicode_statement = rst(
1303 self.unicode_statement, schema_translate_map
1304 )
1305
1306 self.statement = self.unicode_statement
1307
1308 self.cursor = self.create_cursor()
1309 self.compiled_parameters = []
1310
1311 if dialect.positional:
1312 self.parameters = [dialect.execute_sequence_format()]
1313 else:
1314 self.parameters = [self._empty_dict_params]
1315
1316 return self
1317
1318 @classmethod
1319 def _init_compiled(
1320 cls,
1321 dialect: Dialect,
1322 connection: Connection,
1323 dbapi_connection: PoolProxiedConnection,
1324 execution_options: _ExecuteOptions,
1325 compiled: SQLCompiler,
1326 parameters: _CoreMultiExecuteParams,
1327 invoked_statement: Executable,
1328 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1329 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1330 param_dict: _CoreSingleExecuteParams | None = None,
1331 ) -> ExecutionContext:
1332 """Initialize execution context for a Compiled construct."""
1333
1334 self = cls.__new__(cls)
1335 self.root_connection = connection
1336 self._dbapi_connection = dbapi_connection
1337 self.dialect = connection.dialect
1338 self.extracted_parameters = extracted_parameters
1339 self.invoked_statement = invoked_statement
1340 self.compiled = compiled
1341 self.cache_hit = cache_hit
1342
1343 self.execution_options = execution_options
1344
1345 self.result_column_struct = (
1346 compiled._result_columns,
1347 compiled._ordered_columns,
1348 compiled._textual_ordered_columns,
1349 compiled._ad_hoc_textual,
1350 compiled._loose_column_name_matching,
1351 )
1352
1353 self.isinsert = ii = compiled.isinsert
1354 self.isupdate = iu = compiled.isupdate
1355 self.isdelete = id_ = compiled.isdelete
1356 self.is_text = compiled.isplaintext
1357
1358 if ii or iu or id_:
1359 dml_statement = compiled.compile_state.statement # type: ignore
1360 if TYPE_CHECKING:
1361 assert isinstance(dml_statement, UpdateBase)
1362 self.is_crud = True
1363 self._is_explicit_returning = ier = bool(dml_statement._returning)
1364 self._is_implicit_returning = iir = bool(
1365 compiled.implicit_returning
1366 )
1367 if iir and dml_statement._supplemental_returning:
1368 self._is_supplemental_returning = True
1369
1370 # dont mix implicit and explicit returning
1371 assert not (iir and ier)
1372
1373 if (ier or iir) and compiled.for_executemany:
1374 if ii and not self.dialect.insert_executemany_returning:
1375 raise exc.InvalidRequestError(
1376 f"Dialect {self.dialect.dialect_description} with "
1377 f"current server capabilities does not support "
1378 "INSERT..RETURNING when executemany is used"
1379 )
1380 elif (
1381 ii
1382 and dml_statement._sort_by_parameter_order
1383 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1384 ):
1385 raise exc.InvalidRequestError(
1386 f"Dialect {self.dialect.dialect_description} with "
1387 f"current server capabilities does not support "
1388 "INSERT..RETURNING with deterministic row ordering "
1389 "when executemany is used"
1390 )
1391 elif (
1392 ii
1393 and self.dialect.use_insertmanyvalues
1394 and not compiled._insertmanyvalues
1395 ):
1396 raise exc.InvalidRequestError(
1397 'Statement does not have "insertmanyvalues" '
1398 "enabled, can't use INSERT..RETURNING with "
1399 "executemany in this case."
1400 )
1401 elif iu and not self.dialect.update_executemany_returning:
1402 raise exc.InvalidRequestError(
1403 f"Dialect {self.dialect.dialect_description} with "
1404 f"current server capabilities does not support "
1405 "UPDATE..RETURNING when executemany is used"
1406 )
1407 elif id_ and not self.dialect.delete_executemany_returning:
1408 raise exc.InvalidRequestError(
1409 f"Dialect {self.dialect.dialect_description} with "
1410 f"current server capabilities does not support "
1411 "DELETE..RETURNING when executemany is used"
1412 )
1413
1414 if not parameters:
1415 self.compiled_parameters = [
1416 compiled.construct_params(
1417 extracted_parameters=extracted_parameters,
1418 escape_names=False,
1419 _collected_params=param_dict,
1420 )
1421 ]
1422 else:
1423 self.compiled_parameters = [
1424 compiled.construct_params(
1425 m,
1426 escape_names=False,
1427 _group_number=grp,
1428 extracted_parameters=extracted_parameters,
1429 _collected_params=param_dict,
1430 )
1431 for grp, m in enumerate(parameters)
1432 ]
1433
1434 if len(parameters) > 1:
1435 if self.isinsert and compiled._insertmanyvalues:
1436 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1437
1438 imv = compiled._insertmanyvalues
1439 if imv.sentinel_columns is not None:
1440 self._num_sentinel_cols = imv.num_sentinel_columns
1441 else:
1442 self.execute_style = ExecuteStyle.EXECUTEMANY
1443
1444 self.unicode_statement = compiled.string
1445
1446 self.cursor = self.create_cursor()
1447
1448 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1449 self._process_execute_defaults()
1450
1451 processors = compiled._bind_processors
1452
1453 flattened_processors: Mapping[
1454 str, _BindProcessorType[Any]
1455 ] = processors # type: ignore[assignment]
1456
1457 if compiled.literal_execute_params or compiled.post_compile_params:
1458 if self.executemany:
1459 raise exc.InvalidRequestError(
1460 "'literal_execute' or 'expanding' parameters can't be "
1461 "used with executemany()"
1462 )
1463
1464 expanded_state = compiled._process_parameters_for_postcompile(
1465 self.compiled_parameters[0]
1466 )
1467
1468 # re-assign self.unicode_statement
1469 self.unicode_statement = expanded_state.statement
1470
1471 self._expanded_parameters = expanded_state.parameter_expansion
1472
1473 flattened_processors = dict(processors) # type: ignore
1474 flattened_processors.update(expanded_state.processors)
1475 positiontup = expanded_state.positiontup
1476 elif compiled.positional:
1477 positiontup = self.compiled.positiontup
1478 else:
1479 positiontup = None
1480
1481 if compiled.schema_translate_map:
1482 schema_translate_map = self.execution_options.get(
1483 "schema_translate_map", {}
1484 )
1485 rst = compiled.preparer._render_schema_translates
1486 self.unicode_statement = rst(
1487 self.unicode_statement, schema_translate_map
1488 )
1489
1490 # final self.unicode_statement is now assigned, encode if needed
1491 # by dialect
1492 self.statement = self.unicode_statement
1493
1494 # Convert the dictionary of bind parameter values
1495 # into a dict or list to be sent to the DBAPI's
1496 # execute() or executemany() method.
1497
1498 if compiled.positional:
1499 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1500 assert positiontup is not None
1501 for compiled_params in self.compiled_parameters:
1502 l_param: List[Any] = [
1503 (
1504 flattened_processors[key](compiled_params[key])
1505 if key in flattened_processors
1506 else compiled_params[key]
1507 )
1508 for key in positiontup
1509 ]
1510 core_positional_parameters.append(
1511 dialect.execute_sequence_format(l_param)
1512 )
1513
1514 self.parameters = core_positional_parameters
1515 else:
1516 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1517 escaped_names = compiled.escaped_bind_names
1518
1519 # note that currently, "expanded" parameters will be present
1520 # in self.compiled_parameters in their quoted form. This is
1521 # slightly inconsistent with the approach taken as of
1522 # #8056 where self.compiled_parameters is meant to contain unquoted
1523 # param names.
1524 d_param: Dict[str, Any]
1525 for compiled_params in self.compiled_parameters:
1526 if escaped_names:
1527 d_param = {
1528 escaped_names.get(key, key): (
1529 flattened_processors[key](compiled_params[key])
1530 if key in flattened_processors
1531 else compiled_params[key]
1532 )
1533 for key in compiled_params
1534 }
1535 else:
1536 d_param = {
1537 key: (
1538 flattened_processors[key](compiled_params[key])
1539 if key in flattened_processors
1540 else compiled_params[key]
1541 )
1542 for key in compiled_params
1543 }
1544
1545 core_dict_parameters.append(d_param)
1546
1547 self.parameters = core_dict_parameters
1548
1549 return self
1550
1551 @classmethod
1552 def _init_statement(
1553 cls,
1554 dialect: Dialect,
1555 connection: Connection,
1556 dbapi_connection: PoolProxiedConnection,
1557 execution_options: _ExecuteOptions,
1558 statement: str,
1559 parameters: _DBAPIMultiExecuteParams,
1560 ) -> ExecutionContext:
1561 """Initialize execution context for a string SQL statement."""
1562
1563 self = cls.__new__(cls)
1564 self.root_connection = connection
1565 self._dbapi_connection = dbapi_connection
1566 self.dialect = connection.dialect
1567 self.is_text = True
1568
1569 self.execution_options = execution_options
1570
1571 if not parameters:
1572 if self.dialect.positional:
1573 self.parameters = [dialect.execute_sequence_format()]
1574 else:
1575 self.parameters = [self._empty_dict_params]
1576 elif isinstance(parameters[0], dialect.execute_sequence_format):
1577 self.parameters = parameters
1578 elif isinstance(parameters[0], dict):
1579 self.parameters = parameters
1580 else:
1581 self.parameters = [
1582 dialect.execute_sequence_format(p) for p in parameters
1583 ]
1584
1585 if len(parameters) > 1:
1586 self.execute_style = ExecuteStyle.EXECUTEMANY
1587
1588 self.statement = self.unicode_statement = statement
1589
1590 self.cursor = self.create_cursor()
1591 return self
1592
1593 @classmethod
1594 def _init_default(
1595 cls,
1596 dialect: Dialect,
1597 connection: Connection,
1598 dbapi_connection: PoolProxiedConnection,
1599 execution_options: _ExecuteOptions,
1600 ) -> ExecutionContext:
1601 """Initialize execution context for a ColumnDefault construct."""
1602
1603 self = cls.__new__(cls)
1604 self.root_connection = connection
1605 self._dbapi_connection = dbapi_connection
1606 self.dialect = connection.dialect
1607
1608 self.execution_options = execution_options
1609
1610 self.cursor = self.create_cursor()
1611 return self
1612
1613 def _get_cache_stats(self) -> str:
1614 if self.compiled is None:
1615 return "raw sql"
1616
1617 now = perf_counter()
1618
1619 ch = self.cache_hit
1620
1621 gen_time = self.compiled._gen_time
1622 assert gen_time is not None
1623
1624 if ch is NO_CACHE_KEY:
1625 return "no key %.5fs" % (now - gen_time,)
1626 elif ch is CACHE_HIT:
1627 return "cached since %.4gs ago" % (now - gen_time,)
1628 elif ch is CACHE_MISS:
1629 return "generated in %.5fs" % (now - gen_time,)
1630 elif ch is CACHING_DISABLED:
1631 if "_cache_disable_reason" in self.execution_options:
1632 return "caching disabled (%s) %.5fs " % (
1633 self.execution_options["_cache_disable_reason"],
1634 now - gen_time,
1635 )
1636 else:
1637 return "caching disabled %.5fs" % (now - gen_time,)
1638 elif ch is NO_DIALECT_SUPPORT:
1639 return "dialect %s+%s does not support caching %.5fs" % (
1640 self.dialect.name,
1641 self.dialect.driver,
1642 now - gen_time,
1643 )
1644 else:
1645 return "unknown"
1646
1647 @property
1648 def executemany(self): # type: ignore[override]
1649 return self.execute_style in (
1650 ExecuteStyle.EXECUTEMANY,
1651 ExecuteStyle.INSERTMANYVALUES,
1652 )
1653
1654 @util.memoized_property
1655 def identifier_preparer(self):
1656 if self.compiled:
1657 return self.compiled.preparer
1658 elif "schema_translate_map" in self.execution_options:
1659 return self.dialect.identifier_preparer._with_schema_translate(
1660 self.execution_options["schema_translate_map"]
1661 )
1662 else:
1663 return self.dialect.identifier_preparer
1664
1665 @util.memoized_property
1666 def engine(self):
1667 return self.root_connection.engine
1668
1669 @util.memoized_property
1670 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1671 if TYPE_CHECKING:
1672 assert isinstance(self.compiled, SQLCompiler)
1673 return self.compiled.postfetch
1674
1675 @util.memoized_property
1676 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1677 if TYPE_CHECKING:
1678 assert isinstance(self.compiled, SQLCompiler)
1679 if self.isinsert:
1680 return self.compiled.insert_prefetch
1681 elif self.isupdate:
1682 return self.compiled.update_prefetch
1683 else:
1684 return ()
1685
1686 @util.memoized_property
1687 def no_parameters(self):
1688 return self.execution_options.get("no_parameters", False)
1689
1690 def _execute_scalar(
1691 self,
1692 stmt: str,
1693 type_: Optional[TypeEngine[Any]],
1694 parameters: Optional[_DBAPISingleExecuteParams] = None,
1695 ) -> Any:
1696 """Execute a string statement on the current cursor, returning a
1697 scalar result.
1698
1699 Used to fire off sequences, default phrases, and "select lastrowid"
1700 types of statements individually or in the context of a parent INSERT
1701 or UPDATE statement.
1702
1703 """
1704
1705 conn = self.root_connection
1706
1707 if "schema_translate_map" in self.execution_options:
1708 schema_translate_map = self.execution_options.get(
1709 "schema_translate_map", {}
1710 )
1711
1712 rst = self.identifier_preparer._render_schema_translates
1713 stmt = rst(stmt, schema_translate_map)
1714
1715 if not parameters:
1716 if self.dialect.positional:
1717 parameters = self.dialect.execute_sequence_format()
1718 else:
1719 parameters = {}
1720
1721 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1722 row = self.cursor.fetchone()
1723 if row is not None:
1724 r = row[0]
1725 else:
1726 r = None
1727 if type_ is not None:
1728 # apply type post processors to the result
1729 proc = type_._cached_result_processor(
1730 self.dialect, self.cursor.description[0][1]
1731 )
1732 if proc:
1733 return proc(r)
1734 return r
1735
1736 @util.memoized_property
1737 def connection(self):
1738 return self.root_connection
1739
1740 def _use_server_side_cursor(self):
1741 if not self.dialect.supports_server_side_cursors:
1742 return False
1743
1744 if self.dialect.server_side_cursors:
1745 # this is deprecated
1746 use_server_side = self.execution_options.get(
1747 "stream_results", True
1748 ) and (
1749 self.compiled
1750 and isinstance(self.compiled.statement, expression.Selectable)
1751 or (
1752 (
1753 not self.compiled
1754 or isinstance(
1755 self.compiled.statement, expression.TextClause
1756 )
1757 )
1758 and self.unicode_statement
1759 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1760 )
1761 )
1762 else:
1763 use_server_side = self.execution_options.get(
1764 "stream_results", False
1765 )
1766
1767 return use_server_side
1768
1769 def create_cursor(self) -> DBAPICursor:
1770 if (
1771 # inlining initial preference checks for SS cursors
1772 self.dialect.supports_server_side_cursors
1773 and (
1774 self.execution_options.get("stream_results", False)
1775 or (
1776 self.dialect.server_side_cursors
1777 and self._use_server_side_cursor()
1778 )
1779 )
1780 ):
1781 self._is_server_side = True
1782 return self.create_server_side_cursor()
1783 else:
1784 self._is_server_side = False
1785 return self.create_default_cursor()
1786
1787 def fetchall_for_returning(self, cursor):
1788 return cursor.fetchall()
1789
1790 def create_default_cursor(self) -> DBAPICursor:
1791 return self._dbapi_connection.cursor()
1792
1793 def create_server_side_cursor(self) -> DBAPICursor:
1794 raise NotImplementedError()
1795
1796 def pre_exec(self):
1797 pass
1798
1799 def get_out_parameter_values(self, names):
1800 raise NotImplementedError(
1801 "This dialect does not support OUT parameters"
1802 )
1803
1804 def post_exec(self):
1805 pass
1806
1807 def get_result_processor(
1808 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1809 ) -> Optional[_ResultProcessorType[Any]]:
1810 """Return a 'result processor' for a given type as present in
1811 cursor.description.
1812
1813 This has a default implementation that dialects can override
1814 for context-sensitive result type handling.
1815
1816 """
1817 return type_._cached_result_processor(self.dialect, coltype)
1818
1819 def get_lastrowid(self) -> int:
1820 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1821
1822 This may involve calling special cursor functions, issuing a new SELECT
1823 on the cursor (or a new one), or returning a stored value that was
1824 calculated within post_exec().
1825
1826 This function will only be called for dialects which support "implicit"
1827 primary key generation, keep preexecute_autoincrement_sequences set to
1828 False, and when no explicit id value was bound to the statement.
1829
1830 The function is called once for an INSERT statement that would need to
1831 return the last inserted primary key for those dialects that make use
1832 of the lastrowid concept. In these cases, it is called directly after
1833 :meth:`.ExecutionContext.post_exec`.
1834
1835 """
1836 return self.cursor.lastrowid
1837
1838 def handle_dbapi_exception(self, e):
1839 pass
1840
1841 @util.non_memoized_property
1842 def rowcount(self) -> int:
1843 if self._rowcount is not None:
1844 return self._rowcount
1845 else:
1846 return self.cursor.rowcount
1847
1848 @property
1849 def _has_rowcount(self):
1850 return self._rowcount is not None
1851
1852 def supports_sane_rowcount(self):
1853 return self.dialect.supports_sane_rowcount
1854
1855 def supports_sane_multi_rowcount(self):
1856 return self.dialect.supports_sane_multi_rowcount
1857
1858 def _setup_result_proxy(self):
1859 exec_opt = self.execution_options
1860
1861 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1862 self._rowcount = self.cursor.rowcount
1863
1864 yp: Optional[Union[int, bool]]
1865 if self.is_crud or self.is_text:
1866 result = self._setup_dml_or_text_result()
1867 yp = False
1868 else:
1869 yp = exec_opt.get("yield_per", None)
1870 sr = self._is_server_side or exec_opt.get("stream_results", False)
1871 strategy = self.cursor_fetch_strategy
1872 if sr and strategy is _cursor._DEFAULT_FETCH:
1873 strategy = _cursor.BufferedRowCursorFetchStrategy(
1874 self.cursor, self.execution_options
1875 )
1876 cursor_description: _DBAPICursorDescription = (
1877 strategy.alternate_cursor_description
1878 or self.cursor.description
1879 )
1880 if cursor_description is None:
1881 strategy = _cursor._NO_CURSOR_DQL
1882
1883 result = _cursor.CursorResult(self, strategy, cursor_description)
1884
1885 compiled = self.compiled
1886
1887 if (
1888 compiled
1889 and not self.isddl
1890 and cast(SQLCompiler, compiled).has_out_parameters
1891 ):
1892 self._setup_out_parameters(result)
1893
1894 self._soft_closed = result._soft_closed
1895
1896 if yp:
1897 result = result.yield_per(yp)
1898
1899 return result
1900
1901 def _setup_out_parameters(self, result):
1902 compiled = cast(SQLCompiler, self.compiled)
1903
1904 out_bindparams = [
1905 (param, name)
1906 for param, name in compiled.bind_names.items()
1907 if param.isoutparam
1908 ]
1909 out_parameters = {}
1910
1911 for bindparam, raw_value in zip(
1912 [param for param, name in out_bindparams],
1913 self.get_out_parameter_values(
1914 [name for param, name in out_bindparams]
1915 ),
1916 ):
1917 type_ = bindparam.type
1918 impl_type = type_.dialect_impl(self.dialect)
1919 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1920 result_processor = impl_type.result_processor(
1921 self.dialect, dbapi_type
1922 )
1923 if result_processor is not None:
1924 raw_value = result_processor(raw_value)
1925 out_parameters[bindparam.key] = raw_value
1926
1927 result.out_parameters = out_parameters
1928
1929 def _setup_dml_or_text_result(self):
1930 compiled = cast(SQLCompiler, self.compiled)
1931
1932 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1933
1934 if self.isinsert:
1935 if (
1936 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1937 and compiled.effective_returning
1938 ):
1939 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1940 self.cursor,
1941 initial_buffer=self._insertmanyvalues_rows,
1942 # maintain alt cursor description if set by the
1943 # dialect, e.g. mssql preserves it
1944 alternate_description=(
1945 strategy.alternate_cursor_description
1946 ),
1947 )
1948
1949 if compiled.postfetch_lastrowid:
1950 self.inserted_primary_key_rows = (
1951 self._setup_ins_pk_from_lastrowid()
1952 )
1953 # else if not self._is_implicit_returning,
1954 # the default inserted_primary_key_rows accessor will
1955 # return an "empty" primary key collection when accessed.
1956
1957 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1958 strategy = _cursor.BufferedRowCursorFetchStrategy(
1959 self.cursor, self.execution_options
1960 )
1961
1962 if strategy is _cursor._NO_CURSOR_DML:
1963 cursor_description = None
1964 else:
1965 cursor_description = (
1966 strategy.alternate_cursor_description
1967 or self.cursor.description
1968 )
1969
1970 if cursor_description is None:
1971 strategy = _cursor._NO_CURSOR_DML
1972 elif self._num_sentinel_cols:
1973 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1974 # the sentinel columns are handled in CursorResult._init_metadata
1975 # using essentially _reduce
1976
1977 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1978 self, strategy, cursor_description
1979 )
1980
1981 if self.isinsert:
1982 if self._is_implicit_returning:
1983 rows = result.all()
1984
1985 self.returned_default_rows = rows
1986
1987 self.inserted_primary_key_rows = (
1988 self._setup_ins_pk_from_implicit_returning(result, rows)
1989 )
1990
1991 # test that it has a cursor metadata that is accurate. the
1992 # first row will have been fetched and current assumptions
1993 # are that the result has only one row, until executemany()
1994 # support is added here.
1995 assert result._metadata.returns_rows
1996
1997 # Insert statement has both return_defaults() and
1998 # returning(). rewind the result on the list of rows
1999 # we just used.
2000 if self._is_supplemental_returning:
2001 result._rewind(rows)
2002 else:
2003 result._soft_close()
2004 elif not self._is_explicit_returning:
2005 result._soft_close()
2006
2007 # we assume here the result does not return any rows.
2008 # *usually*, this will be true. However, some dialects
2009 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2010 # function so this is not necessarily true.
2011 # assert not result.returns_rows
2012
2013 elif self._is_implicit_returning:
2014 rows = result.all()
2015
2016 if rows:
2017 self.returned_default_rows = rows
2018 self._rowcount = len(rows)
2019
2020 if self._is_supplemental_returning:
2021 result._rewind(rows)
2022 else:
2023 result._soft_close()
2024
2025 # test that it has a cursor metadata that is accurate.
2026 # the rows have all been fetched however.
2027 assert result._metadata.returns_rows
2028
2029 elif not result._metadata.returns_rows:
2030 # no results, get rowcount
2031 # (which requires open cursor on some drivers)
2032 if self._rowcount is None:
2033 self._rowcount = self.cursor.rowcount
2034 result._soft_close()
2035 elif self.isupdate or self.isdelete:
2036 if self._rowcount is None:
2037 self._rowcount = self.cursor.rowcount
2038 return result
2039
2040 @util.memoized_property
2041 def inserted_primary_key_rows(self):
2042 # if no specific "get primary key" strategy was set up
2043 # during execution, return a "default" primary key based
2044 # on what's in the compiled_parameters and nothing else.
2045 return self._setup_ins_pk_from_empty()
2046
2047 def _setup_ins_pk_from_lastrowid(self):
2048 getter = cast(
2049 SQLCompiler, self.compiled
2050 )._inserted_primary_key_from_lastrowid_getter
2051 lastrowid = self.get_lastrowid()
2052 return [getter(lastrowid, self.compiled_parameters[0])]
2053
2054 def _setup_ins_pk_from_empty(self):
2055 getter = cast(
2056 SQLCompiler, self.compiled
2057 )._inserted_primary_key_from_lastrowid_getter
2058 return [getter(None, param) for param in self.compiled_parameters]
2059
2060 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2061 if not rows:
2062 return []
2063
2064 getter = cast(
2065 SQLCompiler, self.compiled
2066 )._inserted_primary_key_from_returning_getter
2067 compiled_params = self.compiled_parameters
2068
2069 return [
2070 getter(row, param) for row, param in zip(rows, compiled_params)
2071 ]
2072
2073 def lastrow_has_defaults(self) -> bool:
2074 return (self.isinsert or self.isupdate) and bool(
2075 cast(SQLCompiler, self.compiled).postfetch
2076 )
2077
2078 def _prepare_set_input_sizes(
2079 self,
2080 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2081 """Given a cursor and ClauseParameters, prepare arguments
2082 in order to call the appropriate
2083 style of ``setinputsizes()`` on the cursor, using DB-API types
2084 from the bind parameter's ``TypeEngine`` objects.
2085
2086 This method only called by those dialects which set the
2087 :attr:`.Dialect.bind_typing` attribute to
2088 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2089 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2090 option.
2091
2092 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2093 for pg8000 and asyncpg, which has been changed to inline rendering
2094 of casts.
2095
2096 """
2097 if self.isddl or self.is_text:
2098 return None
2099
2100 compiled = cast(SQLCompiler, self.compiled)
2101
2102 inputsizes = compiled._get_set_input_sizes_lookup()
2103
2104 if inputsizes is None:
2105 return None
2106
2107 dialect = self.dialect
2108
2109 # all of the rest of this... cython?
2110
2111 if dialect._has_events:
2112 inputsizes = dict(inputsizes)
2113 dialect.dispatch.do_setinputsizes(
2114 inputsizes, self.cursor, self.statement, self.parameters, self
2115 )
2116
2117 if compiled.escaped_bind_names:
2118 escaped_bind_names = compiled.escaped_bind_names
2119 else:
2120 escaped_bind_names = None
2121
2122 if dialect.positional:
2123 items = [
2124 (key, compiled.binds[key])
2125 for key in compiled.positiontup or ()
2126 ]
2127 else:
2128 items = [
2129 (key, bindparam)
2130 for bindparam, key in compiled.bind_names.items()
2131 ]
2132
2133 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2134 for key, bindparam in items:
2135 if bindparam in compiled.literal_execute_params:
2136 continue
2137
2138 if key in self._expanded_parameters:
2139 if is_tuple_type(bindparam.type):
2140 num = len(bindparam.type.types)
2141 dbtypes = inputsizes[bindparam]
2142 generic_inputsizes.extend(
2143 (
2144 (
2145 escaped_bind_names.get(paramname, paramname)
2146 if escaped_bind_names is not None
2147 else paramname
2148 ),
2149 dbtypes[idx % num],
2150 bindparam.type.types[idx % num],
2151 )
2152 for idx, paramname in enumerate(
2153 self._expanded_parameters[key]
2154 )
2155 )
2156 else:
2157 dbtype = inputsizes.get(bindparam, None)
2158 generic_inputsizes.extend(
2159 (
2160 (
2161 escaped_bind_names.get(paramname, paramname)
2162 if escaped_bind_names is not None
2163 else paramname
2164 ),
2165 dbtype,
2166 bindparam.type,
2167 )
2168 for paramname in self._expanded_parameters[key]
2169 )
2170 else:
2171 dbtype = inputsizes.get(bindparam, None)
2172
2173 escaped_name = (
2174 escaped_bind_names.get(key, key)
2175 if escaped_bind_names is not None
2176 else key
2177 )
2178
2179 generic_inputsizes.append(
2180 (escaped_name, dbtype, bindparam.type)
2181 )
2182
2183 return generic_inputsizes
2184
2185 def _exec_default(self, column, default, type_):
2186 if default.is_sequence:
2187 return self.fire_sequence(default, type_)
2188 elif default.is_callable:
2189 # this codepath is not normally used as it's inlined
2190 # into _process_execute_defaults
2191 self.current_column = column
2192 return default.arg(self)
2193 elif default.is_clause_element:
2194 return self._exec_default_clause_element(column, default, type_)
2195 else:
2196 # this codepath is not normally used as it's inlined
2197 # into _process_execute_defaults
2198 return default.arg
2199
2200 def _exec_default_clause_element(self, column, default, type_):
2201 # execute a default that's a complete clause element. Here, we have
2202 # to re-implement a miniature version of the compile->parameters->
2203 # cursor.execute() sequence, since we don't want to modify the state
2204 # of the connection / result in progress or create new connection/
2205 # result objects etc.
2206 # .. versionchanged:: 1.4
2207
2208 if not default._arg_is_typed:
2209 default_arg = expression.type_coerce(default.arg, type_)
2210 else:
2211 default_arg = default.arg
2212 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2213 compiled_params = compiled.construct_params()
2214 processors = compiled._bind_processors
2215 if compiled.positional:
2216 parameters = self.dialect.execute_sequence_format(
2217 [
2218 (
2219 processors[key](compiled_params[key]) # type: ignore
2220 if key in processors
2221 else compiled_params[key]
2222 )
2223 for key in compiled.positiontup or ()
2224 ]
2225 )
2226 else:
2227 parameters = {
2228 key: (
2229 processors[key](compiled_params[key]) # type: ignore
2230 if key in processors
2231 else compiled_params[key]
2232 )
2233 for key in compiled_params
2234 }
2235 return self._execute_scalar(
2236 str(compiled), type_, parameters=parameters
2237 )
2238
2239 current_parameters: Optional[_CoreSingleExecuteParams] = None
2240 """A dictionary of parameters applied to the current row.
2241
2242 This attribute is only available in the context of a user-defined default
2243 generation function, e.g. as described at :ref:`context_default_functions`.
2244 It consists of a dictionary which includes entries for each column/value
2245 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2246 dictionary will be the key value of each :class:`_schema.Column`,
2247 which is usually
2248 synonymous with the name.
2249
2250 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2251 does not accommodate for the "multi-values" feature of the
2252 :meth:`_expression.Insert.values` method. The
2253 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2254 preferred.
2255
2256 .. seealso::
2257
2258 :meth:`.DefaultExecutionContext.get_current_parameters`
2259
2260 :ref:`context_default_functions`
2261
2262 """
2263
2264 def get_current_parameters(self, isolate_multiinsert_groups=True):
2265 """Return a dictionary of parameters applied to the current row.
2266
2267 This method can only be used in the context of a user-defined default
2268 generation function, e.g. as described at
2269 :ref:`context_default_functions`. When invoked, a dictionary is
2270 returned which includes entries for each column/value pair that is part
2271 of the INSERT or UPDATE statement. The keys of the dictionary will be
2272 the key value of each :class:`_schema.Column`,
2273 which is usually synonymous
2274 with the name.
2275
2276 :param isolate_multiinsert_groups=True: indicates that multi-valued
2277 INSERT constructs created using :meth:`_expression.Insert.values`
2278 should be
2279 handled by returning only the subset of parameters that are local
2280 to the current column default invocation. When ``False``, the
2281 raw parameters of the statement are returned including the
2282 naming convention used in the case of multi-valued INSERT.
2283
2284 .. seealso::
2285
2286 :attr:`.DefaultExecutionContext.current_parameters`
2287
2288 :ref:`context_default_functions`
2289
2290 """
2291 try:
2292 parameters = self.current_parameters
2293 column = self.current_column
2294 except AttributeError:
2295 raise exc.InvalidRequestError(
2296 "get_current_parameters() can only be invoked in the "
2297 "context of a Python side column default function"
2298 )
2299 else:
2300 assert column is not None
2301 assert parameters is not None
2302 compile_state = cast(
2303 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2304 )
2305 assert compile_state is not None
2306 if (
2307 isolate_multiinsert_groups
2308 and dml.isinsert(compile_state)
2309 and compile_state._has_multi_parameters
2310 ):
2311 if column._is_multiparam_column:
2312 index = column.index + 1
2313 d = {column.original.key: parameters[column.key]}
2314 else:
2315 d = {column.key: parameters[column.key]}
2316 index = 0
2317 assert compile_state._dict_parameters is not None
2318 keys = compile_state._dict_parameters.keys()
2319 d.update(
2320 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2321 )
2322 return d
2323 else:
2324 return parameters
2325
2326 def get_insert_default(self, column):
2327 if column.default is None:
2328 return None
2329 else:
2330 return self._exec_default(column, column.default, column.type)
2331
2332 def get_update_default(self, column):
2333 if column.onupdate is None:
2334 return None
2335 else:
2336 return self._exec_default(column, column.onupdate, column.type)
2337
2338 def _process_execute_defaults(self):
2339 compiled = cast(SQLCompiler, self.compiled)
2340
2341 key_getter = compiled._within_exec_param_key_getter
2342
2343 sentinel_counter = 0
2344
2345 if compiled.insert_prefetch:
2346 prefetch_recs = [
2347 (
2348 c,
2349 key_getter(c),
2350 c._default_description_tuple,
2351 self.get_insert_default,
2352 )
2353 for c in compiled.insert_prefetch
2354 ]
2355 elif compiled.update_prefetch:
2356 prefetch_recs = [
2357 (
2358 c,
2359 key_getter(c),
2360 c._onupdate_description_tuple,
2361 self.get_update_default,
2362 )
2363 for c in compiled.update_prefetch
2364 ]
2365 else:
2366 prefetch_recs = []
2367
2368 for param in self.compiled_parameters:
2369 self.current_parameters = param
2370
2371 for (
2372 c,
2373 param_key,
2374 (arg, is_scalar, is_callable, is_sentinel),
2375 fallback,
2376 ) in prefetch_recs:
2377 if is_sentinel:
2378 param[param_key] = sentinel_counter
2379 sentinel_counter += 1
2380 elif is_scalar:
2381 param[param_key] = arg
2382 elif is_callable:
2383 self.current_column = c
2384 param[param_key] = arg(self)
2385 else:
2386 val = fallback(c)
2387 if val is not None:
2388 param[param_key] = val
2389
2390 del self.current_parameters
2391
2392
2393DefaultDialect.execution_ctx_cls = DefaultExecutionContext