1# engine/default.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""Default implementations of per-dialect sqlalchemy.engine classes.
9
10These are semi-private implementation classes which are only of importance
11to database dialect authors; dialects will usually use the classes here
12as the base class for their own corresponding classes.
13
14"""
15
16import codecs
17import random
18import re
19import weakref
20
21from . import interfaces
22from . import reflection
23from . import result
24from .. import event
25from .. import exc
26from .. import pool
27from .. import processors
28from .. import types as sqltypes
29from .. import util
30from ..sql import compiler
31from ..sql import expression
32from ..sql import schema
33from ..sql.elements import quoted_name
34
35
36AUTOCOMMIT_REGEXP = re.compile(
37 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)", re.I | re.UNICODE
38)
39
40# When we're handed literal SQL, ensure it's a SELECT query
41SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
42
43
44class DefaultDialect(interfaces.Dialect):
45 """Default implementation of Dialect"""
46
47 statement_compiler = compiler.SQLCompiler
48 ddl_compiler = compiler.DDLCompiler
49 type_compiler = compiler.GenericTypeCompiler
50 preparer = compiler.IdentifierPreparer
51 supports_alter = True
52 supports_comments = False
53 inline_comments = False
54
55 # the first value we'd get for an autoincrement
56 # column.
57 default_sequence_base = 1
58
59 # most DBAPIs happy with this for execute().
60 # not cx_oracle.
61 execute_sequence_format = tuple
62
63 supports_views = True
64 supports_sequences = False
65 sequences_optional = False
66 preexecute_autoincrement_sequences = False
67 postfetch_lastrowid = True
68 implicit_returning = False
69
70 supports_right_nested_joins = True
71 cte_follows_insert = False
72
73 supports_native_enum = False
74 supports_native_boolean = False
75 non_native_boolean_check_constraint = True
76
77 supports_simple_order_by_label = True
78
79 tuple_in_values = False
80
81 engine_config_types = util.immutabledict(
82 [
83 ("convert_unicode", util.bool_or_str("force")),
84 ("pool_timeout", util.asint),
85 ("echo", util.bool_or_str("debug")),
86 ("echo_pool", util.bool_or_str("debug")),
87 ("pool_recycle", util.asint),
88 ("pool_size", util.asint),
89 ("max_overflow", util.asint),
90 ("pool_threadlocal", util.asbool),
91 ]
92 )
93
94 # if the NUMERIC type
95 # returns decimal.Decimal.
96 # *not* the FLOAT type however.
97 supports_native_decimal = False
98
99 if util.py3k:
100 supports_unicode_statements = True
101 supports_unicode_binds = True
102 returns_unicode_strings = True
103 description_encoding = None
104 else:
105 supports_unicode_statements = False
106 supports_unicode_binds = False
107 returns_unicode_strings = False
108 description_encoding = "use_encoding"
109
110 name = "default"
111
112 # length at which to truncate
113 # any identifier.
114 max_identifier_length = 9999
115 _user_defined_max_identifier_length = None
116
117 # sub-categories of max_identifier_length.
118 # currently these accommodate for MySQL which allows alias names
119 # of 255 but DDL names only of 64.
120 max_index_name_length = None
121 max_constraint_name_length = None
122
123 supports_sane_rowcount = True
124 supports_sane_multi_rowcount = True
125 colspecs = {}
126 default_paramstyle = "named"
127 supports_default_values = False
128 supports_empty_insert = True
129 supports_multivalues_insert = False
130
131 supports_is_distinct_from = True
132
133 supports_server_side_cursors = False
134
135 # extra record-level locking features (#4860)
136 supports_for_update_of = False
137
138 server_version_info = None
139
140 construct_arguments = None
141 """Optional set of argument specifiers for various SQLAlchemy
142 constructs, typically schema items.
143
144 To implement, establish as a series of tuples, as in::
145
146 construct_arguments = [
147 (schema.Index, {
148 "using": False,
149 "where": None,
150 "ops": None
151 })
152 ]
153
154 If the above construct is established on the PostgreSQL dialect,
155 the :class:`.Index` construct will now accept the keyword arguments
156 ``postgresql_using``, ``postgresql_where``, nad ``postgresql_ops``.
157 Any other argument specified to the constructor of :class:`.Index`
158 which is prefixed with ``postgresql_`` will raise :class:`.ArgumentError`.
159
160 A dialect which does not include a ``construct_arguments`` member will
161 not participate in the argument validation system. For such a dialect,
162 any argument name is accepted by all participating constructs, within
163 the namespace of arguments prefixed with that dialect name. The rationale
164 here is so that third-party dialects that haven't yet implemented this
165 feature continue to function in the old way.
166
167 .. versionadded:: 0.9.2
168
169 .. seealso::
170
171 :class:`.DialectKWArgs` - implementing base class which consumes
172 :attr:`.DefaultDialect.construct_arguments`
173
174
175 """
176
177 # indicates symbol names are
178 # UPPERCASEd if they are case insensitive
179 # within the database.
180 # if this is True, the methods normalize_name()
181 # and denormalize_name() must be provided.
182 requires_name_normalize = False
183
184 reflection_options = ()
185
186 dbapi_exception_translation_map = util.immutabledict()
187 """mapping used in the extremely unusual case that a DBAPI's
188 published exceptions don't actually have the __name__ that they
189 are linked towards.
190
191 .. versionadded:: 1.0.5
192
193 """
194
195 @util.deprecated_params(
196 convert_unicode=(
197 "1.3",
198 "The :paramref:`_sa.create_engine.convert_unicode` parameter "
199 "and corresponding dialect-level parameters are deprecated, "
200 "and will be removed in a future release. Modern DBAPIs support "
201 "Python Unicode natively and this parameter is unnecessary.",
202 )
203 )
204 def __init__(
205 self,
206 convert_unicode=False,
207 encoding="utf-8",
208 paramstyle=None,
209 dbapi=None,
210 implicit_returning=None,
211 supports_right_nested_joins=None,
212 case_sensitive=True,
213 supports_native_boolean=None,
214 empty_in_strategy="static",
215 max_identifier_length=None,
216 label_length=None,
217 **kwargs
218 ):
219
220 if not getattr(self, "ported_sqla_06", True):
221 util.warn(
222 "The %s dialect is not yet ported to the 0.6 format"
223 % self.name
224 )
225
226 self.convert_unicode = convert_unicode
227 self.encoding = encoding
228 self.positional = False
229 self._ischema = None
230 self.dbapi = dbapi
231 if paramstyle is not None:
232 self.paramstyle = paramstyle
233 elif self.dbapi is not None:
234 self.paramstyle = self.dbapi.paramstyle
235 else:
236 self.paramstyle = self.default_paramstyle
237 if implicit_returning is not None:
238 self.implicit_returning = implicit_returning
239 self.positional = self.paramstyle in ("qmark", "format", "numeric")
240 self.identifier_preparer = self.preparer(self)
241 self.type_compiler = self.type_compiler(self)
242 if supports_right_nested_joins is not None:
243 self.supports_right_nested_joins = supports_right_nested_joins
244 if supports_native_boolean is not None:
245 self.supports_native_boolean = supports_native_boolean
246 self.case_sensitive = case_sensitive
247
248 self.empty_in_strategy = empty_in_strategy
249 if empty_in_strategy == "static":
250 self._use_static_in = True
251 elif empty_in_strategy in ("dynamic", "dynamic_warn"):
252 self._use_static_in = False
253 self._warn_on_empty_in = empty_in_strategy == "dynamic_warn"
254 else:
255 raise exc.ArgumentError(
256 "empty_in_strategy may be 'static', "
257 "'dynamic', or 'dynamic_warn'"
258 )
259
260 self._user_defined_max_identifier_length = max_identifier_length
261 if self._user_defined_max_identifier_length:
262 self.max_identifier_length = (
263 self._user_defined_max_identifier_length
264 )
265 self.label_length = label_length
266
267 if self.description_encoding == "use_encoding":
268 self._description_decoder = (
269 processors.to_unicode_processor_factory
270 )(encoding)
271 elif self.description_encoding is not None:
272 self._description_decoder = (
273 processors.to_unicode_processor_factory
274 )(self.description_encoding)
275 self._encoder = codecs.getencoder(self.encoding)
276 self._decoder = processors.to_unicode_processor_factory(self.encoding)
277
278 @util.memoized_property
279 def _type_memos(self):
280 return weakref.WeakKeyDictionary()
281
282 @property
283 def dialect_description(self):
284 return self.name + "+" + self.driver
285
286 @property
287 def supports_sane_rowcount_returning(self):
288 """True if this dialect supports sane rowcount even if RETURNING is
289 in use.
290
291 For dialects that don't support RETURNING, this is synonymous with
292 ``supports_sane_rowcount``.
293
294 """
295 return self.supports_sane_rowcount
296
297 @classmethod
298 def get_pool_class(cls, url):
299 return getattr(cls, "poolclass", pool.QueuePool)
300
301 @classmethod
302 def load_provisioning(cls):
303 package = ".".join(cls.__module__.split(".")[0:-1])
304 try:
305 __import__(package + ".provision")
306 except ImportError:
307 pass
308
309 def initialize(self, connection):
310 try:
311 self.server_version_info = self._get_server_version_info(
312 connection
313 )
314 except NotImplementedError:
315 self.server_version_info = None
316 try:
317 self.default_schema_name = self._get_default_schema_name(
318 connection
319 )
320 except NotImplementedError:
321 self.default_schema_name = None
322
323 try:
324 self.default_isolation_level = self.get_default_isolation_level(
325 connection.connection
326 )
327 except NotImplementedError:
328 self.default_isolation_level = None
329
330 self.returns_unicode_strings = self._check_unicode_returns(connection)
331
332 if (
333 self.description_encoding is not None
334 and self._check_unicode_description(connection)
335 ):
336 self._description_decoder = self.description_encoding = None
337
338 if not self._user_defined_max_identifier_length:
339 max_ident_length = self._check_max_identifier_length(connection)
340 if max_ident_length:
341 self.max_identifier_length = max_ident_length
342
343 if (
344 self.label_length
345 and self.label_length > self.max_identifier_length
346 ):
347 raise exc.ArgumentError(
348 "Label length of %d is greater than this dialect's"
349 " maximum identifier length of %d"
350 % (self.label_length, self.max_identifier_length)
351 )
352
353 def on_connect(self):
354 # inherits the docstring from interfaces.Dialect.on_connect
355 return None
356
357 def _check_max_identifier_length(self, connection):
358 """Perform a connection / server version specific check to determine
359 the max_identifier_length.
360
361 If the dialect's class level max_identifier_length should be used,
362 can return None.
363
364 .. versionadded:: 1.3.9
365
366 """
367 return None
368
369 def get_default_isolation_level(self, dbapi_conn):
370 """Given a DBAPI connection, return its isolation level, or
371 a default isolation level if one cannot be retrieved.
372
373 May be overridden by subclasses in order to provide a
374 "fallback" isolation level for databases that cannot reliably
375 retrieve the actual isolation level.
376
377 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
378 method, propagating any exceptions raised.
379
380 .. versionadded:: 1.3.22
381
382 """
383 return self.get_isolation_level(dbapi_conn)
384
385 def _check_unicode_returns(self, connection, additional_tests=None):
386 if util.py2k and not self.supports_unicode_statements:
387 cast_to = util.binary_type
388 else:
389 cast_to = util.text_type
390
391 if self.positional:
392 parameters = self.execute_sequence_format()
393 else:
394 parameters = {}
395
396 def check_unicode(test):
397 statement = cast_to(
398 expression.select([test]).compile(dialect=self)
399 )
400 try:
401 cursor = connection.connection.cursor()
402 connection._cursor_execute(cursor, statement, parameters)
403 row = cursor.fetchone()
404 cursor.close()
405 except exc.DBAPIError as de:
406 # note that _cursor_execute() will have closed the cursor
407 # if an exception is thrown.
408 util.warn(
409 "Exception attempting to "
410 "detect unicode returns: %r" % de
411 )
412 return False
413 else:
414 return isinstance(row[0], util.text_type)
415
416 tests = [
417 # detect plain VARCHAR
418 expression.cast(
419 expression.literal_column("'test plain returns'"),
420 sqltypes.VARCHAR(60),
421 ),
422 # detect if there's an NVARCHAR type with different behavior
423 # available
424 expression.cast(
425 expression.literal_column("'test unicode returns'"),
426 sqltypes.Unicode(60),
427 ),
428 ]
429
430 if additional_tests:
431 tests += additional_tests
432
433 results = {check_unicode(test) for test in tests}
434
435 if results.issuperset([True, False]):
436 return "conditional"
437 else:
438 return results == {True}
439
440 def _check_unicode_description(self, connection):
441 # all DBAPIs on Py2K return cursor.description as encoded,
442 # until pypy2.1beta2 with sqlite, so let's just check it -
443 # it's likely others will start doing this too in Py2k.
444
445 if util.py2k and not self.supports_unicode_statements:
446 cast_to = util.binary_type
447 else:
448 cast_to = util.text_type
449
450 cursor = connection.connection.cursor()
451 try:
452 cursor.execute(
453 cast_to(
454 expression.select(
455 [expression.literal_column("'x'").label("some_label")]
456 ).compile(dialect=self)
457 )
458 )
459 return isinstance(cursor.description[0][0], util.text_type)
460 finally:
461 cursor.close()
462
463 def type_descriptor(self, typeobj):
464 """Provide a database-specific :class:`.TypeEngine` object, given
465 the generic object which comes from the types module.
466
467 This method looks for a dictionary called
468 ``colspecs`` as a class or instance-level variable,
469 and passes on to :func:`_types.adapt_type`.
470
471 """
472 return sqltypes.adapt_type(typeobj, self.colspecs)
473
474 def reflecttable(
475 self,
476 connection,
477 table,
478 include_columns,
479 exclude_columns,
480 resolve_fks,
481 **opts
482 ):
483 insp = reflection.Inspector.from_engine(connection)
484 return insp.reflecttable(
485 table, include_columns, exclude_columns, resolve_fks, **opts
486 )
487
488 def get_pk_constraint(self, conn, table_name, schema=None, **kw):
489 """Compatibility method, adapts the result of get_primary_keys()
490 for those dialects which don't implement get_pk_constraint().
491
492 """
493 return {
494 "constrained_columns": self.get_primary_keys(
495 conn, table_name, schema=schema, **kw
496 )
497 }
498
499 def validate_identifier(self, ident):
500 if len(ident) > self.max_identifier_length:
501 raise exc.IdentifierError(
502 "Identifier '%s' exceeds maximum length of %d characters"
503 % (ident, self.max_identifier_length)
504 )
505
506 def connect(self, *cargs, **cparams):
507 # inherits the docstring from interfaces.Dialect.connect
508 return self.dbapi.connect(*cargs, **cparams)
509
510 def create_connect_args(self, url):
511 # inherits the docstring from interfaces.Dialect.create_connect_args
512 opts = url.translate_connect_args()
513 opts.update(url.query)
514 return [[], opts]
515
516 def set_engine_execution_options(self, engine, opts):
517 if "isolation_level" in opts:
518 isolation_level = opts["isolation_level"]
519
520 @event.listens_for(engine, "engine_connect")
521 def set_isolation(connection, branch):
522 if not branch:
523 self._set_connection_isolation(connection, isolation_level)
524
525 if "schema_translate_map" in opts:
526 getter = schema._schema_getter(opts["schema_translate_map"])
527 engine.schema_for_object = getter
528
529 @event.listens_for(engine, "engine_connect")
530 def set_schema_translate_map(connection, branch):
531 connection.schema_for_object = getter
532
533 def set_connection_execution_options(self, connection, opts):
534 if "isolation_level" in opts:
535 self._set_connection_isolation(connection, opts["isolation_level"])
536
537 if "schema_translate_map" in opts:
538 getter = schema._schema_getter(opts["schema_translate_map"])
539 connection.schema_for_object = getter
540
541 def _set_connection_isolation(self, connection, level):
542 if connection.in_transaction():
543 util.warn(
544 "Connection is already established with a Transaction; "
545 "setting isolation_level may implicitly rollback or commit "
546 "the existing transaction, or have no effect until "
547 "next transaction"
548 )
549 self.set_isolation_level(connection.connection, level)
550 connection.connection._connection_record.finalize_callback.append(
551 self.reset_isolation_level
552 )
553
554 def do_begin(self, dbapi_connection):
555 pass
556
557 def do_rollback(self, dbapi_connection):
558 dbapi_connection.rollback()
559
560 def do_commit(self, dbapi_connection):
561 dbapi_connection.commit()
562
563 def do_close(self, dbapi_connection):
564 dbapi_connection.close()
565
566 @util.memoized_property
567 def _dialect_specific_select_one(self):
568 return str(expression.select([1]).compile(dialect=self))
569
570 def do_ping(self, dbapi_connection):
571 cursor = None
572 try:
573 cursor = dbapi_connection.cursor()
574 try:
575 cursor.execute(self._dialect_specific_select_one)
576 finally:
577 cursor.close()
578 except self.dbapi.Error as err:
579 if self.is_disconnect(err, dbapi_connection, cursor):
580 return False
581 else:
582 raise
583 else:
584 return True
585
586 def create_xid(self):
587 """Create a random two-phase transaction ID.
588
589 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
590 do_commit_twophase(). Its format is unspecified.
591 """
592
593 return "_sa_%032x" % random.randint(0, 2 ** 128)
594
595 def do_savepoint(self, connection, name):
596 connection.execute(expression.SavepointClause(name))
597
598 def do_rollback_to_savepoint(self, connection, name):
599 connection.execute(expression.RollbackToSavepointClause(name))
600
601 def do_release_savepoint(self, connection, name):
602 connection.execute(expression.ReleaseSavepointClause(name))
603
604 def do_executemany(self, cursor, statement, parameters, context=None):
605 cursor.executemany(statement, parameters)
606
607 def do_execute(self, cursor, statement, parameters, context=None):
608 cursor.execute(statement, parameters)
609
610 def do_execute_no_params(self, cursor, statement, context=None):
611 cursor.execute(statement)
612
613 def is_disconnect(self, e, connection, cursor):
614 return False
615
616 def reset_isolation_level(self, dbapi_conn):
617 # default_isolation_level is read from the first connection
618 # after the initial set of 'isolation_level', if any, so is
619 # the configured default of this dialect.
620 self.set_isolation_level(dbapi_conn, self.default_isolation_level)
621
622 def normalize_name(self, name):
623 if name is None:
624 return None
625 if util.py2k:
626 if isinstance(name, str):
627 name = name.decode(self.encoding)
628
629 name_lower = name.lower()
630 name_upper = name.upper()
631
632 if name_upper == name_lower:
633 # name has no upper/lower conversion, e.g. non-european characters.
634 # return unchanged
635 return name
636 elif name_upper == name and not (
637 self.identifier_preparer._requires_quotes
638 )(name_lower):
639 # name is all uppercase and doesn't require quoting; normalize
640 # to all lower case
641 return name_lower
642 elif name_lower == name:
643 # name is all lower case, which if denormalized means we need to
644 # force quoting on it
645 return quoted_name(name, quote=True)
646 else:
647 # name is mixed case, means it will be quoted in SQL when used
648 # later, no normalizes
649 return name
650
651 def denormalize_name(self, name):
652 if name is None:
653 return None
654
655 name_lower = name.lower()
656 name_upper = name.upper()
657
658 if name_upper == name_lower:
659 # name has no upper/lower conversion, e.g. non-european characters.
660 # return unchanged
661 return name
662 elif name_lower == name and not (
663 self.identifier_preparer._requires_quotes
664 )(name_lower):
665 name = name_upper
666 if util.py2k:
667 if not self.supports_unicode_binds:
668 name = name.encode(self.encoding)
669 else:
670 name = unicode(name) # noqa
671 return name
672
673
674class _RendersLiteral(object):
675 def literal_processor(self, dialect):
676 def process(value):
677 return "'%s'" % value
678
679 return process
680
681
682class _StrDateTime(_RendersLiteral, sqltypes.DateTime):
683 pass
684
685
686class _StrDate(_RendersLiteral, sqltypes.Date):
687 pass
688
689
690class _StrTime(_RendersLiteral, sqltypes.Time):
691 pass
692
693
694class StrCompileDialect(DefaultDialect):
695
696 statement_compiler = compiler.StrSQLCompiler
697 ddl_compiler = compiler.DDLCompiler
698 type_compiler = compiler.StrSQLTypeCompiler
699 preparer = compiler.IdentifierPreparer
700
701 supports_sequences = True
702 sequences_optional = True
703 preexecute_autoincrement_sequences = False
704 implicit_returning = False
705
706 supports_native_boolean = True
707
708 supports_simple_order_by_label = True
709
710 colspecs = {
711 sqltypes.DateTime: _StrDateTime,
712 sqltypes.Date: _StrDate,
713 sqltypes.Time: _StrTime,
714 }
715
716
717class DefaultExecutionContext(interfaces.ExecutionContext):
718 isinsert = False
719 isupdate = False
720 isdelete = False
721 is_crud = False
722 is_text = False
723 isddl = False
724 executemany = False
725 compiled = None
726 statement = None
727 result_column_struct = None
728 returned_defaults = None
729 _is_implicit_returning = False
730 _is_explicit_returning = False
731
732 # a hook for SQLite's translation of
733 # result column names
734 _translate_colname = None
735
736 _expanded_parameters = util.immutabledict()
737
738 @classmethod
739 def _init_ddl(cls, dialect, connection, dbapi_connection, compiled_ddl):
740 """Initialize execution context for a DDLElement construct."""
741
742 self = cls.__new__(cls)
743 self.root_connection = connection
744 self._dbapi_connection = dbapi_connection
745 self.dialect = connection.dialect
746
747 self.compiled = compiled = compiled_ddl
748 self.isddl = True
749
750 self.execution_options = compiled.execution_options
751 if connection._execution_options:
752 self.execution_options = dict(self.execution_options)
753 self.execution_options.update(connection._execution_options)
754
755 if not dialect.supports_unicode_statements:
756 self.unicode_statement = util.text_type(compiled)
757 self.statement = dialect._encoder(self.unicode_statement)[0]
758 else:
759 self.statement = self.unicode_statement = util.text_type(compiled)
760
761 self.cursor = self.create_cursor()
762 self.compiled_parameters = []
763
764 if dialect.positional:
765 self.parameters = [dialect.execute_sequence_format()]
766 else:
767 self.parameters = [{}]
768
769 return self
770
771 @classmethod
772 def _init_compiled(
773 cls, dialect, connection, dbapi_connection, compiled, parameters
774 ):
775 """Initialize execution context for a Compiled construct."""
776
777 self = cls.__new__(cls)
778 self.root_connection = connection
779 self._dbapi_connection = dbapi_connection
780 self.dialect = connection.dialect
781
782 self.compiled = compiled
783
784 # this should be caught in the engine before
785 # we get here
786 assert compiled.can_execute
787
788 self.execution_options = compiled.execution_options.union(
789 connection._execution_options
790 )
791
792 self.result_column_struct = (
793 compiled._result_columns,
794 compiled._ordered_columns,
795 compiled._textual_ordered_columns,
796 )
797
798 self.unicode_statement = util.text_type(compiled)
799 if not dialect.supports_unicode_statements:
800 self.statement = self.unicode_statement.encode(
801 self.dialect.encoding
802 )
803 else:
804 self.statement = self.unicode_statement
805
806 self.isinsert = compiled.isinsert
807 self.isupdate = compiled.isupdate
808 self.isdelete = compiled.isdelete
809 self.is_text = compiled.isplaintext
810
811 if not parameters:
812 self.compiled_parameters = [compiled.construct_params()]
813 else:
814 self.compiled_parameters = [
815 compiled.construct_params(m, _group_number=grp)
816 for grp, m in enumerate(parameters)
817 ]
818
819 self.executemany = len(parameters) > 1
820
821 self.cursor = self.create_cursor()
822
823 if self.isinsert or self.isupdate or self.isdelete:
824 self.is_crud = True
825 self._is_explicit_returning = bool(compiled.statement._returning)
826 self._is_implicit_returning = bool(
827 compiled.returning and not compiled.statement._returning
828 )
829
830 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
831 if self.executemany:
832 self._process_executemany_defaults()
833 else:
834 self._process_executesingle_defaults()
835
836 processors = compiled._bind_processors
837
838 if compiled.contains_expanding_parameters:
839 # copy processors for this case as they will be mutated
840 processors = dict(processors)
841 positiontup = self._expand_in_parameters(compiled, processors)
842 elif compiled.positional:
843 positiontup = self.compiled.positiontup
844
845 # Convert the dictionary of bind parameter values
846 # into a dict or list to be sent to the DBAPI's
847 # execute() or executemany() method.
848 parameters = []
849 if compiled.positional:
850 for compiled_params in self.compiled_parameters:
851 param = []
852 for key in positiontup:
853 if key in processors:
854 param.append(processors[key](compiled_params[key]))
855 else:
856 param.append(compiled_params[key])
857 parameters.append(dialect.execute_sequence_format(param))
858 else:
859 encode = not dialect.supports_unicode_statements
860 for compiled_params in self.compiled_parameters:
861
862 if encode:
863 param = dict(
864 (
865 dialect._encoder(key)[0],
866 processors[key](compiled_params[key])
867 if key in processors
868 else compiled_params[key],
869 )
870 for key in compiled_params
871 )
872 else:
873 param = dict(
874 (
875 key,
876 processors[key](compiled_params[key])
877 if key in processors
878 else compiled_params[key],
879 )
880 for key in compiled_params
881 )
882
883 parameters.append(param)
884
885 self.parameters = dialect.execute_sequence_format(parameters)
886
887 return self
888
889 def _expand_in_parameters(self, compiled, processors):
890 """handle special 'expanding' parameters, IN tuples that are rendered
891 on a per-parameter basis for an otherwise fixed SQL statement string.
892
893 """
894 if self.executemany:
895 raise exc.InvalidRequestError(
896 "'expanding' parameters can't be used with " "executemany()"
897 )
898
899 if self.compiled.positional and self.compiled._numeric_binds:
900 # I'm not familiar with any DBAPI that uses 'numeric'
901 raise NotImplementedError(
902 "'expanding' bind parameters not supported with "
903 "'numeric' paramstyle at this time."
904 )
905
906 self._expanded_parameters = {}
907
908 compiled_params = self.compiled_parameters[0]
909 if compiled.positional:
910 positiontup = []
911 else:
912 positiontup = None
913
914 replacement_expressions = {}
915 to_update_sets = {}
916
917 for name in (
918 self.compiled.positiontup
919 if compiled.positional
920 else self.compiled.binds
921 ):
922 parameter = self.compiled.binds[name]
923 if parameter.expanding:
924
925 if name in replacement_expressions:
926 to_update = to_update_sets[name]
927 else:
928 # we are removing the parameter from compiled_params
929 # because it is a list value, which is not expected by
930 # TypeEngine objects that would otherwise be asked to
931 # process it. the single name is being replaced with
932 # individual numbered parameters for each value in the
933 # param.
934 values = compiled_params.pop(name)
935
936 if not values:
937 to_update = to_update_sets[name] = []
938 replacement_expressions[
939 name
940 ] = self.compiled.visit_empty_set_expr(
941 parameter._expanding_in_types
942 if parameter._expanding_in_types
943 else [parameter.type]
944 )
945
946 elif isinstance(values[0], (tuple, list)):
947 to_update = to_update_sets[name] = [
948 ("%s_%s_%s" % (name, i, j), value)
949 for i, tuple_element in enumerate(values, 1)
950 for j, value in enumerate(tuple_element, 1)
951 ]
952 replacement_expressions[name] = (
953 "VALUES " if self.dialect.tuple_in_values else ""
954 ) + ", ".join(
955 "(%s)"
956 % ", ".join(
957 self.compiled.bindtemplate
958 % {
959 "name": to_update[
960 i * len(tuple_element) + j
961 ][0]
962 }
963 for j, value in enumerate(tuple_element)
964 )
965 for i, tuple_element in enumerate(values)
966 )
967 else:
968 to_update = to_update_sets[name] = [
969 ("%s_%s" % (name, i), value)
970 for i, value in enumerate(values, 1)
971 ]
972 replacement_expressions[name] = ", ".join(
973 self.compiled.bindtemplate % {"name": key}
974 for key, value in to_update
975 )
976
977 compiled_params.update(to_update)
978 processors.update(
979 (key, processors[name])
980 for key, value in to_update
981 if name in processors
982 )
983 if compiled.positional:
984 positiontup.extend(name for name, value in to_update)
985 self._expanded_parameters[name] = [
986 expand_key for expand_key, value in to_update
987 ]
988 elif compiled.positional:
989 positiontup.append(name)
990
991 def process_expanding(m):
992 return replacement_expressions[m.group(1)]
993
994 self.statement = re.sub(
995 r"\[EXPANDING_(\S+)\]", process_expanding, self.statement
996 )
997 return positiontup
998
999 @classmethod
1000 def _init_statement(
1001 cls, dialect, connection, dbapi_connection, statement, parameters
1002 ):
1003 """Initialize execution context for a string SQL statement."""
1004
1005 self = cls.__new__(cls)
1006 self.root_connection = connection
1007 self._dbapi_connection = dbapi_connection
1008 self.dialect = connection.dialect
1009 self.is_text = True
1010
1011 # plain text statement
1012 self.execution_options = connection._execution_options
1013
1014 if not parameters:
1015 if self.dialect.positional:
1016 self.parameters = [dialect.execute_sequence_format()]
1017 else:
1018 self.parameters = [{}]
1019 elif isinstance(parameters[0], dialect.execute_sequence_format):
1020 self.parameters = parameters
1021 elif isinstance(parameters[0], dict):
1022 if dialect.supports_unicode_statements:
1023 self.parameters = parameters
1024 else:
1025 self.parameters = [
1026 {dialect._encoder(k)[0]: d[k] for k in d}
1027 for d in parameters
1028 ] or [{}]
1029 else:
1030 self.parameters = [
1031 dialect.execute_sequence_format(p) for p in parameters
1032 ]
1033
1034 self.executemany = len(parameters) > 1
1035
1036 if not dialect.supports_unicode_statements and isinstance(
1037 statement, util.text_type
1038 ):
1039 self.unicode_statement = statement
1040 self.statement = dialect._encoder(statement)[0]
1041 else:
1042 self.statement = self.unicode_statement = statement
1043
1044 self.cursor = self.create_cursor()
1045 return self
1046
1047 @classmethod
1048 def _init_default(cls, dialect, connection, dbapi_connection):
1049 """Initialize execution context for a ColumnDefault construct."""
1050
1051 self = cls.__new__(cls)
1052 self.root_connection = connection
1053 self._dbapi_connection = dbapi_connection
1054 self.dialect = connection.dialect
1055 self.execution_options = connection._execution_options
1056 self.cursor = self.create_cursor()
1057 return self
1058
1059 @util.memoized_property
1060 def identifier_preparer(self):
1061 if self.compiled:
1062 return self.compiled.preparer
1063 elif "schema_translate_map" in self.execution_options:
1064 return self.dialect.identifier_preparer._with_schema_translate(
1065 self.execution_options["schema_translate_map"]
1066 )
1067 else:
1068 return self.dialect.identifier_preparer
1069
1070 @util.memoized_property
1071 def engine(self):
1072 return self.root_connection.engine
1073
1074 @util.memoized_property
1075 def postfetch_cols(self):
1076 return self.compiled.postfetch
1077
1078 @util.memoized_property
1079 def prefetch_cols(self):
1080 if self.isinsert:
1081 return self.compiled.insert_prefetch
1082 elif self.isupdate:
1083 return self.compiled.update_prefetch
1084 else:
1085 return ()
1086
1087 @util.memoized_property
1088 def returning_cols(self):
1089 self.compiled.returning
1090
1091 @util.memoized_property
1092 def no_parameters(self):
1093 return self.execution_options.get("no_parameters", False)
1094
1095 @util.memoized_property
1096 def should_autocommit(self):
1097 autocommit = self.execution_options.get(
1098 "autocommit",
1099 not self.compiled
1100 and self.statement
1101 and expression.PARSE_AUTOCOMMIT
1102 or False,
1103 )
1104
1105 if autocommit is expression.PARSE_AUTOCOMMIT:
1106 return self.should_autocommit_text(self.unicode_statement)
1107 else:
1108 return autocommit
1109
1110 def _execute_scalar(self, stmt, type_):
1111 """Execute a string statement on the current cursor, returning a
1112 scalar result.
1113
1114 Used to fire off sequences, default phrases, and "select lastrowid"
1115 types of statements individually or in the context of a parent INSERT
1116 or UPDATE statement.
1117
1118 """
1119
1120 conn = self.root_connection
1121 if (
1122 isinstance(stmt, util.text_type)
1123 and not self.dialect.supports_unicode_statements
1124 ):
1125 stmt = self.dialect._encoder(stmt)[0]
1126
1127 if self.dialect.positional:
1128 default_params = self.dialect.execute_sequence_format()
1129 else:
1130 default_params = {}
1131
1132 conn._cursor_execute(self.cursor, stmt, default_params, context=self)
1133 r = self.cursor.fetchone()[0]
1134 if type_ is not None:
1135 # apply type post processors to the result
1136 proc = type_._cached_result_processor(
1137 self.dialect, self.cursor.description[0][1]
1138 )
1139 if proc:
1140 return proc(r)
1141 return r
1142
1143 @property
1144 def connection(self):
1145 return self.root_connection._branch()
1146
1147 def should_autocommit_text(self, statement):
1148 return AUTOCOMMIT_REGEXP.match(statement)
1149
1150 def _use_server_side_cursor(self):
1151 if not self.dialect.supports_server_side_cursors:
1152 return False
1153
1154 if self.dialect.server_side_cursors:
1155 use_server_side = self.execution_options.get(
1156 "stream_results", True
1157 ) and (
1158 (
1159 self.compiled
1160 and isinstance(
1161 self.compiled.statement, expression.Selectable
1162 )
1163 or (
1164 (
1165 not self.compiled
1166 or isinstance(
1167 self.compiled.statement, expression.TextClause
1168 )
1169 )
1170 and self.statement
1171 and SERVER_SIDE_CURSOR_RE.match(self.statement)
1172 )
1173 )
1174 )
1175 else:
1176 use_server_side = self.execution_options.get(
1177 "stream_results", False
1178 )
1179
1180 return use_server_side
1181
1182 def create_cursor(self):
1183 if self._use_server_side_cursor():
1184 self._is_server_side = True
1185 return self.create_server_side_cursor()
1186 else:
1187 self._is_server_side = False
1188 return self._dbapi_connection.cursor()
1189
1190 def create_server_side_cursor(self):
1191 raise NotImplementedError()
1192
1193 def pre_exec(self):
1194 pass
1195
1196 def post_exec(self):
1197 pass
1198
1199 def get_result_processor(self, type_, colname, coltype):
1200 """Return a 'result processor' for a given type as present in
1201 cursor.description.
1202
1203 This has a default implementation that dialects can override
1204 for context-sensitive result type handling.
1205
1206 """
1207 return type_._cached_result_processor(self.dialect, coltype)
1208
1209 def get_lastrowid(self):
1210 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1211
1212 This may involve calling special cursor functions,
1213 issuing a new SELECT on the cursor (or a new one),
1214 or returning a stored value that was
1215 calculated within post_exec().
1216
1217 This function will only be called for dialects
1218 which support "implicit" primary key generation,
1219 keep preexecute_autoincrement_sequences set to False,
1220 and when no explicit id value was bound to the
1221 statement.
1222
1223 The function is called once, directly after
1224 post_exec() and before the transaction is committed
1225 or ResultProxy is generated. If the post_exec()
1226 method assigns a value to `self._lastrowid`, the
1227 value is used in place of calling get_lastrowid().
1228
1229 Note that this method is *not* equivalent to the
1230 ``lastrowid`` method on ``ResultProxy``, which is a
1231 direct proxy to the DBAPI ``lastrowid`` accessor
1232 in all cases.
1233
1234 """
1235 return self.cursor.lastrowid
1236
1237 def handle_dbapi_exception(self, e):
1238 pass
1239
1240 def get_result_proxy(self):
1241 if self._is_server_side:
1242 return result.BufferedRowResultProxy(self)
1243 else:
1244 return result.ResultProxy(self)
1245
1246 @property
1247 def rowcount(self):
1248 return self.cursor.rowcount
1249
1250 def supports_sane_rowcount(self):
1251 return self.dialect.supports_sane_rowcount
1252
1253 def supports_sane_multi_rowcount(self):
1254 return self.dialect.supports_sane_multi_rowcount
1255
1256 def _setup_crud_result_proxy(self):
1257 if self.isinsert and not self.executemany:
1258 if (
1259 not self._is_implicit_returning
1260 and not self.compiled.inline
1261 and self.dialect.postfetch_lastrowid
1262 ):
1263
1264 self._setup_ins_pk_from_lastrowid()
1265
1266 elif not self._is_implicit_returning:
1267 self._setup_ins_pk_from_empty()
1268
1269 result = self.get_result_proxy()
1270
1271 if self.isinsert:
1272 if self._is_implicit_returning:
1273 row = result.fetchone()
1274 self.returned_defaults = row
1275 self._setup_ins_pk_from_implicit_returning(row)
1276 result._soft_close()
1277 result._metadata = None
1278 elif not self._is_explicit_returning:
1279 result._soft_close()
1280 result._metadata = None
1281 elif self.isupdate and self._is_implicit_returning:
1282 row = result.fetchone()
1283 self.returned_defaults = row
1284 result._soft_close()
1285 result._metadata = None
1286
1287 elif result._metadata is None:
1288 # no results, get rowcount
1289 # (which requires open cursor on some drivers
1290 # such as kintersbasdb, mxodbc)
1291 result.rowcount
1292 result._soft_close()
1293 return result
1294
1295 def _setup_ins_pk_from_lastrowid(self):
1296 key_getter = self.compiled._key_getters_for_crud_column[2]
1297 table = self.compiled.statement.table
1298 compiled_params = self.compiled_parameters[0]
1299
1300 lastrowid = self.get_lastrowid()
1301 if lastrowid is not None:
1302 autoinc_col = table._autoincrement_column
1303 if autoinc_col is not None:
1304 # apply type post processors to the lastrowid
1305 proc = autoinc_col.type._cached_result_processor(
1306 self.dialect, None
1307 )
1308 if proc is not None:
1309 lastrowid = proc(lastrowid)
1310 self.inserted_primary_key = [
1311 lastrowid
1312 if c is autoinc_col
1313 else compiled_params.get(key_getter(c), None)
1314 for c in table.primary_key
1315 ]
1316 else:
1317 # don't have a usable lastrowid, so
1318 # do the same as _setup_ins_pk_from_empty
1319 self.inserted_primary_key = [
1320 compiled_params.get(key_getter(c), None)
1321 for c in table.primary_key
1322 ]
1323
1324 def _setup_ins_pk_from_empty(self):
1325 key_getter = self.compiled._key_getters_for_crud_column[2]
1326 table = self.compiled.statement.table
1327 compiled_params = self.compiled_parameters[0]
1328 self.inserted_primary_key = [
1329 compiled_params.get(key_getter(c), None) for c in table.primary_key
1330 ]
1331
1332 def _setup_ins_pk_from_implicit_returning(self, row):
1333 if row is None:
1334 self.inserted_primary_key = None
1335 return
1336
1337 key_getter = self.compiled._key_getters_for_crud_column[2]
1338 table = self.compiled.statement.table
1339 compiled_params = self.compiled_parameters[0]
1340 self.inserted_primary_key = [
1341 row[col] if value is None else value
1342 for col, value in [
1343 (col, compiled_params.get(key_getter(col), None))
1344 for col in table.primary_key
1345 ]
1346 ]
1347
1348 def lastrow_has_defaults(self):
1349 return (self.isinsert or self.isupdate) and bool(
1350 self.compiled.postfetch
1351 )
1352
1353 def set_input_sizes(
1354 self, translate=None, include_types=None, exclude_types=None
1355 ):
1356 """Given a cursor and ClauseParameters, call the appropriate
1357 style of ``setinputsizes()`` on the cursor, using DB-API types
1358 from the bind parameter's ``TypeEngine`` objects.
1359
1360 This method only called by those dialects which require it,
1361 currently cx_oracle.
1362
1363 """
1364
1365 if not hasattr(self.compiled, "bind_names"):
1366 return
1367
1368 inputsizes = {}
1369 for bindparam in self.compiled.bind_names:
1370
1371 dialect_impl = bindparam.type._unwrapped_dialect_impl(self.dialect)
1372 dialect_impl_cls = type(dialect_impl)
1373 dbtype = dialect_impl.get_dbapi_type(self.dialect.dbapi)
1374
1375 if (
1376 dbtype is not None
1377 and (
1378 not exclude_types
1379 or dbtype not in exclude_types
1380 and dialect_impl_cls not in exclude_types
1381 )
1382 and (
1383 not include_types
1384 or dbtype in include_types
1385 or dialect_impl_cls in include_types
1386 )
1387 ):
1388 inputsizes[bindparam] = dbtype
1389 else:
1390 inputsizes[bindparam] = None
1391
1392 if self.dialect._has_events:
1393 self.dialect.dispatch.do_setinputsizes(
1394 inputsizes, self.cursor, self.statement, self.parameters, self
1395 )
1396
1397 if self.dialect.positional:
1398 positional_inputsizes = []
1399 for key in self.compiled.positiontup:
1400 bindparam = self.compiled.binds[key]
1401 dbtype = inputsizes.get(bindparam, None)
1402 if dbtype is not None:
1403 if key in self._expanded_parameters:
1404 positional_inputsizes.extend(
1405 [dbtype] * len(self._expanded_parameters[key])
1406 )
1407 else:
1408 positional_inputsizes.append(dbtype)
1409 try:
1410 self.cursor.setinputsizes(*positional_inputsizes)
1411 except BaseException as e:
1412 self.root_connection._handle_dbapi_exception(
1413 e, None, None, None, self
1414 )
1415 else:
1416 keyword_inputsizes = {}
1417 for bindparam, key in self.compiled.bind_names.items():
1418 dbtype = inputsizes.get(bindparam, None)
1419 if dbtype is not None:
1420 if translate:
1421 # TODO: this part won't work w/ the
1422 # expanded_parameters feature, e.g. for cx_oracle
1423 # quoted bound names
1424 key = translate.get(key, key)
1425 if not self.dialect.supports_unicode_binds:
1426 key = self.dialect._encoder(key)[0]
1427 if key in self._expanded_parameters:
1428 keyword_inputsizes.update(
1429 (expand_key, dbtype)
1430 for expand_key in self._expanded_parameters[key]
1431 )
1432 else:
1433 keyword_inputsizes[key] = dbtype
1434 try:
1435 self.cursor.setinputsizes(**keyword_inputsizes)
1436 except BaseException as e:
1437 self.root_connection._handle_dbapi_exception(
1438 e, None, None, None, self
1439 )
1440
1441 def _exec_default(self, column, default, type_):
1442 if default.is_sequence:
1443 return self.fire_sequence(default, type_)
1444 elif default.is_callable:
1445 self.current_column = column
1446 return default.arg(self)
1447 elif default.is_clause_element:
1448 # TODO: expensive branching here should be
1449 # pulled into _exec_scalar()
1450 conn = self.connection
1451 if not default._arg_is_typed:
1452 default_arg = expression.type_coerce(default.arg, type_)
1453 else:
1454 default_arg = default.arg
1455 c = expression.select([default_arg]).compile(bind=conn)
1456 return conn._execute_compiled(c, (), {}).scalar()
1457 else:
1458 return default.arg
1459
1460 current_parameters = None
1461 """A dictionary of parameters applied to the current row.
1462
1463 This attribute is only available in the context of a user-defined default
1464 generation function, e.g. as described at :ref:`context_default_functions`.
1465 It consists of a dictionary which includes entries for each column/value
1466 pair that is to be part of the INSERT or UPDATE statement. The keys of the
1467 dictionary will be the key value of each :class:`_schema.Column`,
1468 which is usually
1469 synonymous with the name.
1470
1471 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
1472 does not accommodate for the "multi-values" feature of the
1473 :meth:`_expression.Insert.values` method. The
1474 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
1475 preferred.
1476
1477 .. seealso::
1478
1479 :meth:`.DefaultExecutionContext.get_current_parameters`
1480
1481 :ref:`context_default_functions`
1482
1483 """
1484
1485 def get_current_parameters(self, isolate_multiinsert_groups=True):
1486 """Return a dictionary of parameters applied to the current row.
1487
1488 This method can only be used in the context of a user-defined default
1489 generation function, e.g. as described at
1490 :ref:`context_default_functions`. When invoked, a dictionary is
1491 returned which includes entries for each column/value pair that is part
1492 of the INSERT or UPDATE statement. The keys of the dictionary will be
1493 the key value of each :class:`_schema.Column`,
1494 which is usually synonymous
1495 with the name.
1496
1497 :param isolate_multiinsert_groups=True: indicates that multi-valued
1498 INSERT constructs created using :meth:`_expression.Insert.values`
1499 should be
1500 handled by returning only the subset of parameters that are local
1501 to the current column default invocation. When ``False``, the
1502 raw parameters of the statement are returned including the
1503 naming convention used in the case of multi-valued INSERT.
1504
1505 .. versionadded:: 1.2 added
1506 :meth:`.DefaultExecutionContext.get_current_parameters`
1507 which provides more functionality over the existing
1508 :attr:`.DefaultExecutionContext.current_parameters`
1509 attribute.
1510
1511 .. seealso::
1512
1513 :attr:`.DefaultExecutionContext.current_parameters`
1514
1515 :ref:`context_default_functions`
1516
1517 """
1518 try:
1519 parameters = self.current_parameters
1520 column = self.current_column
1521 except AttributeError:
1522 raise exc.InvalidRequestError(
1523 "get_current_parameters() can only be invoked in the "
1524 "context of a Python side column default function"
1525 )
1526 if (
1527 isolate_multiinsert_groups
1528 and self.isinsert
1529 and self.compiled.statement._has_multi_parameters
1530 ):
1531 if column._is_multiparam_column:
1532 index = column.index + 1
1533 d = {column.original.key: parameters[column.key]}
1534 else:
1535 d = {column.key: parameters[column.key]}
1536 index = 0
1537 keys = self.compiled.statement.parameters[0].keys()
1538 d.update(
1539 (key, parameters["%s_m%d" % (key, index)]) for key in keys
1540 )
1541 return d
1542 else:
1543 return parameters
1544
1545 def get_insert_default(self, column):
1546 if column.default is None:
1547 return None
1548 else:
1549 return self._exec_default(column, column.default, column.type)
1550
1551 def get_update_default(self, column):
1552 if column.onupdate is None:
1553 return None
1554 else:
1555 return self._exec_default(column, column.onupdate, column.type)
1556
1557 def _process_executemany_defaults(self):
1558 key_getter = self.compiled._key_getters_for_crud_column[2]
1559
1560 scalar_defaults = {}
1561
1562 insert_prefetch = self.compiled.insert_prefetch
1563 update_prefetch = self.compiled.update_prefetch
1564
1565 # pre-determine scalar Python-side defaults
1566 # to avoid many calls of get_insert_default()/
1567 # get_update_default()
1568 for c in insert_prefetch:
1569 if c.default and c.default.is_scalar:
1570 scalar_defaults[c] = c.default.arg
1571 for c in update_prefetch:
1572 if c.onupdate and c.onupdate.is_scalar:
1573 scalar_defaults[c] = c.onupdate.arg
1574
1575 for param in self.compiled_parameters:
1576 self.current_parameters = param
1577 for c in insert_prefetch:
1578 if c in scalar_defaults:
1579 val = scalar_defaults[c]
1580 else:
1581 val = self.get_insert_default(c)
1582 if val is not None:
1583 param[key_getter(c)] = val
1584 for c in update_prefetch:
1585 if c in scalar_defaults:
1586 val = scalar_defaults[c]
1587 else:
1588 val = self.get_update_default(c)
1589 if val is not None:
1590 param[key_getter(c)] = val
1591
1592 del self.current_parameters
1593
1594 def _process_executesingle_defaults(self):
1595 key_getter = self.compiled._key_getters_for_crud_column[2]
1596 self.current_parameters = (
1597 compiled_parameters
1598 ) = self.compiled_parameters[0]
1599
1600 for c in self.compiled.insert_prefetch:
1601 if c.default and not c.default.is_sequence and c.default.is_scalar:
1602 val = c.default.arg
1603 else:
1604 val = self.get_insert_default(c)
1605
1606 if val is not None:
1607 compiled_parameters[key_getter(c)] = val
1608
1609 for c in self.compiled.update_prefetch:
1610 val = self.get_update_default(c)
1611
1612 if val is not None:
1613 compiled_parameters[key_getter(c)] = val
1614 del self.current_parameters
1615
1616
1617DefaultDialect.execution_ctx_cls = DefaultExecutionContext