1# engine/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8from __future__ import annotations
9
10import contextlib
11import sys
12import typing
13from typing import Any
14from typing import Callable
15from typing import cast
16from typing import Iterable
17from typing import Iterator
18from typing import List
19from typing import Mapping
20from typing import NoReturn
21from typing import Optional
22from typing import overload
23from typing import Tuple
24from typing import Type
25from typing import TypeVar
26from typing import Union
27
28from .interfaces import BindTyping
29from .interfaces import ConnectionEventsTarget
30from .interfaces import DBAPICursor
31from .interfaces import ExceptionContext
32from .interfaces import ExecuteStyle
33from .interfaces import ExecutionContext
34from .interfaces import IsolationLevel
35from .util import _distill_params_20
36from .util import _distill_raw_params
37from .util import TransactionalContext
38from .. import exc
39from .. import inspection
40from .. import log
41from .. import util
42from ..sql import compiler
43from ..sql import util as sql_util
44from ..util.typing import Never
45from ..util.typing import TupleAny
46from ..util.typing import TypeVarTuple
47from ..util.typing import Unpack
48
49if typing.TYPE_CHECKING:
50 from . import CursorResult
51 from . import ScalarResult
52 from .interfaces import _AnyExecuteParams
53 from .interfaces import _AnyMultiExecuteParams
54 from .interfaces import _CoreAnyExecuteParams
55 from .interfaces import _CoreMultiExecuteParams
56 from .interfaces import _CoreSingleExecuteParams
57 from .interfaces import _DBAPIAnyExecuteParams
58 from .interfaces import _DBAPISingleExecuteParams
59 from .interfaces import _ExecuteOptions
60 from .interfaces import CompiledCacheType
61 from .interfaces import CoreExecuteOptionsParameter
62 from .interfaces import Dialect
63 from .interfaces import SchemaTranslateMapType
64 from .reflection import Inspector # noqa
65 from .url import URL
66 from ..event import dispatcher
67 from ..log import _EchoFlagType
68 from ..pool import _ConnectionFairy
69 from ..pool import Pool
70 from ..pool import PoolProxiedConnection
71 from ..sql import Executable
72 from ..sql._typing import _InfoType
73 from ..sql.compiler import Compiled
74 from ..sql.ddl import ExecutableDDLElement
75 from ..sql.ddl import InvokeDDLBase
76 from ..sql.functions import FunctionElement
77 from ..sql.schema import DefaultGenerator
78 from ..sql.schema import HasSchemaAttr
79 from ..sql.schema import SchemaVisitable
80 from ..sql.selectable import TypedReturnsRows
81
82
83_T = TypeVar("_T", bound=Any)
84_Ts = TypeVarTuple("_Ts")
85_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
86NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
87
88
89class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
90 """Provides high-level functionality for a wrapped DB-API connection.
91
92 The :class:`_engine.Connection` object is procured by calling the
93 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
94 object, and provides services for execution of SQL statements as well
95 as transaction control.
96
97 The Connection object is **not** thread-safe. While a Connection can be
98 shared among threads using properly synchronized access, it is still
99 possible that the underlying DBAPI connection may not support shared
100 access between threads. Check the DBAPI documentation for details.
101
102 The Connection object represents a single DBAPI connection checked out
103 from the connection pool. In this state, the connection pool has no
104 affect upon the connection, including its expiration or timeout state.
105 For the connection pool to properly manage connections, connections
106 should be returned to the connection pool (i.e. ``connection.close()``)
107 whenever the connection is not in use.
108
109 .. index::
110 single: thread safety; Connection
111
112 """
113
114 dialect: Dialect
115 dispatch: dispatcher[ConnectionEventsTarget]
116
117 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
118
119 # used by sqlalchemy.engine.util.TransactionalContext
120 _trans_context_manager: Optional[TransactionalContext] = None
121
122 # legacy as of 2.0, should be eventually deprecated and
123 # removed. was used in the "pre_ping" recipe that's been in the docs
124 # a long time
125 should_close_with_result = False
126
127 _dbapi_connection: Optional[PoolProxiedConnection]
128
129 _execution_options: _ExecuteOptions
130
131 _transaction: Optional[RootTransaction]
132 _nested_transaction: Optional[NestedTransaction]
133
134 def __init__(
135 self,
136 engine: Engine,
137 connection: Optional[PoolProxiedConnection] = None,
138 _has_events: Optional[bool] = None,
139 _allow_revalidate: bool = True,
140 _allow_autobegin: bool = True,
141 ):
142 """Construct a new Connection."""
143 self.engine = engine
144 self.dialect = dialect = engine.dialect
145
146 if connection is None:
147 try:
148 self._dbapi_connection = engine.raw_connection()
149 except dialect.loaded_dbapi.Error as err:
150 Connection._handle_dbapi_exception_noconnection(
151 err, dialect, engine
152 )
153 raise
154 else:
155 self._dbapi_connection = connection
156
157 self._transaction = self._nested_transaction = None
158 self.__savepoint_seq = 0
159 self.__in_begin = False
160
161 self.__can_reconnect = _allow_revalidate
162 self._allow_autobegin = _allow_autobegin
163 self._echo = self.engine._should_log_info()
164
165 if _has_events is None:
166 # if _has_events is sent explicitly as False,
167 # then don't join the dispatch of the engine; we don't
168 # want to handle any of the engine's events in that case.
169 self.dispatch = self.dispatch._join(engine.dispatch)
170 self._has_events = _has_events or (
171 _has_events is None and engine._has_events
172 )
173
174 self._execution_options = engine._execution_options
175
176 if self._has_events or self.engine._has_events:
177 self.dispatch.engine_connect(self)
178
179 # this can be assigned differently via
180 # characteristics.LoggingTokenCharacteristic
181 _message_formatter: Any = None
182
183 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
184 fmt = self._message_formatter
185
186 if fmt:
187 message = fmt(message)
188
189 if log.STACKLEVEL:
190 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
191
192 self.engine.logger.info(message, *arg, **kw)
193
194 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
195 fmt = self._message_formatter
196
197 if fmt:
198 message = fmt(message)
199
200 if log.STACKLEVEL:
201 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
202
203 self.engine.logger.debug(message, *arg, **kw)
204
205 @property
206 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
207 schema_translate_map: Optional[SchemaTranslateMapType] = (
208 self._execution_options.get("schema_translate_map", None)
209 )
210
211 return schema_translate_map
212
213 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
214 """Return the schema name for the given schema item taking into
215 account current schema translate map.
216
217 """
218
219 name = obj.schema
220 schema_translate_map: Optional[SchemaTranslateMapType] = (
221 self._execution_options.get("schema_translate_map", None)
222 )
223
224 if (
225 schema_translate_map
226 and name in schema_translate_map
227 and obj._use_schema_map
228 ):
229 return schema_translate_map[name]
230 else:
231 return name
232
233 def __enter__(self) -> Connection:
234 return self
235
236 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
237 self.close()
238
239 @overload
240 def execution_options(
241 self,
242 *,
243 compiled_cache: Optional[CompiledCacheType] = ...,
244 logging_token: str = ...,
245 isolation_level: IsolationLevel = ...,
246 no_parameters: bool = False,
247 stream_results: bool = False,
248 max_row_buffer: int = ...,
249 yield_per: int = ...,
250 insertmanyvalues_page_size: int = ...,
251 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
252 preserve_rowcount: bool = False,
253 driver_column_names: bool = False,
254 **opt: Any,
255 ) -> Connection: ...
256
257 @overload
258 def execution_options(self, **opt: Any) -> Connection: ...
259
260 def execution_options(self, **opt: Any) -> Connection:
261 r"""Set non-SQL options for the connection which take effect
262 during execution.
263
264 This method modifies this :class:`_engine.Connection` **in-place**;
265 the return value is the same :class:`_engine.Connection` object
266 upon which the method is called. Note that this is in contrast
267 to the behavior of the ``execution_options`` methods on other
268 objects such as :meth:`_engine.Engine.execution_options` and
269 :meth:`_sql.Executable.execution_options`. The rationale is that many
270 such execution options necessarily modify the state of the base
271 DBAPI connection in any case so there is no feasible means of
272 keeping the effect of such an option localized to a "sub" connection.
273
274 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
275 method, in contrast to other objects with this method, modifies
276 the connection in-place without creating copy of it.
277
278 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
279 method accepts any arbitrary parameters including user defined names.
280 All parameters given are consumable in a number of ways including
281 by using the :meth:`_engine.Connection.get_execution_options` method.
282 See the examples at :meth:`_sql.Executable.execution_options`
283 and :meth:`_engine.Engine.execution_options`.
284
285 The keywords that are currently recognized by SQLAlchemy itself
286 include all those listed under :meth:`.Executable.execution_options`,
287 as well as others that are specific to :class:`_engine.Connection`.
288
289 :param compiled_cache: Available on: :class:`_engine.Connection`,
290 :class:`_engine.Engine`.
291
292 A dictionary where :class:`.Compiled` objects
293 will be cached when the :class:`_engine.Connection`
294 compiles a clause
295 expression into a :class:`.Compiled` object. This dictionary will
296 supersede the statement cache that may be configured on the
297 :class:`_engine.Engine` itself. If set to None, caching
298 is disabled, even if the engine has a configured cache size.
299
300 Note that the ORM makes use of its own "compiled" caches for
301 some operations, including flush operations. The caching
302 used by the ORM internally supersedes a cache dictionary
303 specified here.
304
305 :param logging_token: Available on: :class:`_engine.Connection`,
306 :class:`_engine.Engine`, :class:`_sql.Executable`.
307
308 Adds the specified string token surrounded by brackets in log
309 messages logged by the connection, i.e. the logging that's enabled
310 either via the :paramref:`_sa.create_engine.echo` flag or via the
311 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
312 per-connection or per-sub-engine token to be available which is
313 useful for debugging concurrent connection scenarios.
314
315 .. versionadded:: 1.4.0b2
316
317 .. seealso::
318
319 :ref:`dbengine_logging_tokens` - usage example
320
321 :paramref:`_sa.create_engine.logging_name` - adds a name to the
322 name used by the Python logger object itself.
323
324 :param isolation_level: Available on: :class:`_engine.Connection`,
325 :class:`_engine.Engine`.
326
327 Set the transaction isolation level for the lifespan of this
328 :class:`_engine.Connection` object.
329 Valid values include those string
330 values accepted by the :paramref:`_sa.create_engine.isolation_level`
331 parameter passed to :func:`_sa.create_engine`. These levels are
332 semi-database specific; see individual dialect documentation for
333 valid levels.
334
335 The isolation level option applies the isolation level by emitting
336 statements on the DBAPI connection, and **necessarily affects the
337 original Connection object overall**. The isolation level will remain
338 at the given setting until explicitly changed, or when the DBAPI
339 connection itself is :term:`released` to the connection pool, i.e. the
340 :meth:`_engine.Connection.close` method is called, at which time an
341 event handler will emit additional statements on the DBAPI connection
342 in order to revert the isolation level change.
343
344 .. note:: The ``isolation_level`` execution option may only be
345 established before the :meth:`_engine.Connection.begin` method is
346 called, as well as before any SQL statements are emitted which
347 would otherwise trigger "autobegin", or directly after a call to
348 :meth:`_engine.Connection.commit` or
349 :meth:`_engine.Connection.rollback`. A database cannot change the
350 isolation level on a transaction in progress.
351
352 .. note:: The ``isolation_level`` execution option is implicitly
353 reset if the :class:`_engine.Connection` is invalidated, e.g. via
354 the :meth:`_engine.Connection.invalidate` method, or if a
355 disconnection error occurs. The new connection produced after the
356 invalidation will **not** have the selected isolation level
357 re-applied to it automatically.
358
359 .. seealso::
360
361 :ref:`dbapi_autocommit`
362
363 :meth:`_engine.Connection.get_isolation_level`
364 - view current actual level
365
366 :param no_parameters: Available on: :class:`_engine.Connection`,
367 :class:`_sql.Executable`.
368
369 When ``True``, if the final parameter
370 list or dictionary is totally empty, will invoke the
371 statement on the cursor as ``cursor.execute(statement)``,
372 not passing the parameter collection at all.
373 Some DBAPIs such as psycopg2 and mysql-python consider
374 percent signs as significant only when parameters are
375 present; this option allows code to generate SQL
376 containing percent signs (and possibly other characters)
377 that is neutral regarding whether it's executed by the DBAPI
378 or piped into a script that's later invoked by
379 command line tools.
380
381 :param stream_results: Available on: :class:`_engine.Connection`,
382 :class:`_sql.Executable`.
383
384 Indicate to the dialect that results should be "streamed" and not
385 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
386 and MariaDB, this indicates the use of a "server side cursor" as
387 opposed to a client side cursor. Other backends such as that of
388 Oracle Database may already use server side cursors by default.
389
390 The usage of
391 :paramref:`_engine.Connection.execution_options.stream_results` is
392 usually combined with setting a fixed number of rows to to be fetched
393 in batches, to allow for efficient iteration of database rows while
394 at the same time not loading all result rows into memory at once;
395 this can be configured on a :class:`_engine.Result` object using the
396 :meth:`_engine.Result.yield_per` method, after execution has
397 returned a new :class:`_engine.Result`. If
398 :meth:`_engine.Result.yield_per` is not used,
399 the :paramref:`_engine.Connection.execution_options.stream_results`
400 mode of operation will instead use a dynamically sized buffer
401 which buffers sets of rows at a time, growing on each batch
402 based on a fixed growth size up until a limit which may
403 be configured using the
404 :paramref:`_engine.Connection.execution_options.max_row_buffer`
405 parameter.
406
407 When using the ORM to fetch ORM mapped objects from a result,
408 :meth:`_engine.Result.yield_per` should always be used with
409 :paramref:`_engine.Connection.execution_options.stream_results`,
410 so that the ORM does not fetch all rows into new ORM objects at once.
411
412 For typical use, the
413 :paramref:`_engine.Connection.execution_options.yield_per` execution
414 option should be preferred, which sets up both
415 :paramref:`_engine.Connection.execution_options.stream_results` and
416 :meth:`_engine.Result.yield_per` at once. This option is supported
417 both at a core level by :class:`_engine.Connection` as well as by the
418 ORM :class:`_engine.Session`; the latter is described at
419 :ref:`orm_queryguide_yield_per`.
420
421 .. seealso::
422
423 :ref:`engine_stream_results` - background on
424 :paramref:`_engine.Connection.execution_options.stream_results`
425
426 :paramref:`_engine.Connection.execution_options.max_row_buffer`
427
428 :paramref:`_engine.Connection.execution_options.yield_per`
429
430 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
431 describing the ORM version of ``yield_per``
432
433 :param max_row_buffer: Available on: :class:`_engine.Connection`,
434 :class:`_sql.Executable`. Sets a maximum
435 buffer size to use when the
436 :paramref:`_engine.Connection.execution_options.stream_results`
437 execution option is used on a backend that supports server side
438 cursors. The default value if not specified is 1000.
439
440 .. seealso::
441
442 :paramref:`_engine.Connection.execution_options.stream_results`
443
444 :ref:`engine_stream_results`
445
446
447 :param yield_per: Available on: :class:`_engine.Connection`,
448 :class:`_sql.Executable`. Integer value applied which will
449 set the :paramref:`_engine.Connection.execution_options.stream_results`
450 execution option and invoke :meth:`_engine.Result.yield_per`
451 automatically at once. Allows equivalent functionality as
452 is present when using this parameter with the ORM.
453
454 .. versionadded:: 1.4.40
455
456 .. seealso::
457
458 :ref:`engine_stream_results` - background and examples
459 on using server side cursors with Core.
460
461 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
462 describing the ORM version of ``yield_per``
463
464 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
465 :class:`_engine.Engine`. Number of rows to format into an
466 INSERT statement when the statement uses "insertmanyvalues" mode,
467 which is a paged form of bulk insert that is used for many backends
468 when using :term:`executemany` execution typically in conjunction
469 with RETURNING. Defaults to 1000. May also be modified on a
470 per-engine basis using the
471 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
472
473 .. versionadded:: 2.0
474
475 .. seealso::
476
477 :ref:`engine_insertmanyvalues`
478
479 :param schema_translate_map: Available on: :class:`_engine.Connection`,
480 :class:`_engine.Engine`, :class:`_sql.Executable`.
481
482 A dictionary mapping schema names to schema names, that will be
483 applied to the :paramref:`_schema.Table.schema` element of each
484 :class:`_schema.Table`
485 encountered when SQL or DDL expression elements
486 are compiled into strings; the resulting schema name will be
487 converted based on presence in the map of the original name.
488
489 .. seealso::
490
491 :ref:`schema_translating`
492
493 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
494 attribute will be unconditionally memoized within the result and
495 made available via the :attr:`.CursorResult.rowcount` attribute.
496 Normally, this attribute is only preserved for UPDATE and DELETE
497 statements. Using this option, the DBAPIs rowcount value can
498 be accessed for other kinds of statements such as INSERT and SELECT,
499 to the degree that the DBAPI supports these statements. See
500 :attr:`.CursorResult.rowcount` for notes regarding the behavior
501 of this attribute.
502
503 .. versionadded:: 2.0.28
504
505 .. seealso::
506
507 :meth:`_engine.Engine.execution_options`
508
509 :meth:`.Executable.execution_options`
510
511 :meth:`_engine.Connection.get_execution_options`
512
513 :ref:`orm_queryguide_execution_options` - documentation on all
514 ORM-specific execution options
515
516 :param driver_column_names: When True, the returned
517 :class:`_engine.CursorResult` will use the column names as written in
518 ``cursor.description`` to set up the keys for the result set,
519 including the names of columns for the :class:`_engine.Row` object as
520 well as the dictionary keys when using :attr:`_engine.Row._mapping`.
521 On backends that use "name normalization" such as Oracle Database to
522 correct for lower case names being converted to all uppercase, this
523 behavior is turned off and the raw UPPERCASE names in
524 cursor.description will be present.
525
526 .. versionadded:: 2.1
527
528 """ # noqa
529 if self._has_events or self.engine._has_events:
530 self.dispatch.set_connection_execution_options(self, opt)
531 self._execution_options = self._execution_options.union(opt)
532 self.dialect.set_connection_execution_options(self, opt)
533 return self
534
535 def get_execution_options(self) -> _ExecuteOptions:
536 """Get the non-SQL options which will take effect during execution.
537
538 .. seealso::
539
540 :meth:`_engine.Connection.execution_options`
541 """
542 return self._execution_options
543
544 @property
545 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
546 pool_proxied_connection = self._dbapi_connection
547 return (
548 pool_proxied_connection is not None
549 and pool_proxied_connection.is_valid
550 )
551
552 @property
553 def closed(self) -> bool:
554 """Return True if this connection is closed."""
555
556 return self._dbapi_connection is None and not self.__can_reconnect
557
558 @property
559 def invalidated(self) -> bool:
560 """Return True if this connection was invalidated.
561
562 This does not indicate whether or not the connection was
563 invalidated at the pool level, however
564
565 """
566
567 # prior to 1.4, "invalid" was stored as a state independent of
568 # "closed", meaning an invalidated connection could be "closed",
569 # the _dbapi_connection would be None and closed=True, yet the
570 # "invalid" flag would stay True. This meant that there were
571 # three separate states (open/valid, closed/valid, closed/invalid)
572 # when there is really no reason for that; a connection that's
573 # "closed" does not need to be "invalid". So the state is now
574 # represented by the two facts alone.
575
576 pool_proxied_connection = self._dbapi_connection
577 return pool_proxied_connection is None and self.__can_reconnect
578
579 @property
580 def connection(self) -> PoolProxiedConnection:
581 """The underlying DB-API connection managed by this Connection.
582
583 This is a SQLAlchemy connection-pool proxied connection
584 which then has the attribute
585 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
586 actual driver connection.
587
588 .. seealso::
589
590
591 :ref:`dbapi_connections`
592
593 """
594
595 if self._dbapi_connection is None:
596 try:
597 return self._revalidate_connection()
598 except (exc.PendingRollbackError, exc.ResourceClosedError):
599 raise
600 except BaseException as e:
601 self._handle_dbapi_exception(e, None, None, None, None)
602 else:
603 return self._dbapi_connection
604
605 def get_isolation_level(self) -> IsolationLevel:
606 """Return the current **actual** isolation level that's present on
607 the database within the scope of this connection.
608
609 This attribute will perform a live SQL operation against the database
610 in order to procure the current isolation level, so the value returned
611 is the actual level on the underlying DBAPI connection regardless of
612 how this state was set. This will be one of the four actual isolation
613 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
614 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
615 level setting. Third party dialects may also feature additional
616 isolation level settings.
617
618 .. note:: This method **will not report** on the ``AUTOCOMMIT``
619 isolation level, which is a separate :term:`dbapi` setting that's
620 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
621 in use, the database connection still has a "traditional" isolation
622 mode in effect, that is typically one of the four values
623 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
624 ``SERIALIZABLE``.
625
626 Compare to the :attr:`_engine.Connection.default_isolation_level`
627 accessor which returns the isolation level that is present on the
628 database at initial connection time.
629
630 .. seealso::
631
632 :attr:`_engine.Connection.default_isolation_level`
633 - view default level
634
635 :paramref:`_sa.create_engine.isolation_level`
636 - set per :class:`_engine.Engine` isolation level
637
638 :paramref:`.Connection.execution_options.isolation_level`
639 - set per :class:`_engine.Connection` isolation level
640
641 """
642 dbapi_connection = self.connection.dbapi_connection
643 assert dbapi_connection is not None
644 try:
645 return self.dialect.get_isolation_level(dbapi_connection)
646 except BaseException as e:
647 self._handle_dbapi_exception(e, None, None, None, None)
648
649 @property
650 def default_isolation_level(self) -> Optional[IsolationLevel]:
651 """The initial-connection time isolation level associated with the
652 :class:`_engine.Dialect` in use.
653
654 This value is independent of the
655 :paramref:`.Connection.execution_options.isolation_level` and
656 :paramref:`.Engine.execution_options.isolation_level` execution
657 options, and is determined by the :class:`_engine.Dialect` when the
658 first connection is created, by performing a SQL query against the
659 database for the current isolation level before any additional commands
660 have been emitted.
661
662 Calling this accessor does not invoke any new SQL queries.
663
664 .. seealso::
665
666 :meth:`_engine.Connection.get_isolation_level`
667 - view current actual isolation level
668
669 :paramref:`_sa.create_engine.isolation_level`
670 - set per :class:`_engine.Engine` isolation level
671
672 :paramref:`.Connection.execution_options.isolation_level`
673 - set per :class:`_engine.Connection` isolation level
674
675 """
676 return self.dialect.default_isolation_level
677
678 def _invalid_transaction(self) -> NoReturn:
679 raise exc.PendingRollbackError(
680 "Can't reconnect until invalid %stransaction is rolled "
681 "back. Please rollback() fully before proceeding"
682 % ("savepoint " if self._nested_transaction is not None else ""),
683 code="8s2b",
684 )
685
686 def _revalidate_connection(self) -> PoolProxiedConnection:
687 if self.__can_reconnect and self.invalidated:
688 if self._transaction is not None:
689 self._invalid_transaction()
690 self._dbapi_connection = self.engine.raw_connection()
691 return self._dbapi_connection
692 raise exc.ResourceClosedError("This Connection is closed")
693
694 @property
695 def info(self) -> _InfoType:
696 """Info dictionary associated with the underlying DBAPI connection
697 referred to by this :class:`_engine.Connection`, allowing user-defined
698 data to be associated with the connection.
699
700 The data here will follow along with the DBAPI connection including
701 after it is returned to the connection pool and used again
702 in subsequent instances of :class:`_engine.Connection`.
703
704 """
705
706 return self.connection.info
707
708 def invalidate(self, exception: Optional[BaseException] = None) -> None:
709 """Invalidate the underlying DBAPI connection associated with
710 this :class:`_engine.Connection`.
711
712 An attempt will be made to close the underlying DBAPI connection
713 immediately; however if this operation fails, the error is logged
714 but not raised. The connection is then discarded whether or not
715 close() succeeded.
716
717 Upon the next use (where "use" typically means using the
718 :meth:`_engine.Connection.execute` method or similar),
719 this :class:`_engine.Connection` will attempt to
720 procure a new DBAPI connection using the services of the
721 :class:`_pool.Pool` as a source of connectivity (e.g.
722 a "reconnection").
723
724 If a transaction was in progress (e.g. the
725 :meth:`_engine.Connection.begin` method has been called) when
726 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
727 level all state associated with this transaction is lost, as
728 the DBAPI connection is closed. The :class:`_engine.Connection`
729 will not allow a reconnection to proceed until the
730 :class:`.Transaction` object is ended, by calling the
731 :meth:`.Transaction.rollback` method; until that point, any attempt at
732 continuing to use the :class:`_engine.Connection` will raise an
733 :class:`~sqlalchemy.exc.InvalidRequestError`.
734 This is to prevent applications from accidentally
735 continuing an ongoing transactional operations despite the
736 fact that the transaction has been lost due to an
737 invalidation.
738
739 The :meth:`_engine.Connection.invalidate` method,
740 just like auto-invalidation,
741 will at the connection pool level invoke the
742 :meth:`_events.PoolEvents.invalidate` event.
743
744 :param exception: an optional ``Exception`` instance that's the
745 reason for the invalidation. is passed along to event handlers
746 and logging functions.
747
748 .. seealso::
749
750 :ref:`pool_connection_invalidation`
751
752 """
753
754 if self.invalidated:
755 return
756
757 if self.closed:
758 raise exc.ResourceClosedError("This Connection is closed")
759
760 if self._still_open_and_dbapi_connection_is_valid:
761 pool_proxied_connection = self._dbapi_connection
762 assert pool_proxied_connection is not None
763 pool_proxied_connection.invalidate(exception)
764
765 self._dbapi_connection = None
766
767 def detach(self) -> None:
768 """Detach the underlying DB-API connection from its connection pool.
769
770 E.g.::
771
772 with engine.connect() as conn:
773 conn.detach()
774 conn.execute(text("SET search_path TO schema1, schema2"))
775
776 # work with connection
777
778 # connection is fully closed (since we used "with:", can
779 # also call .close())
780
781 This :class:`_engine.Connection` instance will remain usable.
782 When closed
783 (or exited from a context manager context as above),
784 the DB-API connection will be literally closed and not
785 returned to its originating pool.
786
787 This method can be used to insulate the rest of an application
788 from a modified state on a connection (such as a transaction
789 isolation level or similar).
790
791 """
792
793 if self.closed:
794 raise exc.ResourceClosedError("This Connection is closed")
795
796 pool_proxied_connection = self._dbapi_connection
797 if pool_proxied_connection is None:
798 raise exc.InvalidRequestError(
799 "Can't detach an invalidated Connection"
800 )
801 pool_proxied_connection.detach()
802
803 def _autobegin(self) -> None:
804 if self._allow_autobegin and not self.__in_begin:
805 self.begin()
806
807 def begin(self) -> RootTransaction:
808 """Begin a transaction prior to autobegin occurring.
809
810 E.g.::
811
812 with engine.connect() as conn:
813 with conn.begin() as trans:
814 conn.execute(table.insert(), {"username": "sandy"})
815
816 The returned object is an instance of :class:`_engine.RootTransaction`.
817 This object represents the "scope" of the transaction,
818 which completes when either the :meth:`_engine.Transaction.rollback`
819 or :meth:`_engine.Transaction.commit` method is called; the object
820 also works as a context manager as illustrated above.
821
822 The :meth:`_engine.Connection.begin` method begins a
823 transaction that normally will be begun in any case when the connection
824 is first used to execute a statement. The reason this method might be
825 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
826 event at a specific time, or to organize code within the scope of a
827 connection checkout in terms of context managed blocks, such as::
828
829 with engine.connect() as conn:
830 with conn.begin():
831 conn.execute(...)
832 conn.execute(...)
833
834 with conn.begin():
835 conn.execute(...)
836 conn.execute(...)
837
838 The above code is not fundamentally any different in its behavior than
839 the following code which does not use
840 :meth:`_engine.Connection.begin`; the below style is known
841 as "commit as you go" style::
842
843 with engine.connect() as conn:
844 conn.execute(...)
845 conn.execute(...)
846 conn.commit()
847
848 conn.execute(...)
849 conn.execute(...)
850 conn.commit()
851
852 From a database point of view, the :meth:`_engine.Connection.begin`
853 method does not emit any SQL or change the state of the underlying
854 DBAPI connection in any way; the Python DBAPI does not have any
855 concept of explicit transaction begin.
856
857 .. seealso::
858
859 :ref:`tutorial_working_with_transactions` - in the
860 :ref:`unified_tutorial`
861
862 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
863
864 :meth:`_engine.Connection.begin_twophase` -
865 use a two phase /XID transaction
866
867 :meth:`_engine.Engine.begin` - context manager available from
868 :class:`_engine.Engine`
869
870 """
871 if self._transaction is None:
872 self._transaction = RootTransaction(self)
873 return self._transaction
874 else:
875 raise exc.InvalidRequestError(
876 "This connection has already initialized a SQLAlchemy "
877 "Transaction() object via begin() or autobegin; can't "
878 "call begin() here unless rollback() or commit() "
879 "is called first."
880 )
881
882 def begin_nested(self) -> NestedTransaction:
883 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
884 handle that controls the scope of the SAVEPOINT.
885
886 E.g.::
887
888 with engine.begin() as connection:
889 with connection.begin_nested():
890 connection.execute(table.insert(), {"username": "sandy"})
891
892 The returned object is an instance of
893 :class:`_engine.NestedTransaction`, which includes transactional
894 methods :meth:`_engine.NestedTransaction.commit` and
895 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
896 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
897 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
898 to the :class:`_engine.NestedTransaction` object and is generated
899 automatically. Like any other :class:`_engine.Transaction`, the
900 :class:`_engine.NestedTransaction` may be used as a context manager as
901 illustrated above which will "release" or "rollback" corresponding to
902 if the operation within the block were successful or raised an
903 exception.
904
905 Nested transactions require SAVEPOINT support in the underlying
906 database, else the behavior is undefined. SAVEPOINT is commonly used to
907 run operations within a transaction that may fail, while continuing the
908 outer transaction. E.g.::
909
910 from sqlalchemy import exc
911
912 with engine.begin() as connection:
913 trans = connection.begin_nested()
914 try:
915 connection.execute(table.insert(), {"username": "sandy"})
916 trans.commit()
917 except exc.IntegrityError: # catch for duplicate username
918 trans.rollback() # rollback to savepoint
919
920 # outer transaction continues
921 connection.execute(...)
922
923 If :meth:`_engine.Connection.begin_nested` is called without first
924 calling :meth:`_engine.Connection.begin` or
925 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
926 will "autobegin" the outer transaction first. This outer transaction
927 may be committed using "commit-as-you-go" style, e.g.::
928
929 with engine.connect() as connection: # begin() wasn't called
930
931 with connection.begin_nested(): # will auto-"begin()" first
932 connection.execute(...)
933 # savepoint is released
934
935 connection.execute(...)
936
937 # explicitly commit outer transaction
938 connection.commit()
939
940 # can continue working with connection here
941
942 .. versionchanged:: 2.0
943
944 :meth:`_engine.Connection.begin_nested` will now participate
945 in the connection "autobegin" behavior that is new as of
946 2.0 / "future" style connections in 1.4.
947
948 .. seealso::
949
950 :meth:`_engine.Connection.begin`
951
952 :ref:`session_begin_nested` - ORM support for SAVEPOINT
953
954 """
955 if self._transaction is None:
956 self._autobegin()
957
958 return NestedTransaction(self)
959
960 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
961 """Begin a two-phase or XA transaction and return a transaction
962 handle.
963
964 The returned object is an instance of :class:`.TwoPhaseTransaction`,
965 which in addition to the methods provided by
966 :class:`.Transaction`, also provides a
967 :meth:`~.TwoPhaseTransaction.prepare` method.
968
969 :param xid: the two phase transaction id. If not supplied, a
970 random id will be generated.
971
972 .. seealso::
973
974 :meth:`_engine.Connection.begin`
975
976 :meth:`_engine.Connection.begin_twophase`
977
978 """
979
980 if self._transaction is not None:
981 raise exc.InvalidRequestError(
982 "Cannot start a two phase transaction when a transaction "
983 "is already in progress."
984 )
985 if xid is None:
986 xid = self.engine.dialect.create_xid()
987 return TwoPhaseTransaction(self, xid)
988
989 def commit(self) -> None:
990 """Commit the transaction that is currently in progress.
991
992 This method commits the current transaction if one has been started.
993 If no transaction was started, the method has no effect, assuming
994 the connection is in a non-invalidated state.
995
996 A transaction is begun on a :class:`_engine.Connection` automatically
997 whenever a statement is first executed, or when the
998 :meth:`_engine.Connection.begin` method is called.
999
1000 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
1001 the primary database transaction that is linked to the
1002 :class:`_engine.Connection` object. It does not operate upon a
1003 SAVEPOINT that would have been invoked from the
1004 :meth:`_engine.Connection.begin_nested` method; for control of a
1005 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
1006 :class:`_engine.NestedTransaction` that is returned by the
1007 :meth:`_engine.Connection.begin_nested` method itself.
1008
1009
1010 """
1011 if self._transaction:
1012 self._transaction.commit()
1013
1014 def rollback(self) -> None:
1015 """Roll back the transaction that is currently in progress.
1016
1017 This method rolls back the current transaction if one has been started.
1018 If no transaction was started, the method has no effect. If a
1019 transaction was started and the connection is in an invalidated state,
1020 the transaction is cleared using this method.
1021
1022 A transaction is begun on a :class:`_engine.Connection` automatically
1023 whenever a statement is first executed, or when the
1024 :meth:`_engine.Connection.begin` method is called.
1025
1026 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1027 upon the primary database transaction that is linked to the
1028 :class:`_engine.Connection` object. It does not operate upon a
1029 SAVEPOINT that would have been invoked from the
1030 :meth:`_engine.Connection.begin_nested` method; for control of a
1031 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1032 :class:`_engine.NestedTransaction` that is returned by the
1033 :meth:`_engine.Connection.begin_nested` method itself.
1034
1035
1036 """
1037 if self._transaction:
1038 self._transaction.rollback()
1039
1040 def recover_twophase(self) -> List[Any]:
1041 return self.engine.dialect.do_recover_twophase(self)
1042
1043 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1044 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1045
1046 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1047 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1048
1049 def in_transaction(self) -> bool:
1050 """Return True if a transaction is in progress."""
1051 return self._transaction is not None and self._transaction.is_active
1052
1053 def in_nested_transaction(self) -> bool:
1054 """Return True if a transaction is in progress."""
1055 return (
1056 self._nested_transaction is not None
1057 and self._nested_transaction.is_active
1058 )
1059
1060 def _is_autocommit_isolation(self) -> bool:
1061 opt_iso = self._execution_options.get("isolation_level", None)
1062 return bool(
1063 opt_iso == "AUTOCOMMIT"
1064 or (
1065 opt_iso is None
1066 and self.engine.dialect._on_connect_isolation_level
1067 == "AUTOCOMMIT"
1068 )
1069 )
1070
1071 def _get_required_transaction(self) -> RootTransaction:
1072 trans = self._transaction
1073 if trans is None:
1074 raise exc.InvalidRequestError("connection is not in a transaction")
1075 return trans
1076
1077 def _get_required_nested_transaction(self) -> NestedTransaction:
1078 trans = self._nested_transaction
1079 if trans is None:
1080 raise exc.InvalidRequestError(
1081 "connection is not in a nested transaction"
1082 )
1083 return trans
1084
1085 def get_transaction(self) -> Optional[RootTransaction]:
1086 """Return the current root transaction in progress, if any.
1087
1088 .. versionadded:: 1.4
1089
1090 """
1091
1092 return self._transaction
1093
1094 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1095 """Return the current nested transaction in progress, if any.
1096
1097 .. versionadded:: 1.4
1098
1099 """
1100 return self._nested_transaction
1101
1102 def _begin_impl(self, transaction: RootTransaction) -> None:
1103 if self._echo:
1104 if self._is_autocommit_isolation():
1105 self._log_info(
1106 "BEGIN (implicit; DBAPI should not BEGIN due to "
1107 "autocommit mode)"
1108 )
1109 else:
1110 self._log_info("BEGIN (implicit)")
1111
1112 self.__in_begin = True
1113
1114 if self._has_events or self.engine._has_events:
1115 self.dispatch.begin(self)
1116
1117 try:
1118 self.engine.dialect.do_begin(self.connection)
1119 except BaseException as e:
1120 self._handle_dbapi_exception(e, None, None, None, None)
1121 finally:
1122 self.__in_begin = False
1123
1124 def _rollback_impl(self) -> None:
1125 if self._has_events or self.engine._has_events:
1126 self.dispatch.rollback(self)
1127
1128 if self._still_open_and_dbapi_connection_is_valid:
1129 if self._echo:
1130 if self._is_autocommit_isolation():
1131 if self.dialect.skip_autocommit_rollback:
1132 self._log_info(
1133 "ROLLBACK will be skipped by "
1134 "skip_autocommit_rollback"
1135 )
1136 else:
1137 self._log_info(
1138 "ROLLBACK using DBAPI connection.rollback(); "
1139 "set skip_autocommit_rollback to prevent fully"
1140 )
1141 else:
1142 self._log_info("ROLLBACK")
1143 try:
1144 self.engine.dialect.do_rollback(self.connection)
1145 except BaseException as e:
1146 self._handle_dbapi_exception(e, None, None, None, None)
1147
1148 def _commit_impl(self) -> None:
1149 if self._has_events or self.engine._has_events:
1150 self.dispatch.commit(self)
1151
1152 if self._echo:
1153 if self._is_autocommit_isolation():
1154 self._log_info(
1155 "COMMIT using DBAPI connection.commit(), "
1156 "has no effect due to autocommit mode"
1157 )
1158 else:
1159 self._log_info("COMMIT")
1160 try:
1161 self.engine.dialect.do_commit(self.connection)
1162 except BaseException as e:
1163 self._handle_dbapi_exception(e, None, None, None, None)
1164
1165 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1166 if self._has_events or self.engine._has_events:
1167 self.dispatch.savepoint(self, name)
1168
1169 if name is None:
1170 self.__savepoint_seq += 1
1171 name = "sa_savepoint_%s" % self.__savepoint_seq
1172 self.engine.dialect.do_savepoint(self, name)
1173 return name
1174
1175 def _rollback_to_savepoint_impl(self, name: str) -> None:
1176 if self._has_events or self.engine._has_events:
1177 self.dispatch.rollback_savepoint(self, name, None)
1178
1179 if self._still_open_and_dbapi_connection_is_valid:
1180 self.engine.dialect.do_rollback_to_savepoint(self, name)
1181
1182 def _release_savepoint_impl(self, name: str) -> None:
1183 if self._has_events or self.engine._has_events:
1184 self.dispatch.release_savepoint(self, name, None)
1185
1186 self.engine.dialect.do_release_savepoint(self, name)
1187
1188 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1189 if self._echo:
1190 self._log_info("BEGIN TWOPHASE (implicit)")
1191 if self._has_events or self.engine._has_events:
1192 self.dispatch.begin_twophase(self, transaction.xid)
1193
1194 self.__in_begin = True
1195 try:
1196 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1197 except BaseException as e:
1198 self._handle_dbapi_exception(e, None, None, None, None)
1199 finally:
1200 self.__in_begin = False
1201
1202 def _prepare_twophase_impl(self, xid: Any) -> None:
1203 if self._has_events or self.engine._has_events:
1204 self.dispatch.prepare_twophase(self, xid)
1205
1206 assert isinstance(self._transaction, TwoPhaseTransaction)
1207 try:
1208 self.engine.dialect.do_prepare_twophase(self, xid)
1209 except BaseException as e:
1210 self._handle_dbapi_exception(e, None, None, None, None)
1211
1212 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1213 if self._has_events or self.engine._has_events:
1214 self.dispatch.rollback_twophase(self, xid, is_prepared)
1215
1216 if self._still_open_and_dbapi_connection_is_valid:
1217 assert isinstance(self._transaction, TwoPhaseTransaction)
1218 try:
1219 self.engine.dialect.do_rollback_twophase(
1220 self, xid, is_prepared
1221 )
1222 except BaseException as e:
1223 self._handle_dbapi_exception(e, None, None, None, None)
1224
1225 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1226 if self._has_events or self.engine._has_events:
1227 self.dispatch.commit_twophase(self, xid, is_prepared)
1228
1229 assert isinstance(self._transaction, TwoPhaseTransaction)
1230 try:
1231 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1232 except BaseException as e:
1233 self._handle_dbapi_exception(e, None, None, None, None)
1234
1235 def close(self) -> None:
1236 """Close this :class:`_engine.Connection`.
1237
1238 This results in a release of the underlying database
1239 resources, that is, the DBAPI connection referenced
1240 internally. The DBAPI connection is typically restored
1241 back to the connection-holding :class:`_pool.Pool` referenced
1242 by the :class:`_engine.Engine` that produced this
1243 :class:`_engine.Connection`. Any transactional state present on
1244 the DBAPI connection is also unconditionally released via
1245 the DBAPI connection's ``rollback()`` method, regardless
1246 of any :class:`.Transaction` object that may be
1247 outstanding with regards to this :class:`_engine.Connection`.
1248
1249 This has the effect of also calling :meth:`_engine.Connection.rollback`
1250 if any transaction is in place.
1251
1252 After :meth:`_engine.Connection.close` is called, the
1253 :class:`_engine.Connection` is permanently in a closed state,
1254 and will allow no further operations.
1255
1256 """
1257
1258 if self._transaction:
1259 self._transaction.close()
1260 skip_reset = True
1261 else:
1262 skip_reset = False
1263
1264 if self._dbapi_connection is not None:
1265 conn = self._dbapi_connection
1266
1267 # as we just closed the transaction, close the connection
1268 # pool connection without doing an additional reset
1269 if skip_reset:
1270 cast("_ConnectionFairy", conn)._close_special(
1271 transaction_reset=True
1272 )
1273 else:
1274 conn.close()
1275
1276 # There is a slight chance that conn.close() may have
1277 # triggered an invalidation here in which case
1278 # _dbapi_connection would already be None, however usually
1279 # it will be non-None here and in a "closed" state.
1280 self._dbapi_connection = None
1281 self.__can_reconnect = False
1282
1283 # special case to handle mypy issue:
1284 # https://github.com/python/mypy/issues/20651
1285 @overload
1286 def scalar(
1287 self,
1288 statement: TypedReturnsRows[Never],
1289 parameters: Optional[_CoreSingleExecuteParams] = None,
1290 *,
1291 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1292 ) -> Optional[Any]: ...
1293
1294 @overload
1295 def scalar(
1296 self,
1297 statement: TypedReturnsRows[_T],
1298 parameters: Optional[_CoreSingleExecuteParams] = None,
1299 *,
1300 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1301 ) -> Optional[_T]: ...
1302
1303 @overload
1304 def scalar(
1305 self,
1306 statement: Executable,
1307 parameters: Optional[_CoreSingleExecuteParams] = None,
1308 *,
1309 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1310 ) -> Any: ...
1311
1312 def scalar(
1313 self,
1314 statement: Executable,
1315 parameters: Optional[_CoreSingleExecuteParams] = None,
1316 *,
1317 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1318 ) -> Any:
1319 r"""Executes a SQL statement construct and returns a scalar object.
1320
1321 This method is shorthand for invoking the
1322 :meth:`_engine.Result.scalar` method after invoking the
1323 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1324
1325 :return: a scalar Python value representing the first column of the
1326 first row returned.
1327
1328 """
1329 distilled_parameters = _distill_params_20(parameters)
1330 try:
1331 meth = statement._execute_on_scalar
1332 except AttributeError as err:
1333 raise exc.ObjectNotExecutableError(statement) from err
1334 else:
1335 return meth(
1336 self,
1337 distilled_parameters,
1338 execution_options or NO_OPTIONS,
1339 )
1340
1341 @overload
1342 def scalars(
1343 self,
1344 statement: TypedReturnsRows[_T],
1345 parameters: Optional[_CoreAnyExecuteParams] = None,
1346 *,
1347 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1348 ) -> ScalarResult[_T]: ...
1349
1350 @overload
1351 def scalars(
1352 self,
1353 statement: Executable,
1354 parameters: Optional[_CoreAnyExecuteParams] = None,
1355 *,
1356 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1357 ) -> ScalarResult[Any]: ...
1358
1359 def scalars(
1360 self,
1361 statement: Executable,
1362 parameters: Optional[_CoreAnyExecuteParams] = None,
1363 *,
1364 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1365 ) -> ScalarResult[Any]:
1366 """Executes and returns a scalar result set, which yields scalar values
1367 from the first column of each row.
1368
1369 This method is equivalent to calling :meth:`_engine.Connection.execute`
1370 to receive a :class:`_result.Result` object, then invoking the
1371 :meth:`_result.Result.scalars` method to produce a
1372 :class:`_result.ScalarResult` instance.
1373
1374 :return: a :class:`_result.ScalarResult`
1375
1376 .. versionadded:: 1.4.24
1377
1378 """
1379
1380 return self.execute(
1381 statement, parameters, execution_options=execution_options
1382 ).scalars()
1383
1384 @overload
1385 def execute(
1386 self,
1387 statement: TypedReturnsRows[Unpack[_Ts]],
1388 parameters: Optional[_CoreAnyExecuteParams] = None,
1389 *,
1390 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1391 ) -> CursorResult[Unpack[_Ts]]: ...
1392
1393 @overload
1394 def execute(
1395 self,
1396 statement: Executable,
1397 parameters: Optional[_CoreAnyExecuteParams] = None,
1398 *,
1399 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1400 ) -> CursorResult[Unpack[TupleAny]]: ...
1401
1402 def execute(
1403 self,
1404 statement: Executable,
1405 parameters: Optional[_CoreAnyExecuteParams] = None,
1406 *,
1407 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1408 ) -> CursorResult[Unpack[TupleAny]]:
1409 r"""Executes a SQL statement construct and returns a
1410 :class:`_engine.CursorResult`.
1411
1412 :param statement: The statement to be executed. This is always
1413 an object that is in both the :class:`_expression.ClauseElement` and
1414 :class:`_expression.Executable` hierarchies, including:
1415
1416 * :class:`_expression.Select`
1417 * :class:`_expression.Insert`, :class:`_expression.Update`,
1418 :class:`_expression.Delete`
1419 * :class:`_expression.TextClause` and
1420 :class:`_expression.TextualSelect`
1421 * :class:`_schema.DDL` and objects which inherit from
1422 :class:`_schema.ExecutableDDLElement`
1423
1424 :param parameters: parameters which will be bound into the statement.
1425 This may be either a dictionary of parameter names to values,
1426 or a mutable sequence (e.g. a list) of dictionaries. When a
1427 list of dictionaries is passed, the underlying statement execution
1428 will make use of the DBAPI ``cursor.executemany()`` method.
1429 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1430 method will be used.
1431
1432 :param execution_options: optional dictionary of execution options,
1433 which will be associated with the statement execution. This
1434 dictionary can provide a subset of the options that are accepted
1435 by :meth:`_engine.Connection.execution_options`.
1436
1437 :return: a :class:`_engine.Result` object.
1438
1439 """
1440 distilled_parameters = _distill_params_20(parameters)
1441 try:
1442 meth = statement._execute_on_connection
1443 except AttributeError as err:
1444 raise exc.ObjectNotExecutableError(statement) from err
1445 else:
1446 return meth(
1447 self,
1448 distilled_parameters,
1449 execution_options or NO_OPTIONS,
1450 )
1451
1452 def _execute_function(
1453 self,
1454 func: FunctionElement[Any],
1455 distilled_parameters: _CoreMultiExecuteParams,
1456 execution_options: CoreExecuteOptionsParameter,
1457 ) -> CursorResult[Unpack[TupleAny]]:
1458 """Execute a sql.FunctionElement object."""
1459
1460 return self._execute_clauseelement(
1461 func.select(), distilled_parameters, execution_options
1462 )
1463
1464 def _execute_default(
1465 self,
1466 default: DefaultGenerator,
1467 distilled_parameters: _CoreMultiExecuteParams,
1468 execution_options: CoreExecuteOptionsParameter,
1469 ) -> Any:
1470 """Execute a schema.ColumnDefault object."""
1471
1472 exec_opts = self._execution_options.merge_with(execution_options)
1473
1474 event_multiparams: Optional[_CoreMultiExecuteParams]
1475 event_params: Optional[_CoreAnyExecuteParams]
1476
1477 # note for event handlers, the "distilled parameters" which is always
1478 # a list of dicts is broken out into separate "multiparams" and
1479 # "params" collections, which allows the handler to distinguish
1480 # between an executemany and execute style set of parameters.
1481 if self._has_events or self.engine._has_events:
1482 (
1483 default,
1484 distilled_parameters,
1485 event_multiparams,
1486 event_params,
1487 ) = self._invoke_before_exec_event(
1488 default, distilled_parameters, exec_opts
1489 )
1490 else:
1491 event_multiparams = event_params = None
1492
1493 try:
1494 conn = self._dbapi_connection
1495 if conn is None:
1496 conn = self._revalidate_connection()
1497
1498 dialect = self.dialect
1499 ctx = dialect.execution_ctx_cls._init_default(
1500 dialect, self, conn, exec_opts
1501 )
1502 except (exc.PendingRollbackError, exc.ResourceClosedError):
1503 raise
1504 except BaseException as e:
1505 self._handle_dbapi_exception(e, None, None, None, None)
1506
1507 ret = ctx._exec_default(None, default, None)
1508
1509 if self._has_events or self.engine._has_events:
1510 self.dispatch.after_execute(
1511 self,
1512 default,
1513 event_multiparams,
1514 event_params,
1515 exec_opts,
1516 ret,
1517 )
1518
1519 return ret
1520
1521 def _execute_ddl(
1522 self,
1523 ddl: ExecutableDDLElement,
1524 distilled_parameters: _CoreMultiExecuteParams,
1525 execution_options: CoreExecuteOptionsParameter,
1526 ) -> CursorResult[Unpack[TupleAny]]:
1527 """Execute a schema.DDL object."""
1528
1529 exec_opts = ddl._execution_options.merge_with(
1530 self._execution_options, execution_options
1531 )
1532
1533 event_multiparams: Optional[_CoreMultiExecuteParams]
1534 event_params: Optional[_CoreSingleExecuteParams]
1535
1536 if self._has_events or self.engine._has_events:
1537 (
1538 ddl,
1539 distilled_parameters,
1540 event_multiparams,
1541 event_params,
1542 ) = self._invoke_before_exec_event(
1543 ddl, distilled_parameters, exec_opts
1544 )
1545 else:
1546 event_multiparams = event_params = None
1547
1548 schema_translate_map = exec_opts.get("schema_translate_map", None)
1549
1550 dialect = self.dialect
1551
1552 compiled = ddl.compile(
1553 dialect=dialect, schema_translate_map=schema_translate_map
1554 )
1555 ret = self._execute_context(
1556 dialect,
1557 dialect.execution_ctx_cls._init_ddl,
1558 compiled,
1559 None,
1560 exec_opts,
1561 compiled,
1562 )
1563 if self._has_events or self.engine._has_events:
1564 self.dispatch.after_execute(
1565 self,
1566 ddl,
1567 event_multiparams,
1568 event_params,
1569 exec_opts,
1570 ret,
1571 )
1572 return ret
1573
1574 def _invoke_before_exec_event(
1575 self,
1576 elem: Any,
1577 distilled_params: _CoreMultiExecuteParams,
1578 execution_options: _ExecuteOptions,
1579 ) -> Tuple[
1580 Any,
1581 _CoreMultiExecuteParams,
1582 _CoreMultiExecuteParams,
1583 _CoreSingleExecuteParams,
1584 ]:
1585 event_multiparams: _CoreMultiExecuteParams
1586 event_params: _CoreSingleExecuteParams
1587
1588 if len(distilled_params) == 1:
1589 event_multiparams, event_params = [], distilled_params[0]
1590 else:
1591 event_multiparams, event_params = distilled_params, {}
1592
1593 for fn in self.dispatch.before_execute:
1594 elem, event_multiparams, event_params = fn(
1595 self,
1596 elem,
1597 event_multiparams,
1598 event_params,
1599 execution_options,
1600 )
1601
1602 if event_multiparams:
1603 distilled_params = list(event_multiparams)
1604 if event_params:
1605 raise exc.InvalidRequestError(
1606 "Event handler can't return non-empty multiparams "
1607 "and params at the same time"
1608 )
1609 elif event_params:
1610 distilled_params = [event_params]
1611 else:
1612 distilled_params = []
1613
1614 return elem, distilled_params, event_multiparams, event_params
1615
1616 def _execute_clauseelement(
1617 self,
1618 elem: Executable,
1619 distilled_parameters: _CoreMultiExecuteParams,
1620 execution_options: CoreExecuteOptionsParameter,
1621 ) -> CursorResult[Unpack[TupleAny]]:
1622 """Execute a sql.ClauseElement object."""
1623
1624 exec_opts = elem._execution_options.merge_with(
1625 self._execution_options, execution_options
1626 )
1627
1628 has_events = self._has_events or self.engine._has_events
1629 if has_events:
1630 (
1631 elem,
1632 distilled_parameters,
1633 event_multiparams,
1634 event_params,
1635 ) = self._invoke_before_exec_event(
1636 elem, distilled_parameters, exec_opts
1637 )
1638
1639 if distilled_parameters:
1640 # ensure we don't retain a link to the view object for keys()
1641 # which links to the values, which we don't want to cache
1642 keys = sorted(distilled_parameters[0])
1643 for_executemany = len(distilled_parameters) > 1
1644 else:
1645 keys = []
1646 for_executemany = False
1647
1648 dialect = self.dialect
1649
1650 schema_translate_map = exec_opts.get("schema_translate_map", None)
1651
1652 compiled_cache: Optional[CompiledCacheType] = exec_opts.get(
1653 "compiled_cache", self.engine._compiled_cache
1654 )
1655
1656 compiled_sql, extracted_params, param_dict, cache_hit = (
1657 elem._compile_w_cache(
1658 dialect=dialect,
1659 compiled_cache=compiled_cache,
1660 column_keys=keys,
1661 for_executemany=for_executemany,
1662 schema_translate_map=schema_translate_map,
1663 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1664 )
1665 )
1666 ret = self._execute_context(
1667 dialect,
1668 dialect.execution_ctx_cls._init_compiled,
1669 compiled_sql,
1670 distilled_parameters,
1671 exec_opts,
1672 compiled_sql,
1673 distilled_parameters,
1674 elem,
1675 extracted_params,
1676 cache_hit=cache_hit,
1677 param_dict=param_dict,
1678 )
1679 if has_events:
1680 self.dispatch.after_execute(
1681 self,
1682 elem,
1683 event_multiparams,
1684 event_params,
1685 exec_opts,
1686 ret,
1687 )
1688 return ret
1689
1690 def exec_driver_sql(
1691 self,
1692 statement: str,
1693 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1694 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1695 ) -> CursorResult[Unpack[TupleAny]]:
1696 r"""Executes a string SQL statement on the DBAPI cursor directly,
1697 without any SQL compilation steps.
1698
1699 This can be used to pass any string directly to the
1700 ``cursor.execute()`` method of the DBAPI in use.
1701
1702 :param statement: The statement str to be executed. Bound parameters
1703 must use the underlying DBAPI's paramstyle, such as "qmark",
1704 "pyformat", "format", etc.
1705
1706 :param parameters: represent bound parameter values to be used in the
1707 execution. The format is one of: a dictionary of named parameters,
1708 a tuple of positional parameters, or a list containing either
1709 dictionaries or tuples for multiple-execute support.
1710
1711 :return: a :class:`_engine.CursorResult`.
1712
1713 E.g. multiple dictionaries::
1714
1715
1716 conn.exec_driver_sql(
1717 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1718 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1719 )
1720
1721 Single dictionary::
1722
1723 conn.exec_driver_sql(
1724 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1725 dict(id=1, value="v1"),
1726 )
1727
1728 Single tuple::
1729
1730 conn.exec_driver_sql(
1731 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1732 )
1733
1734 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1735 not participate in the
1736 :meth:`_events.ConnectionEvents.before_execute` and
1737 :meth:`_events.ConnectionEvents.after_execute` events. To
1738 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1739 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1740 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1741
1742 .. seealso::
1743
1744 :pep:`249`
1745
1746 """
1747
1748 distilled_parameters = _distill_raw_params(parameters)
1749
1750 exec_opts = self._execution_options.merge_with(execution_options)
1751
1752 dialect = self.dialect
1753 ret = self._execute_context(
1754 dialect,
1755 dialect.execution_ctx_cls._init_statement,
1756 statement,
1757 None,
1758 exec_opts,
1759 statement,
1760 distilled_parameters,
1761 )
1762
1763 return ret
1764
1765 def _execute_context(
1766 self,
1767 dialect: Dialect,
1768 constructor: Callable[..., ExecutionContext],
1769 statement: Union[str, Compiled],
1770 parameters: Optional[_AnyMultiExecuteParams],
1771 execution_options: _ExecuteOptions,
1772 *args: Any,
1773 **kw: Any,
1774 ) -> CursorResult[Unpack[TupleAny]]:
1775 """Create an :class:`.ExecutionContext` and execute, returning
1776 a :class:`_engine.CursorResult`."""
1777
1778 if execution_options:
1779 yp = execution_options.get("yield_per", None)
1780 if yp:
1781 execution_options = execution_options.union(
1782 {"stream_results": True, "max_row_buffer": yp}
1783 )
1784 try:
1785 conn = self._dbapi_connection
1786 if conn is None:
1787 conn = self._revalidate_connection()
1788
1789 context = constructor(
1790 dialect, self, conn, execution_options, *args, **kw
1791 )
1792 except (exc.PendingRollbackError, exc.ResourceClosedError):
1793 raise
1794 except BaseException as e:
1795 self._handle_dbapi_exception(
1796 e, str(statement), parameters, None, None
1797 )
1798
1799 if (
1800 self._transaction
1801 and not self._transaction.is_active
1802 or (
1803 self._nested_transaction
1804 and not self._nested_transaction.is_active
1805 )
1806 ):
1807 self._invalid_transaction()
1808
1809 elif self._trans_context_manager:
1810 TransactionalContext._trans_ctx_check(self)
1811
1812 if self._transaction is None:
1813 self._autobegin()
1814
1815 context.pre_exec()
1816
1817 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1818 return self._exec_insertmany_context(dialect, context)
1819 else:
1820 return self._exec_single_context(
1821 dialect, context, statement, parameters
1822 )
1823
1824 def _exec_single_context(
1825 self,
1826 dialect: Dialect,
1827 context: ExecutionContext,
1828 statement: Union[str, Compiled],
1829 parameters: Optional[_AnyMultiExecuteParams],
1830 ) -> CursorResult[Unpack[TupleAny]]:
1831 """continue the _execute_context() method for a single DBAPI
1832 cursor.execute() or cursor.executemany() call.
1833
1834 """
1835 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1836 generic_setinputsizes = context._prepare_set_input_sizes()
1837
1838 if generic_setinputsizes:
1839 try:
1840 dialect.do_set_input_sizes(
1841 context.cursor, generic_setinputsizes, context
1842 )
1843 except BaseException as e:
1844 self._handle_dbapi_exception(
1845 e, str(statement), parameters, None, context
1846 )
1847
1848 cursor, str_statement, parameters = (
1849 context.cursor,
1850 context.statement,
1851 context.parameters,
1852 )
1853
1854 effective_parameters: Optional[_AnyExecuteParams]
1855
1856 if not context.executemany:
1857 effective_parameters = parameters[0]
1858 else:
1859 effective_parameters = parameters
1860
1861 if self._has_events or self.engine._has_events:
1862 for fn in self.dispatch.before_cursor_execute:
1863 str_statement, effective_parameters = fn(
1864 self,
1865 cursor,
1866 str_statement,
1867 effective_parameters,
1868 context,
1869 context.executemany,
1870 )
1871
1872 if self._echo:
1873 self._log_info(str_statement)
1874
1875 stats = context._get_cache_stats()
1876
1877 if not self.engine.hide_parameters:
1878 self._log_info(
1879 "[%s] %r",
1880 stats,
1881 sql_util._repr_params(
1882 effective_parameters,
1883 batches=10,
1884 ismulti=context.executemany,
1885 ),
1886 )
1887 else:
1888 self._log_info(
1889 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1890 stats,
1891 )
1892
1893 evt_handled: bool = False
1894 try:
1895 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1896 effective_parameters = cast(
1897 "_CoreMultiExecuteParams", effective_parameters
1898 )
1899 if self.dialect._has_events:
1900 for fn in self.dialect.dispatch.do_executemany:
1901 if fn(
1902 cursor,
1903 str_statement,
1904 effective_parameters,
1905 context,
1906 ):
1907 evt_handled = True
1908 break
1909 if not evt_handled:
1910 self.dialect.do_executemany(
1911 cursor,
1912 str_statement,
1913 effective_parameters,
1914 context,
1915 )
1916 elif not effective_parameters and context.no_parameters:
1917 if self.dialect._has_events:
1918 for fn in self.dialect.dispatch.do_execute_no_params:
1919 if fn(cursor, str_statement, context):
1920 evt_handled = True
1921 break
1922 if not evt_handled:
1923 self.dialect.do_execute_no_params(
1924 cursor, str_statement, context
1925 )
1926 else:
1927 effective_parameters = cast(
1928 "_CoreSingleExecuteParams", effective_parameters
1929 )
1930 if self.dialect._has_events:
1931 for fn in self.dialect.dispatch.do_execute:
1932 if fn(
1933 cursor,
1934 str_statement,
1935 effective_parameters,
1936 context,
1937 ):
1938 evt_handled = True
1939 break
1940 if not evt_handled:
1941 self.dialect.do_execute(
1942 cursor, str_statement, effective_parameters, context
1943 )
1944
1945 if self._has_events or self.engine._has_events:
1946 self.dispatch.after_cursor_execute(
1947 self,
1948 cursor,
1949 str_statement,
1950 effective_parameters,
1951 context,
1952 context.executemany,
1953 )
1954
1955 context.post_exec()
1956
1957 result = context._setup_result_proxy()
1958
1959 except BaseException as e:
1960 self._handle_dbapi_exception(
1961 e, str_statement, effective_parameters, cursor, context
1962 )
1963
1964 return result
1965
1966 def _exec_insertmany_context(
1967 self,
1968 dialect: Dialect,
1969 context: ExecutionContext,
1970 ) -> CursorResult[Unpack[TupleAny]]:
1971 """continue the _execute_context() method for an "insertmanyvalues"
1972 operation, which will invoke DBAPI
1973 cursor.execute() one or more times with individual log and
1974 event hook calls.
1975
1976 """
1977
1978 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1979 generic_setinputsizes = context._prepare_set_input_sizes()
1980 else:
1981 generic_setinputsizes = None
1982
1983 cursor, str_statement, parameters = (
1984 context.cursor,
1985 context.statement,
1986 context.parameters,
1987 )
1988
1989 effective_parameters = parameters
1990
1991 engine_events = self._has_events or self.engine._has_events
1992 if self.dialect._has_events:
1993 do_execute_dispatch: Iterable[Any] = (
1994 self.dialect.dispatch.do_execute
1995 )
1996 else:
1997 do_execute_dispatch = ()
1998
1999 if self._echo:
2000 stats = context._get_cache_stats() + " (insertmanyvalues)"
2001
2002 preserve_rowcount = context.execution_options.get(
2003 "preserve_rowcount", False
2004 )
2005 rowcount = 0
2006
2007 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2008 self,
2009 cursor,
2010 str_statement,
2011 effective_parameters,
2012 generic_setinputsizes,
2013 context,
2014 ):
2015 if imv_batch.processed_setinputsizes:
2016 try:
2017 dialect.do_set_input_sizes(
2018 context.cursor,
2019 imv_batch.processed_setinputsizes,
2020 context,
2021 )
2022 except BaseException as e:
2023 self._handle_dbapi_exception(
2024 e,
2025 sql_util._long_statement(imv_batch.replaced_statement),
2026 imv_batch.replaced_parameters,
2027 None,
2028 context,
2029 is_sub_exec=True,
2030 )
2031
2032 sub_stmt = imv_batch.replaced_statement
2033 sub_params = imv_batch.replaced_parameters
2034
2035 if engine_events:
2036 for fn in self.dispatch.before_cursor_execute:
2037 sub_stmt, sub_params = fn(
2038 self,
2039 cursor,
2040 sub_stmt,
2041 sub_params,
2042 context,
2043 True,
2044 )
2045
2046 if self._echo:
2047 self._log_info(sql_util._long_statement(sub_stmt))
2048
2049 imv_stats = f""" {imv_batch.batchnum}/{
2050 imv_batch.total_batches
2051 } ({
2052 'ordered'
2053 if imv_batch.rows_sorted else 'unordered'
2054 }{
2055 '; batch not supported'
2056 if imv_batch.is_downgraded
2057 else ''
2058 })"""
2059
2060 if imv_batch.batchnum == 1:
2061 stats += imv_stats
2062 else:
2063 stats = f"insertmanyvalues{imv_stats}"
2064
2065 if not self.engine.hide_parameters:
2066 self._log_info(
2067 "[%s] %r",
2068 stats,
2069 sql_util._repr_params(
2070 sub_params,
2071 batches=10,
2072 ismulti=False,
2073 ),
2074 )
2075 else:
2076 self._log_info(
2077 "[%s] [SQL parameters hidden due to "
2078 "hide_parameters=True]",
2079 stats,
2080 )
2081
2082 try:
2083 for fn in do_execute_dispatch:
2084 if fn(
2085 cursor,
2086 sub_stmt,
2087 sub_params,
2088 context,
2089 ):
2090 break
2091 else:
2092 dialect.do_execute(
2093 cursor,
2094 sub_stmt,
2095 sub_params,
2096 context,
2097 )
2098 except BaseException as e:
2099 self._handle_dbapi_exception(
2100 e,
2101 sql_util._long_statement(sub_stmt),
2102 sub_params,
2103 cursor,
2104 context,
2105 is_sub_exec=True,
2106 )
2107
2108 if engine_events:
2109 self.dispatch.after_cursor_execute(
2110 self,
2111 cursor,
2112 sub_stmt,
2113 sub_params,
2114 context,
2115 context.executemany,
2116 )
2117
2118 if preserve_rowcount:
2119 rowcount += imv_batch.current_batch_size
2120
2121 try:
2122 context.post_exec()
2123
2124 if preserve_rowcount:
2125 context._rowcount = rowcount # type: ignore[attr-defined]
2126
2127 result = context._setup_result_proxy()
2128
2129 except BaseException as e:
2130 self._handle_dbapi_exception(
2131 e, str_statement, effective_parameters, cursor, context
2132 )
2133
2134 return result
2135
2136 def _cursor_execute(
2137 self,
2138 cursor: DBAPICursor,
2139 statement: str,
2140 parameters: _DBAPISingleExecuteParams,
2141 context: Optional[ExecutionContext] = None,
2142 ) -> None:
2143 """Execute a statement + params on the given cursor.
2144
2145 Adds appropriate logging and exception handling.
2146
2147 This method is used by DefaultDialect for special-case
2148 executions, such as for sequences and column defaults.
2149 The path of statement execution in the majority of cases
2150 terminates at _execute_context().
2151
2152 """
2153 if self._has_events or self.engine._has_events:
2154 for fn in self.dispatch.before_cursor_execute:
2155 statement, parameters = fn(
2156 self, cursor, statement, parameters, context, False
2157 )
2158
2159 if self._echo:
2160 self._log_info(statement)
2161 self._log_info("[raw sql] %r", parameters)
2162 try:
2163 for fn in (
2164 ()
2165 if not self.dialect._has_events
2166 else self.dialect.dispatch.do_execute
2167 ):
2168 if fn(cursor, statement, parameters, context):
2169 break
2170 else:
2171 self.dialect.do_execute(cursor, statement, parameters, context)
2172 except BaseException as e:
2173 self._handle_dbapi_exception(
2174 e, statement, parameters, cursor, context
2175 )
2176
2177 if self._has_events or self.engine._has_events:
2178 self.dispatch.after_cursor_execute(
2179 self, cursor, statement, parameters, context, False
2180 )
2181
2182 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2183 """Close the given cursor, catching exceptions
2184 and turning into log warnings.
2185
2186 """
2187 try:
2188 cursor.close()
2189 except Exception:
2190 # log the error through the connection pool's logger.
2191 self.engine.pool.logger.error(
2192 "Error closing cursor", exc_info=True
2193 )
2194
2195 _reentrant_error = False
2196 _is_disconnect = False
2197
2198 def _handle_dbapi_exception(
2199 self,
2200 e: BaseException,
2201 statement: Optional[str],
2202 parameters: Optional[_AnyExecuteParams],
2203 cursor: Optional[DBAPICursor],
2204 context: Optional[ExecutionContext],
2205 is_sub_exec: bool = False,
2206 ) -> NoReturn:
2207 exc_info = sys.exc_info()
2208
2209 is_exit_exception = util.is_exit_exception(e)
2210
2211 if not self._is_disconnect:
2212 self._is_disconnect = (
2213 isinstance(e, self.dialect.loaded_dbapi.Error)
2214 and not self.closed
2215 and self.dialect.is_disconnect(
2216 e,
2217 self._dbapi_connection if not self.invalidated else None,
2218 cursor,
2219 )
2220 ) or (is_exit_exception and not self.closed)
2221
2222 invalidate_pool_on_disconnect = not is_exit_exception
2223
2224 ismulti: bool = (
2225 not is_sub_exec and context.executemany
2226 if context is not None
2227 else False
2228 )
2229 if self._reentrant_error:
2230 raise exc.DBAPIError.instance(
2231 statement,
2232 parameters,
2233 e,
2234 self.dialect.loaded_dbapi.Error,
2235 hide_parameters=self.engine.hide_parameters,
2236 dialect=self.dialect,
2237 ismulti=ismulti,
2238 ).with_traceback(exc_info[2]) from e
2239 self._reentrant_error = True
2240 try:
2241 # non-DBAPI error - if we already got a context,
2242 # or there's no string statement, don't wrap it
2243 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2244 statement is not None
2245 and context is None
2246 and not is_exit_exception
2247 )
2248
2249 if should_wrap:
2250 sqlalchemy_exception = exc.DBAPIError.instance(
2251 statement,
2252 parameters,
2253 cast(Exception, e),
2254 self.dialect.loaded_dbapi.Error,
2255 hide_parameters=self.engine.hide_parameters,
2256 connection_invalidated=self._is_disconnect,
2257 dialect=self.dialect,
2258 ismulti=ismulti,
2259 )
2260 else:
2261 sqlalchemy_exception = None
2262
2263 newraise = None
2264
2265 if (self.dialect._has_events) and not self._execution_options.get(
2266 "skip_user_error_events", False
2267 ):
2268 ctx = ExceptionContextImpl(
2269 e,
2270 sqlalchemy_exception,
2271 self.engine,
2272 self.dialect,
2273 self,
2274 cursor,
2275 statement,
2276 parameters,
2277 context,
2278 self._is_disconnect,
2279 invalidate_pool_on_disconnect,
2280 False,
2281 )
2282
2283 for fn in self.dialect.dispatch.handle_error:
2284 try:
2285 # handler returns an exception;
2286 # call next handler in a chain
2287 per_fn = fn(ctx)
2288 if per_fn is not None:
2289 ctx.chained_exception = newraise = per_fn
2290 except Exception as _raised:
2291 # handler raises an exception - stop processing
2292 newraise = _raised
2293 break
2294
2295 if self._is_disconnect != ctx.is_disconnect:
2296 self._is_disconnect = ctx.is_disconnect
2297 if sqlalchemy_exception:
2298 sqlalchemy_exception.connection_invalidated = (
2299 ctx.is_disconnect
2300 )
2301
2302 # set up potentially user-defined value for
2303 # invalidate pool.
2304 invalidate_pool_on_disconnect = (
2305 ctx.invalidate_pool_on_disconnect
2306 )
2307
2308 if should_wrap and context:
2309 context.handle_dbapi_exception(e)
2310
2311 if not self._is_disconnect:
2312 if cursor:
2313 self._safe_close_cursor(cursor)
2314 # "autorollback" was mostly relevant in 1.x series.
2315 # It's very unlikely to reach here, as the connection
2316 # does autobegin so when we are here, we are usually
2317 # in an explicit / semi-explicit transaction.
2318 # however we have a test which manufactures this
2319 # scenario in any case using an event handler.
2320 # test/engine/test_execute.py-> test_actual_autorollback
2321 if not self.in_transaction():
2322 self._rollback_impl()
2323
2324 if newraise:
2325 raise newraise.with_traceback(exc_info[2]) from e
2326 elif should_wrap:
2327 assert sqlalchemy_exception is not None
2328 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2329 else:
2330 assert exc_info[1] is not None
2331 raise exc_info[1].with_traceback(exc_info[2])
2332 finally:
2333 del self._reentrant_error
2334 if self._is_disconnect:
2335 del self._is_disconnect
2336 if not self.invalidated:
2337 dbapi_conn_wrapper = self._dbapi_connection
2338 assert dbapi_conn_wrapper is not None
2339 if invalidate_pool_on_disconnect:
2340 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2341 self.invalidate(e)
2342
2343 @classmethod
2344 def _handle_dbapi_exception_noconnection(
2345 cls,
2346 e: BaseException,
2347 dialect: Dialect,
2348 engine: Optional[Engine] = None,
2349 is_disconnect: Optional[bool] = None,
2350 invalidate_pool_on_disconnect: bool = True,
2351 is_pre_ping: bool = False,
2352 ) -> NoReturn:
2353 exc_info = sys.exc_info()
2354
2355 if is_disconnect is None:
2356 is_disconnect = isinstance(
2357 e, dialect.loaded_dbapi.Error
2358 ) and dialect.is_disconnect(e, None, None)
2359
2360 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2361
2362 if should_wrap:
2363 sqlalchemy_exception = exc.DBAPIError.instance(
2364 None,
2365 None,
2366 cast(Exception, e),
2367 dialect.loaded_dbapi.Error,
2368 hide_parameters=(
2369 engine.hide_parameters if engine is not None else False
2370 ),
2371 connection_invalidated=is_disconnect,
2372 dialect=dialect,
2373 )
2374 else:
2375 sqlalchemy_exception = None
2376
2377 newraise = None
2378
2379 if dialect._has_events:
2380 ctx = ExceptionContextImpl(
2381 e,
2382 sqlalchemy_exception,
2383 engine,
2384 dialect,
2385 None,
2386 None,
2387 None,
2388 None,
2389 None,
2390 is_disconnect,
2391 invalidate_pool_on_disconnect,
2392 is_pre_ping,
2393 )
2394 for fn in dialect.dispatch.handle_error:
2395 try:
2396 # handler returns an exception;
2397 # call next handler in a chain
2398 per_fn = fn(ctx)
2399 if per_fn is not None:
2400 ctx.chained_exception = newraise = per_fn
2401 except Exception as _raised:
2402 # handler raises an exception - stop processing
2403 newraise = _raised
2404 break
2405
2406 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2407 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2408
2409 if newraise:
2410 raise newraise.with_traceback(exc_info[2]) from e
2411 elif should_wrap:
2412 assert sqlalchemy_exception is not None
2413 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2414 else:
2415 assert exc_info[1] is not None
2416 raise exc_info[1].with_traceback(exc_info[2])
2417
2418 def _run_ddl_visitor(
2419 self,
2420 visitorcallable: Type[InvokeDDLBase],
2421 element: SchemaVisitable,
2422 **kwargs: Any,
2423 ) -> None:
2424 """run a DDL visitor.
2425
2426 This method is only here so that the MockConnection can change the
2427 options given to the visitor so that "checkfirst" is skipped.
2428
2429 """
2430 visitorcallable(
2431 dialect=self.dialect, connection=self, **kwargs
2432 ).traverse_single(element)
2433
2434
2435class ExceptionContextImpl(ExceptionContext):
2436 """Implement the :class:`.ExceptionContext` interface."""
2437
2438 __slots__ = (
2439 "connection",
2440 "engine",
2441 "dialect",
2442 "cursor",
2443 "statement",
2444 "parameters",
2445 "original_exception",
2446 "sqlalchemy_exception",
2447 "chained_exception",
2448 "execution_context",
2449 "is_disconnect",
2450 "invalidate_pool_on_disconnect",
2451 "is_pre_ping",
2452 )
2453
2454 def __init__(
2455 self,
2456 exception: BaseException,
2457 sqlalchemy_exception: Optional[exc.StatementError],
2458 engine: Optional[Engine],
2459 dialect: Dialect,
2460 connection: Optional[Connection],
2461 cursor: Optional[DBAPICursor],
2462 statement: Optional[str],
2463 parameters: Optional[_DBAPIAnyExecuteParams],
2464 context: Optional[ExecutionContext],
2465 is_disconnect: bool,
2466 invalidate_pool_on_disconnect: bool,
2467 is_pre_ping: bool,
2468 ):
2469 self.engine = engine
2470 self.dialect = dialect
2471 self.connection = connection
2472 self.sqlalchemy_exception = sqlalchemy_exception
2473 self.original_exception = exception
2474 self.execution_context = context
2475 self.statement = statement
2476 self.parameters = parameters
2477 self.is_disconnect = is_disconnect
2478 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2479 self.is_pre_ping = is_pre_ping
2480
2481
2482class Transaction(TransactionalContext):
2483 """Represent a database transaction in progress.
2484
2485 The :class:`.Transaction` object is procured by
2486 calling the :meth:`_engine.Connection.begin` method of
2487 :class:`_engine.Connection`::
2488
2489 from sqlalchemy import create_engine
2490
2491 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2492 connection = engine.connect()
2493 trans = connection.begin()
2494 connection.execute(text("insert into x (a, b) values (1, 2)"))
2495 trans.commit()
2496
2497 The object provides :meth:`.rollback` and :meth:`.commit`
2498 methods in order to control transaction boundaries. It
2499 also implements a context manager interface so that
2500 the Python ``with`` statement can be used with the
2501 :meth:`_engine.Connection.begin` method::
2502
2503 with connection.begin():
2504 connection.execute(text("insert into x (a, b) values (1, 2)"))
2505
2506 The Transaction object is **not** threadsafe.
2507
2508 .. seealso::
2509
2510 :meth:`_engine.Connection.begin`
2511
2512 :meth:`_engine.Connection.begin_twophase`
2513
2514 :meth:`_engine.Connection.begin_nested`
2515
2516 .. index::
2517 single: thread safety; Transaction
2518 """ # noqa
2519
2520 __slots__ = ()
2521
2522 _is_root: bool = False
2523 is_active: bool
2524 connection: Connection
2525
2526 def __init__(self, connection: Connection):
2527 raise NotImplementedError()
2528
2529 @property
2530 def _deactivated_from_connection(self) -> bool:
2531 """True if this transaction is totally deactivated from the connection
2532 and therefore can no longer affect its state.
2533
2534 """
2535 raise NotImplementedError()
2536
2537 def _do_close(self) -> None:
2538 raise NotImplementedError()
2539
2540 def _do_rollback(self) -> None:
2541 raise NotImplementedError()
2542
2543 def _do_commit(self) -> None:
2544 raise NotImplementedError()
2545
2546 @property
2547 def is_valid(self) -> bool:
2548 return self.is_active and not self.connection.invalidated
2549
2550 def close(self) -> None:
2551 """Close this :class:`.Transaction`.
2552
2553 If this transaction is the base transaction in a begin/commit
2554 nesting, the transaction will rollback(). Otherwise, the
2555 method returns.
2556
2557 This is used to cancel a Transaction without affecting the scope of
2558 an enclosing transaction.
2559
2560 """
2561 try:
2562 self._do_close()
2563 finally:
2564 assert not self.is_active
2565
2566 def rollback(self) -> None:
2567 """Roll back this :class:`.Transaction`.
2568
2569 The implementation of this may vary based on the type of transaction in
2570 use:
2571
2572 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2573 it corresponds to a ROLLBACK.
2574
2575 * For a :class:`.NestedTransaction`, it corresponds to a
2576 "ROLLBACK TO SAVEPOINT" operation.
2577
2578 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2579 phase transactions may be used.
2580
2581
2582 """
2583 try:
2584 self._do_rollback()
2585 finally:
2586 assert not self.is_active
2587
2588 def commit(self) -> None:
2589 """Commit this :class:`.Transaction`.
2590
2591 The implementation of this may vary based on the type of transaction in
2592 use:
2593
2594 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2595 it corresponds to a COMMIT.
2596
2597 * For a :class:`.NestedTransaction`, it corresponds to a
2598 "RELEASE SAVEPOINT" operation.
2599
2600 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2601 phase transactions may be used.
2602
2603 """
2604 try:
2605 self._do_commit()
2606 finally:
2607 assert not self.is_active
2608
2609 def _get_subject(self) -> Connection:
2610 return self.connection
2611
2612 def _transaction_is_active(self) -> bool:
2613 return self.is_active
2614
2615 def _transaction_is_closed(self) -> bool:
2616 return not self._deactivated_from_connection
2617
2618 def _rollback_can_be_called(self) -> bool:
2619 # for RootTransaction / NestedTransaction, it's safe to call
2620 # rollback() even if the transaction is deactive and no warnings
2621 # will be emitted. tested in
2622 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2623 return True
2624
2625
2626class RootTransaction(Transaction):
2627 """Represent the "root" transaction on a :class:`_engine.Connection`.
2628
2629 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2630 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2631 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2632 remains associated with the :class:`_engine.Connection` throughout its
2633 active span. The current :class:`_engine.RootTransaction` in use is
2634 accessible via the :attr:`_engine.Connection.get_transaction` method of
2635 :class:`_engine.Connection`.
2636
2637 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2638 "autobegin" behavior that will create a new
2639 :class:`_engine.RootTransaction` whenever a connection in a
2640 non-transactional state is used to emit commands on the DBAPI connection.
2641 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2642 use can be controlled using the :meth:`_engine.Connection.commit` and
2643 :meth:`_engine.Connection.rollback` methods.
2644
2645
2646 """
2647
2648 _is_root = True
2649
2650 __slots__ = ("connection", "is_active")
2651
2652 def __init__(self, connection: Connection):
2653 assert connection._transaction is None
2654 if connection._trans_context_manager:
2655 TransactionalContext._trans_ctx_check(connection)
2656 self.connection = connection
2657 self._connection_begin_impl()
2658 connection._transaction = self
2659
2660 self.is_active = True
2661
2662 def _deactivate_from_connection(self) -> None:
2663 if self.is_active:
2664 assert self.connection._transaction is self
2665 self.is_active = False
2666
2667 elif self.connection._transaction is not self:
2668 util.warn("transaction already deassociated from connection")
2669
2670 @property
2671 def _deactivated_from_connection(self) -> bool:
2672 return self.connection._transaction is not self
2673
2674 def _connection_begin_impl(self) -> None:
2675 self.connection._begin_impl(self)
2676
2677 def _connection_rollback_impl(self) -> None:
2678 self.connection._rollback_impl()
2679
2680 def _connection_commit_impl(self) -> None:
2681 self.connection._commit_impl()
2682
2683 def _close_impl(self, try_deactivate: bool = False) -> None:
2684 try:
2685 if self.is_active:
2686 self._connection_rollback_impl()
2687
2688 if self.connection._nested_transaction:
2689 self.connection._nested_transaction._cancel()
2690 finally:
2691 if self.is_active or try_deactivate:
2692 self._deactivate_from_connection()
2693 if self.connection._transaction is self:
2694 self.connection._transaction = None
2695
2696 assert not self.is_active
2697 assert self.connection._transaction is not self
2698
2699 def _do_close(self) -> None:
2700 self._close_impl()
2701
2702 def _do_rollback(self) -> None:
2703 self._close_impl(try_deactivate=True)
2704
2705 def _do_commit(self) -> None:
2706 if self.is_active:
2707 assert self.connection._transaction is self
2708
2709 try:
2710 self._connection_commit_impl()
2711 finally:
2712 # whether or not commit succeeds, cancel any
2713 # nested transactions, make this transaction "inactive"
2714 # and remove it as a reset agent
2715 if self.connection._nested_transaction:
2716 self.connection._nested_transaction._cancel()
2717
2718 self._deactivate_from_connection()
2719
2720 # ...however only remove as the connection's current transaction
2721 # if commit succeeded. otherwise it stays on so that a rollback
2722 # needs to occur.
2723 self.connection._transaction = None
2724 else:
2725 if self.connection._transaction is self:
2726 self.connection._invalid_transaction()
2727 else:
2728 raise exc.InvalidRequestError("This transaction is inactive")
2729
2730 assert not self.is_active
2731 assert self.connection._transaction is not self
2732
2733
2734class NestedTransaction(Transaction):
2735 """Represent a 'nested', or SAVEPOINT transaction.
2736
2737 The :class:`.NestedTransaction` object is created by calling the
2738 :meth:`_engine.Connection.begin_nested` method of
2739 :class:`_engine.Connection`.
2740
2741 When using :class:`.NestedTransaction`, the semantics of "begin" /
2742 "commit" / "rollback" are as follows:
2743
2744 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2745 the savepoint is given an explicit name that is part of the state
2746 of this object.
2747
2748 * The :meth:`.NestedTransaction.commit` method corresponds to a
2749 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2750 with this :class:`.NestedTransaction`.
2751
2752 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2753 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2754 associated with this :class:`.NestedTransaction`.
2755
2756 The rationale for mimicking the semantics of an outer transaction in
2757 terms of savepoints so that code may deal with a "savepoint" transaction
2758 and an "outer" transaction in an agnostic way.
2759
2760 .. seealso::
2761
2762 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2763
2764 """
2765
2766 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2767
2768 _savepoint: str
2769
2770 def __init__(self, connection: Connection):
2771 assert connection._transaction is not None
2772 if connection._trans_context_manager:
2773 TransactionalContext._trans_ctx_check(connection)
2774 self.connection = connection
2775 self._savepoint = self.connection._savepoint_impl()
2776 self.is_active = True
2777 self._previous_nested = connection._nested_transaction
2778 connection._nested_transaction = self
2779
2780 def _deactivate_from_connection(self, warn: bool = True) -> None:
2781 if self.connection._nested_transaction is self:
2782 self.connection._nested_transaction = self._previous_nested
2783 elif warn:
2784 util.warn(
2785 "nested transaction already deassociated from connection"
2786 )
2787
2788 @property
2789 def _deactivated_from_connection(self) -> bool:
2790 return self.connection._nested_transaction is not self
2791
2792 def _cancel(self) -> None:
2793 # called by RootTransaction when the outer transaction is
2794 # committed, rolled back, or closed to cancel all savepoints
2795 # without any action being taken
2796 self.is_active = False
2797 self._deactivate_from_connection()
2798 if self._previous_nested:
2799 self._previous_nested._cancel()
2800
2801 def _close_impl(
2802 self, deactivate_from_connection: bool, warn_already_deactive: bool
2803 ) -> None:
2804 try:
2805 if (
2806 self.is_active
2807 and self.connection._transaction
2808 and self.connection._transaction.is_active
2809 ):
2810 self.connection._rollback_to_savepoint_impl(self._savepoint)
2811 finally:
2812 self.is_active = False
2813
2814 if deactivate_from_connection:
2815 self._deactivate_from_connection(warn=warn_already_deactive)
2816
2817 assert not self.is_active
2818 if deactivate_from_connection:
2819 assert self.connection._nested_transaction is not self
2820
2821 def _do_close(self) -> None:
2822 self._close_impl(True, False)
2823
2824 def _do_rollback(self) -> None:
2825 self._close_impl(True, True)
2826
2827 def _do_commit(self) -> None:
2828 if self.is_active:
2829 try:
2830 self.connection._release_savepoint_impl(self._savepoint)
2831 finally:
2832 # nested trans becomes inactive on failed release
2833 # unconditionally. this prevents it from trying to
2834 # emit SQL when it rolls back.
2835 self.is_active = False
2836
2837 # but only de-associate from connection if it succeeded
2838 self._deactivate_from_connection()
2839 else:
2840 if self.connection._nested_transaction is self:
2841 self.connection._invalid_transaction()
2842 else:
2843 raise exc.InvalidRequestError(
2844 "This nested transaction is inactive"
2845 )
2846
2847
2848class TwoPhaseTransaction(RootTransaction):
2849 """Represent a two-phase transaction.
2850
2851 A new :class:`.TwoPhaseTransaction` object may be procured
2852 using the :meth:`_engine.Connection.begin_twophase` method.
2853
2854 The interface is the same as that of :class:`.Transaction`
2855 with the addition of the :meth:`prepare` method.
2856
2857 """
2858
2859 __slots__ = ("xid", "_is_prepared")
2860
2861 xid: Any
2862
2863 def __init__(self, connection: Connection, xid: Any):
2864 self._is_prepared = False
2865 self.xid = xid
2866 super().__init__(connection)
2867
2868 def prepare(self) -> None:
2869 """Prepare this :class:`.TwoPhaseTransaction`.
2870
2871 After a PREPARE, the transaction can be committed.
2872
2873 """
2874 if not self.is_active:
2875 raise exc.InvalidRequestError("This transaction is inactive")
2876 self.connection._prepare_twophase_impl(self.xid)
2877 self._is_prepared = True
2878
2879 def _connection_begin_impl(self) -> None:
2880 self.connection._begin_twophase_impl(self)
2881
2882 def _connection_rollback_impl(self) -> None:
2883 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2884
2885 def _connection_commit_impl(self) -> None:
2886 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2887
2888
2889class Engine(
2890 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2891):
2892 """
2893 Connects a :class:`~sqlalchemy.pool.Pool` and
2894 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2895 source of database connectivity and behavior.
2896
2897 An :class:`_engine.Engine` object is instantiated publicly using the
2898 :func:`~sqlalchemy.create_engine` function.
2899
2900 .. seealso::
2901
2902 :doc:`/core/engines`
2903
2904 :ref:`connections_toplevel`
2905
2906 """
2907
2908 dispatch: dispatcher[ConnectionEventsTarget]
2909
2910 _compiled_cache: Optional[CompiledCacheType]
2911
2912 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2913 _has_events: bool = False
2914 _connection_cls: Type[Connection] = Connection
2915 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2916 _is_future: bool = False
2917
2918 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2919 _option_cls: Type[OptionEngine]
2920
2921 dialect: Dialect
2922 pool: Pool
2923 url: URL
2924 hide_parameters: bool
2925
2926 def __init__(
2927 self,
2928 pool: Pool,
2929 dialect: Dialect,
2930 url: URL,
2931 logging_name: Optional[str] = None,
2932 echo: Optional[_EchoFlagType] = None,
2933 query_cache_size: int = 500,
2934 execution_options: Optional[Mapping[str, Any]] = None,
2935 hide_parameters: bool = False,
2936 ):
2937 self.pool = pool
2938 self.url = url
2939 self.dialect = dialect
2940 if logging_name:
2941 self.logging_name = logging_name
2942 self.echo = echo
2943 self.hide_parameters = hide_parameters
2944 if query_cache_size != 0:
2945 self._compiled_cache = util.LRUCache(
2946 query_cache_size, size_alert=self._lru_size_alert
2947 )
2948 else:
2949 self._compiled_cache = None
2950 log.instance_logger(self, echoflag=echo)
2951 if execution_options:
2952 self.update_execution_options(**execution_options)
2953
2954 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2955 if self._should_log_info():
2956 self.logger.info(
2957 "Compiled cache size pruning from %d items to %d. "
2958 "Increase cache size to reduce the frequency of pruning.",
2959 len(cache),
2960 cache.capacity,
2961 )
2962
2963 @property
2964 def engine(self) -> Engine:
2965 """Returns this :class:`.Engine`.
2966
2967 Used for legacy schemes that accept :class:`.Connection` /
2968 :class:`.Engine` objects within the same variable.
2969
2970 """
2971 return self
2972
2973 def clear_compiled_cache(self) -> None:
2974 """Clear the compiled cache associated with the dialect.
2975
2976 This applies **only** to the built-in cache that is established
2977 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
2978 It will not impact any dictionary caches that were passed via the
2979 :paramref:`.Connection.execution_options.compiled_cache` parameter.
2980
2981 .. versionadded:: 1.4
2982
2983 """
2984 if self._compiled_cache:
2985 self._compiled_cache.clear()
2986
2987 def update_execution_options(self, **opt: Any) -> None:
2988 r"""Update the default execution_options dictionary
2989 of this :class:`_engine.Engine`.
2990
2991 The given keys/values in \**opt are added to the
2992 default execution options that will be used for
2993 all connections. The initial contents of this dictionary
2994 can be sent via the ``execution_options`` parameter
2995 to :func:`_sa.create_engine`.
2996
2997 .. seealso::
2998
2999 :meth:`_engine.Connection.execution_options`
3000
3001 :meth:`_engine.Engine.execution_options`
3002
3003 """
3004 self.dispatch.set_engine_execution_options(self, opt)
3005 self._execution_options = self._execution_options.union(opt)
3006 self.dialect.set_engine_execution_options(self, opt)
3007
3008 @overload
3009 def execution_options(
3010 self,
3011 *,
3012 compiled_cache: Optional[CompiledCacheType] = ...,
3013 logging_token: str = ...,
3014 isolation_level: IsolationLevel = ...,
3015 insertmanyvalues_page_size: int = ...,
3016 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3017 **opt: Any,
3018 ) -> OptionEngine: ...
3019
3020 @overload
3021 def execution_options(self, **opt: Any) -> OptionEngine: ...
3022
3023 def execution_options(self, **opt: Any) -> OptionEngine:
3024 """Return a new :class:`_engine.Engine` that will provide
3025 :class:`_engine.Connection` objects with the given execution options.
3026
3027 The returned :class:`_engine.Engine` remains related to the original
3028 :class:`_engine.Engine` in that it shares the same connection pool and
3029 other state:
3030
3031 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3032 is the
3033 same instance. The :meth:`_engine.Engine.dispose`
3034 method will replace
3035 the connection pool instance for the parent engine as well
3036 as this one.
3037 * Event listeners are "cascaded" - meaning, the new
3038 :class:`_engine.Engine`
3039 inherits the events of the parent, and new events can be associated
3040 with the new :class:`_engine.Engine` individually.
3041 * The logging configuration and logging_name is copied from the parent
3042 :class:`_engine.Engine`.
3043
3044 The intent of the :meth:`_engine.Engine.execution_options` method is
3045 to implement schemes where multiple :class:`_engine.Engine`
3046 objects refer to the same connection pool, but are differentiated
3047 by options that affect some execution-level behavior for each
3048 engine. One such example is breaking into separate "reader" and
3049 "writer" :class:`_engine.Engine` instances, where one
3050 :class:`_engine.Engine`
3051 has a lower :term:`isolation level` setting configured or is even
3052 transaction-disabled using "autocommit". An example of this
3053 configuration is at :ref:`dbapi_autocommit_multiple`.
3054
3055 Another example is one that
3056 uses a custom option ``shard_id`` which is consumed by an event
3057 to change the current schema on a database connection::
3058
3059 from sqlalchemy import event
3060 from sqlalchemy.engine import Engine
3061
3062 primary_engine = create_engine("mysql+mysqldb://")
3063 shard1 = primary_engine.execution_options(shard_id="shard1")
3064 shard2 = primary_engine.execution_options(shard_id="shard2")
3065
3066 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3067
3068
3069 @event.listens_for(Engine, "before_cursor_execute")
3070 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3071 shard_id = conn.get_execution_options().get("shard_id", "default")
3072 current_shard = conn.info.get("current_shard", None)
3073
3074 if current_shard != shard_id:
3075 cursor.execute("use %s" % shards[shard_id])
3076 conn.info["current_shard"] = shard_id
3077
3078 The above recipe illustrates two :class:`_engine.Engine` objects that
3079 will each serve as factories for :class:`_engine.Connection` objects
3080 that have pre-established "shard_id" execution options present. A
3081 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3082 then interprets this execution option to emit a MySQL ``use`` statement
3083 to switch databases before a statement execution, while at the same
3084 time keeping track of which database we've established using the
3085 :attr:`_engine.Connection.info` dictionary.
3086
3087 .. seealso::
3088
3089 :meth:`_engine.Connection.execution_options`
3090 - update execution options
3091 on a :class:`_engine.Connection` object.
3092
3093 :meth:`_engine.Engine.update_execution_options`
3094 - update the execution
3095 options for a given :class:`_engine.Engine` in place.
3096
3097 :meth:`_engine.Engine.get_execution_options`
3098
3099
3100 """ # noqa: E501
3101 return self._option_cls(self, opt)
3102
3103 def get_execution_options(self) -> _ExecuteOptions:
3104 """Get the non-SQL options which will take effect during execution.
3105
3106 .. seealso::
3107
3108 :meth:`_engine.Engine.execution_options`
3109 """
3110 return self._execution_options
3111
3112 @property
3113 def name(self) -> str:
3114 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3115 in use by this :class:`Engine`.
3116
3117 """
3118
3119 return self.dialect.name
3120
3121 @property
3122 def driver(self) -> str:
3123 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3124 in use by this :class:`Engine`.
3125
3126 """
3127
3128 return self.dialect.driver
3129
3130 echo = log.echo_property()
3131
3132 def __repr__(self) -> str:
3133 return "Engine(%r)" % (self.url,)
3134
3135 def dispose(self, close: bool = True) -> None:
3136 """Dispose of the connection pool used by this
3137 :class:`_engine.Engine`.
3138
3139 A new connection pool is created immediately after the old one has been
3140 disposed. The previous connection pool is disposed either actively, by
3141 closing out all currently checked-in connections in that pool, or
3142 passively, by losing references to it but otherwise not closing any
3143 connections. The latter strategy is more appropriate for an initializer
3144 in a forked Python process.
3145
3146 Event listeners associated with the old pool via :class:`.PoolEvents`
3147 are **transferred to the new pool**; this is to support the pattern
3148 by which :class:`.PoolEvents` are set up in terms of the owning
3149 :class:`.Engine` without the need to refer to the :class:`.Pool`
3150 directly.
3151
3152 :param close: if left at its default of ``True``, has the
3153 effect of fully closing all **currently checked in**
3154 database connections. Connections that are still checked out
3155 will **not** be closed, however they will no longer be associated
3156 with this :class:`_engine.Engine`,
3157 so when they are closed individually, eventually the
3158 :class:`_pool.Pool` which they are associated with will
3159 be garbage collected and they will be closed out fully, if
3160 not already closed on checkin.
3161
3162 If set to ``False``, the previous connection pool is de-referenced,
3163 and otherwise not touched in any way.
3164
3165 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3166 parameter to allow the replacement of a connection pool in a child
3167 process without interfering with the connections used by the parent
3168 process.
3169
3170
3171 .. seealso::
3172
3173 :ref:`engine_disposal`
3174
3175 :ref:`pooling_multiprocessing`
3176
3177 :meth:`.ConnectionEvents.engine_disposed`
3178
3179 """
3180 if close:
3181 self.pool.dispose()
3182 self.pool = self.pool.recreate()
3183 self.dispatch.engine_disposed(self)
3184
3185 @contextlib.contextmanager
3186 def _optional_conn_ctx_manager(
3187 self, connection: Optional[Connection] = None
3188 ) -> Iterator[Connection]:
3189 if connection is None:
3190 with self.connect() as conn:
3191 yield conn
3192 else:
3193 yield connection
3194
3195 @contextlib.contextmanager
3196 def begin(self) -> Iterator[Connection]:
3197 """Return a context manager delivering a :class:`_engine.Connection`
3198 with a :class:`.Transaction` established.
3199
3200 E.g.::
3201
3202 with engine.begin() as conn:
3203 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3204 conn.execute(text("my_special_procedure(5)"))
3205
3206 Upon successful operation, the :class:`.Transaction`
3207 is committed. If an error is raised, the :class:`.Transaction`
3208 is rolled back.
3209
3210 .. seealso::
3211
3212 :meth:`_engine.Engine.connect` - procure a
3213 :class:`_engine.Connection` from
3214 an :class:`_engine.Engine`.
3215
3216 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3217 for a particular :class:`_engine.Connection`.
3218
3219 """ # noqa: E501
3220 with self.connect() as conn:
3221 with conn.begin():
3222 yield conn
3223
3224 def _run_ddl_visitor(
3225 self,
3226 visitorcallable: Type[InvokeDDLBase],
3227 element: SchemaVisitable,
3228 **kwargs: Any,
3229 ) -> None:
3230 with self.begin() as conn:
3231 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3232
3233 def connect(self) -> Connection:
3234 """Return a new :class:`_engine.Connection` object.
3235
3236 The :class:`_engine.Connection` acts as a Python context manager, so
3237 the typical use of this method looks like::
3238
3239 with engine.connect() as connection:
3240 connection.execute(text("insert into table values ('foo')"))
3241 connection.commit()
3242
3243 Where above, after the block is completed, the connection is "closed"
3244 and its underlying DBAPI resources are returned to the connection pool.
3245 This also has the effect of rolling back any transaction that
3246 was explicitly begun or was begun via autobegin, and will
3247 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3248 started and is still in progress.
3249
3250 .. seealso::
3251
3252 :meth:`_engine.Engine.begin`
3253
3254 """
3255
3256 return self._connection_cls(self)
3257
3258 def raw_connection(self) -> PoolProxiedConnection:
3259 """Return a "raw" DBAPI connection from the connection pool.
3260
3261 The returned object is a proxied version of the DBAPI
3262 connection object used by the underlying driver in use.
3263 The object will have all the same behavior as the real DBAPI
3264 connection, except that its ``close()`` method will result in the
3265 connection being returned to the pool, rather than being closed
3266 for real.
3267
3268 This method provides direct DBAPI connection access for
3269 special situations when the API provided by
3270 :class:`_engine.Connection`
3271 is not needed. When a :class:`_engine.Connection` object is already
3272 present, the DBAPI connection is available using
3273 the :attr:`_engine.Connection.connection` accessor.
3274
3275 .. seealso::
3276
3277 :ref:`dbapi_connections`
3278
3279 """
3280 return self.pool.connect()
3281
3282
3283class OptionEngineMixin(log.Identified):
3284 _sa_propagate_class_events = False
3285
3286 dispatch: dispatcher[ConnectionEventsTarget]
3287 _compiled_cache: Optional[CompiledCacheType]
3288 dialect: Dialect
3289 pool: Pool
3290 url: URL
3291 hide_parameters: bool
3292 echo: log.echo_property
3293
3294 def __init__(
3295 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3296 ):
3297 self._proxied = proxied
3298 self.url = proxied.url
3299 self.dialect = proxied.dialect
3300 self.logging_name = proxied.logging_name
3301 self.echo = proxied.echo
3302 self._compiled_cache = proxied._compiled_cache
3303 self.hide_parameters = proxied.hide_parameters
3304 log.instance_logger(self, echoflag=self.echo)
3305
3306 # note: this will propagate events that are assigned to the parent
3307 # engine after this OptionEngine is created. Since we share
3308 # the events of the parent we also disallow class-level events
3309 # to apply to the OptionEngine class directly.
3310 #
3311 # the other way this can work would be to transfer existing
3312 # events only, using:
3313 # self.dispatch._update(proxied.dispatch)
3314 #
3315 # that might be more appropriate however it would be a behavioral
3316 # change for logic that assigns events to the parent engine and
3317 # would like it to take effect for the already-created sub-engine.
3318 self.dispatch = self.dispatch._join(proxied.dispatch)
3319
3320 self._execution_options = proxied._execution_options
3321 self.update_execution_options(**execution_options)
3322
3323 def update_execution_options(self, **opt: Any) -> None:
3324 raise NotImplementedError()
3325
3326 if not typing.TYPE_CHECKING:
3327 # https://github.com/python/typing/discussions/1095
3328
3329 @property
3330 def pool(self) -> Pool:
3331 return self._proxied.pool
3332
3333 @pool.setter
3334 def pool(self, pool: Pool) -> None:
3335 self._proxied.pool = pool
3336
3337 @property
3338 def _has_events(self) -> bool:
3339 return self._proxied._has_events or self.__dict__.get(
3340 "_has_events", False
3341 )
3342
3343 @_has_events.setter
3344 def _has_events(self, value: bool) -> None:
3345 self.__dict__["_has_events"] = value
3346
3347
3348class OptionEngine(OptionEngineMixin, Engine):
3349 def update_execution_options(self, **opt: Any) -> None:
3350 Engine.update_execution_options(self, **opt)
3351
3352
3353Engine._option_cls = OptionEngine