1# engine/default.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import MutableSequence
34from typing import Optional
35from typing import Sequence
36from typing import Set
37from typing import Tuple
38from typing import Type
39from typing import TYPE_CHECKING
40from typing import Union
41import weakref
42
43from . import characteristics
44from . import cursor as _cursor
45from . import interfaces
46from .base import Connection
47from .interfaces import CacheStats
48from .interfaces import DBAPICursor
49from .interfaces import Dialect
50from .interfaces import ExecuteStyle
51from .interfaces import ExecutionContext
52from .reflection import ObjectKind
53from .reflection import ObjectScope
54from .. import event
55from .. import exc
56from .. import pool
57from .. import util
58from ..sql import compiler
59from ..sql import dml
60from ..sql import expression
61from ..sql import type_api
62from ..sql import util as sql_util
63from ..sql._typing import is_tuple_type
64from ..sql.base import _NoArg
65from ..sql.compiler import DDLCompiler
66from ..sql.compiler import InsertmanyvaluesSentinelOpts
67from ..sql.compiler import SQLCompiler
68from ..sql.elements import quoted_name
69from ..util.typing import Literal
70from ..util.typing import TupleAny
71from ..util.typing import Unpack
72
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _ExecuteOptions
84 from .interfaces import _MutableCoreSingleExecuteParams
85 from .interfaces import _ParamStyle
86 from .interfaces import DBAPIConnection
87 from .interfaces import IsolationLevel
88 from .row import Row
89 from .url import URL
90 from ..event import _ListenerFnType
91 from ..pool import Pool
92 from ..pool import PoolProxiedConnection
93 from ..sql import Executable
94 from ..sql.compiler import Compiled
95 from ..sql.compiler import Linting
96 from ..sql.compiler import ResultColumnsEntry
97 from ..sql.dml import DMLState
98 from ..sql.dml import UpdateBase
99 from ..sql.elements import BindParameter
100 from ..sql.schema import Column
101 from ..sql.type_api import _BindProcessorType
102 from ..sql.type_api import _ResultProcessorType
103 from ..sql.type_api import TypeEngine
104
105# When we're handed literal SQL, ensure it's a SELECT query
106SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
107
108
109(
110 CACHE_HIT,
111 CACHE_MISS,
112 CACHING_DISABLED,
113 NO_CACHE_KEY,
114 NO_DIALECT_SUPPORT,
115) = list(CacheStats)
116
117
118class DefaultDialect(Dialect):
119 """Default implementation of Dialect"""
120
121 statement_compiler = compiler.SQLCompiler
122 ddl_compiler = compiler.DDLCompiler
123 type_compiler_cls = compiler.GenericTypeCompiler
124
125 preparer = compiler.IdentifierPreparer
126 supports_alter = True
127 supports_comments = False
128 supports_constraint_comments = False
129 inline_comments = False
130 supports_statement_cache = True
131
132 div_is_floordiv = True
133
134 bind_typing = interfaces.BindTyping.NONE
135
136 include_set_input_sizes: Optional[Set[Any]] = None
137 exclude_set_input_sizes: Optional[Set[Any]] = None
138
139 # the first value we'd get for an autoincrement column.
140 default_sequence_base = 1
141
142 # most DBAPIs happy with this for execute().
143 # not cx_oracle.
144 execute_sequence_format = tuple
145
146 supports_schemas = True
147 supports_views = True
148 supports_sequences = False
149 sequences_optional = False
150 preexecute_autoincrement_sequences = False
151 supports_identity_columns = False
152 postfetch_lastrowid = True
153 favor_returning_over_lastrowid = False
154 insert_null_pk_still_autoincrements = False
155 update_returning = False
156 delete_returning = False
157 update_returning_multifrom = False
158 delete_returning_multifrom = False
159 insert_returning = False
160
161 cte_follows_insert = False
162
163 supports_native_enum = False
164 supports_native_boolean = False
165 supports_native_uuid = False
166 returns_native_bytes = False
167
168 non_native_boolean_check_constraint = True
169
170 supports_simple_order_by_label = True
171
172 tuple_in_values = False
173
174 connection_characteristics = util.immutabledict(
175 {
176 "isolation_level": characteristics.IsolationLevelCharacteristic(),
177 "logging_token": characteristics.LoggingTokenCharacteristic(),
178 }
179 )
180
181 engine_config_types: Mapping[str, Any] = util.immutabledict(
182 {
183 "pool_timeout": util.asint,
184 "echo": util.bool_or_str("debug"),
185 "echo_pool": util.bool_or_str("debug"),
186 "pool_recycle": util.asint,
187 "pool_size": util.asint,
188 "max_overflow": util.asint,
189 "future": util.asbool,
190 }
191 )
192
193 # if the NUMERIC type
194 # returns decimal.Decimal.
195 # *not* the FLOAT type however.
196 supports_native_decimal = False
197
198 name = "default"
199
200 # length at which to truncate
201 # any identifier.
202 max_identifier_length = 9999
203 _user_defined_max_identifier_length: Optional[int] = None
204
205 isolation_level: Optional[str] = None
206
207 # sub-categories of max_identifier_length.
208 # currently these accommodate for MySQL which allows alias names
209 # of 255 but DDL names only of 64.
210 max_index_name_length: Optional[int] = None
211 max_constraint_name_length: Optional[int] = None
212
213 supports_sane_rowcount = True
214 supports_sane_multi_rowcount = True
215 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
216 default_paramstyle = "named"
217
218 supports_default_values = False
219 """dialect supports INSERT... DEFAULT VALUES syntax"""
220
221 supports_default_metavalue = False
222 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
223
224 default_metavalue_token = "DEFAULT"
225 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
226 parenthesis."""
227
228 # not sure if this is a real thing but the compiler will deliver it
229 # if this is the only flag enabled.
230 supports_empty_insert = True
231 """dialect supports INSERT () VALUES ()"""
232
233 supports_multivalues_insert = False
234
235 use_insertmanyvalues: bool = False
236
237 use_insertmanyvalues_wo_returning: bool = False
238
239 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
240 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
241 )
242
243 insertmanyvalues_page_size: int = 1000
244 insertmanyvalues_max_parameters = 32700
245
246 supports_is_distinct_from = True
247
248 supports_server_side_cursors = False
249
250 server_side_cursors = False
251
252 # extra record-level locking features (#4860)
253 supports_for_update_of = False
254
255 server_version_info = None
256
257 default_schema_name: Optional[str] = None
258
259 # indicates symbol names are
260 # UPPERCASEd if they are case insensitive
261 # within the database.
262 # if this is True, the methods normalize_name()
263 # and denormalize_name() must be provided.
264 requires_name_normalize = False
265
266 is_async = False
267
268 has_terminate = False
269
270 # TODO: this is not to be part of 2.0. implement rudimentary binary
271 # literals for SQLite, PostgreSQL, MySQL only within
272 # _Binary.literal_processor
273 _legacy_binary_type_literal_encoding = "utf-8"
274
275 @util.deprecated_params(
276 empty_in_strategy=(
277 "1.4",
278 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
279 "deprecated, and no longer has any effect. All IN expressions "
280 "are now rendered using "
281 'the "expanding parameter" strategy which renders a set of bound'
282 'expressions, or an "empty set" SELECT, at statement execution'
283 "time.",
284 ),
285 server_side_cursors=(
286 "1.4",
287 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
288 "is deprecated and will be removed in a future release. Please "
289 "use the "
290 ":paramref:`_engine.Connection.execution_options.stream_results` "
291 "parameter.",
292 ),
293 )
294 def __init__(
295 self,
296 paramstyle: Optional[_ParamStyle] = None,
297 isolation_level: Optional[IsolationLevel] = None,
298 dbapi: Optional[ModuleType] = None,
299 implicit_returning: Literal[True] = True,
300 supports_native_boolean: Optional[bool] = None,
301 max_identifier_length: Optional[int] = None,
302 label_length: Optional[int] = None,
303 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
304 use_insertmanyvalues: Optional[bool] = None,
305 # util.deprecated_params decorator cannot render the
306 # Linting.NO_LINTING constant
307 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
308 server_side_cursors: bool = False,
309 **kwargs: Any,
310 ):
311 if server_side_cursors:
312 if not self.supports_server_side_cursors:
313 raise exc.ArgumentError(
314 "Dialect %s does not support server side cursors" % self
315 )
316 else:
317 self.server_side_cursors = True
318
319 if getattr(self, "use_setinputsizes", False):
320 util.warn_deprecated(
321 "The dialect-level use_setinputsizes attribute is "
322 "deprecated. Please use "
323 "bind_typing = BindTyping.SETINPUTSIZES",
324 "2.0",
325 )
326 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
327
328 self.positional = False
329 self._ischema = None
330
331 self.dbapi = dbapi
332
333 if paramstyle is not None:
334 self.paramstyle = paramstyle
335 elif self.dbapi is not None:
336 self.paramstyle = self.dbapi.paramstyle
337 else:
338 self.paramstyle = self.default_paramstyle
339 self.positional = self.paramstyle in (
340 "qmark",
341 "format",
342 "numeric",
343 "numeric_dollar",
344 )
345 self.identifier_preparer = self.preparer(self)
346 self._on_connect_isolation_level = isolation_level
347
348 legacy_tt_callable = getattr(self, "type_compiler", None)
349 if legacy_tt_callable is not None:
350 tt_callable = cast(
351 Type[compiler.GenericTypeCompiler],
352 self.type_compiler,
353 )
354 else:
355 tt_callable = self.type_compiler_cls
356
357 self.type_compiler_instance = self.type_compiler = tt_callable(self)
358
359 if supports_native_boolean is not None:
360 self.supports_native_boolean = supports_native_boolean
361
362 self._user_defined_max_identifier_length = max_identifier_length
363 if self._user_defined_max_identifier_length:
364 self.max_identifier_length = (
365 self._user_defined_max_identifier_length
366 )
367 self.label_length = label_length
368 self.compiler_linting = compiler_linting
369
370 if use_insertmanyvalues is not None:
371 self.use_insertmanyvalues = use_insertmanyvalues
372
373 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
374 self.insertmanyvalues_page_size = insertmanyvalues_page_size
375
376 @property
377 @util.deprecated(
378 "2.0",
379 "full_returning is deprecated, please use insert_returning, "
380 "update_returning, delete_returning",
381 )
382 def full_returning(self):
383 return (
384 self.insert_returning
385 and self.update_returning
386 and self.delete_returning
387 )
388
389 @util.memoized_property
390 def insert_executemany_returning(self):
391 """Default implementation for insert_executemany_returning, if not
392 otherwise overridden by the specific dialect.
393
394 The default dialect determines "insert_executemany_returning" is
395 available if the dialect in use has opted into using the
396 "use_insertmanyvalues" feature. If they haven't opted into that, then
397 this attribute is False, unless the dialect in question overrides this
398 and provides some other implementation (such as the Oracle dialect).
399
400 """
401 return self.insert_returning and self.use_insertmanyvalues
402
403 @util.memoized_property
404 def insert_executemany_returning_sort_by_parameter_order(self):
405 """Default implementation for
406 insert_executemany_returning_deterministic_order, if not otherwise
407 overridden by the specific dialect.
408
409 The default dialect determines "insert_executemany_returning" can have
410 deterministic order only if the dialect in use has opted into using the
411 "use_insertmanyvalues" feature, which implements deterministic ordering
412 using client side sentinel columns only by default. The
413 "insertmanyvalues" feature also features alternate forms that can
414 use server-generated PK values as "sentinels", but those are only
415 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
416 bitflag enables those alternate SQL forms, which are disabled
417 by default.
418
419 If the dialect in use hasn't opted into that, then this attribute is
420 False, unless the dialect in question overrides this and provides some
421 other implementation (such as the Oracle dialect).
422
423 """
424 return self.insert_returning and self.use_insertmanyvalues
425
426 update_executemany_returning = False
427 delete_executemany_returning = False
428
429 @util.memoized_property
430 def loaded_dbapi(self) -> ModuleType:
431 if self.dbapi is None:
432 raise exc.InvalidRequestError(
433 f"Dialect {self} does not have a Python DBAPI established "
434 "and cannot be used for actual database interaction"
435 )
436 return self.dbapi
437
438 @util.memoized_property
439 def _bind_typing_render_casts(self):
440 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
441
442 def _ensure_has_table_connection(self, arg):
443 if not isinstance(arg, Connection):
444 raise exc.ArgumentError(
445 "The argument passed to Dialect.has_table() should be a "
446 "%s, got %s. "
447 "Additionally, the Dialect.has_table() method is for "
448 "internal dialect "
449 "use only; please use "
450 "``inspect(some_engine).has_table(<tablename>>)`` "
451 "for public API use." % (Connection, type(arg))
452 )
453
454 @util.memoized_property
455 def _supports_statement_cache(self):
456 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
457 if ssc is None:
458 util.warn(
459 "Dialect %s:%s will not make use of SQL compilation caching "
460 "as it does not set the 'supports_statement_cache' attribute "
461 "to ``True``. This can have "
462 "significant performance implications including some "
463 "performance degradations in comparison to prior SQLAlchemy "
464 "versions. Dialect maintainers should seek to set this "
465 "attribute to True after appropriate development and testing "
466 "for SQLAlchemy 1.4 caching support. Alternatively, this "
467 "attribute may be set to False which will disable this "
468 "warning." % (self.name, self.driver),
469 code="cprf",
470 )
471
472 return bool(ssc)
473
474 @util.memoized_property
475 def _type_memos(self):
476 return weakref.WeakKeyDictionary()
477
478 @property
479 def dialect_description(self):
480 return self.name + "+" + self.driver
481
482 @property
483 def supports_sane_rowcount_returning(self):
484 """True if this dialect supports sane rowcount even if RETURNING is
485 in use.
486
487 For dialects that don't support RETURNING, this is synonymous with
488 ``supports_sane_rowcount``.
489
490 """
491 return self.supports_sane_rowcount
492
493 @classmethod
494 def get_pool_class(cls, url: URL) -> Type[Pool]:
495 default: Type[pool.Pool]
496 if cls.is_async:
497 default = pool.AsyncAdaptedQueuePool
498 else:
499 default = pool.QueuePool
500
501 return getattr(cls, "poolclass", default)
502
503 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
504 return self.get_pool_class(url)
505
506 @classmethod
507 def load_provisioning(cls):
508 package = ".".join(cls.__module__.split(".")[0:-1])
509 try:
510 __import__(package + ".provision")
511 except ImportError:
512 pass
513
514 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
515 if self._on_connect_isolation_level is not None:
516
517 def builtin_connect(dbapi_conn, conn_rec):
518 self._assert_and_set_isolation_level(
519 dbapi_conn, self._on_connect_isolation_level
520 )
521
522 return builtin_connect
523 else:
524 return None
525
526 def initialize(self, connection):
527 try:
528 self.server_version_info = self._get_server_version_info(
529 connection
530 )
531 except NotImplementedError:
532 self.server_version_info = None
533 try:
534 self.default_schema_name = self._get_default_schema_name(
535 connection
536 )
537 except NotImplementedError:
538 self.default_schema_name = None
539
540 try:
541 self.default_isolation_level = self.get_default_isolation_level(
542 connection.connection.dbapi_connection
543 )
544 except NotImplementedError:
545 self.default_isolation_level = None
546
547 if not self._user_defined_max_identifier_length:
548 max_ident_length = self._check_max_identifier_length(connection)
549 if max_ident_length:
550 self.max_identifier_length = max_ident_length
551
552 if (
553 self.label_length
554 and self.label_length > self.max_identifier_length
555 ):
556 raise exc.ArgumentError(
557 "Label length of %d is greater than this dialect's"
558 " maximum identifier length of %d"
559 % (self.label_length, self.max_identifier_length)
560 )
561
562 def on_connect(self):
563 # inherits the docstring from interfaces.Dialect.on_connect
564 return None
565
566 def _check_max_identifier_length(self, connection):
567 """Perform a connection / server version specific check to determine
568 the max_identifier_length.
569
570 If the dialect's class level max_identifier_length should be used,
571 can return None.
572
573 .. versionadded:: 1.3.9
574
575 """
576 return None
577
578 def get_default_isolation_level(self, dbapi_conn):
579 """Given a DBAPI connection, return its isolation level, or
580 a default isolation level if one cannot be retrieved.
581
582 May be overridden by subclasses in order to provide a
583 "fallback" isolation level for databases that cannot reliably
584 retrieve the actual isolation level.
585
586 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
587 method, propagating any exceptions raised.
588
589 .. versionadded:: 1.3.22
590
591 """
592 return self.get_isolation_level(dbapi_conn)
593
594 def type_descriptor(self, typeobj):
595 """Provide a database-specific :class:`.TypeEngine` object, given
596 the generic object which comes from the types module.
597
598 This method looks for a dictionary called
599 ``colspecs`` as a class or instance-level variable,
600 and passes on to :func:`_types.adapt_type`.
601
602 """
603 return type_api.adapt_type(typeobj, self.colspecs)
604
605 def has_index(self, connection, table_name, index_name, schema=None, **kw):
606 if not self.has_table(connection, table_name, schema=schema, **kw):
607 return False
608 for idx in self.get_indexes(
609 connection, table_name, schema=schema, **kw
610 ):
611 if idx["name"] == index_name:
612 return True
613 else:
614 return False
615
616 def has_schema(
617 self, connection: Connection, schema_name: str, **kw: Any
618 ) -> bool:
619 return schema_name in self.get_schema_names(connection, **kw)
620
621 def validate_identifier(self, ident):
622 if len(ident) > self.max_identifier_length:
623 raise exc.IdentifierError(
624 "Identifier '%s' exceeds maximum length of %d characters"
625 % (ident, self.max_identifier_length)
626 )
627
628 def connect(self, *cargs, **cparams):
629 # inherits the docstring from interfaces.Dialect.connect
630 return self.loaded_dbapi.connect(*cargs, **cparams)
631
632 def create_connect_args(self, url):
633 # inherits the docstring from interfaces.Dialect.create_connect_args
634 opts = url.translate_connect_args()
635 opts.update(url.query)
636 return ([], opts)
637
638 def set_engine_execution_options(
639 self, engine: Engine, opts: Mapping[str, Any]
640 ) -> None:
641 supported_names = set(self.connection_characteristics).intersection(
642 opts
643 )
644 if supported_names:
645 characteristics: Mapping[str, Any] = util.immutabledict(
646 (name, opts[name]) for name in supported_names
647 )
648
649 @event.listens_for(engine, "engine_connect")
650 def set_connection_characteristics(connection):
651 self._set_connection_characteristics(
652 connection, characteristics
653 )
654
655 def set_connection_execution_options(
656 self, connection: Connection, opts: Mapping[str, Any]
657 ) -> None:
658 supported_names = set(self.connection_characteristics).intersection(
659 opts
660 )
661 if supported_names:
662 characteristics: Mapping[str, Any] = util.immutabledict(
663 (name, opts[name]) for name in supported_names
664 )
665 self._set_connection_characteristics(connection, characteristics)
666
667 def _set_connection_characteristics(self, connection, characteristics):
668 characteristic_values = [
669 (name, self.connection_characteristics[name], value)
670 for name, value in characteristics.items()
671 ]
672
673 if connection.in_transaction():
674 trans_objs = [
675 (name, obj)
676 for name, obj, _ in characteristic_values
677 if obj.transactional
678 ]
679 if trans_objs:
680 raise exc.InvalidRequestError(
681 "This connection has already initialized a SQLAlchemy "
682 "Transaction() object via begin() or autobegin; "
683 "%s may not be altered unless rollback() or commit() "
684 "is called first."
685 % (", ".join(name for name, obj in trans_objs))
686 )
687
688 dbapi_connection = connection.connection.dbapi_connection
689 for _, characteristic, value in characteristic_values:
690 characteristic.set_connection_characteristic(
691 self, connection, dbapi_connection, value
692 )
693 connection.connection._connection_record.finalize_callback.append(
694 functools.partial(self._reset_characteristics, characteristics)
695 )
696
697 def _reset_characteristics(self, characteristics, dbapi_connection):
698 for characteristic_name in characteristics:
699 characteristic = self.connection_characteristics[
700 characteristic_name
701 ]
702 characteristic.reset_characteristic(self, dbapi_connection)
703
704 def do_begin(self, dbapi_connection):
705 pass
706
707 def do_rollback(self, dbapi_connection):
708 dbapi_connection.rollback()
709
710 def do_commit(self, dbapi_connection):
711 dbapi_connection.commit()
712
713 def do_terminate(self, dbapi_connection):
714 self.do_close(dbapi_connection)
715
716 def do_close(self, dbapi_connection):
717 dbapi_connection.close()
718
719 @util.memoized_property
720 def _dialect_specific_select_one(self):
721 return str(expression.select(1).compile(dialect=self))
722
723 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
724 try:
725 return self.do_ping(dbapi_connection)
726 except self.loaded_dbapi.Error as err:
727 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
728
729 if self._has_events:
730 try:
731 Connection._handle_dbapi_exception_noconnection(
732 err,
733 self,
734 is_disconnect=is_disconnect,
735 invalidate_pool_on_disconnect=False,
736 is_pre_ping=True,
737 )
738 except exc.StatementError as new_err:
739 is_disconnect = new_err.connection_invalidated
740
741 if is_disconnect:
742 return False
743 else:
744 raise
745
746 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
747 cursor = None
748
749 cursor = dbapi_connection.cursor()
750 try:
751 cursor.execute(self._dialect_specific_select_one)
752 finally:
753 cursor.close()
754 return True
755
756 def create_xid(self):
757 """Create a random two-phase transaction ID.
758
759 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
760 do_commit_twophase(). Its format is unspecified.
761 """
762
763 return "_sa_%032x" % random.randint(0, 2**128)
764
765 def do_savepoint(self, connection, name):
766 connection.execute(expression.SavepointClause(name))
767
768 def do_rollback_to_savepoint(self, connection, name):
769 connection.execute(expression.RollbackToSavepointClause(name))
770
771 def do_release_savepoint(self, connection, name):
772 connection.execute(expression.ReleaseSavepointClause(name))
773
774 def _deliver_insertmanyvalues_batches(
775 self,
776 connection,
777 cursor,
778 statement,
779 parameters,
780 generic_setinputsizes,
781 context,
782 ):
783 context = cast(DefaultExecutionContext, context)
784 compiled = cast(SQLCompiler, context.compiled)
785
786 _composite_sentinel_proc: Sequence[
787 Optional[_ResultProcessorType[Any]]
788 ] = ()
789 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
790 _sentinel_proc_initialized: bool = False
791
792 compiled_parameters = context.compiled_parameters
793
794 imv = compiled._insertmanyvalues
795 assert imv is not None
796
797 is_returning: Final[bool] = bool(compiled.effective_returning)
798 batch_size = context.execution_options.get(
799 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
800 )
801
802 if compiled.schema_translate_map:
803 schema_translate_map = context.execution_options.get(
804 "schema_translate_map", {}
805 )
806 else:
807 schema_translate_map = None
808
809 if is_returning:
810 result: Optional[List[Any]] = []
811 context._insertmanyvalues_rows = result
812
813 sort_by_parameter_order = imv.sort_by_parameter_order
814
815 else:
816 sort_by_parameter_order = False
817 result = None
818
819 for imv_batch in compiled._deliver_insertmanyvalues_batches(
820 statement,
821 parameters,
822 compiled_parameters,
823 generic_setinputsizes,
824 batch_size,
825 sort_by_parameter_order,
826 schema_translate_map,
827 ):
828 yield imv_batch
829
830 if is_returning:
831
832 try:
833 rows = context.fetchall_for_returning(cursor)
834 except BaseException as be:
835 connection._handle_dbapi_exception(
836 be,
837 sql_util._long_statement(imv_batch.replaced_statement),
838 imv_batch.replaced_parameters,
839 None,
840 context,
841 is_sub_exec=True,
842 )
843
844 # I would have thought "is_returning: Final[bool]"
845 # would have assured this but pylance thinks not
846 assert result is not None
847
848 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
849 composite_sentinel = imv.num_sentinel_columns > 1
850 if imv.implicit_sentinel:
851 # for implicit sentinel, which is currently single-col
852 # integer autoincrement, do a simple sort.
853 assert not composite_sentinel
854 result.extend(
855 sorted(rows, key=operator.itemgetter(-1))
856 )
857 continue
858
859 # otherwise, create dictionaries to match up batches
860 # with parameters
861 assert imv.sentinel_param_keys
862 assert imv.sentinel_columns
863
864 _nsc = imv.num_sentinel_columns
865
866 if not _sentinel_proc_initialized:
867 if composite_sentinel:
868 _composite_sentinel_proc = [
869 col.type._cached_result_processor(
870 self, cursor_desc[1]
871 )
872 for col, cursor_desc in zip(
873 imv.sentinel_columns,
874 cursor.description[-_nsc:],
875 )
876 ]
877 else:
878 _scalar_sentinel_proc = (
879 imv.sentinel_columns[0]
880 ).type._cached_result_processor(
881 self, cursor.description[-1][1]
882 )
883 _sentinel_proc_initialized = True
884
885 rows_by_sentinel: Union[
886 Dict[Tuple[Any, ...], Any],
887 Dict[Any, Any],
888 ]
889 if composite_sentinel:
890 rows_by_sentinel = {
891 tuple(
892 (proc(val) if proc else val)
893 for val, proc in zip(
894 row[-_nsc:], _composite_sentinel_proc
895 )
896 ): row
897 for row in rows
898 }
899 elif _scalar_sentinel_proc:
900 rows_by_sentinel = {
901 _scalar_sentinel_proc(row[-1]): row for row in rows
902 }
903 else:
904 rows_by_sentinel = {row[-1]: row for row in rows}
905
906 if len(rows_by_sentinel) != len(imv_batch.batch):
907 # see test_insert_exec.py::
908 # IMVSentinelTest::test_sentinel_incorrect_rowcount
909 # for coverage / demonstration
910 raise exc.InvalidRequestError(
911 f"Sentinel-keyed result set did not produce "
912 f"correct number of rows {len(imv_batch.batch)}; "
913 "produced "
914 f"{len(rows_by_sentinel)}. Please ensure the "
915 "sentinel column is fully unique and populated in "
916 "all cases."
917 )
918
919 try:
920 ordered_rows = [
921 rows_by_sentinel[sentinel_keys]
922 for sentinel_keys in imv_batch.sentinel_values
923 ]
924 except KeyError as ke:
925 # see test_insert_exec.py::
926 # IMVSentinelTest::test_sentinel_cant_match_keys
927 # for coverage / demonstration
928 raise exc.InvalidRequestError(
929 f"Can't match sentinel values in result set to "
930 f"parameter sets; key {ke.args[0]!r} was not "
931 "found. "
932 "There may be a mismatch between the datatype "
933 "passed to the DBAPI driver vs. that which it "
934 "returns in a result row. Ensure the given "
935 "Python value matches the expected result type "
936 "*exactly*, taking care to not rely upon implicit "
937 "conversions which may occur such as when using "
938 "strings in place of UUID or integer values, etc. "
939 ) from ke
940
941 result.extend(ordered_rows)
942
943 else:
944 result.extend(rows)
945
946 def do_executemany(self, cursor, statement, parameters, context=None):
947 cursor.executemany(statement, parameters)
948
949 def do_execute(self, cursor, statement, parameters, context=None):
950 cursor.execute(statement, parameters)
951
952 def do_execute_no_params(self, cursor, statement, context=None):
953 cursor.execute(statement)
954
955 def is_disconnect(self, e, connection, cursor):
956 return False
957
958 @util.memoized_instancemethod
959 def _gen_allowed_isolation_levels(self, dbapi_conn):
960 try:
961 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
962 except NotImplementedError:
963 return None
964 else:
965 normalized_levels = [
966 level.replace("_", " ").upper() for level in raw_levels
967 ]
968 if raw_levels != normalized_levels:
969 raise ValueError(
970 f"Dialect {self.name!r} get_isolation_level_values() "
971 f"method should return names as UPPERCASE using spaces, "
972 f"not underscores; got "
973 f"{sorted(set(raw_levels).difference(normalized_levels))}"
974 )
975 return tuple(normalized_levels)
976
977 def _assert_and_set_isolation_level(self, dbapi_conn, level):
978 level = level.replace("_", " ").upper()
979
980 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
981 dbapi_conn
982 )
983 if (
984 _allowed_isolation_levels
985 and level not in _allowed_isolation_levels
986 ):
987 raise exc.ArgumentError(
988 f"Invalid value {level!r} for isolation_level. "
989 f"Valid isolation levels for {self.name!r} are "
990 f"{', '.join(_allowed_isolation_levels)}"
991 )
992
993 self.set_isolation_level(dbapi_conn, level)
994
995 def reset_isolation_level(self, dbapi_conn):
996 if self._on_connect_isolation_level is not None:
997 assert (
998 self._on_connect_isolation_level == "AUTOCOMMIT"
999 or self._on_connect_isolation_level
1000 == self.default_isolation_level
1001 )
1002 self._assert_and_set_isolation_level(
1003 dbapi_conn, self._on_connect_isolation_level
1004 )
1005 else:
1006 assert self.default_isolation_level is not None
1007 self._assert_and_set_isolation_level(
1008 dbapi_conn,
1009 self.default_isolation_level,
1010 )
1011
1012 def normalize_name(self, name):
1013 if name is None:
1014 return None
1015
1016 name_lower = name.lower()
1017 name_upper = name.upper()
1018
1019 if name_upper == name_lower:
1020 # name has no upper/lower conversion, e.g. non-european characters.
1021 # return unchanged
1022 return name
1023 elif name_upper == name and not (
1024 self.identifier_preparer._requires_quotes
1025 )(name_lower):
1026 # name is all uppercase and doesn't require quoting; normalize
1027 # to all lower case
1028 return name_lower
1029 elif name_lower == name:
1030 # name is all lower case, which if denormalized means we need to
1031 # force quoting on it
1032 return quoted_name(name, quote=True)
1033 else:
1034 # name is mixed case, means it will be quoted in SQL when used
1035 # later, no normalizes
1036 return name
1037
1038 def denormalize_name(self, name):
1039 if name is None:
1040 return None
1041
1042 name_lower = name.lower()
1043 name_upper = name.upper()
1044
1045 if name_upper == name_lower:
1046 # name has no upper/lower conversion, e.g. non-european characters.
1047 # return unchanged
1048 return name
1049 elif name_lower == name and not (
1050 self.identifier_preparer._requires_quotes
1051 )(name_lower):
1052 name = name_upper
1053 return name
1054
1055 def get_driver_connection(self, connection):
1056 return connection
1057
1058 def _overrides_default(self, method):
1059 return (
1060 getattr(type(self), method).__code__
1061 is not getattr(DefaultDialect, method).__code__
1062 )
1063
1064 def _default_multi_reflect(
1065 self,
1066 single_tbl_method,
1067 connection,
1068 kind,
1069 schema,
1070 filter_names,
1071 scope,
1072 **kw,
1073 ):
1074 names_fns = []
1075 temp_names_fns = []
1076 if ObjectKind.TABLE in kind:
1077 names_fns.append(self.get_table_names)
1078 temp_names_fns.append(self.get_temp_table_names)
1079 if ObjectKind.VIEW in kind:
1080 names_fns.append(self.get_view_names)
1081 temp_names_fns.append(self.get_temp_view_names)
1082 if ObjectKind.MATERIALIZED_VIEW in kind:
1083 names_fns.append(self.get_materialized_view_names)
1084 # no temp materialized view at the moment
1085 # temp_names_fns.append(self.get_temp_materialized_view_names)
1086
1087 unreflectable = kw.pop("unreflectable", {})
1088
1089 if (
1090 filter_names
1091 and scope is ObjectScope.ANY
1092 and kind is ObjectKind.ANY
1093 ):
1094 # if names are given and no qualification on type of table
1095 # (i.e. the Table(..., autoload) case), take the names as given,
1096 # don't run names queries. If a table does not exit
1097 # NoSuchTableError is raised and it's skipped
1098
1099 # this also suits the case for mssql where we can reflect
1100 # individual temp tables but there's no temp_names_fn
1101 names = filter_names
1102 else:
1103 names = []
1104 name_kw = {"schema": schema, **kw}
1105 fns = []
1106 if ObjectScope.DEFAULT in scope:
1107 fns.extend(names_fns)
1108 if ObjectScope.TEMPORARY in scope:
1109 fns.extend(temp_names_fns)
1110
1111 for fn in fns:
1112 try:
1113 names.extend(fn(connection, **name_kw))
1114 except NotImplementedError:
1115 pass
1116
1117 if filter_names:
1118 filter_names = set(filter_names)
1119
1120 # iterate over all the tables/views and call the single table method
1121 for table in names:
1122 if not filter_names or table in filter_names:
1123 key = (schema, table)
1124 try:
1125 yield (
1126 key,
1127 single_tbl_method(
1128 connection, table, schema=schema, **kw
1129 ),
1130 )
1131 except exc.UnreflectableTableError as err:
1132 if key not in unreflectable:
1133 unreflectable[key] = err
1134 except exc.NoSuchTableError:
1135 pass
1136
1137 def get_multi_table_options(self, connection, **kw):
1138 return self._default_multi_reflect(
1139 self.get_table_options, connection, **kw
1140 )
1141
1142 def get_multi_columns(self, connection, **kw):
1143 return self._default_multi_reflect(self.get_columns, connection, **kw)
1144
1145 def get_multi_pk_constraint(self, connection, **kw):
1146 return self._default_multi_reflect(
1147 self.get_pk_constraint, connection, **kw
1148 )
1149
1150 def get_multi_foreign_keys(self, connection, **kw):
1151 return self._default_multi_reflect(
1152 self.get_foreign_keys, connection, **kw
1153 )
1154
1155 def get_multi_indexes(self, connection, **kw):
1156 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1157
1158 def get_multi_unique_constraints(self, connection, **kw):
1159 return self._default_multi_reflect(
1160 self.get_unique_constraints, connection, **kw
1161 )
1162
1163 def get_multi_check_constraints(self, connection, **kw):
1164 return self._default_multi_reflect(
1165 self.get_check_constraints, connection, **kw
1166 )
1167
1168 def get_multi_table_comment(self, connection, **kw):
1169 return self._default_multi_reflect(
1170 self.get_table_comment, connection, **kw
1171 )
1172
1173
1174class StrCompileDialect(DefaultDialect):
1175 statement_compiler = compiler.StrSQLCompiler
1176 ddl_compiler = compiler.DDLCompiler
1177 type_compiler_cls = compiler.StrSQLTypeCompiler
1178 preparer = compiler.IdentifierPreparer
1179
1180 insert_returning = True
1181 update_returning = True
1182 delete_returning = True
1183
1184 supports_statement_cache = True
1185
1186 supports_identity_columns = True
1187
1188 supports_sequences = True
1189 sequences_optional = True
1190 preexecute_autoincrement_sequences = False
1191
1192 supports_native_boolean = True
1193
1194 supports_multivalues_insert = True
1195 supports_simple_order_by_label = True
1196
1197
1198class DefaultExecutionContext(ExecutionContext):
1199 isinsert = False
1200 isupdate = False
1201 isdelete = False
1202 is_crud = False
1203 is_text = False
1204 isddl = False
1205
1206 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1207
1208 compiled: Optional[Compiled] = None
1209 result_column_struct: Optional[
1210 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1211 ] = None
1212 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1213
1214 execution_options: _ExecuteOptions = util.EMPTY_DICT
1215
1216 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1217
1218 invoked_statement: Optional[Executable] = None
1219
1220 _is_implicit_returning = False
1221 _is_explicit_returning = False
1222 _is_supplemental_returning = False
1223 _is_server_side = False
1224
1225 _soft_closed = False
1226
1227 _rowcount: Optional[int] = None
1228
1229 # a hook for SQLite's translation of
1230 # result column names
1231 # NOTE: pyhive is using this hook, can't remove it :(
1232 _translate_colname: Optional[Callable[[str], str]] = None
1233
1234 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1235 """used by set_input_sizes().
1236
1237 This collection comes from ``ExpandedState.parameter_expansion``.
1238
1239 """
1240
1241 cache_hit = NO_CACHE_KEY
1242
1243 root_connection: Connection
1244 _dbapi_connection: PoolProxiedConnection
1245 dialect: Dialect
1246 unicode_statement: str
1247 cursor: DBAPICursor
1248 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1249 parameters: _DBAPIMultiExecuteParams
1250 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1251
1252 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1253
1254 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1255 _num_sentinel_cols: int = 0
1256
1257 @classmethod
1258 def _init_ddl(
1259 cls,
1260 dialect: Dialect,
1261 connection: Connection,
1262 dbapi_connection: PoolProxiedConnection,
1263 execution_options: _ExecuteOptions,
1264 compiled_ddl: DDLCompiler,
1265 ) -> ExecutionContext:
1266 """Initialize execution context for an ExecutableDDLElement
1267 construct."""
1268
1269 self = cls.__new__(cls)
1270 self.root_connection = connection
1271 self._dbapi_connection = dbapi_connection
1272 self.dialect = connection.dialect
1273
1274 self.compiled = compiled = compiled_ddl
1275 self.isddl = True
1276
1277 self.execution_options = execution_options
1278
1279 self.unicode_statement = str(compiled)
1280 if compiled.schema_translate_map:
1281 schema_translate_map = self.execution_options.get(
1282 "schema_translate_map", {}
1283 )
1284
1285 rst = compiled.preparer._render_schema_translates
1286 self.unicode_statement = rst(
1287 self.unicode_statement, schema_translate_map
1288 )
1289
1290 self.statement = self.unicode_statement
1291
1292 self.cursor = self.create_cursor()
1293 self.compiled_parameters = []
1294
1295 if dialect.positional:
1296 self.parameters = [dialect.execute_sequence_format()]
1297 else:
1298 self.parameters = [self._empty_dict_params]
1299
1300 return self
1301
1302 @classmethod
1303 def _init_compiled(
1304 cls,
1305 dialect: Dialect,
1306 connection: Connection,
1307 dbapi_connection: PoolProxiedConnection,
1308 execution_options: _ExecuteOptions,
1309 compiled: SQLCompiler,
1310 parameters: _CoreMultiExecuteParams,
1311 invoked_statement: Executable,
1312 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1313 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1314 ) -> ExecutionContext:
1315 """Initialize execution context for a Compiled construct."""
1316
1317 self = cls.__new__(cls)
1318 self.root_connection = connection
1319 self._dbapi_connection = dbapi_connection
1320 self.dialect = connection.dialect
1321 self.extracted_parameters = extracted_parameters
1322 self.invoked_statement = invoked_statement
1323 self.compiled = compiled
1324 self.cache_hit = cache_hit
1325
1326 self.execution_options = execution_options
1327
1328 self.result_column_struct = (
1329 compiled._result_columns,
1330 compiled._ordered_columns,
1331 compiled._textual_ordered_columns,
1332 compiled._ad_hoc_textual,
1333 compiled._loose_column_name_matching,
1334 )
1335
1336 self.isinsert = ii = compiled.isinsert
1337 self.isupdate = iu = compiled.isupdate
1338 self.isdelete = id_ = compiled.isdelete
1339 self.is_text = compiled.isplaintext
1340
1341 if ii or iu or id_:
1342 dml_statement = compiled.compile_state.statement # type: ignore
1343 if TYPE_CHECKING:
1344 assert isinstance(dml_statement, UpdateBase)
1345 self.is_crud = True
1346 self._is_explicit_returning = ier = bool(dml_statement._returning)
1347 self._is_implicit_returning = iir = bool(
1348 compiled.implicit_returning
1349 )
1350 if iir and dml_statement._supplemental_returning:
1351 self._is_supplemental_returning = True
1352
1353 # dont mix implicit and explicit returning
1354 assert not (iir and ier)
1355
1356 if (ier or iir) and compiled.for_executemany:
1357 if ii and not self.dialect.insert_executemany_returning:
1358 raise exc.InvalidRequestError(
1359 f"Dialect {self.dialect.dialect_description} with "
1360 f"current server capabilities does not support "
1361 "INSERT..RETURNING when executemany is used"
1362 )
1363 elif (
1364 ii
1365 and dml_statement._sort_by_parameter_order
1366 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1367 ):
1368 raise exc.InvalidRequestError(
1369 f"Dialect {self.dialect.dialect_description} with "
1370 f"current server capabilities does not support "
1371 "INSERT..RETURNING with deterministic row ordering "
1372 "when executemany is used"
1373 )
1374 elif (
1375 ii
1376 and self.dialect.use_insertmanyvalues
1377 and not compiled._insertmanyvalues
1378 ):
1379 raise exc.InvalidRequestError(
1380 'Statement does not have "insertmanyvalues" '
1381 "enabled, can't use INSERT..RETURNING with "
1382 "executemany in this case."
1383 )
1384 elif iu and not self.dialect.update_executemany_returning:
1385 raise exc.InvalidRequestError(
1386 f"Dialect {self.dialect.dialect_description} with "
1387 f"current server capabilities does not support "
1388 "UPDATE..RETURNING when executemany is used"
1389 )
1390 elif id_ and not self.dialect.delete_executemany_returning:
1391 raise exc.InvalidRequestError(
1392 f"Dialect {self.dialect.dialect_description} with "
1393 f"current server capabilities does not support "
1394 "DELETE..RETURNING when executemany is used"
1395 )
1396
1397 if not parameters:
1398 self.compiled_parameters = [
1399 compiled.construct_params(
1400 extracted_parameters=extracted_parameters,
1401 escape_names=False,
1402 )
1403 ]
1404 else:
1405 self.compiled_parameters = [
1406 compiled.construct_params(
1407 m,
1408 escape_names=False,
1409 _group_number=grp,
1410 extracted_parameters=extracted_parameters,
1411 )
1412 for grp, m in enumerate(parameters)
1413 ]
1414
1415 if len(parameters) > 1:
1416 if self.isinsert and compiled._insertmanyvalues:
1417 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1418
1419 imv = compiled._insertmanyvalues
1420 if imv.sentinel_columns is not None:
1421 self._num_sentinel_cols = imv.num_sentinel_columns
1422 else:
1423 self.execute_style = ExecuteStyle.EXECUTEMANY
1424
1425 self.unicode_statement = compiled.string
1426
1427 self.cursor = self.create_cursor()
1428
1429 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1430 self._process_execute_defaults()
1431
1432 processors = compiled._bind_processors
1433
1434 flattened_processors: Mapping[
1435 str, _BindProcessorType[Any]
1436 ] = processors # type: ignore[assignment]
1437
1438 if compiled.literal_execute_params or compiled.post_compile_params:
1439 if self.executemany:
1440 raise exc.InvalidRequestError(
1441 "'literal_execute' or 'expanding' parameters can't be "
1442 "used with executemany()"
1443 )
1444
1445 expanded_state = compiled._process_parameters_for_postcompile(
1446 self.compiled_parameters[0]
1447 )
1448
1449 # re-assign self.unicode_statement
1450 self.unicode_statement = expanded_state.statement
1451
1452 self._expanded_parameters = expanded_state.parameter_expansion
1453
1454 flattened_processors = dict(processors) # type: ignore
1455 flattened_processors.update(expanded_state.processors)
1456 positiontup = expanded_state.positiontup
1457 elif compiled.positional:
1458 positiontup = self.compiled.positiontup
1459 else:
1460 positiontup = None
1461
1462 if compiled.schema_translate_map:
1463 schema_translate_map = self.execution_options.get(
1464 "schema_translate_map", {}
1465 )
1466 rst = compiled.preparer._render_schema_translates
1467 self.unicode_statement = rst(
1468 self.unicode_statement, schema_translate_map
1469 )
1470
1471 # final self.unicode_statement is now assigned, encode if needed
1472 # by dialect
1473 self.statement = self.unicode_statement
1474
1475 # Convert the dictionary of bind parameter values
1476 # into a dict or list to be sent to the DBAPI's
1477 # execute() or executemany() method.
1478
1479 if compiled.positional:
1480 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1481 assert positiontup is not None
1482 for compiled_params in self.compiled_parameters:
1483 l_param: List[Any] = [
1484 (
1485 flattened_processors[key](compiled_params[key])
1486 if key in flattened_processors
1487 else compiled_params[key]
1488 )
1489 for key in positiontup
1490 ]
1491 core_positional_parameters.append(
1492 dialect.execute_sequence_format(l_param)
1493 )
1494
1495 self.parameters = core_positional_parameters
1496 else:
1497 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1498 escaped_names = compiled.escaped_bind_names
1499
1500 # note that currently, "expanded" parameters will be present
1501 # in self.compiled_parameters in their quoted form. This is
1502 # slightly inconsistent with the approach taken as of
1503 # #8056 where self.compiled_parameters is meant to contain unquoted
1504 # param names.
1505 d_param: Dict[str, Any]
1506 for compiled_params in self.compiled_parameters:
1507 if escaped_names:
1508 d_param = {
1509 escaped_names.get(key, key): (
1510 flattened_processors[key](compiled_params[key])
1511 if key in flattened_processors
1512 else compiled_params[key]
1513 )
1514 for key in compiled_params
1515 }
1516 else:
1517 d_param = {
1518 key: (
1519 flattened_processors[key](compiled_params[key])
1520 if key in flattened_processors
1521 else compiled_params[key]
1522 )
1523 for key in compiled_params
1524 }
1525
1526 core_dict_parameters.append(d_param)
1527
1528 self.parameters = core_dict_parameters
1529
1530 return self
1531
1532 @classmethod
1533 def _init_statement(
1534 cls,
1535 dialect: Dialect,
1536 connection: Connection,
1537 dbapi_connection: PoolProxiedConnection,
1538 execution_options: _ExecuteOptions,
1539 statement: str,
1540 parameters: _DBAPIMultiExecuteParams,
1541 ) -> ExecutionContext:
1542 """Initialize execution context for a string SQL statement."""
1543
1544 self = cls.__new__(cls)
1545 self.root_connection = connection
1546 self._dbapi_connection = dbapi_connection
1547 self.dialect = connection.dialect
1548 self.is_text = True
1549
1550 self.execution_options = execution_options
1551
1552 if not parameters:
1553 if self.dialect.positional:
1554 self.parameters = [dialect.execute_sequence_format()]
1555 else:
1556 self.parameters = [self._empty_dict_params]
1557 elif isinstance(parameters[0], dialect.execute_sequence_format):
1558 self.parameters = parameters
1559 elif isinstance(parameters[0], dict):
1560 self.parameters = parameters
1561 else:
1562 self.parameters = [
1563 dialect.execute_sequence_format(p) for p in parameters
1564 ]
1565
1566 if len(parameters) > 1:
1567 self.execute_style = ExecuteStyle.EXECUTEMANY
1568
1569 self.statement = self.unicode_statement = statement
1570
1571 self.cursor = self.create_cursor()
1572 return self
1573
1574 @classmethod
1575 def _init_default(
1576 cls,
1577 dialect: Dialect,
1578 connection: Connection,
1579 dbapi_connection: PoolProxiedConnection,
1580 execution_options: _ExecuteOptions,
1581 ) -> ExecutionContext:
1582 """Initialize execution context for a ColumnDefault construct."""
1583
1584 self = cls.__new__(cls)
1585 self.root_connection = connection
1586 self._dbapi_connection = dbapi_connection
1587 self.dialect = connection.dialect
1588
1589 self.execution_options = execution_options
1590
1591 self.cursor = self.create_cursor()
1592 return self
1593
1594 def _get_cache_stats(self) -> str:
1595 if self.compiled is None:
1596 return "raw sql"
1597
1598 now = perf_counter()
1599
1600 ch = self.cache_hit
1601
1602 gen_time = self.compiled._gen_time
1603 assert gen_time is not None
1604
1605 if ch is NO_CACHE_KEY:
1606 return "no key %.5fs" % (now - gen_time,)
1607 elif ch is CACHE_HIT:
1608 return "cached since %.4gs ago" % (now - gen_time,)
1609 elif ch is CACHE_MISS:
1610 return "generated in %.5fs" % (now - gen_time,)
1611 elif ch is CACHING_DISABLED:
1612 if "_cache_disable_reason" in self.execution_options:
1613 return "caching disabled (%s) %.5fs " % (
1614 self.execution_options["_cache_disable_reason"],
1615 now - gen_time,
1616 )
1617 else:
1618 return "caching disabled %.5fs" % (now - gen_time,)
1619 elif ch is NO_DIALECT_SUPPORT:
1620 return "dialect %s+%s does not support caching %.5fs" % (
1621 self.dialect.name,
1622 self.dialect.driver,
1623 now - gen_time,
1624 )
1625 else:
1626 return "unknown"
1627
1628 @property
1629 def executemany(self):
1630 return self.execute_style in (
1631 ExecuteStyle.EXECUTEMANY,
1632 ExecuteStyle.INSERTMANYVALUES,
1633 )
1634
1635 @util.memoized_property
1636 def identifier_preparer(self):
1637 if self.compiled:
1638 return self.compiled.preparer
1639 elif "schema_translate_map" in self.execution_options:
1640 return self.dialect.identifier_preparer._with_schema_translate(
1641 self.execution_options["schema_translate_map"]
1642 )
1643 else:
1644 return self.dialect.identifier_preparer
1645
1646 @util.memoized_property
1647 def engine(self):
1648 return self.root_connection.engine
1649
1650 @util.memoized_property
1651 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1652 if TYPE_CHECKING:
1653 assert isinstance(self.compiled, SQLCompiler)
1654 return self.compiled.postfetch
1655
1656 @util.memoized_property
1657 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1658 if TYPE_CHECKING:
1659 assert isinstance(self.compiled, SQLCompiler)
1660 if self.isinsert:
1661 return self.compiled.insert_prefetch
1662 elif self.isupdate:
1663 return self.compiled.update_prefetch
1664 else:
1665 return ()
1666
1667 @util.memoized_property
1668 def no_parameters(self):
1669 return self.execution_options.get("no_parameters", False)
1670
1671 def _execute_scalar(self, stmt, type_, parameters=None):
1672 """Execute a string statement on the current cursor, returning a
1673 scalar result.
1674
1675 Used to fire off sequences, default phrases, and "select lastrowid"
1676 types of statements individually or in the context of a parent INSERT
1677 or UPDATE statement.
1678
1679 """
1680
1681 conn = self.root_connection
1682
1683 if "schema_translate_map" in self.execution_options:
1684 schema_translate_map = self.execution_options.get(
1685 "schema_translate_map", {}
1686 )
1687
1688 rst = self.identifier_preparer._render_schema_translates
1689 stmt = rst(stmt, schema_translate_map)
1690
1691 if not parameters:
1692 if self.dialect.positional:
1693 parameters = self.dialect.execute_sequence_format()
1694 else:
1695 parameters = {}
1696
1697 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1698 row = self.cursor.fetchone()
1699 if row is not None:
1700 r = row[0]
1701 else:
1702 r = None
1703 if type_ is not None:
1704 # apply type post processors to the result
1705 proc = type_._cached_result_processor(
1706 self.dialect, self.cursor.description[0][1]
1707 )
1708 if proc:
1709 return proc(r)
1710 return r
1711
1712 @util.memoized_property
1713 def connection(self):
1714 return self.root_connection
1715
1716 def _use_server_side_cursor(self):
1717 if not self.dialect.supports_server_side_cursors:
1718 return False
1719
1720 if self.dialect.server_side_cursors:
1721 # this is deprecated
1722 use_server_side = self.execution_options.get(
1723 "stream_results", True
1724 ) and (
1725 self.compiled
1726 and isinstance(self.compiled.statement, expression.Selectable)
1727 or (
1728 (
1729 not self.compiled
1730 or isinstance(
1731 self.compiled.statement, expression.TextClause
1732 )
1733 )
1734 and self.unicode_statement
1735 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1736 )
1737 )
1738 else:
1739 use_server_side = self.execution_options.get(
1740 "stream_results", False
1741 )
1742
1743 return use_server_side
1744
1745 def create_cursor(self):
1746 if (
1747 # inlining initial preference checks for SS cursors
1748 self.dialect.supports_server_side_cursors
1749 and (
1750 self.execution_options.get("stream_results", False)
1751 or (
1752 self.dialect.server_side_cursors
1753 and self._use_server_side_cursor()
1754 )
1755 )
1756 ):
1757 self._is_server_side = True
1758 return self.create_server_side_cursor()
1759 else:
1760 self._is_server_side = False
1761 return self.create_default_cursor()
1762
1763 def fetchall_for_returning(self, cursor):
1764 return cursor.fetchall()
1765
1766 def create_default_cursor(self):
1767 return self._dbapi_connection.cursor()
1768
1769 def create_server_side_cursor(self):
1770 raise NotImplementedError()
1771
1772 def pre_exec(self):
1773 pass
1774
1775 def get_out_parameter_values(self, names):
1776 raise NotImplementedError(
1777 "This dialect does not support OUT parameters"
1778 )
1779
1780 def post_exec(self):
1781 pass
1782
1783 def get_result_processor(self, type_, colname, coltype):
1784 """Return a 'result processor' for a given type as present in
1785 cursor.description.
1786
1787 This has a default implementation that dialects can override
1788 for context-sensitive result type handling.
1789
1790 """
1791 return type_._cached_result_processor(self.dialect, coltype)
1792
1793 def get_lastrowid(self):
1794 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1795
1796 This may involve calling special cursor functions, issuing a new SELECT
1797 on the cursor (or a new one), or returning a stored value that was
1798 calculated within post_exec().
1799
1800 This function will only be called for dialects which support "implicit"
1801 primary key generation, keep preexecute_autoincrement_sequences set to
1802 False, and when no explicit id value was bound to the statement.
1803
1804 The function is called once for an INSERT statement that would need to
1805 return the last inserted primary key for those dialects that make use
1806 of the lastrowid concept. In these cases, it is called directly after
1807 :meth:`.ExecutionContext.post_exec`.
1808
1809 """
1810 return self.cursor.lastrowid
1811
1812 def handle_dbapi_exception(self, e):
1813 pass
1814
1815 @util.non_memoized_property
1816 def rowcount(self) -> int:
1817 if self._rowcount is not None:
1818 return self._rowcount
1819 else:
1820 return self.cursor.rowcount
1821
1822 @property
1823 def _has_rowcount(self):
1824 return self._rowcount is not None
1825
1826 def supports_sane_rowcount(self):
1827 return self.dialect.supports_sane_rowcount
1828
1829 def supports_sane_multi_rowcount(self):
1830 return self.dialect.supports_sane_multi_rowcount
1831
1832 def _setup_result_proxy(self):
1833 exec_opt = self.execution_options
1834
1835 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1836 self._rowcount = self.cursor.rowcount
1837
1838 if self.is_crud or self.is_text:
1839 result = self._setup_dml_or_text_result()
1840 yp = sr = False
1841 else:
1842 yp = exec_opt.get("yield_per", None)
1843 sr = self._is_server_side or exec_opt.get("stream_results", False)
1844 strategy = self.cursor_fetch_strategy
1845 if sr and strategy is _cursor._DEFAULT_FETCH:
1846 strategy = _cursor.BufferedRowCursorFetchStrategy(
1847 self.cursor, self.execution_options
1848 )
1849 cursor_description: _DBAPICursorDescription = (
1850 strategy.alternate_cursor_description
1851 or self.cursor.description
1852 )
1853 if cursor_description is None:
1854 strategy = _cursor._NO_CURSOR_DQL
1855
1856 result = _cursor.CursorResult(self, strategy, cursor_description)
1857
1858 compiled = self.compiled
1859
1860 if (
1861 compiled
1862 and not self.isddl
1863 and cast(SQLCompiler, compiled).has_out_parameters
1864 ):
1865 self._setup_out_parameters(result)
1866
1867 self._soft_closed = result._soft_closed
1868
1869 if yp:
1870 result = result.yield_per(yp)
1871
1872 return result
1873
1874 def _setup_out_parameters(self, result):
1875 compiled = cast(SQLCompiler, self.compiled)
1876
1877 out_bindparams = [
1878 (param, name)
1879 for param, name in compiled.bind_names.items()
1880 if param.isoutparam
1881 ]
1882 out_parameters = {}
1883
1884 for bindparam, raw_value in zip(
1885 [param for param, name in out_bindparams],
1886 self.get_out_parameter_values(
1887 [name for param, name in out_bindparams]
1888 ),
1889 ):
1890 type_ = bindparam.type
1891 impl_type = type_.dialect_impl(self.dialect)
1892 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1893 result_processor = impl_type.result_processor(
1894 self.dialect, dbapi_type
1895 )
1896 if result_processor is not None:
1897 raw_value = result_processor(raw_value)
1898 out_parameters[bindparam.key] = raw_value
1899
1900 result.out_parameters = out_parameters
1901
1902 def _setup_dml_or_text_result(self):
1903 compiled = cast(SQLCompiler, self.compiled)
1904
1905 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1906
1907 if self.isinsert:
1908 if (
1909 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1910 and compiled.effective_returning
1911 ):
1912 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1913 self.cursor,
1914 initial_buffer=self._insertmanyvalues_rows,
1915 # maintain alt cursor description if set by the
1916 # dialect, e.g. mssql preserves it
1917 alternate_description=(
1918 strategy.alternate_cursor_description
1919 ),
1920 )
1921
1922 if compiled.postfetch_lastrowid:
1923 self.inserted_primary_key_rows = (
1924 self._setup_ins_pk_from_lastrowid()
1925 )
1926 # else if not self._is_implicit_returning,
1927 # the default inserted_primary_key_rows accessor will
1928 # return an "empty" primary key collection when accessed.
1929
1930 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1931 strategy = _cursor.BufferedRowCursorFetchStrategy(
1932 self.cursor, self.execution_options
1933 )
1934
1935 if strategy is _cursor._NO_CURSOR_DML:
1936 cursor_description = None
1937 else:
1938 cursor_description = (
1939 strategy.alternate_cursor_description
1940 or self.cursor.description
1941 )
1942
1943 if cursor_description is None:
1944 strategy = _cursor._NO_CURSOR_DML
1945 elif self._num_sentinel_cols:
1946 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1947 # strip out the sentinel columns from cursor description
1948 # a similar logic is done to the rows only in CursorResult
1949 cursor_description = cursor_description[
1950 0 : -self._num_sentinel_cols
1951 ]
1952
1953 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1954 self, strategy, cursor_description
1955 )
1956
1957 if self.isinsert:
1958 if self._is_implicit_returning:
1959 rows = result.all()
1960
1961 self.returned_default_rows = rows
1962
1963 self.inserted_primary_key_rows = (
1964 self._setup_ins_pk_from_implicit_returning(result, rows)
1965 )
1966
1967 # test that it has a cursor metadata that is accurate. the
1968 # first row will have been fetched and current assumptions
1969 # are that the result has only one row, until executemany()
1970 # support is added here.
1971 assert result._metadata.returns_rows
1972
1973 # Insert statement has both return_defaults() and
1974 # returning(). rewind the result on the list of rows
1975 # we just used.
1976 if self._is_supplemental_returning:
1977 result._rewind(rows)
1978 else:
1979 result._soft_close()
1980 elif not self._is_explicit_returning:
1981 result._soft_close()
1982
1983 # we assume here the result does not return any rows.
1984 # *usually*, this will be true. However, some dialects
1985 # such as that of MSSQL/pyodbc need to SELECT a post fetch
1986 # function so this is not necessarily true.
1987 # assert not result.returns_rows
1988
1989 elif self._is_implicit_returning:
1990 rows = result.all()
1991
1992 if rows:
1993 self.returned_default_rows = rows
1994 self._rowcount = len(rows)
1995
1996 if self._is_supplemental_returning:
1997 result._rewind(rows)
1998 else:
1999 result._soft_close()
2000
2001 # test that it has a cursor metadata that is accurate.
2002 # the rows have all been fetched however.
2003 assert result._metadata.returns_rows
2004
2005 elif not result._metadata.returns_rows:
2006 # no results, get rowcount
2007 # (which requires open cursor on some drivers)
2008 if self._rowcount is None:
2009 self._rowcount = self.cursor.rowcount
2010 result._soft_close()
2011 elif self.isupdate or self.isdelete:
2012 if self._rowcount is None:
2013 self._rowcount = self.cursor.rowcount
2014 return result
2015
2016 @util.memoized_property
2017 def inserted_primary_key_rows(self):
2018 # if no specific "get primary key" strategy was set up
2019 # during execution, return a "default" primary key based
2020 # on what's in the compiled_parameters and nothing else.
2021 return self._setup_ins_pk_from_empty()
2022
2023 def _setup_ins_pk_from_lastrowid(self):
2024 getter = cast(
2025 SQLCompiler, self.compiled
2026 )._inserted_primary_key_from_lastrowid_getter
2027 lastrowid = self.get_lastrowid()
2028 return [getter(lastrowid, self.compiled_parameters[0])]
2029
2030 def _setup_ins_pk_from_empty(self):
2031 getter = cast(
2032 SQLCompiler, self.compiled
2033 )._inserted_primary_key_from_lastrowid_getter
2034 return [getter(None, param) for param in self.compiled_parameters]
2035
2036 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2037 if not rows:
2038 return []
2039
2040 getter = cast(
2041 SQLCompiler, self.compiled
2042 )._inserted_primary_key_from_returning_getter
2043 compiled_params = self.compiled_parameters
2044
2045 return [
2046 getter(row, param) for row, param in zip(rows, compiled_params)
2047 ]
2048
2049 def lastrow_has_defaults(self):
2050 return (self.isinsert or self.isupdate) and bool(
2051 cast(SQLCompiler, self.compiled).postfetch
2052 )
2053
2054 def _prepare_set_input_sizes(
2055 self,
2056 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2057 """Given a cursor and ClauseParameters, prepare arguments
2058 in order to call the appropriate
2059 style of ``setinputsizes()`` on the cursor, using DB-API types
2060 from the bind parameter's ``TypeEngine`` objects.
2061
2062 This method only called by those dialects which set
2063 the :attr:`.Dialect.bind_typing` attribute to
2064 :attr:`.BindTyping.SETINPUTSIZES`. cx_Oracle is the only DBAPI
2065 that requires setinputsizes(), pyodbc offers it as an option.
2066
2067 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2068 for pg8000 and asyncpg, which has been changed to inline rendering
2069 of casts.
2070
2071 """
2072 if self.isddl or self.is_text:
2073 return None
2074
2075 compiled = cast(SQLCompiler, self.compiled)
2076
2077 inputsizes = compiled._get_set_input_sizes_lookup()
2078
2079 if inputsizes is None:
2080 return None
2081
2082 dialect = self.dialect
2083
2084 # all of the rest of this... cython?
2085
2086 if dialect._has_events:
2087 inputsizes = dict(inputsizes)
2088 dialect.dispatch.do_setinputsizes(
2089 inputsizes, self.cursor, self.statement, self.parameters, self
2090 )
2091
2092 if compiled.escaped_bind_names:
2093 escaped_bind_names = compiled.escaped_bind_names
2094 else:
2095 escaped_bind_names = None
2096
2097 if dialect.positional:
2098 items = [
2099 (key, compiled.binds[key])
2100 for key in compiled.positiontup or ()
2101 ]
2102 else:
2103 items = [
2104 (key, bindparam)
2105 for bindparam, key in compiled.bind_names.items()
2106 ]
2107
2108 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2109 for key, bindparam in items:
2110 if bindparam in compiled.literal_execute_params:
2111 continue
2112
2113 if key in self._expanded_parameters:
2114 if is_tuple_type(bindparam.type):
2115 num = len(bindparam.type.types)
2116 dbtypes = inputsizes[bindparam]
2117 generic_inputsizes.extend(
2118 (
2119 (
2120 escaped_bind_names.get(paramname, paramname)
2121 if escaped_bind_names is not None
2122 else paramname
2123 ),
2124 dbtypes[idx % num],
2125 bindparam.type.types[idx % num],
2126 )
2127 for idx, paramname in enumerate(
2128 self._expanded_parameters[key]
2129 )
2130 )
2131 else:
2132 dbtype = inputsizes.get(bindparam, None)
2133 generic_inputsizes.extend(
2134 (
2135 (
2136 escaped_bind_names.get(paramname, paramname)
2137 if escaped_bind_names is not None
2138 else paramname
2139 ),
2140 dbtype,
2141 bindparam.type,
2142 )
2143 for paramname in self._expanded_parameters[key]
2144 )
2145 else:
2146 dbtype = inputsizes.get(bindparam, None)
2147
2148 escaped_name = (
2149 escaped_bind_names.get(key, key)
2150 if escaped_bind_names is not None
2151 else key
2152 )
2153
2154 generic_inputsizes.append(
2155 (escaped_name, dbtype, bindparam.type)
2156 )
2157
2158 return generic_inputsizes
2159
2160 def _exec_default(self, column, default, type_):
2161 if default.is_sequence:
2162 return self.fire_sequence(default, type_)
2163 elif default.is_callable:
2164 # this codepath is not normally used as it's inlined
2165 # into _process_execute_defaults
2166 self.current_column = column
2167 return default.arg(self)
2168 elif default.is_clause_element:
2169 return self._exec_default_clause_element(column, default, type_)
2170 else:
2171 # this codepath is not normally used as it's inlined
2172 # into _process_execute_defaults
2173 return default.arg
2174
2175 def _exec_default_clause_element(self, column, default, type_):
2176 # execute a default that's a complete clause element. Here, we have
2177 # to re-implement a miniature version of the compile->parameters->
2178 # cursor.execute() sequence, since we don't want to modify the state
2179 # of the connection / result in progress or create new connection/
2180 # result objects etc.
2181 # .. versionchanged:: 1.4
2182
2183 if not default._arg_is_typed:
2184 default_arg = expression.type_coerce(default.arg, type_)
2185 else:
2186 default_arg = default.arg
2187 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2188 compiled_params = compiled.construct_params()
2189 processors = compiled._bind_processors
2190 if compiled.positional:
2191 parameters = self.dialect.execute_sequence_format(
2192 [
2193 (
2194 processors[key](compiled_params[key]) # type: ignore
2195 if key in processors
2196 else compiled_params[key]
2197 )
2198 for key in compiled.positiontup or ()
2199 ]
2200 )
2201 else:
2202 parameters = {
2203 key: (
2204 processors[key](compiled_params[key]) # type: ignore
2205 if key in processors
2206 else compiled_params[key]
2207 )
2208 for key in compiled_params
2209 }
2210 return self._execute_scalar(
2211 str(compiled), type_, parameters=parameters
2212 )
2213
2214 current_parameters: Optional[_CoreSingleExecuteParams] = None
2215 """A dictionary of parameters applied to the current row.
2216
2217 This attribute is only available in the context of a user-defined default
2218 generation function, e.g. as described at :ref:`context_default_functions`.
2219 It consists of a dictionary which includes entries for each column/value
2220 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2221 dictionary will be the key value of each :class:`_schema.Column`,
2222 which is usually
2223 synonymous with the name.
2224
2225 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2226 does not accommodate for the "multi-values" feature of the
2227 :meth:`_expression.Insert.values` method. The
2228 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2229 preferred.
2230
2231 .. seealso::
2232
2233 :meth:`.DefaultExecutionContext.get_current_parameters`
2234
2235 :ref:`context_default_functions`
2236
2237 """
2238
2239 def get_current_parameters(self, isolate_multiinsert_groups=True):
2240 """Return a dictionary of parameters applied to the current row.
2241
2242 This method can only be used in the context of a user-defined default
2243 generation function, e.g. as described at
2244 :ref:`context_default_functions`. When invoked, a dictionary is
2245 returned which includes entries for each column/value pair that is part
2246 of the INSERT or UPDATE statement. The keys of the dictionary will be
2247 the key value of each :class:`_schema.Column`,
2248 which is usually synonymous
2249 with the name.
2250
2251 :param isolate_multiinsert_groups=True: indicates that multi-valued
2252 INSERT constructs created using :meth:`_expression.Insert.values`
2253 should be
2254 handled by returning only the subset of parameters that are local
2255 to the current column default invocation. When ``False``, the
2256 raw parameters of the statement are returned including the
2257 naming convention used in the case of multi-valued INSERT.
2258
2259 .. versionadded:: 1.2 added
2260 :meth:`.DefaultExecutionContext.get_current_parameters`
2261 which provides more functionality over the existing
2262 :attr:`.DefaultExecutionContext.current_parameters`
2263 attribute.
2264
2265 .. seealso::
2266
2267 :attr:`.DefaultExecutionContext.current_parameters`
2268
2269 :ref:`context_default_functions`
2270
2271 """
2272 try:
2273 parameters = self.current_parameters
2274 column = self.current_column
2275 except AttributeError:
2276 raise exc.InvalidRequestError(
2277 "get_current_parameters() can only be invoked in the "
2278 "context of a Python side column default function"
2279 )
2280 else:
2281 assert column is not None
2282 assert parameters is not None
2283 compile_state = cast(
2284 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2285 )
2286 assert compile_state is not None
2287 if (
2288 isolate_multiinsert_groups
2289 and dml.isinsert(compile_state)
2290 and compile_state._has_multi_parameters
2291 ):
2292 if column._is_multiparam_column:
2293 index = column.index + 1
2294 d = {column.original.key: parameters[column.key]}
2295 else:
2296 d = {column.key: parameters[column.key]}
2297 index = 0
2298 assert compile_state._dict_parameters is not None
2299 keys = compile_state._dict_parameters.keys()
2300 d.update(
2301 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2302 )
2303 return d
2304 else:
2305 return parameters
2306
2307 def get_insert_default(self, column):
2308 if column.default is None:
2309 return None
2310 else:
2311 return self._exec_default(column, column.default, column.type)
2312
2313 def get_update_default(self, column):
2314 if column.onupdate is None:
2315 return None
2316 else:
2317 return self._exec_default(column, column.onupdate, column.type)
2318
2319 def _process_execute_defaults(self):
2320 compiled = cast(SQLCompiler, self.compiled)
2321
2322 key_getter = compiled._within_exec_param_key_getter
2323
2324 sentinel_counter = 0
2325
2326 if compiled.insert_prefetch:
2327 prefetch_recs = [
2328 (
2329 c,
2330 key_getter(c),
2331 c._default_description_tuple,
2332 self.get_insert_default,
2333 )
2334 for c in compiled.insert_prefetch
2335 ]
2336 elif compiled.update_prefetch:
2337 prefetch_recs = [
2338 (
2339 c,
2340 key_getter(c),
2341 c._onupdate_description_tuple,
2342 self.get_update_default,
2343 )
2344 for c in compiled.update_prefetch
2345 ]
2346 else:
2347 prefetch_recs = []
2348
2349 for param in self.compiled_parameters:
2350 self.current_parameters = param
2351
2352 for (
2353 c,
2354 param_key,
2355 (arg, is_scalar, is_callable, is_sentinel),
2356 fallback,
2357 ) in prefetch_recs:
2358 if is_sentinel:
2359 param[param_key] = sentinel_counter
2360 sentinel_counter += 1
2361 elif is_scalar:
2362 param[param_key] = arg
2363 elif is_callable:
2364 self.current_column = c
2365 param[param_key] = arg(self)
2366 else:
2367 val = fallback(c)
2368 if val is not None:
2369 param[param_key] = val
2370
2371 del self.current_parameters
2372
2373
2374DefaultDialect.execution_ctx_cls = DefaultExecutionContext