1# engine/default.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Default implementations of per-dialect sqlalchemy.engine classes.
10
11These are semi-private implementation classes which are only of importance
12to database dialect authors; dialects will usually use the classes here
13as the base class for their own corresponding classes.
14
15"""
16
17from __future__ import annotations
18
19import functools
20import operator
21import random
22import re
23from time import perf_counter
24import typing
25from typing import Any
26from typing import Callable
27from typing import cast
28from typing import Dict
29from typing import Final
30from typing import List
31from typing import Literal
32from typing import Mapping
33from typing import MutableMapping
34from typing import MutableSequence
35from typing import Optional
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import Union
42import weakref
43
44from . import characteristics
45from . import cursor as _cursor
46from . import interfaces
47from .base import Connection
48from .interfaces import CacheStats
49from .interfaces import DBAPICursor
50from .interfaces import Dialect
51from .interfaces import ExecuteStyle
52from .interfaces import ExecutionContext
53from .reflection import ObjectKind
54from .reflection import ObjectScope
55from .. import event
56from .. import exc
57from .. import pool
58from .. import util
59from ..sql import compiler
60from ..sql import dml
61from ..sql import expression
62from ..sql import type_api
63from ..sql import util as sql_util
64from ..sql._typing import is_tuple_type
65from ..sql.base import _NoArg
66from ..sql.compiler import AggregateOrderByStyle
67from ..sql.compiler import DDLCompiler
68from ..sql.compiler import InsertmanyvaluesSentinelOpts
69from ..sql.compiler import SQLCompiler
70from ..sql.elements import quoted_name
71from ..util.typing import TupleAny
72from ..util.typing import Unpack
73
74if typing.TYPE_CHECKING:
75 from types import ModuleType
76
77 from .base import Engine
78 from .cursor import ResultFetchStrategy
79 from .interfaces import _CoreMultiExecuteParams
80 from .interfaces import _CoreSingleExecuteParams
81 from .interfaces import _DBAPICursorDescription
82 from .interfaces import _DBAPIMultiExecuteParams
83 from .interfaces import _DBAPISingleExecuteParams
84 from .interfaces import _ExecuteOptions
85 from .interfaces import _MutableCoreSingleExecuteParams
86 from .interfaces import _ParamStyle
87 from .interfaces import ConnectArgsType
88 from .interfaces import DBAPIConnection
89 from .interfaces import DBAPIModule
90 from .interfaces import DBAPIType
91 from .interfaces import IsolationLevel
92 from .row import Row
93 from .url import URL
94 from ..event import _ListenerFnType
95 from ..pool import Pool
96 from ..pool import PoolProxiedConnection
97 from ..sql import Executable
98 from ..sql.compiler import Compiled
99 from ..sql.compiler import Linting
100 from ..sql.compiler import ResultColumnsEntry
101 from ..sql.dml import DMLState
102 from ..sql.dml import UpdateBase
103 from ..sql.elements import BindParameter
104 from ..sql.schema import Column
105 from ..sql.type_api import _BindProcessorType
106 from ..sql.type_api import _ResultProcessorType
107 from ..sql.type_api import TypeEngine
108
109
110# When we're handed literal SQL, ensure it's a SELECT query
111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
112
113
114(
115 CACHE_HIT,
116 CACHE_MISS,
117 CACHING_DISABLED,
118 NO_CACHE_KEY,
119 NO_DIALECT_SUPPORT,
120) = list(CacheStats)
121
122
123class DefaultDialect(Dialect):
124 """Default implementation of Dialect"""
125
126 statement_compiler = compiler.SQLCompiler
127 ddl_compiler = compiler.DDLCompiler
128 type_compiler_cls = compiler.GenericTypeCompiler
129
130 preparer = compiler.IdentifierPreparer
131 supports_alter = True
132 supports_comments = False
133 supports_constraint_comments = False
134 inline_comments = False
135 supports_statement_cache = True
136
137 div_is_floordiv = True
138
139 bind_typing = interfaces.BindTyping.NONE
140
141 include_set_input_sizes: Optional[Set[Any]] = None
142 exclude_set_input_sizes: Optional[Set[Any]] = None
143
144 # the first value we'd get for an autoincrement column.
145 default_sequence_base = 1
146
147 # most DBAPIs happy with this for execute().
148 # not cx_oracle.
149 execute_sequence_format = tuple
150
151 supports_schemas = True
152 supports_views = True
153 supports_sequences = False
154 sequences_optional = False
155 preexecute_autoincrement_sequences = False
156 supports_identity_columns = False
157 postfetch_lastrowid = True
158 favor_returning_over_lastrowid = False
159 insert_null_pk_still_autoincrements = False
160 update_returning = False
161 delete_returning = False
162 update_returning_multifrom = False
163 delete_returning_multifrom = False
164 insert_returning = False
165
166 aggregate_order_by_style = AggregateOrderByStyle.INLINE
167
168 cte_follows_insert = False
169
170 supports_native_enum = False
171 supports_native_boolean = False
172 supports_native_uuid = False
173 returns_native_bytes = False
174
175 non_native_boolean_check_constraint = True
176
177 supports_simple_order_by_label = True
178
179 tuple_in_values = False
180
181 connection_characteristics = util.immutabledict(
182 {
183 "isolation_level": characteristics.IsolationLevelCharacteristic(),
184 "logging_token": characteristics.LoggingTokenCharacteristic(),
185 }
186 )
187
188 engine_config_types: Mapping[str, Any] = util.immutabledict(
189 {
190 "pool_timeout": util.asint,
191 "echo": util.bool_or_str("debug"),
192 "echo_pool": util.bool_or_str("debug"),
193 "pool_recycle": util.asint,
194 "pool_size": util.asint,
195 "max_overflow": util.asint,
196 "future": util.asbool,
197 }
198 )
199
200 # if the NUMERIC type
201 # returns decimal.Decimal.
202 # *not* the FLOAT type however.
203 supports_native_decimal = False
204
205 name = "default"
206
207 # length at which to truncate
208 # any identifier.
209 max_identifier_length = 9999
210 _user_defined_max_identifier_length: Optional[int] = None
211
212 isolation_level: Optional[str] = None
213
214 # sub-categories of max_identifier_length.
215 # currently these accommodate for MySQL which allows alias names
216 # of 255 but DDL names only of 64.
217 max_index_name_length: Optional[int] = None
218 max_constraint_name_length: Optional[int] = None
219
220 supports_sane_rowcount = True
221 supports_sane_multi_rowcount = True
222 colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
223 default_paramstyle = "named"
224
225 supports_default_values = False
226 """dialect supports INSERT... DEFAULT VALUES syntax"""
227
228 supports_default_metavalue = False
229 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
230
231 default_metavalue_token = "DEFAULT"
232 """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
233 parenthesis."""
234
235 # not sure if this is a real thing but the compiler will deliver it
236 # if this is the only flag enabled.
237 supports_empty_insert = True
238 """dialect supports INSERT () VALUES ()"""
239
240 supports_multivalues_insert = False
241
242 use_insertmanyvalues: bool = False
243
244 use_insertmanyvalues_wo_returning: bool = False
245
246 insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
247 InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
248 )
249
250 insertmanyvalues_page_size: int = 1000
251 insertmanyvalues_max_parameters = 32700
252
253 supports_is_distinct_from = True
254
255 supports_server_side_cursors = False
256
257 server_side_cursors = False
258
259 # extra record-level locking features (#4860)
260 supports_for_update_of = False
261
262 server_version_info = None
263
264 default_schema_name: Optional[str] = None
265
266 # indicates symbol names are
267 # UPPERCASED if they are case insensitive
268 # within the database.
269 # if this is True, the methods normalize_name()
270 # and denormalize_name() must be provided.
271 requires_name_normalize = False
272
273 is_async = False
274
275 has_terminate = False
276
277 # TODO: this is not to be part of 2.0. implement rudimentary binary
278 # literals for SQLite, PostgreSQL, MySQL only within
279 # _Binary.literal_processor
280 _legacy_binary_type_literal_encoding = "utf-8"
281
282 @util.deprecated_params(
283 empty_in_strategy=(
284 "1.4",
285 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
286 "deprecated, and no longer has any effect. All IN expressions "
287 "are now rendered using "
288 'the "expanding parameter" strategy which renders a set of bound'
289 'expressions, or an "empty set" SELECT, at statement execution'
290 "time.",
291 ),
292 server_side_cursors=(
293 "1.4",
294 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
295 "is deprecated and will be removed in a future release. Please "
296 "use the "
297 ":paramref:`_engine.Connection.execution_options.stream_results` "
298 "parameter.",
299 ),
300 )
301 def __init__(
302 self,
303 paramstyle: Optional[_ParamStyle] = None,
304 isolation_level: Optional[IsolationLevel] = None,
305 dbapi: Optional[ModuleType] = None,
306 implicit_returning: Literal[True] = True,
307 supports_native_boolean: Optional[bool] = None,
308 max_identifier_length: Optional[int] = None,
309 label_length: Optional[int] = None,
310 insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
311 use_insertmanyvalues: Optional[bool] = None,
312 # util.deprecated_params decorator cannot render the
313 # Linting.NO_LINTING constant
314 compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
315 server_side_cursors: bool = False,
316 skip_autocommit_rollback: bool = False,
317 **kwargs: Any,
318 ):
319 if server_side_cursors:
320 if not self.supports_server_side_cursors:
321 raise exc.ArgumentError(
322 "Dialect %s does not support server side cursors" % self
323 )
324 else:
325 self.server_side_cursors = True
326
327 if getattr(self, "use_setinputsizes", False):
328 util.warn_deprecated(
329 "The dialect-level use_setinputsizes attribute is "
330 "deprecated. Please use "
331 "bind_typing = BindTyping.SETINPUTSIZES",
332 "2.0",
333 )
334 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
335
336 self.positional = False
337 self._ischema = None
338
339 self.dbapi = dbapi
340
341 self.skip_autocommit_rollback = skip_autocommit_rollback
342
343 if paramstyle is not None:
344 self.paramstyle = paramstyle
345 elif self.dbapi is not None:
346 self.paramstyle = self.dbapi.paramstyle
347 else:
348 self.paramstyle = self.default_paramstyle
349 self.positional = self.paramstyle in (
350 "qmark",
351 "format",
352 "numeric",
353 "numeric_dollar",
354 )
355 self.identifier_preparer = self.preparer(self)
356 self._on_connect_isolation_level = isolation_level
357
358 legacy_tt_callable = getattr(self, "type_compiler", None)
359 if legacy_tt_callable is not None:
360 tt_callable = cast(
361 Type[compiler.GenericTypeCompiler],
362 self.type_compiler,
363 )
364 else:
365 tt_callable = self.type_compiler_cls
366
367 self.type_compiler_instance = self.type_compiler = tt_callable(self)
368
369 if supports_native_boolean is not None:
370 self.supports_native_boolean = supports_native_boolean
371
372 self._user_defined_max_identifier_length = max_identifier_length
373 if self._user_defined_max_identifier_length:
374 self.max_identifier_length = (
375 self._user_defined_max_identifier_length
376 )
377 self.label_length = label_length
378 self.compiler_linting = compiler_linting
379
380 if use_insertmanyvalues is not None:
381 self.use_insertmanyvalues = use_insertmanyvalues
382
383 if insertmanyvalues_page_size is not _NoArg.NO_ARG:
384 self.insertmanyvalues_page_size = insertmanyvalues_page_size
385
386 @property
387 @util.deprecated(
388 "2.0",
389 "full_returning is deprecated, please use insert_returning, "
390 "update_returning, delete_returning",
391 )
392 def full_returning(self):
393 return (
394 self.insert_returning
395 and self.update_returning
396 and self.delete_returning
397 )
398
399 @util.memoized_property
400 def insert_executemany_returning(self):
401 """Default implementation for insert_executemany_returning, if not
402 otherwise overridden by the specific dialect.
403
404 The default dialect determines "insert_executemany_returning" is
405 available if the dialect in use has opted into using the
406 "use_insertmanyvalues" feature. If they haven't opted into that, then
407 this attribute is False, unless the dialect in question overrides this
408 and provides some other implementation (such as the Oracle Database
409 dialects).
410
411 """
412 return self.insert_returning and self.use_insertmanyvalues
413
414 @util.memoized_property
415 def insert_executemany_returning_sort_by_parameter_order(self):
416 """Default implementation for
417 insert_executemany_returning_deterministic_order, if not otherwise
418 overridden by the specific dialect.
419
420 The default dialect determines "insert_executemany_returning" can have
421 deterministic order only if the dialect in use has opted into using the
422 "use_insertmanyvalues" feature, which implements deterministic ordering
423 using client side sentinel columns only by default. The
424 "insertmanyvalues" feature also features alternate forms that can
425 use server-generated PK values as "sentinels", but those are only
426 used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
427 bitflag enables those alternate SQL forms, which are disabled
428 by default.
429
430 If the dialect in use hasn't opted into that, then this attribute is
431 False, unless the dialect in question overrides this and provides some
432 other implementation (such as the Oracle Database dialects).
433
434 """
435 return self.insert_returning and self.use_insertmanyvalues
436
437 update_executemany_returning = False
438 delete_executemany_returning = False
439
440 @util.memoized_property
441 def loaded_dbapi(self) -> DBAPIModule:
442 if self.dbapi is None:
443 raise exc.InvalidRequestError(
444 f"Dialect {self} does not have a Python DBAPI established "
445 "and cannot be used for actual database interaction"
446 )
447 return self.dbapi
448
449 @util.memoized_property
450 def _bind_typing_render_casts(self):
451 return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
452
453 def _ensure_has_table_connection(self, arg: Connection) -> None:
454 if not isinstance(arg, Connection):
455 raise exc.ArgumentError(
456 "The argument passed to Dialect.has_table() should be a "
457 "%s, got %s. "
458 "Additionally, the Dialect.has_table() method is for "
459 "internal dialect "
460 "use only; please use "
461 "``inspect(some_engine).has_table(<tablename>>)`` "
462 "for public API use." % (Connection, type(arg))
463 )
464
465 @util.memoized_property
466 def _supports_statement_cache(self):
467 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
468 if ssc is None:
469 util.warn(
470 "Dialect %s:%s will not make use of SQL compilation caching "
471 "as it does not set the 'supports_statement_cache' attribute "
472 "to ``True``. This can have "
473 "significant performance implications including some "
474 "performance degradations in comparison to prior SQLAlchemy "
475 "versions. Dialect maintainers should seek to set this "
476 "attribute to True after appropriate development and testing "
477 "for SQLAlchemy 1.4 caching support. Alternatively, this "
478 "attribute may be set to False which will disable this "
479 "warning." % (self.name, self.driver),
480 code="cprf",
481 )
482
483 return bool(ssc)
484
485 @util.memoized_property
486 def _type_memos(self):
487 return weakref.WeakKeyDictionary()
488
489 @property
490 def dialect_description(self): # type: ignore[override]
491 return self.name + "+" + self.driver
492
493 @property
494 def supports_sane_rowcount_returning(self):
495 """True if this dialect supports sane rowcount even if RETURNING is
496 in use.
497
498 For dialects that don't support RETURNING, this is synonymous with
499 ``supports_sane_rowcount``.
500
501 """
502 return self.supports_sane_rowcount
503
504 @classmethod
505 def get_pool_class(cls, url: URL) -> Type[Pool]:
506 default: Type[pool.Pool]
507 if cls.is_async:
508 default = pool.AsyncAdaptedQueuePool
509 else:
510 default = pool.QueuePool
511
512 return getattr(cls, "poolclass", default)
513
514 def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
515 return self.get_pool_class(url)
516
517 @classmethod
518 def load_provisioning(cls):
519 package = ".".join(cls.__module__.split(".")[0:-1])
520 try:
521 __import__(package + ".provision")
522 except ImportError:
523 pass
524
525 def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
526 if self._on_connect_isolation_level is not None:
527
528 def builtin_connect(dbapi_conn, conn_rec):
529 self._assert_and_set_isolation_level(
530 dbapi_conn, self._on_connect_isolation_level
531 )
532
533 return builtin_connect
534 else:
535 return None
536
537 def initialize(self, connection: Connection) -> None:
538 try:
539 self.server_version_info = self._get_server_version_info(
540 connection
541 )
542 except NotImplementedError:
543 self.server_version_info = None
544 try:
545 self.default_schema_name = self._get_default_schema_name(
546 connection
547 )
548 except NotImplementedError:
549 self.default_schema_name = None
550
551 try:
552 self.default_isolation_level = self.get_default_isolation_level(
553 connection.connection.dbapi_connection
554 )
555 except NotImplementedError:
556 self.default_isolation_level = None
557
558 if not self._user_defined_max_identifier_length:
559 max_ident_length = self._check_max_identifier_length(connection)
560 if max_ident_length:
561 self.max_identifier_length = max_ident_length
562
563 if (
564 self.label_length
565 and self.label_length > self.max_identifier_length
566 ):
567 raise exc.ArgumentError(
568 "Label length of %d is greater than this dialect's"
569 " maximum identifier length of %d"
570 % (self.label_length, self.max_identifier_length)
571 )
572
573 def on_connect(self) -> Optional[Callable[[Any], None]]:
574 # inherits the docstring from interfaces.Dialect.on_connect
575 return None
576
577 def _check_max_identifier_length(self, connection):
578 """Perform a connection / server version specific check to determine
579 the max_identifier_length.
580
581 If the dialect's class level max_identifier_length should be used,
582 can return None.
583
584 """
585 return None
586
587 def get_default_isolation_level(self, dbapi_conn):
588 """Given a DBAPI connection, return its isolation level, or
589 a default isolation level if one cannot be retrieved.
590
591 May be overridden by subclasses in order to provide a
592 "fallback" isolation level for databases that cannot reliably
593 retrieve the actual isolation level.
594
595 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
596 method, propagating any exceptions raised.
597
598 """
599 return self.get_isolation_level(dbapi_conn)
600
601 def type_descriptor(self, typeobj):
602 """Provide a database-specific :class:`.TypeEngine` object, given
603 the generic object which comes from the types module.
604
605 This method looks for a dictionary called
606 ``colspecs`` as a class or instance-level variable,
607 and passes on to :func:`_types.adapt_type`.
608
609 """
610 return type_api.adapt_type(typeobj, self.colspecs)
611
612 def has_index(self, connection, table_name, index_name, schema=None, **kw):
613 if not self.has_table(connection, table_name, schema=schema, **kw):
614 return False
615 for idx in self.get_indexes(
616 connection, table_name, schema=schema, **kw
617 ):
618 if idx["name"] == index_name:
619 return True
620 else:
621 return False
622
623 def has_schema(
624 self, connection: Connection, schema_name: str, **kw: Any
625 ) -> bool:
626 return schema_name in self.get_schema_names(connection, **kw)
627
628 def validate_identifier(self, ident: str) -> None:
629 if len(ident) > self.max_identifier_length:
630 raise exc.IdentifierError(
631 "Identifier '%s' exceeds maximum length of %d characters"
632 % (ident, self.max_identifier_length)
633 )
634
635 def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection:
636 # inherits the docstring from interfaces.Dialect.connect
637 return self.loaded_dbapi.connect(*cargs, **cparams) # type: ignore[no-any-return] # NOQA: E501
638
639 def create_connect_args(self, url: URL) -> ConnectArgsType:
640 # inherits the docstring from interfaces.Dialect.create_connect_args
641 opts = url.translate_connect_args()
642 opts.update(url.query)
643 return ([], opts)
644
645 def set_engine_execution_options(
646 self, engine: Engine, opts: Mapping[str, Any]
647 ) -> None:
648 supported_names = set(self.connection_characteristics).intersection(
649 opts
650 )
651 if supported_names:
652 characteristics: Mapping[str, Any] = util.immutabledict(
653 (name, opts[name]) for name in supported_names
654 )
655
656 @event.listens_for(engine, "engine_connect")
657 def set_connection_characteristics(connection):
658 self._set_connection_characteristics(
659 connection, characteristics
660 )
661
662 def set_connection_execution_options(
663 self, connection: Connection, opts: Mapping[str, Any]
664 ) -> None:
665 supported_names = set(self.connection_characteristics).intersection(
666 opts
667 )
668 if supported_names:
669 characteristics: Mapping[str, Any] = util.immutabledict(
670 (name, opts[name]) for name in supported_names
671 )
672 self._set_connection_characteristics(connection, characteristics)
673
674 def _set_connection_characteristics(self, connection, characteristics):
675 characteristic_values = [
676 (name, self.connection_characteristics[name], value)
677 for name, value in characteristics.items()
678 ]
679
680 if connection.in_transaction():
681 trans_objs = [
682 (name, obj)
683 for name, obj, _ in characteristic_values
684 if obj.transactional
685 ]
686 if trans_objs:
687 raise exc.InvalidRequestError(
688 "This connection has already initialized a SQLAlchemy "
689 "Transaction() object via begin() or autobegin; "
690 "%s may not be altered unless rollback() or commit() "
691 "is called first."
692 % (", ".join(name for name, obj in trans_objs))
693 )
694
695 dbapi_connection = connection.connection.dbapi_connection
696 for _, characteristic, value in characteristic_values:
697 characteristic.set_connection_characteristic(
698 self, connection, dbapi_connection, value
699 )
700 connection.connection._connection_record.finalize_callback.append(
701 functools.partial(self._reset_characteristics, characteristics)
702 )
703
704 def _reset_characteristics(self, characteristics, dbapi_connection):
705 for characteristic_name in characteristics:
706 characteristic = self.connection_characteristics[
707 characteristic_name
708 ]
709 characteristic.reset_characteristic(self, dbapi_connection)
710
711 def do_begin(self, dbapi_connection):
712 pass
713
714 def do_rollback(self, dbapi_connection):
715 if self.skip_autocommit_rollback and self.detect_autocommit_setting(
716 dbapi_connection
717 ):
718 return
719 dbapi_connection.rollback()
720
721 def do_commit(self, dbapi_connection):
722 dbapi_connection.commit()
723
724 def do_terminate(self, dbapi_connection):
725 self.do_close(dbapi_connection)
726
727 def do_close(self, dbapi_connection):
728 dbapi_connection.close()
729
730 @util.memoized_property
731 def _dialect_specific_select_one(self):
732 return str(expression.select(1).compile(dialect=self))
733
734 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
735 try:
736 return self.do_ping(dbapi_connection)
737 except self.loaded_dbapi.Error as err:
738 is_disconnect = self.is_disconnect(err, dbapi_connection, None)
739
740 if self._has_events:
741 try:
742 Connection._handle_dbapi_exception_noconnection(
743 err,
744 self,
745 is_disconnect=is_disconnect,
746 invalidate_pool_on_disconnect=False,
747 is_pre_ping=True,
748 )
749 except exc.StatementError as new_err:
750 is_disconnect = new_err.connection_invalidated
751
752 if is_disconnect:
753 return False
754 else:
755 raise
756
757 def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
758 cursor = dbapi_connection.cursor()
759 try:
760 cursor.execute(self._dialect_specific_select_one)
761 finally:
762 cursor.close()
763 return True
764
765 def create_xid(self):
766 """Create a random two-phase transaction ID.
767
768 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
769 do_commit_twophase(). Its format is unspecified.
770 """
771
772 return "_sa_%032x" % random.randint(0, 2**128)
773
774 def do_savepoint(self, connection, name):
775 connection.execute(expression.SavepointClause(name))
776
777 def do_rollback_to_savepoint(self, connection, name):
778 connection.execute(expression.RollbackToSavepointClause(name))
779
780 def do_release_savepoint(self, connection, name):
781 connection.execute(expression.ReleaseSavepointClause(name))
782
783 def _deliver_insertmanyvalues_batches(
784 self,
785 connection,
786 cursor,
787 statement,
788 parameters,
789 generic_setinputsizes,
790 context,
791 ):
792 context = cast(DefaultExecutionContext, context)
793 compiled = cast(SQLCompiler, context.compiled)
794
795 _composite_sentinel_proc: Sequence[
796 Optional[_ResultProcessorType[Any]]
797 ] = ()
798 _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
799 _sentinel_proc_initialized: bool = False
800
801 compiled_parameters = context.compiled_parameters
802
803 imv = compiled._insertmanyvalues
804 assert imv is not None
805
806 is_returning: Final[bool] = bool(compiled.effective_returning)
807 batch_size = context.execution_options.get(
808 "insertmanyvalues_page_size", self.insertmanyvalues_page_size
809 )
810
811 if compiled.schema_translate_map:
812 schema_translate_map = context.execution_options.get(
813 "schema_translate_map", {}
814 )
815 else:
816 schema_translate_map = None
817
818 if is_returning:
819 result: Optional[List[Any]] = []
820 context._insertmanyvalues_rows = result
821
822 sort_by_parameter_order = imv.sort_by_parameter_order
823
824 else:
825 sort_by_parameter_order = False
826 result = None
827
828 for imv_batch in compiled._deliver_insertmanyvalues_batches(
829 statement,
830 parameters,
831 compiled_parameters,
832 generic_setinputsizes,
833 batch_size,
834 sort_by_parameter_order,
835 schema_translate_map,
836 ):
837 yield imv_batch
838
839 if is_returning:
840
841 try:
842 rows = context.fetchall_for_returning(cursor)
843 except BaseException as be:
844 connection._handle_dbapi_exception(
845 be,
846 sql_util._long_statement(imv_batch.replaced_statement),
847 imv_batch.replaced_parameters,
848 None,
849 context,
850 is_sub_exec=True,
851 )
852
853 # I would have thought "is_returning: Final[bool]"
854 # would have assured this but pylance thinks not
855 assert result is not None
856
857 if imv.num_sentinel_columns and not imv_batch.is_downgraded:
858 composite_sentinel = imv.num_sentinel_columns > 1
859 if imv.implicit_sentinel:
860 # for implicit sentinel, which is currently single-col
861 # integer autoincrement, do a simple sort.
862 assert not composite_sentinel
863 result.extend(
864 sorted(rows, key=operator.itemgetter(-1))
865 )
866 continue
867
868 # otherwise, create dictionaries to match up batches
869 # with parameters
870 assert imv.sentinel_param_keys
871 assert imv.sentinel_columns
872
873 _nsc = imv.num_sentinel_columns
874
875 if not _sentinel_proc_initialized:
876 if composite_sentinel:
877 _composite_sentinel_proc = [
878 col.type._cached_result_processor(
879 self, cursor_desc[1]
880 )
881 for col, cursor_desc in zip(
882 imv.sentinel_columns,
883 cursor.description[-_nsc:],
884 )
885 ]
886 else:
887 _scalar_sentinel_proc = (
888 imv.sentinel_columns[0]
889 ).type._cached_result_processor(
890 self, cursor.description[-1][1]
891 )
892 _sentinel_proc_initialized = True
893
894 rows_by_sentinel: Union[
895 Dict[Tuple[Any, ...], Any],
896 Dict[Any, Any],
897 ]
898 if composite_sentinel:
899 rows_by_sentinel = {
900 tuple(
901 (proc(val) if proc else val)
902 for val, proc in zip(
903 row[-_nsc:], _composite_sentinel_proc
904 )
905 ): row
906 for row in rows
907 }
908 elif _scalar_sentinel_proc:
909 rows_by_sentinel = {
910 _scalar_sentinel_proc(row[-1]): row for row in rows
911 }
912 else:
913 rows_by_sentinel = {row[-1]: row for row in rows}
914
915 if len(rows_by_sentinel) != len(imv_batch.batch):
916 # see test_insert_exec.py::
917 # IMVSentinelTest::test_sentinel_incorrect_rowcount
918 # for coverage / demonstration
919 raise exc.InvalidRequestError(
920 f"Sentinel-keyed result set did not produce "
921 f"correct number of rows {len(imv_batch.batch)}; "
922 "produced "
923 f"{len(rows_by_sentinel)}. Please ensure the "
924 "sentinel column is fully unique and populated in "
925 "all cases."
926 )
927
928 try:
929 ordered_rows = [
930 rows_by_sentinel[sentinel_keys]
931 for sentinel_keys in imv_batch.sentinel_values
932 ]
933 except KeyError as ke:
934 # see test_insert_exec.py::
935 # IMVSentinelTest::test_sentinel_cant_match_keys
936 # for coverage / demonstration
937 raise exc.InvalidRequestError(
938 f"Can't match sentinel values in result set to "
939 f"parameter sets; key {ke.args[0]!r} was not "
940 "found. "
941 "There may be a mismatch between the datatype "
942 "passed to the DBAPI driver vs. that which it "
943 "returns in a result row. Ensure the given "
944 "Python value matches the expected result type "
945 "*exactly*, taking care to not rely upon implicit "
946 "conversions which may occur such as when using "
947 "strings in place of UUID or integer values, etc. "
948 ) from ke
949
950 result.extend(ordered_rows)
951
952 else:
953 result.extend(rows)
954
955 def do_executemany(self, cursor, statement, parameters, context=None):
956 cursor.executemany(statement, parameters)
957
958 def do_execute(self, cursor, statement, parameters, context=None):
959 cursor.execute(statement, parameters)
960
961 def do_execute_no_params(self, cursor, statement, context=None):
962 cursor.execute(statement)
963
964 def is_disconnect(
965 self,
966 e: DBAPIModule.Error,
967 connection: Union[
968 pool.PoolProxiedConnection, interfaces.DBAPIConnection, None
969 ],
970 cursor: Optional[interfaces.DBAPICursor],
971 ) -> bool:
972 return False
973
974 @util.memoized_instancemethod
975 def _gen_allowed_isolation_levels(self, dbapi_conn):
976 try:
977 raw_levels = list(self.get_isolation_level_values(dbapi_conn))
978 except NotImplementedError:
979 return None
980 else:
981 normalized_levels = [
982 level.replace("_", " ").upper() for level in raw_levels
983 ]
984 if raw_levels != normalized_levels:
985 raise ValueError(
986 f"Dialect {self.name!r} get_isolation_level_values() "
987 f"method should return names as UPPERCASE using spaces, "
988 f"not underscores; got "
989 f"{sorted(set(raw_levels).difference(normalized_levels))}"
990 )
991 return tuple(normalized_levels)
992
993 def _assert_and_set_isolation_level(self, dbapi_conn, level):
994 level = level.replace("_", " ").upper()
995
996 _allowed_isolation_levels = self._gen_allowed_isolation_levels(
997 dbapi_conn
998 )
999 if (
1000 _allowed_isolation_levels
1001 and level not in _allowed_isolation_levels
1002 ):
1003 raise exc.ArgumentError(
1004 f"Invalid value {level!r} for isolation_level. "
1005 f"Valid isolation levels for {self.name!r} are "
1006 f"{', '.join(_allowed_isolation_levels)}"
1007 )
1008
1009 self.set_isolation_level(dbapi_conn, level)
1010
1011 def reset_isolation_level(self, dbapi_conn):
1012 if self._on_connect_isolation_level is not None:
1013 assert (
1014 self._on_connect_isolation_level == "AUTOCOMMIT"
1015 or self._on_connect_isolation_level
1016 == self.default_isolation_level
1017 )
1018 self._assert_and_set_isolation_level(
1019 dbapi_conn, self._on_connect_isolation_level
1020 )
1021 else:
1022 assert self.default_isolation_level is not None
1023 self._assert_and_set_isolation_level(
1024 dbapi_conn,
1025 self.default_isolation_level,
1026 )
1027
1028 def normalize_name(self, name):
1029 if name is None:
1030 return None
1031
1032 name_lower = name.lower()
1033 name_upper = name.upper()
1034
1035 if name_upper == name_lower:
1036 # name has no upper/lower conversion, e.g. non-european characters.
1037 # return unchanged
1038 return name
1039 elif name_upper == name and not (
1040 self.identifier_preparer._requires_quotes
1041 )(name_lower):
1042 # name is all uppercase and doesn't require quoting; normalize
1043 # to all lower case
1044 return name_lower
1045 elif name_lower == name:
1046 # name is all lower case, which if denormalized means we need to
1047 # force quoting on it
1048 return quoted_name(name, quote=True)
1049 else:
1050 # name is mixed case, means it will be quoted in SQL when used
1051 # later, no normalizes
1052 return name
1053
1054 def denormalize_name(self, name):
1055 if name is None:
1056 return None
1057
1058 name_lower = name.lower()
1059 name_upper = name.upper()
1060
1061 if name_upper == name_lower:
1062 # name has no upper/lower conversion, e.g. non-european characters.
1063 # return unchanged
1064 return name
1065 elif name_lower == name and not (
1066 self.identifier_preparer._requires_quotes
1067 )(name_lower):
1068 name = name_upper
1069 return name
1070
1071 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
1072 return connection
1073
1074 def _overrides_default(self, method):
1075 return (
1076 getattr(type(self), method).__code__
1077 is not getattr(DefaultDialect, method).__code__
1078 )
1079
1080 def _default_multi_reflect(
1081 self,
1082 single_tbl_method,
1083 connection,
1084 kind,
1085 schema,
1086 filter_names,
1087 scope,
1088 **kw,
1089 ):
1090 names_fns = []
1091 temp_names_fns = []
1092 if ObjectKind.TABLE in kind:
1093 names_fns.append(self.get_table_names)
1094 temp_names_fns.append(self.get_temp_table_names)
1095 if ObjectKind.VIEW in kind:
1096 names_fns.append(self.get_view_names)
1097 temp_names_fns.append(self.get_temp_view_names)
1098 if ObjectKind.MATERIALIZED_VIEW in kind:
1099 names_fns.append(self.get_materialized_view_names)
1100 # no temp materialized view at the moment
1101 # temp_names_fns.append(self.get_temp_materialized_view_names)
1102
1103 unreflectable = kw.pop("unreflectable", {})
1104
1105 if (
1106 filter_names
1107 and scope is ObjectScope.ANY
1108 and kind is ObjectKind.ANY
1109 ):
1110 # if names are given and no qualification on type of table
1111 # (i.e. the Table(..., autoload) case), take the names as given,
1112 # don't run names queries. If a table does not exit
1113 # NoSuchTableError is raised and it's skipped
1114
1115 # this also suits the case for mssql where we can reflect
1116 # individual temp tables but there's no temp_names_fn
1117 names = filter_names
1118 else:
1119 names = []
1120 name_kw = {"schema": schema, **kw}
1121 fns = []
1122 if ObjectScope.DEFAULT in scope:
1123 fns.extend(names_fns)
1124 if ObjectScope.TEMPORARY in scope:
1125 fns.extend(temp_names_fns)
1126
1127 for fn in fns:
1128 try:
1129 names.extend(fn(connection, **name_kw))
1130 except NotImplementedError:
1131 pass
1132
1133 if filter_names:
1134 filter_names = set(filter_names)
1135
1136 # iterate over all the tables/views and call the single table method
1137 for table in names:
1138 if not filter_names or table in filter_names:
1139 key = (schema, table)
1140 try:
1141 yield (
1142 key,
1143 single_tbl_method(
1144 connection, table, schema=schema, **kw
1145 ),
1146 )
1147 except exc.UnreflectableTableError as err:
1148 if key not in unreflectable:
1149 unreflectable[key] = err
1150 except exc.NoSuchTableError:
1151 pass
1152
1153 def get_multi_table_options(self, connection, **kw):
1154 return self._default_multi_reflect(
1155 self.get_table_options, connection, **kw
1156 )
1157
1158 def get_multi_columns(self, connection, **kw):
1159 return self._default_multi_reflect(self.get_columns, connection, **kw)
1160
1161 def get_multi_pk_constraint(self, connection, **kw):
1162 return self._default_multi_reflect(
1163 self.get_pk_constraint, connection, **kw
1164 )
1165
1166 def get_multi_foreign_keys(self, connection, **kw):
1167 return self._default_multi_reflect(
1168 self.get_foreign_keys, connection, **kw
1169 )
1170
1171 def get_multi_indexes(self, connection, **kw):
1172 return self._default_multi_reflect(self.get_indexes, connection, **kw)
1173
1174 def get_multi_unique_constraints(self, connection, **kw):
1175 return self._default_multi_reflect(
1176 self.get_unique_constraints, connection, **kw
1177 )
1178
1179 def get_multi_check_constraints(self, connection, **kw):
1180 return self._default_multi_reflect(
1181 self.get_check_constraints, connection, **kw
1182 )
1183
1184 def get_multi_table_comment(self, connection, **kw):
1185 return self._default_multi_reflect(
1186 self.get_table_comment, connection, **kw
1187 )
1188
1189
1190class StrCompileDialect(DefaultDialect):
1191 statement_compiler = compiler.StrSQLCompiler
1192 ddl_compiler = compiler.DDLCompiler
1193 type_compiler_cls = compiler.StrSQLTypeCompiler
1194 preparer = compiler.IdentifierPreparer
1195
1196 insert_returning = True
1197 update_returning = True
1198 delete_returning = True
1199
1200 supports_statement_cache = True
1201
1202 supports_identity_columns = True
1203
1204 supports_sequences = True
1205 sequences_optional = True
1206 preexecute_autoincrement_sequences = False
1207
1208 supports_native_boolean = True
1209
1210 supports_multivalues_insert = True
1211 supports_simple_order_by_label = True
1212
1213
1214class DefaultExecutionContext(ExecutionContext):
1215 isinsert = False
1216 isupdate = False
1217 isdelete = False
1218 is_crud = False
1219 is_text = False
1220 isddl = False
1221
1222 execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
1223
1224 compiled: Optional[Compiled] = None
1225 result_column_struct: Optional[
1226 Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
1227 ] = None
1228 returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
1229
1230 execution_options: _ExecuteOptions = util.EMPTY_DICT
1231
1232 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
1233
1234 invoked_statement: Optional[Executable] = None
1235
1236 _is_implicit_returning = False
1237 _is_explicit_returning = False
1238 _is_supplemental_returning = False
1239 _is_server_side = False
1240
1241 _soft_closed = False
1242
1243 _rowcount: Optional[int] = None
1244
1245 # a hook for SQLite's translation of
1246 # result column names
1247 # NOTE: pyhive is using this hook, can't remove it :(
1248 _translate_colname: Optional[
1249 Callable[[str], Tuple[str, Optional[str]]]
1250 ] = None
1251
1252 _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
1253 """used by set_input_sizes().
1254
1255 This collection comes from ``ExpandedState.parameter_expansion``.
1256
1257 """
1258
1259 cache_hit = NO_CACHE_KEY
1260
1261 root_connection: Connection
1262 _dbapi_connection: PoolProxiedConnection
1263 dialect: Dialect
1264 unicode_statement: str
1265 cursor: DBAPICursor
1266 compiled_parameters: List[_MutableCoreSingleExecuteParams]
1267 parameters: _DBAPIMultiExecuteParams
1268 extracted_parameters: Optional[Sequence[BindParameter[Any]]]
1269
1270 _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
1271
1272 _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
1273 _num_sentinel_cols: int = 0
1274
1275 @classmethod
1276 def _init_ddl(
1277 cls,
1278 dialect: Dialect,
1279 connection: Connection,
1280 dbapi_connection: PoolProxiedConnection,
1281 execution_options: _ExecuteOptions,
1282 compiled_ddl: DDLCompiler,
1283 ) -> ExecutionContext:
1284 """Initialize execution context for an ExecutableDDLElement
1285 construct."""
1286
1287 self = cls.__new__(cls)
1288 self.root_connection = connection
1289 self._dbapi_connection = dbapi_connection
1290 self.dialect = connection.dialect
1291
1292 self.compiled = compiled = compiled_ddl
1293 self.isddl = True
1294
1295 self.execution_options = execution_options
1296
1297 self.unicode_statement = str(compiled)
1298 if compiled.schema_translate_map:
1299 schema_translate_map = self.execution_options.get(
1300 "schema_translate_map", {}
1301 )
1302
1303 rst = compiled.preparer._render_schema_translates
1304 self.unicode_statement = rst(
1305 self.unicode_statement, schema_translate_map
1306 )
1307
1308 self.statement = self.unicode_statement
1309
1310 self.cursor = self.create_cursor()
1311 self.compiled_parameters = []
1312
1313 if dialect.positional:
1314 self.parameters = [dialect.execute_sequence_format()]
1315 else:
1316 self.parameters = [self._empty_dict_params]
1317
1318 return self
1319
1320 @classmethod
1321 def _init_compiled(
1322 cls,
1323 dialect: Dialect,
1324 connection: Connection,
1325 dbapi_connection: PoolProxiedConnection,
1326 execution_options: _ExecuteOptions,
1327 compiled: SQLCompiler,
1328 parameters: _CoreMultiExecuteParams,
1329 invoked_statement: Executable,
1330 extracted_parameters: Optional[Sequence[BindParameter[Any]]],
1331 cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
1332 param_dict: _CoreSingleExecuteParams | None = None,
1333 ) -> ExecutionContext:
1334 """Initialize execution context for a Compiled construct."""
1335
1336 self = cls.__new__(cls)
1337 self.root_connection = connection
1338 self._dbapi_connection = dbapi_connection
1339 self.dialect = connection.dialect
1340 self.extracted_parameters = extracted_parameters
1341 self.invoked_statement = invoked_statement
1342 self.compiled = compiled
1343 self.cache_hit = cache_hit
1344
1345 self.execution_options = execution_options
1346
1347 self.result_column_struct = (
1348 compiled._result_columns,
1349 compiled._ordered_columns,
1350 compiled._textual_ordered_columns,
1351 compiled._ad_hoc_textual,
1352 compiled._loose_column_name_matching,
1353 )
1354
1355 self.isinsert = ii = compiled.isinsert
1356 self.isupdate = iu = compiled.isupdate
1357 self.isdelete = id_ = compiled.isdelete
1358 self.is_text = compiled.isplaintext
1359
1360 if ii or iu or id_:
1361 dml_statement = compiled.compile_state.statement # type: ignore
1362 if TYPE_CHECKING:
1363 assert isinstance(dml_statement, UpdateBase)
1364 self.is_crud = True
1365 self._is_explicit_returning = ier = bool(dml_statement._returning)
1366 self._is_implicit_returning = iir = bool(
1367 compiled.implicit_returning
1368 )
1369 if iir and dml_statement._supplemental_returning:
1370 self._is_supplemental_returning = True
1371
1372 # dont mix implicit and explicit returning
1373 assert not (iir and ier)
1374
1375 if (ier or iir) and compiled.for_executemany:
1376 if ii and not self.dialect.insert_executemany_returning:
1377 raise exc.InvalidRequestError(
1378 f"Dialect {self.dialect.dialect_description} with "
1379 f"current server capabilities does not support "
1380 "INSERT..RETURNING when executemany is used"
1381 )
1382 elif (
1383 ii
1384 and dml_statement._sort_by_parameter_order
1385 and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
1386 ):
1387 raise exc.InvalidRequestError(
1388 f"Dialect {self.dialect.dialect_description} with "
1389 f"current server capabilities does not support "
1390 "INSERT..RETURNING with deterministic row ordering "
1391 "when executemany is used"
1392 )
1393 elif (
1394 ii
1395 and self.dialect.use_insertmanyvalues
1396 and not compiled._insertmanyvalues
1397 ):
1398 raise exc.InvalidRequestError(
1399 'Statement does not have "insertmanyvalues" '
1400 "enabled, can't use INSERT..RETURNING with "
1401 "executemany in this case."
1402 )
1403 elif iu and not self.dialect.update_executemany_returning:
1404 raise exc.InvalidRequestError(
1405 f"Dialect {self.dialect.dialect_description} with "
1406 f"current server capabilities does not support "
1407 "UPDATE..RETURNING when executemany is used"
1408 )
1409 elif id_ and not self.dialect.delete_executemany_returning:
1410 raise exc.InvalidRequestError(
1411 f"Dialect {self.dialect.dialect_description} with "
1412 f"current server capabilities does not support "
1413 "DELETE..RETURNING when executemany is used"
1414 )
1415
1416 if not parameters:
1417 self.compiled_parameters = [
1418 compiled.construct_params(
1419 extracted_parameters=extracted_parameters,
1420 escape_names=False,
1421 _collected_params=param_dict,
1422 )
1423 ]
1424 else:
1425 self.compiled_parameters = [
1426 compiled.construct_params(
1427 m,
1428 escape_names=False,
1429 _group_number=grp,
1430 extracted_parameters=extracted_parameters,
1431 _collected_params=param_dict,
1432 )
1433 for grp, m in enumerate(parameters)
1434 ]
1435
1436 if len(parameters) > 1:
1437 if self.isinsert and compiled._insertmanyvalues:
1438 self.execute_style = ExecuteStyle.INSERTMANYVALUES
1439
1440 imv = compiled._insertmanyvalues
1441 if imv.sentinel_columns is not None:
1442 self._num_sentinel_cols = imv.num_sentinel_columns
1443 else:
1444 self.execute_style = ExecuteStyle.EXECUTEMANY
1445
1446 self.unicode_statement = compiled.string
1447
1448 self.cursor = self.create_cursor()
1449
1450 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1451 self._process_execute_defaults()
1452
1453 processors = compiled._bind_processors
1454
1455 flattened_processors: Mapping[
1456 str, _BindProcessorType[Any]
1457 ] = processors # type: ignore[assignment]
1458
1459 if compiled.literal_execute_params or compiled.post_compile_params:
1460 if self.executemany:
1461 raise exc.InvalidRequestError(
1462 "'literal_execute' or 'expanding' parameters can't be "
1463 "used with executemany()"
1464 )
1465
1466 expanded_state = compiled._process_parameters_for_postcompile(
1467 self.compiled_parameters[0]
1468 )
1469
1470 # re-assign self.unicode_statement
1471 self.unicode_statement = expanded_state.statement
1472
1473 self._expanded_parameters = expanded_state.parameter_expansion
1474
1475 flattened_processors = dict(processors) # type: ignore
1476 flattened_processors.update(expanded_state.processors)
1477 positiontup = expanded_state.positiontup
1478 elif compiled.positional:
1479 positiontup = self.compiled.positiontup
1480 else:
1481 positiontup = None
1482
1483 if compiled.schema_translate_map:
1484 schema_translate_map = self.execution_options.get(
1485 "schema_translate_map", {}
1486 )
1487 rst = compiled.preparer._render_schema_translates
1488 self.unicode_statement = rst(
1489 self.unicode_statement, schema_translate_map
1490 )
1491
1492 # final self.unicode_statement is now assigned, encode if needed
1493 # by dialect
1494 self.statement = self.unicode_statement
1495
1496 # Convert the dictionary of bind parameter values
1497 # into a dict or list to be sent to the DBAPI's
1498 # execute() or executemany() method.
1499
1500 if compiled.positional:
1501 core_positional_parameters: MutableSequence[Sequence[Any]] = []
1502 assert positiontup is not None
1503 for compiled_params in self.compiled_parameters:
1504 l_param: List[Any] = [
1505 (
1506 flattened_processors[key](compiled_params[key])
1507 if key in flattened_processors
1508 else compiled_params[key]
1509 )
1510 for key in positiontup
1511 ]
1512 core_positional_parameters.append(
1513 dialect.execute_sequence_format(l_param)
1514 )
1515
1516 self.parameters = core_positional_parameters
1517 else:
1518 core_dict_parameters: MutableSequence[Dict[str, Any]] = []
1519 escaped_names = compiled.escaped_bind_names
1520
1521 # note that currently, "expanded" parameters will be present
1522 # in self.compiled_parameters in their quoted form. This is
1523 # slightly inconsistent with the approach taken as of
1524 # #8056 where self.compiled_parameters is meant to contain unquoted
1525 # param names.
1526 d_param: Dict[str, Any]
1527 for compiled_params in self.compiled_parameters:
1528 if escaped_names:
1529 d_param = {
1530 escaped_names.get(key, key): (
1531 flattened_processors[key](compiled_params[key])
1532 if key in flattened_processors
1533 else compiled_params[key]
1534 )
1535 for key in compiled_params
1536 }
1537 else:
1538 d_param = {
1539 key: (
1540 flattened_processors[key](compiled_params[key])
1541 if key in flattened_processors
1542 else compiled_params[key]
1543 )
1544 for key in compiled_params
1545 }
1546
1547 core_dict_parameters.append(d_param)
1548
1549 self.parameters = core_dict_parameters
1550
1551 return self
1552
1553 @classmethod
1554 def _init_statement(
1555 cls,
1556 dialect: Dialect,
1557 connection: Connection,
1558 dbapi_connection: PoolProxiedConnection,
1559 execution_options: _ExecuteOptions,
1560 statement: str,
1561 parameters: _DBAPIMultiExecuteParams,
1562 ) -> ExecutionContext:
1563 """Initialize execution context for a string SQL statement."""
1564
1565 self = cls.__new__(cls)
1566 self.root_connection = connection
1567 self._dbapi_connection = dbapi_connection
1568 self.dialect = connection.dialect
1569 self.is_text = True
1570
1571 self.execution_options = execution_options
1572
1573 if not parameters:
1574 if self.dialect.positional:
1575 self.parameters = [dialect.execute_sequence_format()]
1576 else:
1577 self.parameters = [self._empty_dict_params]
1578 elif isinstance(parameters[0], dialect.execute_sequence_format):
1579 self.parameters = parameters
1580 elif isinstance(parameters[0], dict):
1581 self.parameters = parameters
1582 else:
1583 self.parameters = [
1584 dialect.execute_sequence_format(p) for p in parameters
1585 ]
1586
1587 if len(parameters) > 1:
1588 self.execute_style = ExecuteStyle.EXECUTEMANY
1589
1590 self.statement = self.unicode_statement = statement
1591
1592 self.cursor = self.create_cursor()
1593 return self
1594
1595 @classmethod
1596 def _init_default(
1597 cls,
1598 dialect: Dialect,
1599 connection: Connection,
1600 dbapi_connection: PoolProxiedConnection,
1601 execution_options: _ExecuteOptions,
1602 ) -> ExecutionContext:
1603 """Initialize execution context for a ColumnDefault construct."""
1604
1605 self = cls.__new__(cls)
1606 self.root_connection = connection
1607 self._dbapi_connection = dbapi_connection
1608 self.dialect = connection.dialect
1609
1610 self.execution_options = execution_options
1611
1612 self.cursor = self.create_cursor()
1613 return self
1614
1615 def _get_cache_stats(self) -> str:
1616 if self.compiled is None:
1617 return "raw sql"
1618
1619 now = perf_counter()
1620
1621 ch = self.cache_hit
1622
1623 gen_time = self.compiled._gen_time
1624 assert gen_time is not None
1625
1626 if ch is NO_CACHE_KEY:
1627 return "no key %.5fs" % (now - gen_time,)
1628 elif ch is CACHE_HIT:
1629 return "cached since %.4gs ago" % (now - gen_time,)
1630 elif ch is CACHE_MISS:
1631 return "generated in %.5fs" % (now - gen_time,)
1632 elif ch is CACHING_DISABLED:
1633 if "_cache_disable_reason" in self.execution_options:
1634 return "caching disabled (%s) %.5fs " % (
1635 self.execution_options["_cache_disable_reason"],
1636 now - gen_time,
1637 )
1638 else:
1639 return "caching disabled %.5fs" % (now - gen_time,)
1640 elif ch is NO_DIALECT_SUPPORT:
1641 return "dialect %s+%s does not support caching %.5fs" % (
1642 self.dialect.name,
1643 self.dialect.driver,
1644 now - gen_time,
1645 )
1646 else:
1647 return "unknown"
1648
1649 @property
1650 def executemany(self): # type: ignore[override]
1651 return self.execute_style in (
1652 ExecuteStyle.EXECUTEMANY,
1653 ExecuteStyle.INSERTMANYVALUES,
1654 )
1655
1656 @util.memoized_property
1657 def identifier_preparer(self):
1658 if self.compiled:
1659 return self.compiled.preparer
1660 elif "schema_translate_map" in self.execution_options:
1661 return self.dialect.identifier_preparer._with_schema_translate(
1662 self.execution_options["schema_translate_map"]
1663 )
1664 else:
1665 return self.dialect.identifier_preparer
1666
1667 @util.memoized_property
1668 def engine(self):
1669 return self.root_connection.engine
1670
1671 @util.memoized_property
1672 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1673 if TYPE_CHECKING:
1674 assert isinstance(self.compiled, SQLCompiler)
1675 return self.compiled.postfetch
1676
1677 @util.memoized_property
1678 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
1679 if TYPE_CHECKING:
1680 assert isinstance(self.compiled, SQLCompiler)
1681 if self.isinsert:
1682 return self.compiled.insert_prefetch
1683 elif self.isupdate:
1684 return self.compiled.update_prefetch
1685 else:
1686 return ()
1687
1688 @util.memoized_property
1689 def no_parameters(self):
1690 return self.execution_options.get("no_parameters", False)
1691
1692 def _execute_scalar(
1693 self,
1694 stmt: str,
1695 type_: Optional[TypeEngine[Any]],
1696 parameters: Optional[_DBAPISingleExecuteParams] = None,
1697 ) -> Any:
1698 """Execute a string statement on the current cursor, returning a
1699 scalar result.
1700
1701 Used to fire off sequences, default phrases, and "select lastrowid"
1702 types of statements individually or in the context of a parent INSERT
1703 or UPDATE statement.
1704
1705 """
1706
1707 conn = self.root_connection
1708
1709 if "schema_translate_map" in self.execution_options:
1710 schema_translate_map = self.execution_options.get(
1711 "schema_translate_map", {}
1712 )
1713
1714 rst = self.identifier_preparer._render_schema_translates
1715 stmt = rst(stmt, schema_translate_map)
1716
1717 if not parameters:
1718 if self.dialect.positional:
1719 parameters = self.dialect.execute_sequence_format()
1720 else:
1721 parameters = {}
1722
1723 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1724 row = self.cursor.fetchone()
1725 if row is not None:
1726 r = row[0]
1727 else:
1728 r = None
1729 if type_ is not None:
1730 # apply type post processors to the result
1731 proc = type_._cached_result_processor(
1732 self.dialect, self.cursor.description[0][1]
1733 )
1734 if proc:
1735 return proc(r)
1736 return r
1737
1738 @util.memoized_property
1739 def connection(self):
1740 return self.root_connection
1741
1742 def _use_server_side_cursor(self):
1743 if not self.dialect.supports_server_side_cursors:
1744 return False
1745
1746 if self.dialect.server_side_cursors:
1747 # this is deprecated
1748 use_server_side = self.execution_options.get(
1749 "stream_results", True
1750 ) and (
1751 self.compiled
1752 and isinstance(self.compiled.statement, expression.Selectable)
1753 or (
1754 (
1755 not self.compiled
1756 or isinstance(
1757 self.compiled.statement, expression.TextClause
1758 )
1759 )
1760 and self.unicode_statement
1761 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1762 )
1763 )
1764 else:
1765 use_server_side = self.execution_options.get(
1766 "stream_results", False
1767 )
1768
1769 return use_server_side
1770
1771 def create_cursor(self) -> DBAPICursor:
1772 if (
1773 # inlining initial preference checks for SS cursors
1774 self.dialect.supports_server_side_cursors
1775 and (
1776 self.execution_options.get("stream_results", False)
1777 or (
1778 self.dialect.server_side_cursors
1779 and self._use_server_side_cursor()
1780 )
1781 )
1782 ):
1783 self._is_server_side = True
1784 return self.create_server_side_cursor()
1785 else:
1786 self._is_server_side = False
1787 return self.create_default_cursor()
1788
1789 def fetchall_for_returning(self, cursor):
1790 return cursor.fetchall()
1791
1792 def create_default_cursor(self) -> DBAPICursor:
1793 return self._dbapi_connection.cursor()
1794
1795 def create_server_side_cursor(self) -> DBAPICursor:
1796 raise NotImplementedError()
1797
1798 def pre_exec(self):
1799 pass
1800
1801 def get_out_parameter_values(self, names):
1802 raise NotImplementedError(
1803 "This dialect does not support OUT parameters"
1804 )
1805
1806 def post_exec(self):
1807 pass
1808
1809 def get_result_processor(
1810 self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType
1811 ) -> Optional[_ResultProcessorType[Any]]:
1812 """Return a 'result processor' for a given type as present in
1813 cursor.description.
1814
1815 This has a default implementation that dialects can override
1816 for context-sensitive result type handling.
1817
1818 """
1819 return type_._cached_result_processor(self.dialect, coltype)
1820
1821 def get_lastrowid(self) -> int:
1822 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1823
1824 This may involve calling special cursor functions, issuing a new SELECT
1825 on the cursor (or a new one), or returning a stored value that was
1826 calculated within post_exec().
1827
1828 This function will only be called for dialects which support "implicit"
1829 primary key generation, keep preexecute_autoincrement_sequences set to
1830 False, and when no explicit id value was bound to the statement.
1831
1832 The function is called once for an INSERT statement that would need to
1833 return the last inserted primary key for those dialects that make use
1834 of the lastrowid concept. In these cases, it is called directly after
1835 :meth:`.ExecutionContext.post_exec`.
1836
1837 """
1838 return self.cursor.lastrowid
1839
1840 def handle_dbapi_exception(self, e):
1841 pass
1842
1843 @util.non_memoized_property
1844 def rowcount(self) -> int:
1845 if self._rowcount is not None:
1846 return self._rowcount
1847 else:
1848 return self.cursor.rowcount
1849
1850 @property
1851 def _has_rowcount(self):
1852 return self._rowcount is not None
1853
1854 def supports_sane_rowcount(self):
1855 return self.dialect.supports_sane_rowcount
1856
1857 def supports_sane_multi_rowcount(self):
1858 return self.dialect.supports_sane_multi_rowcount
1859
1860 def _setup_result_proxy(self):
1861 exec_opt = self.execution_options
1862
1863 if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
1864 self._rowcount = self.cursor.rowcount
1865
1866 yp: Optional[Union[int, bool]]
1867 if self.is_crud or self.is_text:
1868 result = self._setup_dml_or_text_result()
1869 yp = False
1870 else:
1871 yp = exec_opt.get("yield_per", None)
1872 sr = self._is_server_side or exec_opt.get("stream_results", False)
1873 strategy = self.cursor_fetch_strategy
1874 if sr and strategy is _cursor._DEFAULT_FETCH:
1875 strategy = _cursor.BufferedRowCursorFetchStrategy(
1876 self.cursor, self.execution_options
1877 )
1878 cursor_description: _DBAPICursorDescription = (
1879 strategy.alternate_cursor_description
1880 or self.cursor.description
1881 )
1882 if cursor_description is None:
1883 strategy = _cursor._NO_CURSOR_DQL
1884
1885 result = _cursor.CursorResult(self, strategy, cursor_description)
1886
1887 compiled = self.compiled
1888
1889 if (
1890 compiled
1891 and not self.isddl
1892 and cast(SQLCompiler, compiled).has_out_parameters
1893 ):
1894 self._setup_out_parameters(result)
1895
1896 self._soft_closed = result._soft_closed
1897
1898 if yp:
1899 result = result.yield_per(yp)
1900
1901 return result
1902
1903 def _setup_out_parameters(self, result):
1904 compiled = cast(SQLCompiler, self.compiled)
1905
1906 out_bindparams = [
1907 (param, name)
1908 for param, name in compiled.bind_names.items()
1909 if param.isoutparam
1910 ]
1911 out_parameters = {}
1912
1913 for bindparam, raw_value in zip(
1914 [param for param, name in out_bindparams],
1915 self.get_out_parameter_values(
1916 [name for param, name in out_bindparams]
1917 ),
1918 ):
1919 type_ = bindparam.type
1920 impl_type = type_.dialect_impl(self.dialect)
1921 dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
1922 result_processor = impl_type.result_processor(
1923 self.dialect, dbapi_type
1924 )
1925 if result_processor is not None:
1926 raw_value = result_processor(raw_value)
1927 out_parameters[bindparam.key] = raw_value
1928
1929 result.out_parameters = out_parameters
1930
1931 def _setup_dml_or_text_result(self):
1932 compiled = cast(SQLCompiler, self.compiled)
1933
1934 strategy: ResultFetchStrategy = self.cursor_fetch_strategy
1935
1936 if self.isinsert:
1937 if (
1938 self.execute_style is ExecuteStyle.INSERTMANYVALUES
1939 and compiled.effective_returning
1940 ):
1941 strategy = _cursor.FullyBufferedCursorFetchStrategy(
1942 self.cursor,
1943 initial_buffer=self._insertmanyvalues_rows,
1944 # maintain alt cursor description if set by the
1945 # dialect, e.g. mssql preserves it
1946 alternate_description=(
1947 strategy.alternate_cursor_description
1948 ),
1949 )
1950
1951 if compiled.postfetch_lastrowid:
1952 self.inserted_primary_key_rows = (
1953 self._setup_ins_pk_from_lastrowid()
1954 )
1955 # else if not self._is_implicit_returning,
1956 # the default inserted_primary_key_rows accessor will
1957 # return an "empty" primary key collection when accessed.
1958
1959 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1960 strategy = _cursor.BufferedRowCursorFetchStrategy(
1961 self.cursor, self.execution_options
1962 )
1963
1964 if strategy is _cursor._NO_CURSOR_DML:
1965 cursor_description = None
1966 else:
1967 cursor_description = (
1968 strategy.alternate_cursor_description
1969 or self.cursor.description
1970 )
1971
1972 if cursor_description is None:
1973 strategy = _cursor._NO_CURSOR_DML
1974 elif self._num_sentinel_cols:
1975 assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
1976 # the sentinel columns are handled in CursorResult._init_metadata
1977 # using essentially _reduce
1978
1979 result: _cursor.CursorResult[Any] = _cursor.CursorResult(
1980 self, strategy, cursor_description
1981 )
1982
1983 if self.isinsert:
1984 if self._is_implicit_returning:
1985 rows = result.all()
1986
1987 self.returned_default_rows = rows
1988
1989 self.inserted_primary_key_rows = (
1990 self._setup_ins_pk_from_implicit_returning(result, rows)
1991 )
1992
1993 # test that it has a cursor metadata that is accurate. the
1994 # first row will have been fetched and current assumptions
1995 # are that the result has only one row, until executemany()
1996 # support is added here.
1997 assert result._metadata.returns_rows
1998
1999 # Insert statement has both return_defaults() and
2000 # returning(). rewind the result on the list of rows
2001 # we just used.
2002 if self._is_supplemental_returning:
2003 result._rewind(rows)
2004 else:
2005 result._soft_close()
2006 elif not self._is_explicit_returning:
2007 result._soft_close()
2008
2009 # we assume here the result does not return any rows.
2010 # *usually*, this will be true. However, some dialects
2011 # such as that of MSSQL/pyodbc need to SELECT a post fetch
2012 # function so this is not necessarily true.
2013 # assert not result.returns_rows
2014
2015 elif self._is_implicit_returning:
2016 rows = result.all()
2017
2018 if rows:
2019 self.returned_default_rows = rows
2020 self._rowcount = len(rows)
2021
2022 if self._is_supplemental_returning:
2023 result._rewind(rows)
2024 else:
2025 result._soft_close()
2026
2027 # test that it has a cursor metadata that is accurate.
2028 # the rows have all been fetched however.
2029 assert result._metadata.returns_rows
2030
2031 elif not result._metadata.returns_rows:
2032 # no results, get rowcount
2033 # (which requires open cursor on some drivers)
2034 if self._rowcount is None:
2035 self._rowcount = self.cursor.rowcount
2036 result._soft_close()
2037 elif self.isupdate or self.isdelete:
2038 if self._rowcount is None:
2039 self._rowcount = self.cursor.rowcount
2040 return result
2041
2042 @util.memoized_property
2043 def inserted_primary_key_rows(self):
2044 # if no specific "get primary key" strategy was set up
2045 # during execution, return a "default" primary key based
2046 # on what's in the compiled_parameters and nothing else.
2047 return self._setup_ins_pk_from_empty()
2048
2049 def _setup_ins_pk_from_lastrowid(self):
2050 getter = cast(
2051 SQLCompiler, self.compiled
2052 )._inserted_primary_key_from_lastrowid_getter
2053 lastrowid = self.get_lastrowid()
2054 return [getter(lastrowid, self.compiled_parameters[0])]
2055
2056 def _setup_ins_pk_from_empty(self):
2057 getter = cast(
2058 SQLCompiler, self.compiled
2059 )._inserted_primary_key_from_lastrowid_getter
2060 return [getter(None, param) for param in self.compiled_parameters]
2061
2062 def _setup_ins_pk_from_implicit_returning(self, result, rows):
2063 if not rows:
2064 return []
2065
2066 getter = cast(
2067 SQLCompiler, self.compiled
2068 )._inserted_primary_key_from_returning_getter
2069 compiled_params = self.compiled_parameters
2070
2071 return [
2072 getter(row, param) for row, param in zip(rows, compiled_params)
2073 ]
2074
2075 def lastrow_has_defaults(self) -> bool:
2076 return (self.isinsert or self.isupdate) and bool(
2077 cast(SQLCompiler, self.compiled).postfetch
2078 )
2079
2080 def _prepare_set_input_sizes(
2081 self,
2082 ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
2083 """Given a cursor and ClauseParameters, prepare arguments
2084 in order to call the appropriate
2085 style of ``setinputsizes()`` on the cursor, using DB-API types
2086 from the bind parameter's ``TypeEngine`` objects.
2087
2088 This method only called by those dialects which set the
2089 :attr:`.Dialect.bind_typing` attribute to
2090 :attr:`.BindTyping.SETINPUTSIZES`. Python-oracledb and cx_Oracle are
2091 the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
2092 option.
2093
2094 Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
2095 for pg8000 and asyncpg, which has been changed to inline rendering
2096 of casts.
2097
2098 """
2099 if self.isddl or self.is_text:
2100 return None
2101
2102 compiled = cast(SQLCompiler, self.compiled)
2103
2104 inputsizes = compiled._get_set_input_sizes_lookup()
2105
2106 if inputsizes is None:
2107 return None
2108
2109 dialect = self.dialect
2110
2111 # all of the rest of this... cython?
2112
2113 if dialect._has_events:
2114 inputsizes = dict(inputsizes)
2115 dialect.dispatch.do_setinputsizes(
2116 inputsizes, self.cursor, self.statement, self.parameters, self
2117 )
2118
2119 if compiled.escaped_bind_names:
2120 escaped_bind_names = compiled.escaped_bind_names
2121 else:
2122 escaped_bind_names = None
2123
2124 if dialect.positional:
2125 items = [
2126 (key, compiled.binds[key])
2127 for key in compiled.positiontup or ()
2128 ]
2129 else:
2130 items = [
2131 (key, bindparam)
2132 for bindparam, key in compiled.bind_names.items()
2133 ]
2134
2135 generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
2136 for key, bindparam in items:
2137 if bindparam in compiled.literal_execute_params:
2138 continue
2139
2140 if key in self._expanded_parameters:
2141 if is_tuple_type(bindparam.type):
2142 num = len(bindparam.type.types)
2143 dbtypes = inputsizes[bindparam]
2144 generic_inputsizes.extend(
2145 (
2146 (
2147 escaped_bind_names.get(paramname, paramname)
2148 if escaped_bind_names is not None
2149 else paramname
2150 ),
2151 dbtypes[idx % num],
2152 bindparam.type.types[idx % num],
2153 )
2154 for idx, paramname in enumerate(
2155 self._expanded_parameters[key]
2156 )
2157 )
2158 else:
2159 dbtype = inputsizes.get(bindparam, None)
2160 generic_inputsizes.extend(
2161 (
2162 (
2163 escaped_bind_names.get(paramname, paramname)
2164 if escaped_bind_names is not None
2165 else paramname
2166 ),
2167 dbtype,
2168 bindparam.type,
2169 )
2170 for paramname in self._expanded_parameters[key]
2171 )
2172 else:
2173 dbtype = inputsizes.get(bindparam, None)
2174
2175 escaped_name = (
2176 escaped_bind_names.get(key, key)
2177 if escaped_bind_names is not None
2178 else key
2179 )
2180
2181 generic_inputsizes.append(
2182 (escaped_name, dbtype, bindparam.type)
2183 )
2184
2185 return generic_inputsizes
2186
2187 def _exec_default(self, column, default, type_):
2188 if default.is_sequence:
2189 return self.fire_sequence(default, type_)
2190 elif default.is_callable:
2191 # this codepath is not normally used as it's inlined
2192 # into _process_execute_defaults
2193 self.current_column = column
2194 return default.arg(self)
2195 elif default.is_clause_element:
2196 return self._exec_default_clause_element(column, default, type_)
2197 else:
2198 # this codepath is not normally used as it's inlined
2199 # into _process_execute_defaults
2200 return default.arg
2201
2202 def _exec_default_clause_element(self, column, default, type_):
2203 # execute a default that's a complete clause element. Here, we have
2204 # to re-implement a miniature version of the compile->parameters->
2205 # cursor.execute() sequence, since we don't want to modify the state
2206 # of the connection / result in progress or create new connection/
2207 # result objects etc.
2208 # .. versionchanged:: 1.4
2209
2210 if not default._arg_is_typed:
2211 default_arg = expression.type_coerce(default.arg, type_)
2212 else:
2213 default_arg = default.arg
2214 compiled = expression.select(default_arg).compile(dialect=self.dialect)
2215 compiled_params = compiled.construct_params()
2216 processors = compiled._bind_processors
2217 if compiled.positional:
2218 parameters = self.dialect.execute_sequence_format(
2219 [
2220 (
2221 processors[key](compiled_params[key]) # type: ignore
2222 if key in processors
2223 else compiled_params[key]
2224 )
2225 for key in compiled.positiontup or ()
2226 ]
2227 )
2228 else:
2229 parameters = {
2230 key: (
2231 processors[key](compiled_params[key]) # type: ignore
2232 if key in processors
2233 else compiled_params[key]
2234 )
2235 for key in compiled_params
2236 }
2237 return self._execute_scalar(
2238 str(compiled), type_, parameters=parameters
2239 )
2240
2241 current_parameters: Optional[_CoreSingleExecuteParams] = None
2242 """A dictionary of parameters applied to the current row.
2243
2244 This attribute is only available in the context of a user-defined default
2245 generation function, e.g. as described at :ref:`context_default_functions`.
2246 It consists of a dictionary which includes entries for each column/value
2247 pair that is to be part of the INSERT or UPDATE statement. The keys of the
2248 dictionary will be the key value of each :class:`_schema.Column`,
2249 which is usually
2250 synonymous with the name.
2251
2252 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
2253 does not accommodate for the "multi-values" feature of the
2254 :meth:`_expression.Insert.values` method. The
2255 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
2256 preferred.
2257
2258 .. seealso::
2259
2260 :meth:`.DefaultExecutionContext.get_current_parameters`
2261
2262 :ref:`context_default_functions`
2263
2264 """
2265
2266 def get_current_parameters(self, isolate_multiinsert_groups=True):
2267 """Return a dictionary of parameters applied to the current row.
2268
2269 This method can only be used in the context of a user-defined default
2270 generation function, e.g. as described at
2271 :ref:`context_default_functions`. When invoked, a dictionary is
2272 returned which includes entries for each column/value pair that is part
2273 of the INSERT or UPDATE statement. The keys of the dictionary will be
2274 the key value of each :class:`_schema.Column`,
2275 which is usually synonymous
2276 with the name.
2277
2278 :param isolate_multiinsert_groups=True: indicates that multi-valued
2279 INSERT constructs created using :meth:`_expression.Insert.values`
2280 should be
2281 handled by returning only the subset of parameters that are local
2282 to the current column default invocation. When ``False``, the
2283 raw parameters of the statement are returned including the
2284 naming convention used in the case of multi-valued INSERT.
2285
2286 .. seealso::
2287
2288 :attr:`.DefaultExecutionContext.current_parameters`
2289
2290 :ref:`context_default_functions`
2291
2292 """
2293 try:
2294 parameters = self.current_parameters
2295 column = self.current_column
2296 except AttributeError:
2297 raise exc.InvalidRequestError(
2298 "get_current_parameters() can only be invoked in the "
2299 "context of a Python side column default function"
2300 )
2301 else:
2302 assert column is not None
2303 assert parameters is not None
2304 compile_state = cast(
2305 "DMLState", cast(SQLCompiler, self.compiled).compile_state
2306 )
2307 assert compile_state is not None
2308 if (
2309 isolate_multiinsert_groups
2310 and dml.isinsert(compile_state)
2311 and compile_state._has_multi_parameters
2312 ):
2313 if column._is_multiparam_column:
2314 index = column.index + 1
2315 d = {column.original.key: parameters[column.key]}
2316 else:
2317 d = {column.key: parameters[column.key]}
2318 index = 0
2319 assert compile_state._dict_parameters is not None
2320 keys = compile_state._dict_parameters.keys()
2321 d.update(
2322 (key, parameters["%s_m%d" % (key, index)]) for key in keys
2323 )
2324 return d
2325 else:
2326 return parameters
2327
2328 def get_insert_default(self, column):
2329 if column.default is None:
2330 return None
2331 else:
2332 return self._exec_default(column, column.default, column.type)
2333
2334 def get_update_default(self, column):
2335 if column.onupdate is None:
2336 return None
2337 else:
2338 return self._exec_default(column, column.onupdate, column.type)
2339
2340 def _process_execute_defaults(self):
2341 compiled = cast(SQLCompiler, self.compiled)
2342
2343 key_getter = compiled._within_exec_param_key_getter
2344
2345 sentinel_counter = 0
2346
2347 if compiled.insert_prefetch:
2348 prefetch_recs = [
2349 (
2350 c,
2351 key_getter(c),
2352 c._default_description_tuple,
2353 self.get_insert_default,
2354 )
2355 for c in compiled.insert_prefetch
2356 ]
2357 elif compiled.update_prefetch:
2358 prefetch_recs = [
2359 (
2360 c,
2361 key_getter(c),
2362 c._onupdate_description_tuple,
2363 self.get_update_default,
2364 )
2365 for c in compiled.update_prefetch
2366 ]
2367 else:
2368 prefetch_recs = []
2369
2370 for param in self.compiled_parameters:
2371 self.current_parameters = param
2372
2373 for (
2374 c,
2375 param_key,
2376 (arg, is_scalar, is_callable, is_sentinel),
2377 fallback,
2378 ) in prefetch_recs:
2379 if is_sentinel:
2380 param[param_key] = sentinel_counter
2381 sentinel_counter += 1
2382 elif is_scalar:
2383 param[param_key] = arg
2384 elif is_callable:
2385 self.current_column = c
2386 param[param_key] = arg(self)
2387 else:
2388 val = fallback(c)
2389 if val is not None:
2390 param[param_key] = val
2391
2392 del self.current_parameters
2393
2394
2395DefaultDialect.execution_ctx_cls = DefaultExecutionContext