Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py: 33%
852 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# engine/default.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""Default implementations of per-dialect sqlalchemy.engine classes.
10These are semi-private implementation classes which are only of importance
11to database dialect authors; dialects will usually use the classes here
12as the base class for their own corresponding classes.
14"""
16import codecs
17import functools
18import random
19import re
20import weakref
22from . import characteristics
23from . import cursor as _cursor
24from . import interfaces
25from .base import Connection
26from .. import event
27from .. import exc
28from .. import pool
29from .. import processors
30from .. import types as sqltypes
31from .. import util
32from ..sql import compiler
33from ..sql import expression
34from ..sql.elements import quoted_name
36AUTOCOMMIT_REGEXP = re.compile(
37 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)", re.I | re.UNICODE
38)
40# When we're handed literal SQL, ensure it's a SELECT query
41SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
44CACHE_HIT = util.symbol("CACHE_HIT")
45CACHE_MISS = util.symbol("CACHE_MISS")
46CACHING_DISABLED = util.symbol("CACHING_DISABLED")
47NO_CACHE_KEY = util.symbol("NO_CACHE_KEY")
48NO_DIALECT_SUPPORT = util.symbol("NO_DIALECT_SUPPORT")
51class DefaultDialect(interfaces.Dialect):
52 """Default implementation of Dialect"""
54 statement_compiler = compiler.SQLCompiler
55 ddl_compiler = compiler.DDLCompiler
56 type_compiler = compiler.GenericTypeCompiler
57 preparer = compiler.IdentifierPreparer
58 supports_alter = True
59 supports_comments = False
60 inline_comments = False
61 use_setinputsizes = False
62 supports_statement_cache = True
64 # the first value we'd get for an autoincrement
65 # column.
66 default_sequence_base = 1
68 # most DBAPIs happy with this for execute().
69 # not cx_oracle.
70 execute_sequence_format = tuple
72 supports_schemas = True
73 supports_views = True
74 supports_sequences = False
75 sequences_optional = False
76 preexecute_autoincrement_sequences = False
77 supports_identity_columns = False
78 postfetch_lastrowid = True
79 implicit_returning = False
80 full_returning = False
81 insert_executemany_returning = False
83 cte_follows_insert = False
85 supports_native_enum = False
86 supports_native_boolean = False
87 non_native_boolean_check_constraint = True
89 supports_simple_order_by_label = True
91 tuple_in_values = False
93 connection_characteristics = util.immutabledict(
94 {"isolation_level": characteristics.IsolationLevelCharacteristic()}
95 )
97 engine_config_types = util.immutabledict(
98 [
99 ("convert_unicode", util.bool_or_str("force")),
100 ("pool_timeout", util.asint),
101 ("echo", util.bool_or_str("debug")),
102 ("echo_pool", util.bool_or_str("debug")),
103 ("pool_recycle", util.asint),
104 ("pool_size", util.asint),
105 ("max_overflow", util.asint),
106 ("future", util.asbool),
107 ]
108 )
110 # if the NUMERIC type
111 # returns decimal.Decimal.
112 # *not* the FLOAT type however.
113 supports_native_decimal = False
115 if util.py3k:
116 supports_unicode_statements = True
117 supports_unicode_binds = True
118 returns_unicode_strings = sqltypes.String.RETURNS_UNICODE
119 description_encoding = None
120 else:
121 supports_unicode_statements = False
122 supports_unicode_binds = False
123 returns_unicode_strings = sqltypes.String.RETURNS_UNKNOWN
124 description_encoding = "use_encoding"
126 name = "default"
128 # length at which to truncate
129 # any identifier.
130 max_identifier_length = 9999
131 _user_defined_max_identifier_length = None
133 isolation_level = None
135 # sub-categories of max_identifier_length.
136 # currently these accommodate for MySQL which allows alias names
137 # of 255 but DDL names only of 64.
138 max_index_name_length = None
139 max_constraint_name_length = None
141 supports_sane_rowcount = True
142 supports_sane_multi_rowcount = True
143 colspecs = {}
144 default_paramstyle = "named"
146 supports_default_values = False
147 """dialect supports INSERT... DEFAULT VALUES syntax"""
149 supports_default_metavalue = False
150 """dialect supports INSERT... VALUES (DEFAULT) syntax"""
152 # not sure if this is a real thing but the compiler will deliver it
153 # if this is the only flag enabled.
154 supports_empty_insert = True
155 """dialect supports INSERT () VALUES ()"""
157 supports_multivalues_insert = False
159 supports_is_distinct_from = True
161 supports_server_side_cursors = False
163 server_side_cursors = False
165 # extra record-level locking features (#4860)
166 supports_for_update_of = False
168 server_version_info = None
170 default_schema_name = None
172 construct_arguments = None
173 """Optional set of argument specifiers for various SQLAlchemy
174 constructs, typically schema items.
176 To implement, establish as a series of tuples, as in::
178 construct_arguments = [
179 (schema.Index, {
180 "using": False,
181 "where": None,
182 "ops": None
183 })
184 ]
186 If the above construct is established on the PostgreSQL dialect,
187 the :class:`.Index` construct will now accept the keyword arguments
188 ``postgresql_using``, ``postgresql_where``, nad ``postgresql_ops``.
189 Any other argument specified to the constructor of :class:`.Index`
190 which is prefixed with ``postgresql_`` will raise :class:`.ArgumentError`.
192 A dialect which does not include a ``construct_arguments`` member will
193 not participate in the argument validation system. For such a dialect,
194 any argument name is accepted by all participating constructs, within
195 the namespace of arguments prefixed with that dialect name. The rationale
196 here is so that third-party dialects that haven't yet implemented this
197 feature continue to function in the old way.
199 .. versionadded:: 0.9.2
201 .. seealso::
203 :class:`.DialectKWArgs` - implementing base class which consumes
204 :attr:`.DefaultDialect.construct_arguments`
207 """
209 # indicates symbol names are
210 # UPPERCASEd if they are case insensitive
211 # within the database.
212 # if this is True, the methods normalize_name()
213 # and denormalize_name() must be provided.
214 requires_name_normalize = False
216 reflection_options = ()
218 dbapi_exception_translation_map = util.immutabledict()
219 """mapping used in the extremely unusual case that a DBAPI's
220 published exceptions don't actually have the __name__ that they
221 are linked towards.
223 .. versionadded:: 1.0.5
225 """
227 is_async = False
229 CACHE_HIT = CACHE_HIT
230 CACHE_MISS = CACHE_MISS
231 CACHING_DISABLED = CACHING_DISABLED
232 NO_CACHE_KEY = NO_CACHE_KEY
233 NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT
234 has_terminate = False
236 @util.deprecated_params(
237 convert_unicode=(
238 "1.3",
239 "The :paramref:`_sa.create_engine.convert_unicode` parameter "
240 "and corresponding dialect-level parameters are deprecated, "
241 "and will be removed in a future release. Modern DBAPIs support "
242 "Python Unicode natively and this parameter is unnecessary.",
243 ),
244 empty_in_strategy=(
245 "1.4",
246 "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
247 "deprecated, and no longer has any effect. All IN expressions "
248 "are now rendered using "
249 'the "expanding parameter" strategy which renders a set of bound'
250 'expressions, or an "empty set" SELECT, at statement execution'
251 "time.",
252 ),
253 case_sensitive=(
254 "1.4",
255 "The :paramref:`_sa.create_engine.case_sensitive` parameter "
256 "is deprecated and will be removed in a future release. "
257 "Applications should work with result column names in a case "
258 "sensitive fashion.",
259 ),
260 server_side_cursors=(
261 "1.4",
262 "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
263 "is deprecated and will be removed in a future release. Please "
264 "use the "
265 ":paramref:`_engine.Connection.execution_options.stream_results` "
266 "parameter.",
267 ),
268 )
269 def __init__(
270 self,
271 convert_unicode=False,
272 encoding="utf-8",
273 paramstyle=None,
274 dbapi=None,
275 implicit_returning=None,
276 case_sensitive=True,
277 supports_native_boolean=None,
278 max_identifier_length=None,
279 label_length=None,
280 # int() is because the @deprecated_params decorator cannot accommodate
281 # the direct reference to the "NO_LINTING" object
282 compiler_linting=int(compiler.NO_LINTING),
283 server_side_cursors=False,
284 **kwargs
285 ):
287 if not getattr(self, "ported_sqla_06", True):
288 util.warn(
289 "The %s dialect is not yet ported to the 0.6 format"
290 % self.name
291 )
293 if server_side_cursors:
294 if not self.supports_server_side_cursors:
295 raise exc.ArgumentError(
296 "Dialect %s does not support server side cursors" % self
297 )
298 else:
299 self.server_side_cursors = True
301 self.convert_unicode = convert_unicode
302 self.encoding = encoding
303 self.positional = False
304 self._ischema = None
305 self.dbapi = dbapi
306 if paramstyle is not None:
307 self.paramstyle = paramstyle
308 elif self.dbapi is not None:
309 self.paramstyle = self.dbapi.paramstyle
310 else:
311 self.paramstyle = self.default_paramstyle
312 if implicit_returning is not None:
313 self.implicit_returning = implicit_returning
314 self.positional = self.paramstyle in ("qmark", "format", "numeric")
315 self.identifier_preparer = self.preparer(self)
316 self.type_compiler = self.type_compiler(self)
317 if supports_native_boolean is not None:
318 self.supports_native_boolean = supports_native_boolean
319 self.case_sensitive = case_sensitive
321 self._user_defined_max_identifier_length = max_identifier_length
322 if self._user_defined_max_identifier_length:
323 self.max_identifier_length = (
324 self._user_defined_max_identifier_length
325 )
326 self.label_length = label_length
327 self.compiler_linting = compiler_linting
328 if self.description_encoding == "use_encoding":
329 self._description_decoder = (
330 processors.to_unicode_processor_factory
331 )(encoding)
332 elif self.description_encoding is not None:
333 self._description_decoder = (
334 processors.to_unicode_processor_factory
335 )(self.description_encoding)
336 self._encoder = codecs.getencoder(self.encoding)
337 self._decoder = processors.to_unicode_processor_factory(self.encoding)
339 def _ensure_has_table_connection(self, arg):
341 if not isinstance(arg, Connection):
342 raise exc.ArgumentError(
343 "The argument passed to Dialect.has_table() should be a "
344 "%s, got %s. "
345 "Additionally, the Dialect.has_table() method is for "
346 "internal dialect "
347 "use only; please use "
348 "``inspect(some_engine).has_table(<tablename>>)`` "
349 "for public API use." % (Connection, type(arg))
350 )
352 @util.memoized_property
353 def _supports_statement_cache(self):
354 ssc = self.__class__.__dict__.get("supports_statement_cache", None)
355 if ssc is None:
356 util.warn(
357 "Dialect %s:%s will not make use of SQL compilation caching "
358 "as it does not set the 'supports_statement_cache' attribute "
359 "to ``True``. This can have "
360 "significant performance implications including some "
361 "performance degradations in comparison to prior SQLAlchemy "
362 "versions. Dialect maintainers should seek to set this "
363 "attribute to True after appropriate development and testing "
364 "for SQLAlchemy 1.4 caching support. Alternatively, this "
365 "attribute may be set to False which will disable this "
366 "warning." % (self.name, self.driver),
367 code="cprf",
368 )
370 return bool(ssc)
372 @util.memoized_property
373 def _type_memos(self):
374 return weakref.WeakKeyDictionary()
376 @property
377 def dialect_description(self):
378 return self.name + "+" + self.driver
380 @property
381 def supports_sane_rowcount_returning(self):
382 """True if this dialect supports sane rowcount even if RETURNING is
383 in use.
385 For dialects that don't support RETURNING, this is synonymous with
386 ``supports_sane_rowcount``.
388 """
389 return self.supports_sane_rowcount
391 @classmethod
392 def get_pool_class(cls, url):
393 return getattr(cls, "poolclass", pool.QueuePool)
395 def get_dialect_pool_class(self, url):
396 return self.get_pool_class(url)
398 @classmethod
399 def load_provisioning(cls):
400 package = ".".join(cls.__module__.split(".")[0:-1])
401 try:
402 __import__(package + ".provision")
403 except ImportError:
404 pass
406 def initialize(self, connection):
407 try:
408 self.server_version_info = self._get_server_version_info(
409 connection
410 )
411 except NotImplementedError:
412 self.server_version_info = None
413 try:
414 self.default_schema_name = self._get_default_schema_name(
415 connection
416 )
417 except NotImplementedError:
418 self.default_schema_name = None
420 try:
421 self.default_isolation_level = self.get_default_isolation_level(
422 connection.connection
423 )
424 except NotImplementedError:
425 self.default_isolation_level = None
427 if self.returns_unicode_strings is sqltypes.String.RETURNS_UNKNOWN:
428 if util.py3k:
429 raise exc.InvalidRequestError(
430 "RETURNS_UNKNOWN is unsupported in Python 3"
431 )
432 self.returns_unicode_strings = self._check_unicode_returns(
433 connection
434 )
436 if (
437 self.description_encoding is not None
438 and self._check_unicode_description(connection)
439 ):
440 self._description_decoder = self.description_encoding = None
442 if not self._user_defined_max_identifier_length:
443 max_ident_length = self._check_max_identifier_length(connection)
444 if max_ident_length:
445 self.max_identifier_length = max_ident_length
447 if (
448 self.label_length
449 and self.label_length > self.max_identifier_length
450 ):
451 raise exc.ArgumentError(
452 "Label length of %d is greater than this dialect's"
453 " maximum identifier length of %d"
454 % (self.label_length, self.max_identifier_length)
455 )
457 def on_connect(self):
458 # inherits the docstring from interfaces.Dialect.on_connect
459 return None
461 def _check_max_identifier_length(self, connection):
462 """Perform a connection / server version specific check to determine
463 the max_identifier_length.
465 If the dialect's class level max_identifier_length should be used,
466 can return None.
468 .. versionadded:: 1.3.9
470 """
471 return None
473 def get_default_isolation_level(self, dbapi_conn):
474 """Given a DBAPI connection, return its isolation level, or
475 a default isolation level if one cannot be retrieved.
477 May be overridden by subclasses in order to provide a
478 "fallback" isolation level for databases that cannot reliably
479 retrieve the actual isolation level.
481 By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
482 method, propagating any exceptions raised.
484 .. versionadded:: 1.3.22
486 """
487 return self.get_isolation_level(dbapi_conn)
489 def _check_unicode_returns(self, connection, additional_tests=None):
490 # this now runs in py2k only and will be removed in 2.0; disabled for
491 # Python 3 in all cases under #5315
492 if util.py2k and not self.supports_unicode_statements:
493 cast_to = util.binary_type
494 else:
495 cast_to = util.text_type
497 if self.positional:
498 parameters = self.execute_sequence_format()
499 else:
500 parameters = {}
502 def check_unicode(test):
503 statement = cast_to(expression.select(test).compile(dialect=self))
504 try:
505 cursor = connection.connection.cursor()
506 connection._cursor_execute(cursor, statement, parameters)
507 row = cursor.fetchone()
508 cursor.close()
509 except exc.DBAPIError as de:
510 # note that _cursor_execute() will have closed the cursor
511 # if an exception is thrown.
512 util.warn(
513 "Exception attempting to "
514 "detect unicode returns: %r" % de
515 )
516 return False
517 else:
518 return isinstance(row[0], util.text_type)
520 tests = [
521 # detect plain VARCHAR
522 expression.cast(
523 expression.literal_column("'test plain returns'"),
524 sqltypes.VARCHAR(60),
525 ),
526 # detect if there's an NVARCHAR type with different behavior
527 # available
528 expression.cast(
529 expression.literal_column("'test unicode returns'"),
530 sqltypes.Unicode(60),
531 ),
532 ]
534 if additional_tests:
535 tests += additional_tests
537 results = {check_unicode(test) for test in tests}
539 if results.issuperset([True, False]):
540 return sqltypes.String.RETURNS_CONDITIONAL
541 else:
542 return (
543 sqltypes.String.RETURNS_UNICODE
544 if results == {True}
545 else sqltypes.String.RETURNS_BYTES
546 )
548 def _check_unicode_description(self, connection):
549 # all DBAPIs on Py2K return cursor.description as encoded
551 if util.py2k and not self.supports_unicode_statements:
552 cast_to = util.binary_type
553 else:
554 cast_to = util.text_type
556 cursor = connection.connection.cursor()
557 try:
558 cursor.execute(
559 cast_to(
560 expression.select(
561 expression.literal_column("'x'").label("some_label")
562 ).compile(dialect=self)
563 )
564 )
565 return isinstance(cursor.description[0][0], util.text_type)
566 finally:
567 cursor.close()
569 def type_descriptor(self, typeobj):
570 """Provide a database-specific :class:`.TypeEngine` object, given
571 the generic object which comes from the types module.
573 This method looks for a dictionary called
574 ``colspecs`` as a class or instance-level variable,
575 and passes on to :func:`_types.adapt_type`.
577 """
578 return sqltypes.adapt_type(typeobj, self.colspecs)
580 def has_index(self, connection, table_name, index_name, schema=None):
581 if not self.has_table(connection, table_name, schema=schema):
582 return False
583 for idx in self.get_indexes(connection, table_name, schema=schema):
584 if idx["name"] == index_name:
585 return True
586 else:
587 return False
589 def validate_identifier(self, ident):
590 if len(ident) > self.max_identifier_length:
591 raise exc.IdentifierError(
592 "Identifier '%s' exceeds maximum length of %d characters"
593 % (ident, self.max_identifier_length)
594 )
596 def connect(self, *cargs, **cparams):
597 # inherits the docstring from interfaces.Dialect.connect
598 return self.dbapi.connect(*cargs, **cparams)
600 def create_connect_args(self, url):
601 # inherits the docstring from interfaces.Dialect.create_connect_args
602 opts = url.translate_connect_args()
603 opts.update(url.query)
604 return [[], opts]
606 def set_engine_execution_options(self, engine, opts):
607 supported_names = set(self.connection_characteristics).intersection(
608 opts
609 )
610 if supported_names:
611 characteristics = util.immutabledict(
612 (name, opts[name]) for name in supported_names
613 )
615 @event.listens_for(engine, "engine_connect")
616 def set_connection_characteristics(connection, branch):
617 if not branch:
618 self._set_connection_characteristics(
619 connection, characteristics
620 )
622 def set_connection_execution_options(self, connection, opts):
623 supported_names = set(self.connection_characteristics).intersection(
624 opts
625 )
626 if supported_names:
627 characteristics = util.immutabledict(
628 (name, opts[name]) for name in supported_names
629 )
630 self._set_connection_characteristics(connection, characteristics)
632 def _set_connection_characteristics(self, connection, characteristics):
634 characteristic_values = [
635 (name, self.connection_characteristics[name], value)
636 for name, value in characteristics.items()
637 ]
639 if connection.in_transaction():
640 trans_objs = [
641 (name, obj)
642 for name, obj, value in characteristic_values
643 if obj.transactional
644 ]
645 if trans_objs:
646 if connection._is_future:
647 raise exc.InvalidRequestError(
648 "This connection has already initialized a SQLAlchemy "
649 "Transaction() object via begin() or autobegin; "
650 "%s may not be altered unless rollback() or commit() "
651 "is called first."
652 % (", ".join(name for name, obj in trans_objs))
653 )
654 else:
655 util.warn(
656 "Connection is already established with a "
657 "Transaction; "
658 "setting %s may implicitly rollback or "
659 "commit "
660 "the existing transaction, or have no effect until "
661 "next transaction"
662 % (", ".join(name for name, obj in trans_objs))
663 )
665 dbapi_connection = connection.connection.dbapi_connection
666 for name, characteristic, value in characteristic_values:
667 characteristic.set_characteristic(self, dbapi_connection, value)
668 connection.connection._connection_record.finalize_callback.append(
669 functools.partial(self._reset_characteristics, characteristics)
670 )
672 def _reset_characteristics(self, characteristics, dbapi_connection):
673 for characteristic_name in characteristics:
674 characteristic = self.connection_characteristics[
675 characteristic_name
676 ]
677 characteristic.reset_characteristic(self, dbapi_connection)
679 def do_begin(self, dbapi_connection):
680 pass
682 def do_rollback(self, dbapi_connection):
683 dbapi_connection.rollback()
685 def do_commit(self, dbapi_connection):
686 dbapi_connection.commit()
688 def do_terminate(self, dbapi_connection):
689 self.do_close(dbapi_connection)
691 def do_close(self, dbapi_connection):
692 dbapi_connection.close()
694 @util.memoized_property
695 def _dialect_specific_select_one(self):
696 return str(expression.select(1).compile(dialect=self))
698 def do_ping(self, dbapi_connection):
699 cursor = None
700 try:
701 cursor = dbapi_connection.cursor()
702 try:
703 cursor.execute(self._dialect_specific_select_one)
704 finally:
705 cursor.close()
706 except self.dbapi.Error as err:
707 if self.is_disconnect(err, dbapi_connection, cursor):
708 return False
709 else:
710 raise
711 else:
712 return True
714 def create_xid(self):
715 """Create a random two-phase transaction ID.
717 This id will be passed to do_begin_twophase(), do_rollback_twophase(),
718 do_commit_twophase(). Its format is unspecified.
719 """
721 return "_sa_%032x" % random.randint(0, 2 ** 128)
723 def do_savepoint(self, connection, name):
724 connection.execute(expression.SavepointClause(name))
726 def do_rollback_to_savepoint(self, connection, name):
727 connection.execute(expression.RollbackToSavepointClause(name))
729 def do_release_savepoint(self, connection, name):
730 connection.execute(expression.ReleaseSavepointClause(name))
732 def do_executemany(self, cursor, statement, parameters, context=None):
733 cursor.executemany(statement, parameters)
735 def do_execute(self, cursor, statement, parameters, context=None):
736 cursor.execute(statement, parameters)
738 def do_execute_no_params(self, cursor, statement, context=None):
739 cursor.execute(statement)
741 def is_disconnect(self, e, connection, cursor):
742 return False
744 def reset_isolation_level(self, dbapi_conn):
745 # default_isolation_level is read from the first connection
746 # after the initial set of 'isolation_level', if any, so is
747 # the configured default of this dialect.
748 self.set_isolation_level(dbapi_conn, self.default_isolation_level)
750 def normalize_name(self, name):
751 if name is None:
752 return None
753 if util.py2k:
754 if isinstance(name, str):
755 name = name.decode(self.encoding)
757 name_lower = name.lower()
758 name_upper = name.upper()
760 if name_upper == name_lower:
761 # name has no upper/lower conversion, e.g. non-european characters.
762 # return unchanged
763 return name
764 elif name_upper == name and not (
765 self.identifier_preparer._requires_quotes
766 )(name_lower):
767 # name is all uppercase and doesn't require quoting; normalize
768 # to all lower case
769 return name_lower
770 elif name_lower == name:
771 # name is all lower case, which if denormalized means we need to
772 # force quoting on it
773 return quoted_name(name, quote=True)
774 else:
775 # name is mixed case, means it will be quoted in SQL when used
776 # later, no normalizes
777 return name
779 def denormalize_name(self, name):
780 if name is None:
781 return None
783 name_lower = name.lower()
784 name_upper = name.upper()
786 if name_upper == name_lower:
787 # name has no upper/lower conversion, e.g. non-european characters.
788 # return unchanged
789 return name
790 elif name_lower == name and not (
791 self.identifier_preparer._requires_quotes
792 )(name_lower):
793 name = name_upper
794 if util.py2k:
795 if not self.supports_unicode_binds:
796 name = name.encode(self.encoding)
797 else:
798 name = unicode(name) # noqa
799 return name
801 def get_driver_connection(self, connection):
802 return connection
805class _RendersLiteral(object):
806 def literal_processor(self, dialect):
807 def process(value):
808 return "'%s'" % value
810 return process
813class _StrDateTime(_RendersLiteral, sqltypes.DateTime):
814 pass
817class _StrDate(_RendersLiteral, sqltypes.Date):
818 pass
821class _StrTime(_RendersLiteral, sqltypes.Time):
822 pass
825class StrCompileDialect(DefaultDialect):
827 statement_compiler = compiler.StrSQLCompiler
828 ddl_compiler = compiler.DDLCompiler
829 type_compiler = compiler.StrSQLTypeCompiler
830 preparer = compiler.IdentifierPreparer
832 supports_statement_cache = True
834 supports_identity_columns = True
836 supports_sequences = True
837 sequences_optional = True
838 preexecute_autoincrement_sequences = False
839 implicit_returning = False
841 supports_native_boolean = True
843 supports_multivalues_insert = True
844 supports_simple_order_by_label = True
846 colspecs = {
847 sqltypes.DateTime: _StrDateTime,
848 sqltypes.Date: _StrDate,
849 sqltypes.Time: _StrTime,
850 }
853class DefaultExecutionContext(interfaces.ExecutionContext):
854 isinsert = False
855 isupdate = False
856 isdelete = False
857 is_crud = False
858 is_text = False
859 isddl = False
860 executemany = False
861 compiled = None
862 statement = None
863 result_column_struct = None
864 returned_default_rows = None
865 execution_options = util.immutabledict()
867 include_set_input_sizes = None
868 exclude_set_input_sizes = None
870 cursor_fetch_strategy = _cursor._DEFAULT_FETCH
872 cache_stats = None
873 invoked_statement = None
875 _is_implicit_returning = False
876 _is_explicit_returning = False
877 _is_future_result = False
878 _is_server_side = False
880 _soft_closed = False
882 # a hook for SQLite's translation of
883 # result column names
884 # NOTE: pyhive is using this hook, can't remove it :(
885 _translate_colname = None
887 _expanded_parameters = util.immutabledict()
889 cache_hit = NO_CACHE_KEY
891 @classmethod
892 def _init_ddl(
893 cls,
894 dialect,
895 connection,
896 dbapi_connection,
897 execution_options,
898 compiled_ddl,
899 ):
900 """Initialize execution context for a DDLElement construct."""
902 self = cls.__new__(cls)
903 self.root_connection = connection
904 self._dbapi_connection = dbapi_connection
905 self.dialect = connection.dialect
907 self.compiled = compiled = compiled_ddl
908 self.isddl = True
910 self.execution_options = execution_options
912 self._is_future_result = (
913 connection._is_future
914 or self.execution_options.get("future_result", False)
915 )
917 self.unicode_statement = util.text_type(compiled)
918 if compiled.schema_translate_map:
919 schema_translate_map = self.execution_options.get(
920 "schema_translate_map", {}
921 )
923 rst = compiled.preparer._render_schema_translates
924 self.unicode_statement = rst(
925 self.unicode_statement, schema_translate_map
926 )
928 if not dialect.supports_unicode_statements:
929 self.statement = dialect._encoder(self.unicode_statement)[0]
930 else:
931 self.statement = self.unicode_statement
933 self.cursor = self.create_cursor()
934 self.compiled_parameters = []
936 if dialect.positional:
937 self.parameters = [dialect.execute_sequence_format()]
938 else:
939 self.parameters = [{}]
941 return self
943 @classmethod
944 def _init_compiled(
945 cls,
946 dialect,
947 connection,
948 dbapi_connection,
949 execution_options,
950 compiled,
951 parameters,
952 invoked_statement,
953 extracted_parameters,
954 cache_hit=CACHING_DISABLED,
955 ):
956 """Initialize execution context for a Compiled construct."""
958 self = cls.__new__(cls)
959 self.root_connection = connection
960 self._dbapi_connection = dbapi_connection
961 self.dialect = connection.dialect
962 self.extracted_parameters = extracted_parameters
963 self.invoked_statement = invoked_statement
964 self.compiled = compiled
965 self.cache_hit = cache_hit
967 self.execution_options = execution_options
969 self._is_future_result = (
970 connection._is_future
971 or self.execution_options.get("future_result", False)
972 )
974 self.result_column_struct = (
975 compiled._result_columns,
976 compiled._ordered_columns,
977 compiled._textual_ordered_columns,
978 compiled._ad_hoc_textual,
979 compiled._loose_column_name_matching,
980 )
981 self.isinsert = compiled.isinsert
982 self.isupdate = compiled.isupdate
983 self.isdelete = compiled.isdelete
984 self.is_text = compiled.isplaintext
986 if self.isinsert or self.isupdate or self.isdelete:
987 self.is_crud = True
988 self._is_explicit_returning = bool(compiled.statement._returning)
989 self._is_implicit_returning = bool(
990 compiled.returning and not compiled.statement._returning
991 )
993 if not parameters:
994 self.compiled_parameters = [
995 compiled.construct_params(
996 extracted_parameters=extracted_parameters,
997 escape_names=False,
998 )
999 ]
1000 else:
1001 self.compiled_parameters = [
1002 compiled.construct_params(
1003 m,
1004 escape_names=False,
1005 _group_number=grp,
1006 extracted_parameters=extracted_parameters,
1007 )
1008 for grp, m in enumerate(parameters)
1009 ]
1011 self.executemany = len(parameters) > 1
1013 # this must occur before create_cursor() since the statement
1014 # has to be regexed in some cases for server side cursor
1015 if util.py2k:
1016 self.unicode_statement = util.text_type(compiled.string)
1017 else:
1018 self.unicode_statement = compiled.string
1020 self.cursor = self.create_cursor()
1022 if self.compiled.insert_prefetch or self.compiled.update_prefetch:
1023 if self.executemany:
1024 self._process_executemany_defaults()
1025 else:
1026 self._process_executesingle_defaults()
1028 processors = compiled._bind_processors
1030 if compiled.literal_execute_params or compiled.post_compile_params:
1031 if self.executemany:
1032 raise exc.InvalidRequestError(
1033 "'literal_execute' or 'expanding' parameters can't be "
1034 "used with executemany()"
1035 )
1037 expanded_state = compiled._process_parameters_for_postcompile(
1038 self.compiled_parameters[0]
1039 )
1041 # re-assign self.unicode_statement
1042 self.unicode_statement = expanded_state.statement
1044 # used by set_input_sizes() which is needed for Oracle
1045 self._expanded_parameters = expanded_state.parameter_expansion
1047 processors = dict(processors)
1048 processors.update(expanded_state.processors)
1049 positiontup = expanded_state.positiontup
1050 elif compiled.positional:
1051 positiontup = self.compiled.positiontup
1053 if compiled.schema_translate_map:
1054 schema_translate_map = self.execution_options.get(
1055 "schema_translate_map", {}
1056 )
1057 rst = compiled.preparer._render_schema_translates
1058 self.unicode_statement = rst(
1059 self.unicode_statement, schema_translate_map
1060 )
1062 # final self.unicode_statement is now assigned, encode if needed
1063 # by dialect
1064 if not dialect.supports_unicode_statements:
1065 self.statement = self.unicode_statement.encode(
1066 self.dialect.encoding
1067 )
1068 else:
1069 self.statement = self.unicode_statement
1071 # Convert the dictionary of bind parameter values
1072 # into a dict or list to be sent to the DBAPI's
1073 # execute() or executemany() method.
1074 parameters = []
1075 if compiled.positional:
1076 for compiled_params in self.compiled_parameters:
1077 param = [
1078 processors[key](compiled_params[key])
1079 if key in processors
1080 else compiled_params[key]
1081 for key in positiontup
1082 ]
1083 parameters.append(dialect.execute_sequence_format(param))
1084 else:
1085 encode = not dialect.supports_unicode_statements
1086 if encode:
1087 encoder = dialect._encoder
1088 for compiled_params in self.compiled_parameters:
1089 escaped_bind_names = compiled.escaped_bind_names
1091 if encode:
1092 if escaped_bind_names:
1093 param = {
1094 encoder(escaped_bind_names.get(key, key))[
1095 0
1096 ]: processors[key](compiled_params[key])
1097 if key in processors
1098 else compiled_params[key]
1099 for key in compiled_params
1100 }
1101 else:
1102 param = {
1103 encoder(key)[0]: processors[key](
1104 compiled_params[key]
1105 )
1106 if key in processors
1107 else compiled_params[key]
1108 for key in compiled_params
1109 }
1110 else:
1111 if escaped_bind_names:
1112 param = {
1113 escaped_bind_names.get(key, key): processors[key](
1114 compiled_params[key]
1115 )
1116 if key in processors
1117 else compiled_params[key]
1118 for key in compiled_params
1119 }
1120 else:
1121 param = {
1122 key: processors[key](compiled_params[key])
1123 if key in processors
1124 else compiled_params[key]
1125 for key in compiled_params
1126 }
1128 parameters.append(param)
1130 self.parameters = dialect.execute_sequence_format(parameters)
1132 return self
1134 @classmethod
1135 def _init_statement(
1136 cls,
1137 dialect,
1138 connection,
1139 dbapi_connection,
1140 execution_options,
1141 statement,
1142 parameters,
1143 ):
1144 """Initialize execution context for a string SQL statement."""
1146 self = cls.__new__(cls)
1147 self.root_connection = connection
1148 self._dbapi_connection = dbapi_connection
1149 self.dialect = connection.dialect
1150 self.is_text = True
1152 self.execution_options = execution_options
1154 self._is_future_result = (
1155 connection._is_future
1156 or self.execution_options.get("future_result", False)
1157 )
1159 if not parameters:
1160 if self.dialect.positional:
1161 self.parameters = [dialect.execute_sequence_format()]
1162 else:
1163 self.parameters = [{}]
1164 elif isinstance(parameters[0], dialect.execute_sequence_format):
1165 self.parameters = parameters
1166 elif isinstance(parameters[0], dict):
1167 if dialect.supports_unicode_statements:
1168 self.parameters = parameters
1169 else:
1170 self.parameters = [
1171 {dialect._encoder(k)[0]: d[k] for k in d}
1172 for d in parameters
1173 ] or [{}]
1174 else:
1175 self.parameters = [
1176 dialect.execute_sequence_format(p) for p in parameters
1177 ]
1179 self.executemany = len(parameters) > 1
1181 if not dialect.supports_unicode_statements and isinstance(
1182 statement, util.text_type
1183 ):
1184 self.unicode_statement = statement
1185 self.statement = dialect._encoder(statement)[0]
1186 else:
1187 self.statement = self.unicode_statement = statement
1189 self.cursor = self.create_cursor()
1190 return self
1192 @classmethod
1193 def _init_default(
1194 cls, dialect, connection, dbapi_connection, execution_options
1195 ):
1196 """Initialize execution context for a ColumnDefault construct."""
1198 self = cls.__new__(cls)
1199 self.root_connection = connection
1200 self._dbapi_connection = dbapi_connection
1201 self.dialect = connection.dialect
1203 self.execution_options = execution_options
1205 self._is_future_result = (
1206 connection._is_future
1207 or self.execution_options.get("future_result", False)
1208 )
1210 self.cursor = self.create_cursor()
1211 return self
1213 def _get_cache_stats(self):
1214 if self.compiled is None:
1215 return "raw sql"
1217 now = util.perf_counter()
1219 ch = self.cache_hit
1221 if ch is NO_CACHE_KEY:
1222 return "no key %.5fs" % (now - self.compiled._gen_time,)
1223 elif ch is CACHE_HIT:
1224 return "cached since %.4gs ago" % (now - self.compiled._gen_time,)
1225 elif ch is CACHE_MISS:
1226 return "generated in %.5fs" % (now - self.compiled._gen_time,)
1227 elif ch is CACHING_DISABLED:
1228 return "caching disabled %.5fs" % (now - self.compiled._gen_time,)
1229 elif ch is NO_DIALECT_SUPPORT:
1230 return "dialect %s+%s does not support caching %.5fs" % (
1231 self.dialect.name,
1232 self.dialect.driver,
1233 now - self.compiled._gen_time,
1234 )
1235 else:
1236 return "unknown"
1238 @util.memoized_property
1239 def identifier_preparer(self):
1240 if self.compiled:
1241 return self.compiled.preparer
1242 elif "schema_translate_map" in self.execution_options:
1243 return self.dialect.identifier_preparer._with_schema_translate(
1244 self.execution_options["schema_translate_map"]
1245 )
1246 else:
1247 return self.dialect.identifier_preparer
1249 @util.memoized_property
1250 def engine(self):
1251 return self.root_connection.engine
1253 @util.memoized_property
1254 def postfetch_cols(self):
1255 return self.compiled.postfetch
1257 @util.memoized_property
1258 def prefetch_cols(self):
1259 if self.isinsert:
1260 return self.compiled.insert_prefetch
1261 elif self.isupdate:
1262 return self.compiled.update_prefetch
1263 else:
1264 return ()
1266 @util.memoized_property
1267 def returning_cols(self):
1268 self.compiled.returning
1270 @util.memoized_property
1271 def no_parameters(self):
1272 return self.execution_options.get("no_parameters", False)
1274 @util.memoized_property
1275 def should_autocommit(self):
1276 autocommit = self.execution_options.get(
1277 "autocommit",
1278 not self.compiled
1279 and self.statement
1280 and expression.PARSE_AUTOCOMMIT
1281 or False,
1282 )
1284 if autocommit is expression.PARSE_AUTOCOMMIT:
1285 return self.should_autocommit_text(self.unicode_statement)
1286 else:
1287 return autocommit
1289 def _execute_scalar(self, stmt, type_, parameters=None):
1290 """Execute a string statement on the current cursor, returning a
1291 scalar result.
1293 Used to fire off sequences, default phrases, and "select lastrowid"
1294 types of statements individually or in the context of a parent INSERT
1295 or UPDATE statement.
1297 """
1299 conn = self.root_connection
1300 if (
1301 isinstance(stmt, util.text_type)
1302 and not self.dialect.supports_unicode_statements
1303 ):
1304 stmt = self.dialect._encoder(stmt)[0]
1306 if "schema_translate_map" in self.execution_options:
1307 schema_translate_map = self.execution_options.get(
1308 "schema_translate_map", {}
1309 )
1311 rst = self.identifier_preparer._render_schema_translates
1312 stmt = rst(stmt, schema_translate_map)
1314 if not parameters:
1315 if self.dialect.positional:
1316 parameters = self.dialect.execute_sequence_format()
1317 else:
1318 parameters = {}
1320 conn._cursor_execute(self.cursor, stmt, parameters, context=self)
1321 r = self.cursor.fetchone()[0]
1322 if type_ is not None:
1323 # apply type post processors to the result
1324 proc = type_._cached_result_processor(
1325 self.dialect, self.cursor.description[0][1]
1326 )
1327 if proc:
1328 return proc(r)
1329 return r
1331 @property
1332 def connection(self):
1333 conn = self.root_connection
1334 if conn._is_future:
1335 return conn
1336 else:
1337 return conn._branch()
1339 def should_autocommit_text(self, statement):
1340 return AUTOCOMMIT_REGEXP.match(statement)
1342 def _use_server_side_cursor(self):
1343 if not self.dialect.supports_server_side_cursors:
1344 return False
1346 if self.dialect.server_side_cursors:
1347 # this is deprecated
1348 use_server_side = self.execution_options.get(
1349 "stream_results", True
1350 ) and (
1351 (
1352 self.compiled
1353 and isinstance(
1354 self.compiled.statement, expression.Selectable
1355 )
1356 or (
1357 (
1358 not self.compiled
1359 or isinstance(
1360 self.compiled.statement, expression.TextClause
1361 )
1362 )
1363 and self.unicode_statement
1364 and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
1365 )
1366 )
1367 )
1368 else:
1369 use_server_side = self.execution_options.get(
1370 "stream_results", False
1371 )
1373 return use_server_side
1375 def create_cursor(self):
1376 if (
1377 # inlining initial preference checks for SS cursors
1378 self.dialect.supports_server_side_cursors
1379 and (
1380 self.execution_options.get("stream_results", False)
1381 or (
1382 self.dialect.server_side_cursors
1383 and self._use_server_side_cursor()
1384 )
1385 )
1386 ):
1387 self._is_server_side = True
1388 return self.create_server_side_cursor()
1389 else:
1390 self._is_server_side = False
1391 return self.create_default_cursor()
1393 def create_default_cursor(self):
1394 return self._dbapi_connection.cursor()
1396 def create_server_side_cursor(self):
1397 raise NotImplementedError()
1399 def pre_exec(self):
1400 pass
1402 def get_out_parameter_values(self, names):
1403 raise NotImplementedError(
1404 "This dialect does not support OUT parameters"
1405 )
1407 def post_exec(self):
1408 pass
1410 def get_result_processor(self, type_, colname, coltype):
1411 """Return a 'result processor' for a given type as present in
1412 cursor.description.
1414 This has a default implementation that dialects can override
1415 for context-sensitive result type handling.
1417 """
1418 return type_._cached_result_processor(self.dialect, coltype)
1420 def get_lastrowid(self):
1421 """return self.cursor.lastrowid, or equivalent, after an INSERT.
1423 This may involve calling special cursor functions, issuing a new SELECT
1424 on the cursor (or a new one), or returning a stored value that was
1425 calculated within post_exec().
1427 This function will only be called for dialects which support "implicit"
1428 primary key generation, keep preexecute_autoincrement_sequences set to
1429 False, and when no explicit id value was bound to the statement.
1431 The function is called once for an INSERT statement that would need to
1432 return the last inserted primary key for those dialects that make use
1433 of the lastrowid concept. In these cases, it is called directly after
1434 :meth:`.ExecutionContext.post_exec`.
1436 """
1437 return self.cursor.lastrowid
1439 def handle_dbapi_exception(self, e):
1440 pass
1442 @property
1443 def rowcount(self):
1444 return self.cursor.rowcount
1446 def supports_sane_rowcount(self):
1447 return self.dialect.supports_sane_rowcount
1449 def supports_sane_multi_rowcount(self):
1450 return self.dialect.supports_sane_multi_rowcount
1452 def _setup_result_proxy(self):
1453 exec_opt = self.execution_options
1455 if self.is_crud or self.is_text:
1456 result = self._setup_dml_or_text_result()
1457 yp = sr = False
1458 else:
1459 yp = exec_opt.get("yield_per", None)
1460 sr = self._is_server_side or exec_opt.get("stream_results", False)
1461 strategy = self.cursor_fetch_strategy
1462 if sr and strategy is _cursor._DEFAULT_FETCH:
1463 strategy = _cursor.BufferedRowCursorFetchStrategy(
1464 self.cursor, self.execution_options
1465 )
1466 cursor_description = (
1467 strategy.alternate_cursor_description
1468 or self.cursor.description
1469 )
1470 if cursor_description is None:
1471 strategy = _cursor._NO_CURSOR_DQL
1473 if self._is_future_result:
1474 if self.root_connection.should_close_with_result:
1475 raise exc.InvalidRequestError(
1476 "can't use future_result=True with close_with_result"
1477 )
1478 result = _cursor.CursorResult(
1479 self, strategy, cursor_description
1480 )
1481 else:
1482 result = _cursor.LegacyCursorResult(
1483 self, strategy, cursor_description
1484 )
1486 if (
1487 self.compiled
1488 and not self.isddl
1489 and self.compiled.has_out_parameters
1490 ):
1491 self._setup_out_parameters(result)
1493 self._soft_closed = result._soft_closed
1495 if yp:
1496 result = result.yield_per(yp)
1498 return result
1500 def _setup_out_parameters(self, result):
1502 out_bindparams = [
1503 (param, name)
1504 for param, name in self.compiled.bind_names.items()
1505 if param.isoutparam
1506 ]
1507 out_parameters = {}
1509 for bindparam, raw_value in zip(
1510 [param for param, name in out_bindparams],
1511 self.get_out_parameter_values(
1512 [name for param, name in out_bindparams]
1513 ),
1514 ):
1516 type_ = bindparam.type
1517 impl_type = type_.dialect_impl(self.dialect)
1518 dbapi_type = impl_type.get_dbapi_type(self.dialect.dbapi)
1519 result_processor = impl_type.result_processor(
1520 self.dialect, dbapi_type
1521 )
1522 if result_processor is not None:
1523 raw_value = result_processor(raw_value)
1524 out_parameters[bindparam.key] = raw_value
1526 result.out_parameters = out_parameters
1528 def _setup_dml_or_text_result(self):
1529 if self.isinsert:
1530 if self.compiled.postfetch_lastrowid:
1531 self.inserted_primary_key_rows = (
1532 self._setup_ins_pk_from_lastrowid()
1533 )
1534 # else if not self._is_implicit_returning,
1535 # the default inserted_primary_key_rows accessor will
1536 # return an "empty" primary key collection when accessed.
1538 strategy = self.cursor_fetch_strategy
1539 if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
1540 strategy = _cursor.BufferedRowCursorFetchStrategy(
1541 self.cursor, self.execution_options
1542 )
1543 cursor_description = (
1544 strategy.alternate_cursor_description or self.cursor.description
1545 )
1546 if cursor_description is None:
1547 strategy = _cursor._NO_CURSOR_DML
1549 if self._is_future_result:
1550 result = _cursor.CursorResult(self, strategy, cursor_description)
1551 else:
1552 result = _cursor.LegacyCursorResult(
1553 self, strategy, cursor_description
1554 )
1556 if self.isinsert:
1557 if self._is_implicit_returning:
1558 rows = result.all()
1560 self.returned_default_rows = rows
1562 self.inserted_primary_key_rows = (
1563 self._setup_ins_pk_from_implicit_returning(result, rows)
1564 )
1566 # test that it has a cursor metadata that is accurate. the
1567 # first row will have been fetched and current assumptions
1568 # are that the result has only one row, until executemany()
1569 # support is added here.
1570 assert result._metadata.returns_rows
1571 result._soft_close()
1572 elif not self._is_explicit_returning:
1573 result._soft_close()
1575 # we assume here the result does not return any rows.
1576 # *usually*, this will be true. However, some dialects
1577 # such as that of MSSQL/pyodbc need to SELECT a post fetch
1578 # function so this is not necessarily true.
1579 # assert not result.returns_rows
1581 elif self.isupdate and self._is_implicit_returning:
1582 row = result.fetchone()
1583 self.returned_default_rows = [row]
1584 result._soft_close()
1586 # test that it has a cursor metadata that is accurate.
1587 # the rows have all been fetched however.
1588 assert result._metadata.returns_rows
1590 elif not result._metadata.returns_rows:
1591 # no results, get rowcount
1592 # (which requires open cursor on some drivers
1593 # such as kintersbasdb, mxodbc)
1594 result.rowcount
1595 result._soft_close()
1596 return result
1598 @util.memoized_property
1599 def inserted_primary_key_rows(self):
1600 # if no specific "get primary key" strategy was set up
1601 # during execution, return a "default" primary key based
1602 # on what's in the compiled_parameters and nothing else.
1603 return self._setup_ins_pk_from_empty()
1605 def _setup_ins_pk_from_lastrowid(self):
1606 getter = self.compiled._inserted_primary_key_from_lastrowid_getter
1608 lastrowid = self.get_lastrowid()
1609 return [getter(lastrowid, self.compiled_parameters[0])]
1611 def _setup_ins_pk_from_empty(self):
1612 getter = self.compiled._inserted_primary_key_from_lastrowid_getter
1613 return [getter(None, param) for param in self.compiled_parameters]
1615 def _setup_ins_pk_from_implicit_returning(self, result, rows):
1617 if not rows:
1618 return []
1620 getter = self.compiled._inserted_primary_key_from_returning_getter
1621 compiled_params = self.compiled_parameters
1623 return [
1624 getter(row, param) for row, param in zip(rows, compiled_params)
1625 ]
1627 def lastrow_has_defaults(self):
1628 return (self.isinsert or self.isupdate) and bool(
1629 self.compiled.postfetch
1630 )
1632 def _set_input_sizes(self):
1633 """Given a cursor and ClauseParameters, call the appropriate
1634 style of ``setinputsizes()`` on the cursor, using DB-API types
1635 from the bind parameter's ``TypeEngine`` objects.
1637 This method only called by those dialects which require it,
1638 currently cx_oracle, asyncpg and pg8000.
1640 """
1641 if self.isddl or self.is_text:
1642 return
1644 inputsizes = self.compiled._get_set_input_sizes_lookup(
1645 include_types=self.include_set_input_sizes,
1646 exclude_types=self.exclude_set_input_sizes,
1647 )
1649 if inputsizes is None:
1650 return
1652 if self.dialect._has_events:
1653 inputsizes = dict(inputsizes)
1654 self.dialect.dispatch.do_setinputsizes(
1655 inputsizes, self.cursor, self.statement, self.parameters, self
1656 )
1658 has_escaped_names = bool(self.compiled.escaped_bind_names)
1659 if has_escaped_names:
1660 escaped_bind_names = self.compiled.escaped_bind_names
1662 if self.dialect.positional:
1663 items = [
1664 (key, self.compiled.binds[key])
1665 for key in self.compiled.positiontup
1666 ]
1667 else:
1668 items = [
1669 (key, bindparam)
1670 for bindparam, key in self.compiled.bind_names.items()
1671 ]
1673 generic_inputsizes = []
1674 for key, bindparam in items:
1675 if bindparam in self.compiled.literal_execute_params:
1676 continue
1678 if key in self._expanded_parameters:
1679 if bindparam.type._is_tuple_type:
1680 num = len(bindparam.type.types)
1681 dbtypes = inputsizes[bindparam]
1682 generic_inputsizes.extend(
1683 (
1684 (
1685 escaped_bind_names.get(paramname, paramname)
1686 if has_escaped_names
1687 else paramname
1688 ),
1689 dbtypes[idx % num],
1690 bindparam.type.types[idx % num],
1691 )
1692 for idx, paramname in enumerate(
1693 self._expanded_parameters[key]
1694 )
1695 )
1696 else:
1697 dbtype = inputsizes.get(bindparam, None)
1698 generic_inputsizes.extend(
1699 (
1700 (
1701 escaped_bind_names.get(paramname, paramname)
1702 if has_escaped_names
1703 else paramname
1704 ),
1705 dbtype,
1706 bindparam.type,
1707 )
1708 for paramname in self._expanded_parameters[key]
1709 )
1710 else:
1711 dbtype = inputsizes.get(bindparam, None)
1713 escaped_name = (
1714 escaped_bind_names.get(key, key)
1715 if has_escaped_names
1716 else key
1717 )
1719 generic_inputsizes.append(
1720 (escaped_name, dbtype, bindparam.type)
1721 )
1722 try:
1723 self.dialect.do_set_input_sizes(
1724 self.cursor, generic_inputsizes, self
1725 )
1726 except BaseException as e:
1727 self.root_connection._handle_dbapi_exception(
1728 e, None, None, None, self
1729 )
1731 def _exec_default(self, column, default, type_):
1732 if default.is_sequence:
1733 return self.fire_sequence(default, type_)
1734 elif default.is_callable:
1735 self.current_column = column
1736 return default.arg(self)
1737 elif default.is_clause_element:
1738 return self._exec_default_clause_element(column, default, type_)
1739 else:
1740 return default.arg
1742 def _exec_default_clause_element(self, column, default, type_):
1743 # execute a default that's a complete clause element. Here, we have
1744 # to re-implement a miniature version of the compile->parameters->
1745 # cursor.execute() sequence, since we don't want to modify the state
1746 # of the connection / result in progress or create new connection/
1747 # result objects etc.
1748 # .. versionchanged:: 1.4
1750 if not default._arg_is_typed:
1751 default_arg = expression.type_coerce(default.arg, type_)
1752 else:
1753 default_arg = default.arg
1754 compiled = expression.select(default_arg).compile(dialect=self.dialect)
1755 compiled_params = compiled.construct_params()
1756 processors = compiled._bind_processors
1757 if compiled.positional:
1758 positiontup = compiled.positiontup
1759 parameters = self.dialect.execute_sequence_format(
1760 [
1761 processors[key](compiled_params[key])
1762 if key in processors
1763 else compiled_params[key]
1764 for key in positiontup
1765 ]
1766 )
1767 else:
1768 parameters = dict(
1769 (
1770 key,
1771 processors[key](compiled_params[key])
1772 if key in processors
1773 else compiled_params[key],
1774 )
1775 for key in compiled_params
1776 )
1777 return self._execute_scalar(
1778 util.text_type(compiled), type_, parameters=parameters
1779 )
1781 current_parameters = None
1782 """A dictionary of parameters applied to the current row.
1784 This attribute is only available in the context of a user-defined default
1785 generation function, e.g. as described at :ref:`context_default_functions`.
1786 It consists of a dictionary which includes entries for each column/value
1787 pair that is to be part of the INSERT or UPDATE statement. The keys of the
1788 dictionary will be the key value of each :class:`_schema.Column`,
1789 which is usually
1790 synonymous with the name.
1792 Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
1793 does not accommodate for the "multi-values" feature of the
1794 :meth:`_expression.Insert.values` method. The
1795 :meth:`.DefaultExecutionContext.get_current_parameters` method should be
1796 preferred.
1798 .. seealso::
1800 :meth:`.DefaultExecutionContext.get_current_parameters`
1802 :ref:`context_default_functions`
1804 """
1806 def get_current_parameters(self, isolate_multiinsert_groups=True):
1807 """Return a dictionary of parameters applied to the current row.
1809 This method can only be used in the context of a user-defined default
1810 generation function, e.g. as described at
1811 :ref:`context_default_functions`. When invoked, a dictionary is
1812 returned which includes entries for each column/value pair that is part
1813 of the INSERT or UPDATE statement. The keys of the dictionary will be
1814 the key value of each :class:`_schema.Column`,
1815 which is usually synonymous
1816 with the name.
1818 :param isolate_multiinsert_groups=True: indicates that multi-valued
1819 INSERT constructs created using :meth:`_expression.Insert.values`
1820 should be
1821 handled by returning only the subset of parameters that are local
1822 to the current column default invocation. When ``False``, the
1823 raw parameters of the statement are returned including the
1824 naming convention used in the case of multi-valued INSERT.
1826 .. versionadded:: 1.2 added
1827 :meth:`.DefaultExecutionContext.get_current_parameters`
1828 which provides more functionality over the existing
1829 :attr:`.DefaultExecutionContext.current_parameters`
1830 attribute.
1832 .. seealso::
1834 :attr:`.DefaultExecutionContext.current_parameters`
1836 :ref:`context_default_functions`
1838 """
1839 try:
1840 parameters = self.current_parameters
1841 column = self.current_column
1842 except AttributeError:
1843 raise exc.InvalidRequestError(
1844 "get_current_parameters() can only be invoked in the "
1845 "context of a Python side column default function"
1846 )
1848 compile_state = self.compiled.compile_state
1849 if (
1850 isolate_multiinsert_groups
1851 and self.isinsert
1852 and compile_state._has_multi_parameters
1853 ):
1854 if column._is_multiparam_column:
1855 index = column.index + 1
1856 d = {column.original.key: parameters[column.key]}
1857 else:
1858 d = {column.key: parameters[column.key]}
1859 index = 0
1860 keys = compile_state._dict_parameters.keys()
1861 d.update(
1862 (key, parameters["%s_m%d" % (key, index)]) for key in keys
1863 )
1864 return d
1865 else:
1866 return parameters
1868 def get_insert_default(self, column):
1869 if column.default is None:
1870 return None
1871 else:
1872 return self._exec_default(column, column.default, column.type)
1874 def get_update_default(self, column):
1875 if column.onupdate is None:
1876 return None
1877 else:
1878 return self._exec_default(column, column.onupdate, column.type)
1880 def _process_executemany_defaults(self):
1881 key_getter = self.compiled._within_exec_param_key_getter
1883 scalar_defaults = {}
1885 insert_prefetch = self.compiled.insert_prefetch
1886 update_prefetch = self.compiled.update_prefetch
1888 # pre-determine scalar Python-side defaults
1889 # to avoid many calls of get_insert_default()/
1890 # get_update_default()
1891 for c in insert_prefetch:
1892 if c.default and not c.default.is_sequence and c.default.is_scalar:
1893 scalar_defaults[c] = c.default.arg
1895 for c in update_prefetch:
1896 if c.onupdate and c.onupdate.is_scalar:
1897 scalar_defaults[c] = c.onupdate.arg
1899 for param in self.compiled_parameters:
1900 self.current_parameters = param
1901 for c in insert_prefetch:
1902 if c in scalar_defaults:
1903 val = scalar_defaults[c]
1904 else:
1905 val = self.get_insert_default(c)
1906 if val is not None:
1907 param[key_getter(c)] = val
1908 for c in update_prefetch:
1909 if c in scalar_defaults:
1910 val = scalar_defaults[c]
1911 else:
1912 val = self.get_update_default(c)
1913 if val is not None:
1914 param[key_getter(c)] = val
1916 del self.current_parameters
1918 def _process_executesingle_defaults(self):
1919 key_getter = self.compiled._within_exec_param_key_getter
1920 self.current_parameters = (
1921 compiled_parameters
1922 ) = self.compiled_parameters[0]
1924 for c in self.compiled.insert_prefetch:
1925 if c.default and not c.default.is_sequence and c.default.is_scalar:
1926 val = c.default.arg
1927 else:
1928 val = self.get_insert_default(c)
1930 if val is not None:
1931 compiled_parameters[key_getter(c)] = val
1933 for c in self.compiled.update_prefetch:
1934 val = self.get_update_default(c)
1936 if val is not None:
1937 compiled_parameters[key_getter(c)] = val
1938 del self.current_parameters
1941DefaultDialect.execution_ctx_cls = DefaultExecutionContext